package com.fzzy.gateway.hx2023.websocket; import com.alibaba.fastjson.JSONObject; import com.fzzy.gateway.GatewayUtils; import com.fzzy.gateway.hx2023.data.WebSocketPacket; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * */ @Slf4j @Component //@ServerEndpoint(value = "/mqtt") public class WebSocketMqtt { private static Map sessionPool = new ConcurrentHashMap<>(); private static Map sessionIds = new ConcurrentHashMap<>(); // 与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; @OnOpen public void onOpen(Session session) throws Exception { this.session = session; Map> params = session.getRequestParameterMap(); log.info("new webSocket,params={}", params); } public void onOpen2(Session session, @PathParam("keepalive") String keepalive, @PathParam("clientId") String clientId, @PathParam("protocolId") String protocolId, @PathParam("protocolVersion") String protocolVersion, @PathParam("clean") boolean clean, @PathParam("reconnectPeriod") int reconnectPeriod, @PathParam("connectTimeout") int connectTimeout ) throws Exception { this.session = session; String key = clientId; sessionPool.put(key, session); sessionIds.put(session.getId(), key); GatewayUtils.updateOnline(clientId); log.info("new webSocket,clientId={}", key); } @OnClose public void onClose() { // String key = sessionIds.get(session.getId()); // // String deviceId = key.substring(key.indexOf("-")); // // GatewayUtils.updateOffOnline(deviceId); // // sessionPool.remove(key); // sessionIds.remove(session.getId()); log.info("WebSocket连接关闭={}","----------"); } /** * 收到前端发送的信息 * * @param message * @param session */ @OnMessage public void onMessage(String message, Session session) { log.info("来自客户端信息:\n" + message); } @OnError public void onError(Session session, Throwable error) { log.error("发生错误"); // String clientId = sessionIds.get(session.getId()); // // sessionPool.remove(clientId); // sessionIds.remove(session.getId()); error.printStackTrace(); } /** * 推送信息到前端 * * @param packet */ public void sendByPacket(WebSocketPacket packet) { if (StringUtils.isEmpty(packet.getDeviceId())) { log.error("WebSocket信息推送失败,设备编码为空。"); return; } String tag = packet.getDeviceId(); // 遍历推送 Session session; for (String key : sessionPool.keySet()) { packet.getHeaders().setProductId(key); log.debug("----------websocket返回信息-----{}", packet); if (key.indexOf(tag) != -1) { session = sessionPool.get(key); session.getAsyncRemote().sendText(JSONObject.toJSONString(packet)); } } } }