jiazx0107@163.com
2023-11-07 f75ee38163880a4a12373bf1d4750056a72272d2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package com.fzzy.gateway.hx2023.websocket;
 
import com.alibaba.fastjson.JSONObject;
import com.fzzy.gateway.GatewayUtils;
import com.fzzy.gateway.hx2023.data.WebSocketPacket;
import lombok.extern.slf4j.Slf4j;
 
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
 
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
 
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 *
 */
@Slf4j
@Component
@ServerEndpoint(value = "/mqtt")
public class WebSocketMqtt {
 
    private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
    private static Map<String, String> sessionIds = new ConcurrentHashMap<>();
 
    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
 
 
    @OnOpen
    public void onOpen(Session session) throws Exception {
 
        this.session = session;
 
        Map<String, List<String>> params =  session.getRequestParameterMap();
 
        log.info("new webSocket,params={}", params);
    }
 
    public void onOpen2(Session session,
                       @PathParam("keepalive") String keepalive,
                       @PathParam("clientId") String clientId,
                       @PathParam("protocolId") String protocolId,
                       @PathParam("protocolVersion") String protocolVersion,
                       @PathParam("clean") boolean clean,
                       @PathParam("reconnectPeriod") int reconnectPeriod,
                       @PathParam("connectTimeout") int connectTimeout
    ) throws Exception {
 
        this.session = session;
 
        String key = clientId;
 
        sessionPool.put(key, session);
        sessionIds.put(session.getId(), key);
 
        GatewayUtils.updateOnline(clientId);
 
        log.info("new webSocket,clientId={}", key);
    }
 
    @OnClose
    public void onClose() {
 
//        String key = sessionIds.get(session.getId());
//
//        String deviceId = key.substring(key.indexOf("-"));
//
//        GatewayUtils.updateOffOnline(deviceId);
//
//        sessionPool.remove(key);
//        sessionIds.remove(session.getId());
 
        log.info("WebSocket连接关闭={}","----------");
 
    }
 
    /**
     * 收到前端发送的信息
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
 
        log.info("来自客户端信息:\n" + message);
    }
 
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
 
//        String clientId = sessionIds.get(session.getId());
//
//        sessionPool.remove(clientId);
//        sessionIds.remove(session.getId());
        error.printStackTrace();
    }
 
 
    /**
     * 推送信息到前端
     *
     * @param packet
     */
    public void sendByPacket(WebSocketPacket packet) {
        if (StringUtils.isEmpty(packet.getDeviceId())) {
            log.error("WebSocket信息推送失败,设备编码为空。");
            return;
        }
 
        String tag = packet.getDeviceId();
 
        // 遍历推送
        Session session;
        for (String key : sessionPool.keySet()) {
 
            packet.getHeaders().setProductId(key);
 
            log.debug("----------websocket返回信息-----{}", packet);
 
            if (key.indexOf(tag) != -1) {
                session = sessionPool.get(key);
                session.getAsyncRemote().sendText(JSONObject.toJSONString(packet));
            }
        }
    }
 
 
}