package com.ld.io.netty; import com.ld.io.api.IoMsgConsumer; import com.ld.io.api.IoSession; import io.netty.util.concurrent.DefaultThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.*; /** * 线程池 处理接收到的信息, 不占用netty的工作线程 */ class ReceiveMessageThreadPool { private static final String POOL_NAME = "netty-receive-message"; private final Logger logger = LoggerFactory.getLogger(getClass()); private static ExecutorService executorService; private IoMsgConsumer messageConsumer; public ReceiveMessageThreadPool(IoMsgConsumer messageConsumer) { this.messageConsumer = messageConsumer; } static { ThreadFactory threadFactory = new DefaultThreadFactory(POOL_NAME); executorService = new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(500), threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy()); } void execute(IoSession session, byte[] bytes) { executorService .submit(() -> { if (messageConsumer != null) { String msg; try { //msg = new String(bytes, "UTF-8"); //logger.info("临时打印查看报文:"+ msg); messageConsumer.consume(session, bytes); } catch (Exception e) { try { msg = new String(bytes, "UTF-8"); } catch (Exception e1) { msg = "转换为字符串异常"; } logger.error( "Consume message happened error, business key=" + session.getBusinessKey() + ", msg=" + msg, e); } } else { logger.error("Message consumer not config! All request was ignored!"); } }); } }