From f75ee38163880a4a12373bf1d4750056a72272d2 Mon Sep 17 00:00:00 2001
From: jiazx0107@163.com <jiazx0107@163.com>
Date: 星期二, 07 十一月 2023 14:06:55 +0800
Subject: [PATCH] 四川网关-提交测试
---
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java | 5 -
src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java | 2
src/main/java/com/fzzy/gateway/entity/GatewayConf.java | 4 -
src/main/java/com/fzzy/conf/KafkaProviderConfig.java | 20 ------
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java | 83 +++++++++++++++++++++++----
src/main/java/com/fzzy/gateway/service/GatewayConfService.java | 15 +++++
6 files changed, 87 insertions(+), 42 deletions(-)
diff --git a/src/main/java/com/fzzy/conf/KafkaProviderConfig.java b/src/main/java/com/fzzy/conf/KafkaProviderConfig.java
index d7a3c40..ed7fecd 100644
--- a/src/main/java/com/fzzy/conf/KafkaProviderConfig.java
+++ b/src/main/java/com/fzzy/conf/KafkaProviderConfig.java
@@ -39,9 +39,6 @@
@Value("${spring.kafka.properties.sasl.password}")
private String saslPassword;
-// @Value("${spring.kafka.producer.transaction-id-prefix}")
-// private String transactionIdPrefix;
-
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
@@ -59,10 +56,6 @@
props.put("sasl.username", saslUsername);
props.put("sasl.password", saslPassword);
-
- // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';");
-
-
props.put("sasl.jaas.config", saslJaasConfig);
return props;
@@ -73,19 +66,6 @@
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
return producerFactory;
}
-
-// @Bean
-// public ProducerFactory<Object, Object> producerFactory() {
-// DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
-// //寮�鍚簨鍔★紝浼氬鑷� LINGER_MS_CONFIG 閰嶇疆澶辨晥
-// // factory.setTransactionIdPrefix(transactionIdPrefix);
-// return factory;
-// }
-
-// @Bean
-// public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
-// return new KafkaTransactionManager<>(producerFactory);
-// }
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
diff --git a/src/main/java/com/fzzy/gateway/entity/GatewayConf.java b/src/main/java/com/fzzy/gateway/entity/GatewayConf.java
index 622cc8d..c697ffe 100644
--- a/src/main/java/com/fzzy/gateway/entity/GatewayConf.java
+++ b/src/main/java/com/fzzy/gateway/entity/GatewayConf.java
@@ -76,10 +76,6 @@
@Column(name = "apiUrl", length = 200)
private String apiUrl;
- @PropertyDef(label = "appId")
- @Column(name = "appId", length = 50)
- private String appId;
-
@PropertyDef(label = "鎺ㄩ�佸崗璁�")
@Column(name = "pushProtocol", length = 20)
private String pushProtocol;
diff --git a/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java b/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java
index a6d3166..8ad1617 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java
@@ -16,7 +16,7 @@
*/
@Slf4j
@Controller
-@RequestMapping
+@RequestMapping("/sc2023/gateway")
public class GatewayController {
@Resource
diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java
index 8d7622a..6edc518 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java
@@ -14,7 +14,7 @@
import java.util.concurrent.ConcurrentHashMap;
/**
- * 璁惧鎭㈠淇℃伅鐨刉EBSOCKET
+ * 鍩轰簬鍓嶇 websocket 璁㈤槄鍦扮鏁版嵁 topic:
*/
@Slf4j
@Component
@@ -113,8 +113,7 @@
packet.getHeaders().setProductId(productId);
-
- log.debug("----------杩斿洖淇℃伅-----{}", packet);
+ log.debug("----------websocket杩斿洖淇℃伅-----{}", packet);
if (key.indexOf(tag) != -1) {
session = sessionPool.get(key);
diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
index affbda3..28e5666 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
@@ -1,15 +1,20 @@
package com.fzzy.gateway.hx2023.websocket;
+import com.alibaba.fastjson.JSONObject;
+import com.fzzy.gateway.GatewayUtils;
+import com.fzzy.gateway.hx2023.data.WebSocketPacket;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
-import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
*
@@ -19,30 +24,41 @@
@ServerEndpoint(value = "/mqtt")
public class WebSocketMqtt {
- private static Map<String, Session> sessionPool = new HashMap<>();
+ private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
+ private static Map<String, String> sessionIds = new ConcurrentHashMap<>();
// 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹�
private Session session;
- private String key;
-
-
+
+
@OnOpen
- public void onOpen(Session session,
+ public void onOpen(Session session) throws Exception {
+
+ this.session = session;
+
+ Map<String, List<String>> params = session.getRequestParameterMap();
+
+ log.info("new webSocket,params={}", params);
+ }
+
+ public void onOpen2(Session session,
@PathParam("keepalive") String keepalive,
@PathParam("clientId") String clientId,
@PathParam("protocolId") String protocolId,
@PathParam("protocolVersion") String protocolVersion,
- @PathParam("clean") String clean,
- @PathParam("reconnectPeriod") String reconnectPeriod,
- @PathParam("reconnectTimeout") String reconnectTimeout
+ @PathParam("clean") boolean clean,
+ @PathParam("reconnectPeriod") int reconnectPeriod,
+ @PathParam("connectTimeout") int connectTimeout
) throws Exception {
this.session = session;
- key = clientId;
+ String key = clientId;
sessionPool.put(key, session);
+ sessionIds.put(session.getId(), key);
+ GatewayUtils.updateOnline(clientId);
log.info("new webSocket,clientId={}", key);
}
@@ -50,9 +66,16 @@
@OnClose
public void onClose() {
- sessionPool.remove(key);
+// String key = sessionIds.get(session.getId());
+//
+// String deviceId = key.substring(key.indexOf("-"));
+//
+// GatewayUtils.updateOffOnline(deviceId);
+//
+// sessionPool.remove(key);
+// sessionIds.remove(session.getId());
- log.info("WebSocket杩炴帴鍏抽棴={}", key);
+ log.info("WebSocket杩炴帴鍏抽棴={}","----------");
}
@@ -72,9 +95,41 @@
public void onError(Session session, Throwable error) {
log.error("鍙戠敓閿欒");
- sessionPool.remove(key);
-
+// String clientId = sessionIds.get(session.getId());
+//
+// sessionPool.remove(clientId);
+// sessionIds.remove(session.getId());
error.printStackTrace();
}
+
+ /**
+ * 鎺ㄩ�佷俊鎭埌鍓嶇
+ *
+ * @param packet
+ */
+ public void sendByPacket(WebSocketPacket packet) {
+ if (StringUtils.isEmpty(packet.getDeviceId())) {
+ log.error("WebSocket淇℃伅鎺ㄩ�佸け璐ワ紝璁惧缂栫爜涓虹┖銆�");
+ return;
+ }
+
+ String tag = packet.getDeviceId();
+
+ // 閬嶅巻鎺ㄩ��
+ Session session;
+ for (String key : sessionPool.keySet()) {
+
+ packet.getHeaders().setProductId(key);
+
+ log.debug("----------websocket杩斿洖淇℃伅-----{}", packet);
+
+ if (key.indexOf(tag) != -1) {
+ session = sessionPool.get(key);
+ session.getAsyncRemote().sendText(JSONObject.toJSONString(packet));
+ }
+ }
+ }
+
+
}
diff --git a/src/main/java/com/fzzy/gateway/service/GatewayConfService.java b/src/main/java/com/fzzy/gateway/service/GatewayConfService.java
index 68baebd..eabbf30 100644
--- a/src/main/java/com/fzzy/gateway/service/GatewayConfService.java
+++ b/src/main/java/com/fzzy/gateway/service/GatewayConfService.java
@@ -56,4 +56,19 @@
gatewayConfRep.delete(data2);
return null;
}
+
+
+ /**
+ * gatewayConfService#delData
+ *
+ * @param data
+ */
+ @Expose
+ public String flush(GatewayConf data) {
+
+ GatewayConf data2 = new GatewayConf();
+ BeanUtils.copyProperties(data, data2);
+ gatewayConfRep.delete(data2);
+ return null;
+ }
}
--
Gitblit v1.9.3