package com.ld.io.netty; import com.ld.io.api.HeartbeatProvider; import com.ld.io.api.IoMsgConsumer; import com.ld.io.api.IoSession; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; @Sharable public class ReadMessageHandler extends SimpleChannelInboundHandler { private final InternalLogger logger = InternalLoggerFactory.getInstance(this.getClass()); private ReceiveMessageThreadPool threadPool;//消息处理线程池 private NettySessionFactory nettySessionFactory;//会话工厂 private HeartbeatProvider heartbeatMessageProvider;//心跳数据提供者 public ReadMessageHandler(IoMsgConsumer messageConsumer, NettySessionFactory nettySessionFactory, HeartbeatProvider heartbeatMessageProvider) { this.threadPool = new ReceiveMessageThreadPool(messageConsumer); this.nettySessionFactory = nettySessionFactory; this.heartbeatMessageProvider = heartbeatMessageProvider; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { nettySessionFactory.create(new NettyChannel(ctx)); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { nettySessionFactory.destroy(ctx); super.channelInactive(ctx); } @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { IoSession session = nettySessionFactory.getSession(ctx); threadPool.execute(session, (byte[]) msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("happened error!", cause); Channel channel = ctx.channel(); if (cause instanceof IOException && channel.isActive()) { nettySessionFactory.destroy(ctx); ctx.close(); } } /** * 心跳处理 * 超时后尝试向客户端发送消息,若发送失败关闭channel */ @Override public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { if (heartbeatMessageProvider == null) { return; } logger.info("Health check time out, the event was triggered"); IoSession session = nettySessionFactory.getSession(ctx); if(null == session){ logger.error("no Session by HeartBeat"); return; } byte[] bytes = heartbeatMessageProvider.provide(session); if(null == bytes){ logger.error("no msg by HeartBeat"); return; } ByteBuf buf = Unpooled.copiedBuffer(bytes); ctx.writeAndFlush(buf).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { nettySessionFactory.destroy(ctx); future.channel().close(); } }); } else { super.userEventTriggered(ctx, evt); } } }