From f75ee38163880a4a12373bf1d4750056a72272d2 Mon Sep 17 00:00:00 2001 From: jiazx0107@163.com <jiazx0107@163.com> Date: 星期二, 07 十一月 2023 14:06:55 +0800 Subject: [PATCH] 四川网关-提交测试 --- src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java | 5 - src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java | 2 src/main/java/com/fzzy/gateway/entity/GatewayConf.java | 4 - src/main/java/com/fzzy/conf/KafkaProviderConfig.java | 20 ------ src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java | 83 +++++++++++++++++++++++---- src/main/java/com/fzzy/gateway/service/GatewayConfService.java | 15 +++++ 6 files changed, 87 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/fzzy/conf/KafkaProviderConfig.java b/src/main/java/com/fzzy/conf/KafkaProviderConfig.java index d7a3c40..ed7fecd 100644 --- a/src/main/java/com/fzzy/conf/KafkaProviderConfig.java +++ b/src/main/java/com/fzzy/conf/KafkaProviderConfig.java @@ -39,9 +39,6 @@ @Value("${spring.kafka.properties.sasl.password}") private String saslPassword; -// @Value("${spring.kafka.producer.transaction-id-prefix}") -// private String transactionIdPrefix; - @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); @@ -59,10 +56,6 @@ props.put("sasl.username", saslUsername); props.put("sasl.password", saslPassword); - - // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); - - props.put("sasl.jaas.config", saslJaasConfig); return props; @@ -73,19 +66,6 @@ DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs()); return producerFactory; } - -// @Bean -// public ProducerFactory<Object, Object> producerFactory() { -// DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs()); -// //寮�鍚簨鍔★紝浼氬鑷� LINGER_MS_CONFIG 閰嶇疆澶辨晥 -// // factory.setTransactionIdPrefix(transactionIdPrefix); -// return factory; -// } - -// @Bean -// public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) { -// return new KafkaTransactionManager<>(producerFactory); -// } @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { diff --git a/src/main/java/com/fzzy/gateway/entity/GatewayConf.java b/src/main/java/com/fzzy/gateway/entity/GatewayConf.java index 622cc8d..c697ffe 100644 --- a/src/main/java/com/fzzy/gateway/entity/GatewayConf.java +++ b/src/main/java/com/fzzy/gateway/entity/GatewayConf.java @@ -76,10 +76,6 @@ @Column(name = "apiUrl", length = 200) private String apiUrl; - @PropertyDef(label = "appId") - @Column(name = "appId", length = 50) - private String appId; - @PropertyDef(label = "鎺ㄩ�佸崗璁�") @Column(name = "pushProtocol", length = 20) private String pushProtocol; diff --git a/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java b/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java index a6d3166..8ad1617 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java +++ b/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java @@ -16,7 +16,7 @@ */ @Slf4j @Controller -@RequestMapping +@RequestMapping("/sc2023/gateway") public class GatewayController { @Resource diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java index 8d7622a..6edc518 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java +++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java @@ -14,7 +14,7 @@ import java.util.concurrent.ConcurrentHashMap; /** - * 璁惧鎭㈠淇℃伅鐨刉EBSOCKET + * 鍩轰簬鍓嶇 websocket 璁㈤槄鍦扮鏁版嵁 topic: */ @Slf4j @Component @@ -113,8 +113,7 @@ packet.getHeaders().setProductId(productId); - - log.debug("----------杩斿洖淇℃伅-----{}", packet); + log.debug("----------websocket杩斿洖淇℃伅-----{}", packet); if (key.indexOf(tag) != -1) { session = sessionPool.get(key); diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java index affbda3..28e5666 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java +++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java @@ -1,15 +1,20 @@ package com.fzzy.gateway.hx2023.websocket; +import com.alibaba.fastjson.JSONObject; +import com.fzzy.gateway.GatewayUtils; +import com.fzzy.gateway.hx2023.data.WebSocketPacket; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; -import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * @@ -19,30 +24,41 @@ @ServerEndpoint(value = "/mqtt") public class WebSocketMqtt { - private static Map<String, Session> sessionPool = new HashMap<>(); + private static Map<String, Session> sessionPool = new ConcurrentHashMap<>(); + private static Map<String, String> sessionIds = new ConcurrentHashMap<>(); // 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹� private Session session; - private String key; - - + + @OnOpen - public void onOpen(Session session, + public void onOpen(Session session) throws Exception { + + this.session = session; + + Map<String, List<String>> params = session.getRequestParameterMap(); + + log.info("new webSocket,params={}", params); + } + + public void onOpen2(Session session, @PathParam("keepalive") String keepalive, @PathParam("clientId") String clientId, @PathParam("protocolId") String protocolId, @PathParam("protocolVersion") String protocolVersion, - @PathParam("clean") String clean, - @PathParam("reconnectPeriod") String reconnectPeriod, - @PathParam("reconnectTimeout") String reconnectTimeout + @PathParam("clean") boolean clean, + @PathParam("reconnectPeriod") int reconnectPeriod, + @PathParam("connectTimeout") int connectTimeout ) throws Exception { this.session = session; - key = clientId; + String key = clientId; sessionPool.put(key, session); + sessionIds.put(session.getId(), key); + GatewayUtils.updateOnline(clientId); log.info("new webSocket,clientId={}", key); } @@ -50,9 +66,16 @@ @OnClose public void onClose() { - sessionPool.remove(key); +// String key = sessionIds.get(session.getId()); +// +// String deviceId = key.substring(key.indexOf("-")); +// +// GatewayUtils.updateOffOnline(deviceId); +// +// sessionPool.remove(key); +// sessionIds.remove(session.getId()); - log.info("WebSocket杩炴帴鍏抽棴={}", key); + log.info("WebSocket杩炴帴鍏抽棴={}","----------"); } @@ -72,9 +95,41 @@ public void onError(Session session, Throwable error) { log.error("鍙戠敓閿欒"); - sessionPool.remove(key); - +// String clientId = sessionIds.get(session.getId()); +// +// sessionPool.remove(clientId); +// sessionIds.remove(session.getId()); error.printStackTrace(); } + + /** + * 鎺ㄩ�佷俊鎭埌鍓嶇 + * + * @param packet + */ + public void sendByPacket(WebSocketPacket packet) { + if (StringUtils.isEmpty(packet.getDeviceId())) { + log.error("WebSocket淇℃伅鎺ㄩ�佸け璐ワ紝璁惧缂栫爜涓虹┖銆�"); + return; + } + + String tag = packet.getDeviceId(); + + // 閬嶅巻鎺ㄩ�� + Session session; + for (String key : sessionPool.keySet()) { + + packet.getHeaders().setProductId(key); + + log.debug("----------websocket杩斿洖淇℃伅-----{}", packet); + + if (key.indexOf(tag) != -1) { + session = sessionPool.get(key); + session.getAsyncRemote().sendText(JSONObject.toJSONString(packet)); + } + } + } + + } diff --git a/src/main/java/com/fzzy/gateway/service/GatewayConfService.java b/src/main/java/com/fzzy/gateway/service/GatewayConfService.java index 68baebd..eabbf30 100644 --- a/src/main/java/com/fzzy/gateway/service/GatewayConfService.java +++ b/src/main/java/com/fzzy/gateway/service/GatewayConfService.java @@ -56,4 +56,19 @@ gatewayConfRep.delete(data2); return null; } + + + /** + * gatewayConfService#delData + * + * @param data + */ + @Expose + public String flush(GatewayConf data) { + + GatewayConf data2 = new GatewayConf(); + BeanUtils.copyProperties(data, data2); + gatewayConfRep.delete(data2); + return null; + } } -- Gitblit v1.9.3