package com.fzzy.gateway.hx2023.websocket;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.fzzy.gateway.GatewayUtils;
|
import com.fzzy.gateway.hx2023.data.WebSocketPacket;
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.commons.lang.StringUtils;
|
import org.springframework.stereotype.Component;
|
|
import javax.websocket.*;
|
import javax.websocket.server.PathParam;
|
import javax.websocket.server.ServerEndpoint;
|
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
/**
|
*
|
*/
|
@Slf4j
|
@Component
|
//@ServerEndpoint(value = "/mqtt")
|
public class WebSocketMqtt {
|
|
private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
|
private static Map<String, String> sessionIds = new ConcurrentHashMap<>();
|
|
// 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
private Session session;
|
|
|
@OnOpen
|
public void onOpen(Session session) throws Exception {
|
|
this.session = session;
|
|
Map<String, List<String>> params = session.getRequestParameterMap();
|
|
log.info("new webSocket,params={}", params);
|
}
|
|
public void onOpen2(Session session,
|
@PathParam("keepalive") String keepalive,
|
@PathParam("clientId") String clientId,
|
@PathParam("protocolId") String protocolId,
|
@PathParam("protocolVersion") String protocolVersion,
|
@PathParam("clean") boolean clean,
|
@PathParam("reconnectPeriod") int reconnectPeriod,
|
@PathParam("connectTimeout") int connectTimeout
|
) throws Exception {
|
|
this.session = session;
|
|
String key = clientId;
|
|
sessionPool.put(key, session);
|
sessionIds.put(session.getId(), key);
|
|
GatewayUtils.updateOnline(clientId);
|
|
log.info("new webSocket,clientId={}", key);
|
}
|
|
@OnClose
|
public void onClose() {
|
|
// String key = sessionIds.get(session.getId());
|
//
|
// String deviceId = key.substring(key.indexOf("-"));
|
//
|
// GatewayUtils.updateOffOnline(deviceId);
|
//
|
// sessionPool.remove(key);
|
// sessionIds.remove(session.getId());
|
|
log.info("WebSocket连接关闭={}","----------");
|
|
}
|
|
/**
|
* 收到前端发送的信息
|
*
|
* @param message
|
* @param session
|
*/
|
@OnMessage
|
public void onMessage(String message, Session session) {
|
|
log.info("来自客户端信息:\n" + message);
|
}
|
|
@OnError
|
public void onError(Session session, Throwable error) {
|
log.error("发生错误");
|
|
// String clientId = sessionIds.get(session.getId());
|
//
|
// sessionPool.remove(clientId);
|
// sessionIds.remove(session.getId());
|
error.printStackTrace();
|
}
|
|
|
/**
|
* 推送信息到前端
|
*
|
* @param packet
|
*/
|
public void sendByPacket(WebSocketPacket packet) {
|
if (StringUtils.isEmpty(packet.getDeviceId())) {
|
log.error("WebSocket信息推送失败,设备编码为空。");
|
return;
|
}
|
|
String tag = packet.getDeviceId();
|
|
// 遍历推送
|
Session session;
|
for (String key : sessionPool.keySet()) {
|
|
packet.getHeaders().setProductId(key);
|
|
log.debug("----------websocket返回信息-----{}", packet);
|
|
if (key.indexOf(tag) != -1) {
|
session = sessionPool.get(key);
|
session.getAsyncRemote().sendText(JSONObject.toJSONString(packet));
|
}
|
}
|
}
|
|
|
}
|