package com.ld.igds.protocol.es.dlt645.client; import com.ld.io.api.InvokeResult; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.oio.OioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.oio.OioSocketChannel; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import lombok.extern.slf4j.Slf4j; /** * DL/T 645-2007多功能电表 * * @author czt */ @Slf4j public class Dlt645ClientEngine implements Runnable { private String host; private int port; public Channel channel; public Dlt645ClientEngine(String host, int port) { this.host = host; this.port = port; } public void start() { Thread thread = new Thread(this); thread.start(); } @Override public void run() { try { startRun(); } catch (Exception e) { e.printStackTrace(); } } public void startRun() throws Exception { EventLoopGroup group = new OioEventLoopGroup(); Bootstrap b = new Bootstrap(); // b.option(ChannelOption.SO_KEEPALIVE, true); b.group(group).channel(OioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // 字符串解码 和 编码 p.addLast("decoder", new ByteArrayDecoder()); p.addLast("encoder", new ByteArrayEncoder()); // 自己的逻辑Handler p.addLast("handler", new ClientHandler()); } }); // 发起异步连接请求,绑定连接端口和host信息 ChannelFuture channelFuture = b.connect(host, port); this.channel = channelFuture.channel(); // channelFuture.channel().closeFuture().sync(); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture arg0) throws Exception { if (channelFuture.isSuccess()) { log.info("-----连接电表服务器成功,IP={},PORT={}", host, port); } else { log.info("-----连接电表服务器失败,IP={},PORT={}", host, port); channelFuture.cause().printStackTrace(); group.shutdownGracefully(); // 关闭线程组 } } }); } public InvokeResult send(byte[] array) throws InterruptedException { if (null == channel) { return InvokeResult.SOCKET_NOT_CREATE; } if (!channel.isActive()) { return InvokeResult.CHANNEL_CLOSED; } channel.writeAndFlush(Unpooled.copiedBuffer(array)).sync(); return InvokeResult.SUCCESS; } public Channel getChannel() { return channel; } }