jiazx0107@163.com
2023-11-07 f75ee38163880a4a12373bf1d4750056a72272d2
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
@@ -1,15 +1,20 @@
package com.fzzy.gateway.hx2023.websocket;
import com.alibaba.fastjson.JSONObject;
import com.fzzy.gateway.GatewayUtils;
import com.fzzy.gateway.hx2023.data.WebSocketPacket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 *
@@ -19,30 +24,41 @@
@ServerEndpoint(value = "/mqtt")
public class WebSocketMqtt {
    private static Map<String, Session> sessionPool = new HashMap<>();
    private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
    private static Map<String, String> sessionIds = new ConcurrentHashMap<>();
    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    private String key;
    @OnOpen
    public void onOpen(Session session,
    public void onOpen(Session session) throws Exception {
        this.session = session;
        Map<String, List<String>> params =  session.getRequestParameterMap();
        log.info("new webSocket,params={}", params);
    }
    public void onOpen2(Session session,
                       @PathParam("keepalive") String keepalive,
                       @PathParam("clientId") String clientId,
                       @PathParam("protocolId") String protocolId,
                       @PathParam("protocolVersion") String protocolVersion,
                       @PathParam("clean") String clean,
                       @PathParam("reconnectPeriod") String reconnectPeriod,
                       @PathParam("reconnectTimeout") String reconnectTimeout
                       @PathParam("clean") boolean clean,
                       @PathParam("reconnectPeriod") int reconnectPeriod,
                       @PathParam("connectTimeout") int connectTimeout
    ) throws Exception {
        this.session = session;
        key = clientId;
        String key = clientId;
        sessionPool.put(key, session);
        sessionIds.put(session.getId(), key);
        GatewayUtils.updateOnline(clientId);
        log.info("new webSocket,clientId={}", key);
    }
@@ -50,9 +66,16 @@
    @OnClose
    public void onClose() {
        sessionPool.remove(key);
//        String key = sessionIds.get(session.getId());
//
//        String deviceId = key.substring(key.indexOf("-"));
//
//        GatewayUtils.updateOffOnline(deviceId);
//
//        sessionPool.remove(key);
//        sessionIds.remove(session.getId());
        log.info("WebSocket连接关闭={}", key);
        log.info("WebSocket连接关闭={}","----------");
    }
@@ -72,9 +95,41 @@
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        sessionPool.remove(key);
//        String clientId = sessionIds.get(session.getId());
//
//        sessionPool.remove(clientId);
//        sessionIds.remove(session.getId());
        error.printStackTrace();
    }
    /**
     * 推送信息到前端
     *
     * @param packet
     */
    public void sendByPacket(WebSocketPacket packet) {
        if (StringUtils.isEmpty(packet.getDeviceId())) {
            log.error("WebSocket信息推送失败,设备编码为空。");
            return;
        }
        String tag = packet.getDeviceId();
        // 遍历推送
        Session session;
        for (String key : sessionPool.keySet()) {
            packet.getHeaders().setProductId(key);
            log.debug("----------websocket返回信息-----{}", packet);
            if (key.indexOf(tag) != -1) {
                session = sessionPool.get(key);
                session.getAsyncRemote().sendText(JSONObject.toJSONString(packet));
            }
        }
    }
}