jiazx0107@163.com
2023-11-07 f75ee38163880a4a12373bf1d4750056a72272d2
四川网关-提交测试
已修改6个文件
129 ■■■■■ 文件已修改
src/main/java/com/fzzy/conf/KafkaProviderConfig.java 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/entity/GatewayConf.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java 83 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/service/GatewayConfService.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/conf/KafkaProviderConfig.java
@@ -39,9 +39,6 @@
    @Value("${spring.kafka.properties.sasl.password}")
    private String saslPassword;
//    @Value("${spring.kafka.producer.transaction-id-prefix}")
//    private String transactionIdPrefix;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
@@ -59,10 +56,6 @@
        props.put("sasl.username", saslUsername);
        props.put("sasl.password", saslPassword);
        //  props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='sc001' password='wCV0ISwmoKwbx1lpBKMW';");
        props.put("sasl.jaas.config", saslJaasConfig);
        return props;
@@ -73,19 +66,6 @@
        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
        return producerFactory;
    }
//    @Bean
//    public ProducerFactory<Object, Object> producerFactory() {
//        DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
//        //开启事务,会导致 LINGER_MS_CONFIG 配置失效
//       // factory.setTransactionIdPrefix(transactionIdPrefix);
//        return factory;
//    }
//    @Bean
//    public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
//        return new KafkaTransactionManager<>(producerFactory);
//    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
src/main/java/com/fzzy/gateway/entity/GatewayConf.java
@@ -76,10 +76,6 @@
    @Column(name = "apiUrl", length = 200)
    private String apiUrl;
    @PropertyDef(label = "appId")
    @Column(name = "appId", length = 50)
    private String appId;
    @PropertyDef(label = "推送协议")
    @Column(name = "pushProtocol", length = 20)
    private String pushProtocol;
src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java
@@ -16,7 +16,7 @@
 */
@Slf4j
@Controller
@RequestMapping
@RequestMapping("/sc2023/gateway")
public class GatewayController {
    @Resource
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceReport.java
@@ -14,7 +14,7 @@
import java.util.concurrent.ConcurrentHashMap;
/**
 * 设备恢复信息的WEBSOCKET
 * 基于前端 websocket 订阅地磅数据 topic:
 */
@Slf4j
@Component
@@ -113,8 +113,7 @@
            packet.getHeaders().setProductId(productId);
            log.debug("----------返回信息-----{}", packet);
            log.debug("----------websocket返回信息-----{}", packet);
            if (key.indexOf(tag) != -1) {
                session = sessionPool.get(key);
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
@@ -1,15 +1,20 @@
package com.fzzy.gateway.hx2023.websocket;
import com.alibaba.fastjson.JSONObject;
import com.fzzy.gateway.GatewayUtils;
import com.fzzy.gateway.hx2023.data.WebSocketPacket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 *
@@ -19,30 +24,41 @@
@ServerEndpoint(value = "/mqtt")
public class WebSocketMqtt {
    private static Map<String, Session> sessionPool = new HashMap<>();
    private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
    private static Map<String, String> sessionIds = new ConcurrentHashMap<>();
    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    private String key;
    @OnOpen
    public void onOpen(Session session,
    public void onOpen(Session session) throws Exception {
        this.session = session;
        Map<String, List<String>> params =  session.getRequestParameterMap();
        log.info("new webSocket,params={}", params);
    }
    public void onOpen2(Session session,
                       @PathParam("keepalive") String keepalive,
                       @PathParam("clientId") String clientId,
                       @PathParam("protocolId") String protocolId,
                       @PathParam("protocolVersion") String protocolVersion,
                       @PathParam("clean") String clean,
                       @PathParam("reconnectPeriod") String reconnectPeriod,
                       @PathParam("reconnectTimeout") String reconnectTimeout
                       @PathParam("clean") boolean clean,
                       @PathParam("reconnectPeriod") int reconnectPeriod,
                       @PathParam("connectTimeout") int connectTimeout
    ) throws Exception {
        this.session = session;
        key = clientId;
        String key = clientId;
        sessionPool.put(key, session);
        sessionIds.put(session.getId(), key);
        GatewayUtils.updateOnline(clientId);
        log.info("new webSocket,clientId={}", key);
    }
@@ -50,9 +66,16 @@
    @OnClose
    public void onClose() {
        sessionPool.remove(key);
//        String key = sessionIds.get(session.getId());
//
//        String deviceId = key.substring(key.indexOf("-"));
//
//        GatewayUtils.updateOffOnline(deviceId);
//
//        sessionPool.remove(key);
//        sessionIds.remove(session.getId());
        log.info("WebSocket连接关闭={}", key);
        log.info("WebSocket连接关闭={}","----------");
    }
@@ -72,9 +95,41 @@
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        sessionPool.remove(key);
//        String clientId = sessionIds.get(session.getId());
//
//        sessionPool.remove(clientId);
//        sessionIds.remove(session.getId());
        error.printStackTrace();
    }
    /**
     * 推送信息到前端
     *
     * @param packet
     */
    public void sendByPacket(WebSocketPacket packet) {
        if (StringUtils.isEmpty(packet.getDeviceId())) {
            log.error("WebSocket信息推送失败,设备编码为空。");
            return;
        }
        String tag = packet.getDeviceId();
        // 遍历推送
        Session session;
        for (String key : sessionPool.keySet()) {
            packet.getHeaders().setProductId(key);
            log.debug("----------websocket返回信息-----{}", packet);
            if (key.indexOf(tag) != -1) {
                session = sessionPool.get(key);
                session.getAsyncRemote().sendText(JSONObject.toJSONString(packet));
            }
        }
    }
}
src/main/java/com/fzzy/gateway/service/GatewayConfService.java
@@ -56,4 +56,19 @@
        gatewayConfRep.delete(data2);
        return null;
    }
    /**
     * gatewayConfService#delData
     *
     * @param data
     */
    @Expose
    public String flush(GatewayConf data) {
        GatewayConf data2 = new GatewayConf();
        BeanUtils.copyProperties(data, data2);
        gatewayConfRep.delete(data2);
        return null;
    }
}