package com.ld.io.netty; import com.ld.io.api.*; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; public class NettyServer implements Runnable, IoServer { private EventLoopGroup bossGroup = new NioEventLoopGroup(1); private EventLoopGroup workerGroup = new NioEventLoopGroup(); private IoServerOption ioServerOption; private NettySessionFactory nettySessionFactory; private ReadMessageHandler readMessageHandler; private SplitDecoderFactory splitDecoderFactory; public NettyServer(IoServerOption ioServerOption, IoMsgConsumer ioMsgConsumer, IoSessionListener sessionListener) { this(ioServerOption, ioMsgConsumer, sessionListener, null); } public NettyServer(IoServerOption ioServerOption, IoMsgConsumer ioMsgConsumer, IoSessionListener sessionListener, HeartbeatProvider heartbeatMessageProvider) { this.splitDecoderFactory = new SplitDecoderFactory(); this.nettySessionFactory = new NettySessionFactory(sessionListener); this.ioServerOption = ioServerOption; this.readMessageHandler = new ReadMessageHandler(ioMsgConsumer, nettySessionFactory, heartbeatMessageProvider); } public void startup() { Thread thread = new Thread(this); thread.start(); } public void shutdown() { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } @Override public void run() { try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ioServerOption.getConnectTimeoutMillis()) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); ByteToMessageDecoder splitDecoder = splitDecoderFactory.build(ioServerOption); if(splitDecoder != null) p.addLast("splitDecoder", splitDecoder); p.addLast("idleStateHandler", new IdleStateHandler(ioServerOption.getReaderIdleTime(), 0, 0)); p.addLast("byteArrayDecoder", new ByteArrayDecoder());//字符串解码 和 编码 p.addLast("byteArrayEncoder", new ByteArrayEncoder()); p.addLast("readMessageHandler", readMessageHandler);//读取信息 } }); // Bind and start to accept incoming connections. ChannelFuture channelFuture = b.bind(ioServerOption.getPort()).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public IoSessionQuery getSessionQuery() { return nettySessionFactory; } }