package com.ld.igds.websocket;
|
|
import java.util.HashMap;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import javax.websocket.OnClose;
|
import javax.websocket.OnError;
|
import javax.websocket.OnMessage;
|
import javax.websocket.OnOpen;
|
import javax.websocket.Session;
|
import javax.websocket.server.PathParam;
|
import javax.websocket.server.ServerEndpoint;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.stereotype.Component;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
/**
|
* WEB-Socket服务,该服务用于整个系统,包括大屏,粮情检测页面,气体操作页面
|
*
|
* @author jiazx
|
*
|
* 请求路径定义:"/websocket/{库区编码}/{业务代码}/{自定义标签}/{用户ID}"
|
*
|
* 说明:通知以bizType为单位进行推送,同一个bizType接到相同的推送信息
|
*
|
*/
|
@Slf4j
|
@Component
|
@ServerEndpoint(value = "/websocket/{deptId}/{bizType}/{bizTag}/{userId}")
|
public class WebSocketServer {
|
|
private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
|
private static Map<String, String> sessionIds = new ConcurrentHashMap<>();
|
|
/**
|
* 配置模块在线状态
|
*/
|
public static Map<String,Boolean> contextOnLineMap = new HashMap<String, Boolean>();
|
|
// 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
private Session session;
|
|
@OnOpen
|
public void onOpen(Session session,
|
@PathParam("deptId") String deptId,
|
@PathParam("bizType") String bizType,
|
@PathParam("bizTag") String bizTag,
|
@PathParam("userId") String userId) throws Exception {
|
|
this.session = session;
|
|
String key = deptId + "-" + bizType + "-" + bizTag + "-" + userId;
|
|
sessionPool.put(key, session);
|
sessionIds.put(session.getId(), key);
|
|
log.info("new webSocket,key={}", key);
|
}
|
|
@OnClose
|
public void onClose() {
|
String key = sessionIds.get(session.getId());
|
|
sessionPool.remove(key);
|
sessionIds.remove(session.getId());
|
|
log.info("WebSocket连接关闭={}", key);
|
|
String bizType = key.split("-")[2];
|
|
contextOnLineMap.put(bizType, false);
|
}
|
|
/**
|
* 收到前端发送的信息
|
*
|
* @param message
|
* @param session
|
*/
|
@OnMessage
|
public void onMessage(String message, Session session) {
|
|
contextOnLineMap.put(message,true);
|
|
log.info("来自客户端信息:\n" + message);
|
}
|
|
@OnError
|
public void onError(Session session, Throwable error) {
|
log.error("发生错误");
|
|
String key = sessionIds.get(session.getId());
|
|
sessionPool.remove(key);
|
sessionIds.remove(session.getId());
|
error.printStackTrace();
|
}
|
|
/**
|
* 后端向前端推送信息,接受者为同一个业务类型操作所有人员
|
*
|
* @param packet
|
*/
|
public static void sendByPocket(WebSocketPacket packet) {
|
if (StringUtils.isEmpty(packet.getBizType())) {
|
log.error("WebSocket信息推送失败,业务类型不可为空。");
|
return;
|
}
|
|
String tag = packet.getDeptId() + "-" + packet.getBizType() + "-" + packet.getBizTag();
|
|
// 遍历推送,只要是bizType一致的均推送
|
Session session;
|
for (String key : sessionPool.keySet()) {
|
if (key.indexOf(tag) != -1) {
|
session = sessionPool.get(key);
|
session.getAsyncRemote().sendText(
|
JSONObject.toJSONString(packet));
|
}
|
}
|
}
|
|
/**
|
* 发送给指定人员
|
*
|
* @param packet
|
*/
|
public static void sendByUser(WebSocketPacket packet) {
|
if (StringUtils.isEmpty(packet.getBizType())
|
|| StringUtils.isEmpty(packet.getDeptId())
|
|| StringUtils.isEmpty(packet.getUserId())) {
|
log.error("WebSocket信息推送失败,组织编码和业务类型或者指定人信息参数没有获取到!");
|
|
return;
|
}
|
|
String tag = packet.getDeptId() + "-" + packet.getBizType() + "-"
|
+ packet.getBizTag() + "-" + packet.getUserId();
|
|
Session session;
|
for (String key : sessionPool.keySet()) {
|
if (tag.equals(key)) {
|
session = sessionPool.get(key);
|
session.getAsyncRemote().sendText(
|
JSONObject.toJSONString(packet));
|
}
|
}
|
}
|
|
/**
|
* 根据指定的BizId标签推送
|
*
|
* @param packet
|
*/
|
public static void sendByBizTag(WebSocketPacket packet) {
|
if (StringUtils.isEmpty(packet.getBizType())
|
|| StringUtils.isEmpty(packet.getDeptId())
|
|| StringUtils.isEmpty(packet.getBizTag())) {
|
log.error("WebSocket信息推送失败,组织编码和业务类型或者指定人信息参数没有获取到!");
|
|
return;
|
}
|
|
String tag = packet.getDeptId() + "-" + packet.getBizType() + "-" + packet.getBizTag();
|
|
// 遍历推送,只要是bizType一致的均推送
|
Session session;
|
for (String key : sessionPool.keySet()) {
|
if (key.indexOf(tag) != -1) {
|
session = sessionPool.get(key);
|
session.getAsyncRemote().sendText(
|
JSONObject.toJSONString(packet));
|
}
|
}
|
|
}
|
}
|