package com.ld.io.netty;
|
|
import com.ld.io.api.*;
|
|
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.channel.*;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.socket.SocketChannel;
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
import io.netty.handler.codec.ByteToMessageDecoder;
|
import io.netty.handler.codec.bytes.ByteArrayDecoder;
|
import io.netty.handler.codec.bytes.ByteArrayEncoder;
|
import io.netty.handler.logging.LogLevel;
|
import io.netty.handler.logging.LoggingHandler;
|
import io.netty.handler.timeout.IdleStateHandler;
|
|
public class NettyServer implements Runnable, IoServer {
|
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
private EventLoopGroup workerGroup = new NioEventLoopGroup();
|
|
private IoServerOption ioServerOption;
|
private NettySessionFactory nettySessionFactory;
|
private ReadMessageHandler readMessageHandler;
|
private SplitDecoderFactory splitDecoderFactory;
|
|
public NettyServer(IoServerOption ioServerOption,
|
IoMsgConsumer ioMsgConsumer,
|
IoSessionListener sessionListener) {
|
this(ioServerOption, ioMsgConsumer, sessionListener, null);
|
}
|
|
public NettyServer(IoServerOption ioServerOption,
|
IoMsgConsumer ioMsgConsumer,
|
IoSessionListener sessionListener,
|
HeartbeatProvider heartbeatMessageProvider) {
|
this.splitDecoderFactory = new SplitDecoderFactory();
|
this.nettySessionFactory = new NettySessionFactory(sessionListener);
|
this.ioServerOption = ioServerOption;
|
|
this.readMessageHandler = new ReadMessageHandler(ioMsgConsumer, nettySessionFactory, heartbeatMessageProvider);
|
}
|
|
public void startup() {
|
Thread thread = new Thread(this);
|
thread.start();
|
}
|
|
public void shutdown() {
|
workerGroup.shutdownGracefully();
|
bossGroup.shutdownGracefully();
|
}
|
|
@Override
|
public void run() {
|
try {
|
ServerBootstrap b = new ServerBootstrap();
|
b.group(bossGroup, workerGroup)
|
.channel(NioServerSocketChannel.class)
|
.handler(new LoggingHandler(LogLevel.INFO))
|
.option(ChannelOption.SO_KEEPALIVE, true)
|
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ioServerOption.getConnectTimeoutMillis())
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
@Override
|
public void initChannel(SocketChannel ch) {
|
ChannelPipeline p = ch.pipeline();
|
ByteToMessageDecoder splitDecoder = splitDecoderFactory.build(ioServerOption);
|
if(splitDecoder != null)
|
p.addLast("splitDecoder", splitDecoder);
|
p.addLast("idleStateHandler", new IdleStateHandler(ioServerOption.getReaderIdleTime(), 0, 0));
|
p.addLast("byteArrayDecoder", new ByteArrayDecoder());//字符串解码 和 编码
|
p.addLast("byteArrayEncoder", new ByteArrayEncoder());
|
p.addLast("readMessageHandler", readMessageHandler);//读取信息
|
}
|
});
|
|
// Bind and start to accept incoming connections.
|
ChannelFuture channelFuture = b.bind(ioServerOption.getPort()).sync();
|
channelFuture.channel().closeFuture().sync();
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
} finally {
|
workerGroup.shutdownGracefully();
|
bossGroup.shutdownGracefully();
|
}
|
}
|
|
public IoSessionQuery getSessionQuery() {
|
return nettySessionFactory;
|
}
|
}
|