package com.ld.io.netty;
|
|
import com.ld.io.api.IoMsgConsumer;
|
import com.ld.io.api.IoSession;
|
|
import io.netty.util.concurrent.DefaultThreadFactory;
|
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.concurrent.*;
|
|
/**
|
* 线程池 处理接收到的信息, 不占用netty的工作线程
|
*/
|
class ReceiveMessageThreadPool {
|
private static final String POOL_NAME = "netty-receive-message";
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
private static ExecutorService executorService;
|
private IoMsgConsumer messageConsumer;
|
|
public ReceiveMessageThreadPool(IoMsgConsumer messageConsumer) {
|
this.messageConsumer = messageConsumer;
|
}
|
|
static {
|
ThreadFactory threadFactory = new DefaultThreadFactory(POOL_NAME);
|
executorService = new ThreadPoolExecutor(100, 100, 0L,
|
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(500),
|
threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
|
}
|
|
void execute(IoSession session, byte[] bytes) {
|
executorService
|
.submit(() -> {
|
if (messageConsumer != null) {
|
String msg;
|
try {
|
//msg = new String(bytes, "UTF-8");
|
//logger.info("临时打印查看报文:"+ msg);
|
messageConsumer.consume(session, bytes);
|
} catch (Exception e) {
|
try {
|
msg = new String(bytes, "UTF-8");
|
} catch (Exception e1) {
|
msg = "转换为字符串异常";
|
}
|
logger.error(
|
"Consume message happened error, business key="
|
+ session.getBusinessKey()
|
+ ", msg=" + msg, e);
|
}
|
} else {
|
logger.error("Message consumer not config! All request was ignored!");
|
}
|
});
|
}
|
}
|