src/main/java/com/fzzy/conf/KafkaProviderConfig.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/fzzy/gateway/entity/GatewayConf.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/fzzy/gateway/service/GatewayConfService.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/java/com/fzzy/conf/KafkaProviderConfig.java
@@ -39,9 +39,6 @@ @Value("${spring.kafka.properties.sasl.password}") private String saslPassword; // @Value("${spring.kafka.producer.transaction-id-prefix}") // private String transactionIdPrefix; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); @@ -59,10 +56,6 @@ props.put("sasl.username", saslUsername); props.put("sasl.password", saslPassword); // props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';"); props.put("sasl.jaas.config", saslJaasConfig); return props; @@ -73,19 +66,6 @@ DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs()); return producerFactory; } // @Bean // public ProducerFactory<Object, Object> producerFactory() { // DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs()); // //开启事务,会导致 LINGER_MS_CONFIG 配置失效 // // factory.setTransactionIdPrefix(transactionIdPrefix); // return factory; // } // @Bean // public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) { // return new KafkaTransactionManager<>(producerFactory); // } @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { src/main/java/com/fzzy/gateway/entity/GatewayConf.java
@@ -76,10 +76,6 @@ @Column(name = "apiUrl", length = 200) private String apiUrl; @PropertyDef(label = "appId") @Column(name = "appId", length = 50) private String appId; @PropertyDef(label = "推送协议") @Column(name = "pushProtocol", length = 20) private String pushProtocol; src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java
@@ -16,7 +16,7 @@ */ @Slf4j @Controller @RequestMapping @RequestMapping("/sc2023/gateway") public class GatewayController { @Resource src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java
@@ -14,7 +14,7 @@ import java.util.concurrent.ConcurrentHashMap; /** * 设备恢复信息的WEBSOCKET * 基于前端 websocket 订阅地磅数据 topic: */ @Slf4j @Component @@ -113,8 +113,7 @@ packet.getHeaders().setProductId(productId); log.debug("----------返回信息-----{}", packet); log.debug("----------websocket返回信息-----{}", packet); if (key.indexOf(tag) != -1) { session = sessionPool.get(key); src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
@@ -1,15 +1,20 @@ package com.fzzy.gateway.hx2023.websocket; import com.alibaba.fastjson.JSONObject; import com.fzzy.gateway.GatewayUtils; import com.fzzy.gateway.hx2023.data.WebSocketPacket; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @@ -19,30 +24,41 @@ @ServerEndpoint(value = "/mqtt") public class WebSocketMqtt { private static Map<String, Session> sessionPool = new HashMap<>(); private static Map<String, Session> sessionPool = new ConcurrentHashMap<>(); private static Map<String, String> sessionIds = new ConcurrentHashMap<>(); // 与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; private String key; @OnOpen public void onOpen(Session session, public void onOpen(Session session) throws Exception { this.session = session; Map<String, List<String>> params = session.getRequestParameterMap(); log.info("new webSocket,params={}", params); } public void onOpen2(Session session, @PathParam("keepalive") String keepalive, @PathParam("clientId") String clientId, @PathParam("protocolId") String protocolId, @PathParam("protocolVersion") String protocolVersion, @PathParam("clean") String clean, @PathParam("reconnectPeriod") String reconnectPeriod, @PathParam("reconnectTimeout") String reconnectTimeout @PathParam("clean") boolean clean, @PathParam("reconnectPeriod") int reconnectPeriod, @PathParam("connectTimeout") int connectTimeout ) throws Exception { this.session = session; key = clientId; String key = clientId; sessionPool.put(key, session); sessionIds.put(session.getId(), key); GatewayUtils.updateOnline(clientId); log.info("new webSocket,clientId={}", key); } @@ -50,9 +66,16 @@ @OnClose public void onClose() { sessionPool.remove(key); // String key = sessionIds.get(session.getId()); // // String deviceId = key.substring(key.indexOf("-")); // // GatewayUtils.updateOffOnline(deviceId); // // sessionPool.remove(key); // sessionIds.remove(session.getId()); log.info("WebSocket连接关闭={}", key); log.info("WebSocket连接关闭={}","----------"); } @@ -72,9 +95,41 @@ public void onError(Session session, Throwable error) { log.error("发生错误"); sessionPool.remove(key); // String clientId = sessionIds.get(session.getId()); // // sessionPool.remove(clientId); // sessionIds.remove(session.getId()); error.printStackTrace(); } /** * 推送信息到前端 * * @param packet */ public void sendByPacket(WebSocketPacket packet) { if (StringUtils.isEmpty(packet.getDeviceId())) { log.error("WebSocket信息推送失败,设备编码为空。"); return; } String tag = packet.getDeviceId(); // 遍历推送 Session session; for (String key : sessionPool.keySet()) { packet.getHeaders().setProductId(key); log.debug("----------websocket返回信息-----{}", packet); if (key.indexOf(tag) != -1) { session = sessionPool.get(key); session.getAsyncRemote().sendText(JSONObject.toJSONString(packet)); } } } } src/main/java/com/fzzy/gateway/service/GatewayConfService.java
@@ -56,4 +56,19 @@ gatewayConfRep.delete(data2); return null; } /** * gatewayConfService#delData * * @param data */ @Expose public String flush(GatewayConf data) { GatewayConf data2 = new GatewayConf(); BeanUtils.copyProperties(data, data2); gatewayConfRep.delete(data2); return null; } }