From f75ee38163880a4a12373bf1d4750056a72272d2 Mon Sep 17 00:00:00 2001 From: jiazx0107@163.com <jiazx0107@163.com> Date: 星期二, 07 十一月 2023 14:06:55 +0800 Subject: [PATCH] 四川网关-提交测试 --- src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java | 83 ++++++++++++++++++++++++++++++++++------- 1 files changed, 69 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java index affbda3..28e5666 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java +++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java @@ -1,15 +1,20 @@ package com.fzzy.gateway.hx2023.websocket; +import com.alibaba.fastjson.JSONObject; +import com.fzzy.gateway.GatewayUtils; +import com.fzzy.gateway.hx2023.data.WebSocketPacket; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; -import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * @@ -19,30 +24,41 @@ @ServerEndpoint(value = "/mqtt") public class WebSocketMqtt { - private static Map<String, Session> sessionPool = new HashMap<>(); + private static Map<String, Session> sessionPool = new ConcurrentHashMap<>(); + private static Map<String, String> sessionIds = new ConcurrentHashMap<>(); // 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹� private Session session; - private String key; - - + + @OnOpen - public void onOpen(Session session, + public void onOpen(Session session) throws Exception { + + this.session = session; + + Map<String, List<String>> params = session.getRequestParameterMap(); + + log.info("new webSocket,params={}", params); + } + + public void onOpen2(Session session, @PathParam("keepalive") String keepalive, @PathParam("clientId") String clientId, @PathParam("protocolId") String protocolId, @PathParam("protocolVersion") String protocolVersion, - @PathParam("clean") String clean, - @PathParam("reconnectPeriod") String reconnectPeriod, - @PathParam("reconnectTimeout") String reconnectTimeout + @PathParam("clean") boolean clean, + @PathParam("reconnectPeriod") int reconnectPeriod, + @PathParam("connectTimeout") int connectTimeout ) throws Exception { this.session = session; - key = clientId; + String key = clientId; sessionPool.put(key, session); + sessionIds.put(session.getId(), key); + GatewayUtils.updateOnline(clientId); log.info("new webSocket,clientId={}", key); } @@ -50,9 +66,16 @@ @OnClose public void onClose() { - sessionPool.remove(key); +// String key = sessionIds.get(session.getId()); +// +// String deviceId = key.substring(key.indexOf("-")); +// +// GatewayUtils.updateOffOnline(deviceId); +// +// sessionPool.remove(key); +// sessionIds.remove(session.getId()); - log.info("WebSocket杩炴帴鍏抽棴={}", key); + log.info("WebSocket杩炴帴鍏抽棴={}","----------"); } @@ -72,9 +95,41 @@ public void onError(Session session, Throwable error) { log.error("鍙戠敓閿欒"); - sessionPool.remove(key); - +// String clientId = sessionIds.get(session.getId()); +// +// sessionPool.remove(clientId); +// sessionIds.remove(session.getId()); error.printStackTrace(); } + + /** + * 鎺ㄩ�佷俊鎭埌鍓嶇 + * + * @param packet + */ + public void sendByPacket(WebSocketPacket packet) { + if (StringUtils.isEmpty(packet.getDeviceId())) { + log.error("WebSocket淇℃伅鎺ㄩ�佸け璐ワ紝璁惧缂栫爜涓虹┖銆�"); + return; + } + + String tag = packet.getDeviceId(); + + // 閬嶅巻鎺ㄩ�� + Session session; + for (String key : sessionPool.keySet()) { + + packet.getHeaders().setProductId(key); + + log.debug("----------websocket杩斿洖淇℃伅-----{}", packet); + + if (key.indexOf(tag) != -1) { + session = sessionPool.get(key); + session.getAsyncRemote().sendText(JSONObject.toJSONString(packet)); + } + } + } + + } -- Gitblit v1.9.3