package com.ld.io.netty;
|
|
import com.ld.io.api.HeartbeatProvider;
|
import com.ld.io.api.IoMsgConsumer;
|
import com.ld.io.api.IoSession;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.Channel;
|
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelHandler.Sharable;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.handler.timeout.IdleStateEvent;
|
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
|
import java.io.IOException;
|
|
@Sharable
|
public class ReadMessageHandler extends SimpleChannelInboundHandler<Object> {
|
private final InternalLogger logger = InternalLoggerFactory.getInstance(this.getClass());
|
private ReceiveMessageThreadPool threadPool;//消息处理线程池
|
private NettySessionFactory nettySessionFactory;//会话工厂
|
private HeartbeatProvider heartbeatMessageProvider;//心跳数据提供者
|
|
public ReadMessageHandler(IoMsgConsumer messageConsumer, NettySessionFactory nettySessionFactory, HeartbeatProvider heartbeatMessageProvider) {
|
this.threadPool = new ReceiveMessageThreadPool(messageConsumer);
|
this.nettySessionFactory = nettySessionFactory;
|
this.heartbeatMessageProvider = heartbeatMessageProvider;
|
}
|
|
@Override
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
nettySessionFactory.create(new NettyChannel(ctx));
|
super.channelActive(ctx);
|
}
|
|
@Override
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
nettySessionFactory.destroy(ctx);
|
super.channelInactive(ctx);
|
}
|
|
@Override
|
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
|
IoSession session = nettySessionFactory.getSession(ctx);
|
threadPool.execute(session, (byte[]) msg);
|
}
|
|
@Override
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
logger.error("happened error!", cause);
|
Channel channel = ctx.channel();
|
if (cause instanceof IOException && channel.isActive()) {
|
nettySessionFactory.destroy(ctx);
|
ctx.close();
|
}
|
}
|
|
/**
|
* 心跳处理
|
* 超时后尝试向客户端发送消息,若发送失败关闭channel
|
*/
|
@Override
|
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
|
if (evt instanceof IdleStateEvent) {
|
if (heartbeatMessageProvider == null) {
|
return;
|
}
|
|
logger.info("Health check time out, the event was triggered");
|
IoSession session = nettySessionFactory.getSession(ctx);
|
if(null == session){
|
logger.error("no Session by HeartBeat");
|
return;
|
}
|
byte[] bytes = heartbeatMessageProvider.provide(session);
|
|
if(null == bytes){
|
logger.error("no msg by HeartBeat");
|
return;
|
}
|
|
ByteBuf buf = Unpooled.copiedBuffer(bytes);
|
ctx.writeAndFlush(buf).addListener((ChannelFutureListener) future -> {
|
if (!future.isSuccess()) {
|
nettySessionFactory.destroy(ctx);
|
future.channel().close();
|
}
|
});
|
} else {
|
super.userEventTriggered(ctx, evt);
|
}
|
}
|
}
|