| | |
| | | package com.fzzy.gateway.hx2023.websocket; |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.fzzy.gateway.GatewayUtils; |
| | | import com.fzzy.gateway.hx2023.data.WebSocketPacket; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | |
| | | import org.apache.commons.lang.StringUtils; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.websocket.*; |
| | | import javax.websocket.server.PathParam; |
| | | import javax.websocket.server.ServerEndpoint; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | |
| | | /** |
| | | * |
| | |
| | | @ServerEndpoint(value = "/mqtt") |
| | | public class WebSocketMqtt { |
| | | |
| | | private static Map<String, Session> sessionPool = new HashMap<>(); |
| | | private static Map<String, Session> sessionPool = new ConcurrentHashMap<>(); |
| | | private static Map<String, String> sessionIds = new ConcurrentHashMap<>(); |
| | | |
| | | // 与某个客户端的连接会话,需要通过它来给客户端发送数据 |
| | | private Session session; |
| | | private String key; |
| | | |
| | | |
| | | @OnOpen |
| | | public void onOpen(Session session, |
| | | public void onOpen(Session session) throws Exception { |
| | | |
| | | this.session = session; |
| | | |
| | | Map<String, List<String>> params = session.getRequestParameterMap(); |
| | | |
| | | log.info("new webSocket,params={}", params); |
| | | } |
| | | |
| | | public void onOpen2(Session session, |
| | | @PathParam("keepalive") String keepalive, |
| | | @PathParam("clientId") String clientId, |
| | | @PathParam("protocolId") String protocolId, |
| | | @PathParam("protocolVersion") String protocolVersion, |
| | | @PathParam("clean") String clean, |
| | | @PathParam("reconnectPeriod") String reconnectPeriod, |
| | | @PathParam("reconnectTimeout") String reconnectTimeout |
| | | @PathParam("clean") boolean clean, |
| | | @PathParam("reconnectPeriod") int reconnectPeriod, |
| | | @PathParam("connectTimeout") int connectTimeout |
| | | ) throws Exception { |
| | | |
| | | this.session = session; |
| | | |
| | | key = clientId; |
| | | String key = clientId; |
| | | |
| | | sessionPool.put(key, session); |
| | | sessionIds.put(session.getId(), key); |
| | | |
| | | GatewayUtils.updateOnline(clientId); |
| | | |
| | | log.info("new webSocket,clientId={}", key); |
| | | } |
| | |
| | | @OnClose |
| | | public void onClose() { |
| | | |
| | | sessionPool.remove(key); |
| | | // String key = sessionIds.get(session.getId()); |
| | | // |
| | | // String deviceId = key.substring(key.indexOf("-")); |
| | | // |
| | | // GatewayUtils.updateOffOnline(deviceId); |
| | | // |
| | | // sessionPool.remove(key); |
| | | // sessionIds.remove(session.getId()); |
| | | |
| | | log.info("WebSocket连接关闭={}", key); |
| | | log.info("WebSocket连接关闭={}","----------"); |
| | | |
| | | } |
| | | |
| | |
| | | public void onError(Session session, Throwable error) { |
| | | log.error("发生错误"); |
| | | |
| | | sessionPool.remove(key); |
| | | |
| | | // String clientId = sessionIds.get(session.getId()); |
| | | // |
| | | // sessionPool.remove(clientId); |
| | | // sessionIds.remove(session.getId()); |
| | | error.printStackTrace(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 推送信息到前端 |
| | | * |
| | | * @param packet |
| | | */ |
| | | public void sendByPacket(WebSocketPacket packet) { |
| | | if (StringUtils.isEmpty(packet.getDeviceId())) { |
| | | log.error("WebSocket信息推送失败,设备编码为空。"); |
| | | return; |
| | | } |
| | | |
| | | String tag = packet.getDeviceId(); |
| | | |
| | | // 遍历推送 |
| | | Session session; |
| | | for (String key : sessionPool.keySet()) { |
| | | |
| | | packet.getHeaders().setProductId(key); |
| | | |
| | | log.debug("----------websocket返回信息-----{}", packet); |
| | | |
| | | if (key.indexOf(tag) != -1) { |
| | | session = sessionPool.get(key); |
| | | session.getAsyncRemote().sendText(JSONObject.toJSONString(packet)); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | } |