package com.ld.igds.websocket; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; /** * WEB-Socket服务,该服务用于整个系统,包括大屏,粮情检测页面,气体操作页面 * * @author jiazx * * 请求路径定义:"/websocket/{库区编码}/{业务代码}/{自定义标签}/{用户ID}" * * 说明:通知以bizType为单位进行推送,同一个bizType接到相同的推送信息 * */ @Slf4j @Component @ServerEndpoint(value = "/websocket/{deptId}/{bizType}/{bizTag}/{userId}") public class WebSocketServer { private static Map sessionPool = new ConcurrentHashMap<>(); private static Map sessionIds = new ConcurrentHashMap<>(); /** * 配置模块在线状态 */ public static Map contextOnLineMap = new HashMap(); // 与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; @OnOpen public void onOpen(Session session, @PathParam("deptId") String deptId, @PathParam("bizType") String bizType, @PathParam("bizTag") String bizTag, @PathParam("userId") String userId) throws Exception { this.session = session; String key = deptId + "-" + bizType + "-" + bizTag + "-" + userId; sessionPool.put(key, session); sessionIds.put(session.getId(), key); log.info("new webSocket,key={}", key); } @OnClose public void onClose() { String key = sessionIds.get(session.getId()); sessionPool.remove(key); sessionIds.remove(session.getId()); log.info("WebSocket连接关闭={}", key); String bizType = key.split("-")[2]; contextOnLineMap.put(bizType, false); } /** * 收到前端发送的信息 * * @param message * @param session */ @OnMessage public void onMessage(String message, Session session) { contextOnLineMap.put(message,true); log.info("来自客户端信息:\n" + message); } @OnError public void onError(Session session, Throwable error) { log.error("发生错误"); String key = sessionIds.get(session.getId()); sessionPool.remove(key); sessionIds.remove(session.getId()); error.printStackTrace(); } /** * 后端向前端推送信息,接受者为同一个业务类型操作所有人员 * * @param packet */ public static void sendByPocket(WebSocketPacket packet) { if (StringUtils.isEmpty(packet.getBizType())) { log.error("WebSocket信息推送失败,业务类型不可为空。"); return; } String tag = packet.getDeptId() + "-" + packet.getBizType() + "-" + packet.getBizTag(); // 遍历推送,只要是bizType一致的均推送 Session session; for (String key : sessionPool.keySet()) { if (key.indexOf(tag) != -1) { session = sessionPool.get(key); session.getAsyncRemote().sendText( JSONObject.toJSONString(packet)); } } } /** * 发送给指定人员 * * @param packet */ public static void sendByUser(WebSocketPacket packet) { if (StringUtils.isEmpty(packet.getBizType()) || StringUtils.isEmpty(packet.getDeptId()) || StringUtils.isEmpty(packet.getUserId())) { log.error("WebSocket信息推送失败,组织编码和业务类型或者指定人信息参数没有获取到!"); return; } String tag = packet.getDeptId() + "-" + packet.getBizType() + "-" + packet.getBizTag() + "-" + packet.getUserId(); Session session; for (String key : sessionPool.keySet()) { if (tag.equals(key)) { session = sessionPool.get(key); session.getAsyncRemote().sendText( JSONObject.toJSONString(packet)); } } } /** * 根据指定的BizId标签推送 * * @param packet */ public static void sendByBizTag(WebSocketPacket packet) { if (StringUtils.isEmpty(packet.getBizType()) || StringUtils.isEmpty(packet.getDeptId()) || StringUtils.isEmpty(packet.getBizTag())) { log.error("WebSocket信息推送失败,组织编码和业务类型或者指定人信息参数没有获取到!"); return; } String tag = packet.getDeptId() + "-" + packet.getBizType() + "-" + packet.getBizTag(); // 遍历推送,只要是bizType一致的均推送 Session session; for (String key : sessionPool.keySet()) { if (key.indexOf(tag) != -1) { session = sessionPool.get(key); session.getAsyncRemote().sendText( JSONObject.toJSONString(packet)); } } } }