jiazx0107@163.com
2024-01-05 7fe5e6110fb2516a81c2258e8d29fc19ee781d76
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.fzzy.protocol.youxian1.client;
 
import com.ld.io.api.InvokeResult;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import lombok.extern.slf4j.Slf4j;
 
import java.util.HashMap;
import java.util.Map;
 
/**
 * 游仙分库,通讯协议
 */
@Slf4j
public class ClientEngine {
 
    /**
     * 客户端模式创建的连接通道
     */
    public static Map<String, Channel> clientChannelMap = new HashMap<>();
 
 
    public static Channel defaultChannel;
 
    public static void start(String ip, Integer port) throws Exception {
        if (defaultChannel != null) {
            log.info("-----IP={},连接存在,直接使用", ip);
            return;
        }
        EventLoopGroup group = new OioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        //默认长连接
        b.option(ChannelOption.SO_KEEPALIVE, true);
        b.group(group).channel(OioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch)
                            throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        // 字符串解码 和 编码
                        p.addLast("decoder", new ByteArrayDecoder());
                        p.addLast("encoder", new ByteArrayEncoder());
                        // 自己的逻辑Handler
                        p.addLast("handler", new ClientHandler());
                    }
                });
 
        // 发起异步连接请求,绑定连接端口和host信息
        ChannelFuture channelFuture = b.connect(ip, port);
 
        defaultChannel = channelFuture.channel();
 
        // channelFuture.channel().closeFuture().sync();
 
        add2ChannelMap(ip, defaultChannel);
 
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture arg0) throws Exception {
                if (channelFuture.isSuccess()) {
                    log.info("-----IP={},连接成功", ip);
                } else {
                    log.info("-----IP={},连接失败,自动关闭线程", ip);
                    channelFuture.cause().printStackTrace();
                    group.shutdownGracefully(); // 关闭线程组
                }
            }
        });
    }
 
    public static InvokeResult send(byte[] array) throws InterruptedException {
 
        if (null == defaultChannel) {
            return InvokeResult.SOCKET_NOT_CREATE;
        }
        if (!defaultChannel.isActive()) {
            return InvokeResult.CHANNEL_CLOSED;
        }
        defaultChannel.writeAndFlush(Unpooled.copiedBuffer(array)).sync();
        return InvokeResult.SUCCESS;
    }
 
 
    public static Channel getChannel() {
        return defaultChannel;
    }
 
    public static void add2ChannelMap(String key, Channel channel) {
        clientChannelMap.put(key, channel);
    }
 
    /**
     * 如果
     *
     * @param key
     * @return
     */
    public static Channel getChannel(String key) {
        Channel channel = clientChannelMap.get(key);
        if (null == channel) return null;
        if (channel.isActive()) {
            return channel;
        } else {
            channel.close();
        }
        return null;
    }
}