From f75ee38163880a4a12373bf1d4750056a72272d2 Mon Sep 17 00:00:00 2001
From: jiazx0107@163.com <jiazx0107@163.com>
Date: 星期二, 07 十一月 2023 14:06:55 +0800
Subject: [PATCH] 四川网关-提交测试

---
 src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java |   83 ++++++++++++++++++++++++++++++++++-------
 1 files changed, 69 insertions(+), 14 deletions(-)

diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
index affbda3..28e5666 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
@@ -1,15 +1,20 @@
 package com.fzzy.gateway.hx2023.websocket;
 
+import com.alibaba.fastjson.JSONObject;
+import com.fzzy.gateway.GatewayUtils;
+import com.fzzy.gateway.hx2023.data.WebSocketPacket;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.commons.lang.StringUtils;
 import org.springframework.stereotype.Component;
 
 import javax.websocket.*;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
 
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  *
@@ -19,30 +24,41 @@
 @ServerEndpoint(value = "/mqtt")
 public class WebSocketMqtt {
 
-    private static Map<String, Session> sessionPool = new HashMap<>();
+    private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
+    private static Map<String, String> sessionIds = new ConcurrentHashMap<>();
 
     // 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹�
     private Session session;
-    private String key;
-    
-    
+
+
     @OnOpen
-    public void onOpen(Session session,
+    public void onOpen(Session session) throws Exception {
+
+        this.session = session;
+
+        Map<String, List<String>> params =  session.getRequestParameterMap();
+
+        log.info("new webSocket,params={}", params);
+    }
+
+    public void onOpen2(Session session,
                        @PathParam("keepalive") String keepalive,
                        @PathParam("clientId") String clientId,
                        @PathParam("protocolId") String protocolId,
                        @PathParam("protocolVersion") String protocolVersion,
-                       @PathParam("clean") String clean,
-                       @PathParam("reconnectPeriod") String reconnectPeriod,
-                       @PathParam("reconnectTimeout") String reconnectTimeout
+                       @PathParam("clean") boolean clean,
+                       @PathParam("reconnectPeriod") int reconnectPeriod,
+                       @PathParam("connectTimeout") int connectTimeout
     ) throws Exception {
 
         this.session = session;
 
-        key = clientId;
+        String key = clientId;
 
         sessionPool.put(key, session);
+        sessionIds.put(session.getId(), key);
 
+        GatewayUtils.updateOnline(clientId);
 
         log.info("new webSocket,clientId={}", key);
     }
@@ -50,9 +66,16 @@
     @OnClose
     public void onClose() {
 
-        sessionPool.remove(key);
+//        String key = sessionIds.get(session.getId());
+//
+//        String deviceId = key.substring(key.indexOf("-"));
+//
+//        GatewayUtils.updateOffOnline(deviceId);
+//
+//        sessionPool.remove(key);
+//        sessionIds.remove(session.getId());
 
-        log.info("WebSocket杩炴帴鍏抽棴={}", key);
+        log.info("WebSocket杩炴帴鍏抽棴={}","----------");
 
     }
 
@@ -72,9 +95,41 @@
     public void onError(Session session, Throwable error) {
         log.error("鍙戠敓閿欒");
 
-        sessionPool.remove(key);
-
+//        String clientId = sessionIds.get(session.getId());
+//
+//        sessionPool.remove(clientId);
+//        sessionIds.remove(session.getId());
         error.printStackTrace();
     }
 
+
+    /**
+     * 鎺ㄩ�佷俊鎭埌鍓嶇
+     *
+     * @param packet
+     */
+    public void sendByPacket(WebSocketPacket packet) {
+        if (StringUtils.isEmpty(packet.getDeviceId())) {
+            log.error("WebSocket淇℃伅鎺ㄩ�佸け璐ワ紝璁惧缂栫爜涓虹┖銆�");
+            return;
+        }
+
+        String tag = packet.getDeviceId();
+
+        // 閬嶅巻鎺ㄩ��
+        Session session;
+        for (String key : sessionPool.keySet()) {
+
+            packet.getHeaders().setProductId(key);
+
+            log.debug("----------websocket杩斿洖淇℃伅-----{}", packet);
+
+            if (key.indexOf(tag) != -1) {
+                session = sessionPool.get(key);
+                session.getAsyncRemote().sendText(JSONObject.toJSONString(packet));
+            }
+        }
+    }
+
+
 }

--
Gitblit v1.9.3