From bf57ab9e4db58dbee018268dd8b593ee564bc7ee Mon Sep 17 00:00:00 2001
From: vince <757871790@qq.com>
Date: 星期四, 09 十一月 2023 12:07:09 +0800
Subject: [PATCH] Merge remote-tracking branch 'orgin/igds-api-gateway' into igds-api-gateway

---
 src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java       |    9 +
 /dev/null                                                               |   26 ---
 src/main/resources/application-devGateway.yml                           |   14 +-
 src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java       |   18 ++
 src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java                     |  147 ++++++++++----------
 src/main/java/com/fzzy/mqtt/MqttProviderConfig.java                     |  116 +++++-----------
 src/main/java/com/fzzy/mqtt/MqttGatewayService.java                     |   31 ++++
 src/main/java/com/fzzy/mqtt/MqttProperties.java                         |   15 +-
 src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java |   13 +
 src/main/java/com/fzzy/gateway/GatewayRunner.java                       |    3 
 src/main/java/com/fzzy/mqtt/MqttPublishService.java                     |    2 
 11 files changed, 192 insertions(+), 202 deletions(-)

diff --git a/src/main/java/com/fzzy/gateway/GatewayRunner.java b/src/main/java/com/fzzy/gateway/GatewayRunner.java
index 9b85744..1f05550 100644
--- a/src/main/java/com/fzzy/gateway/GatewayRunner.java
+++ b/src/main/java/com/fzzy/gateway/GatewayRunner.java
@@ -36,6 +36,9 @@
         //鑾峰彇姘旇薄淇℃伅
         scheduled.doWeatherExe();
 
+        //鏇存柊璁惧缂撳瓨
+        apiInitService.updateDeviceCache();
+
     }
 
 }
diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java b/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
index f9b093f..75a354b 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
@@ -7,6 +7,7 @@
 import com.fzzy.gateway.service.GatewayConfService;
 
 
+import com.fzzy.gateway.service.GatewayDeviceService;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
@@ -21,6 +22,8 @@
 
     @Resource
     private GatewayConfService confService;
+    @Resource
+    private GatewayDeviceService deviceService;
     @Resource
     private GatewayRemoteManager gatewayRemoteManager;
 
@@ -44,7 +47,9 @@
         }
 
     }
-    
-    
 
+
+    public void updateDeviceCache() {
+        deviceService.flushCache();
+    }
 }
diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java b/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java
index e782bd8..f2eed2f 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java
@@ -8,6 +8,7 @@
 import com.fzzy.gateway.hx2023.ScConstant;
 import com.fzzy.gateway.hx2023.data.CloudSendData;
 import com.fzzy.gateway.hx2023.data.SyncReqData;
+import com.fzzy.mqtt.MqttGatewayService;
 import com.fzzy.mqtt.MqttProviderConfig;
 import com.fzzy.mqtt.MqttPublishService;
 import lombok.extern.slf4j.Slf4j;
@@ -25,7 +26,7 @@
     @Resource
     private GatewayRemoteManager gatewayRemoteManager;
     @Resource
-    private MqttPublishService publishService;
+    private MqttGatewayService mqttGatewayService;
 
 
     /**
@@ -33,7 +34,7 @@
      *
      * @param message
      */
-    public void onReceiveMessage(String message) {
+    public void onReceiveMessage(String topic,String message) {
 
         try {
             CloudSendData cloudSendData = JSONObject.parseObject(message, CloudSendData.class);
@@ -47,7 +48,7 @@
             }
 
         } catch (Exception e) {
-
+            log.error("--------鎵ц寮傚父-----{}",e);
         }
 
     }
@@ -74,9 +75,11 @@
 
             topic = topic.replace("${productId}", device.getProductId()).replace("${deviceId}", device.getDeviceId());
 
-            publishService.publishMsg(topic, resp.getData());
+            mqttGatewayService.publishMqttWithTopic(resp.getData(), topic);
 
-            log.info("=======绮儏鎺ㄩ��==========={}", resp.getData());
+            log.info("----------------------------鎺ㄩ�丮QTT淇℃伅---------------------------");
+            log.info("-----TOPIC-----{}",topic);
+            log.info("-----Message-----{}",resp.getData());
         }
     }
 }
diff --git a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
index c17e815..f30ac51 100644
--- a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
+++ b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
@@ -11,9 +11,11 @@
 import com.fzzy.gateway.hx2023.ScConstant;
 import com.fzzy.gateway.hx2023.data.SyncReqData;
 import com.fzzy.gateway.service.repository.GatewayDeviceRep;
+import com.fzzy.mqtt.MqttGatewayService;
 import com.fzzy.mqtt.MqttProviderConfig;
 import com.fzzy.mqtt.MqttPublishService;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
 import org.springframework.beans.BeanUtils;
 import org.springframework.data.domain.Sort;
 import org.springframework.stereotype.Component;
@@ -31,7 +33,7 @@
     @Resource
     private GatewayRemoteManager gatewayRemoteManager;
     @Resource
-    private MqttPublishService publishService;
+    private MqttGatewayService publishService;
 
     public List<GatewayDevice> listAll() {
         Sort sort = new Sort(Sort.Direction.ASC, "deviceId");
@@ -86,6 +88,7 @@
     @Expose
     public String ajaxTestGrain2(GatewayDevice data) {
 
+
         SyncReqData reqData = new SyncReqData();
         reqData.setDevice(data);
         reqData.setAutoReplay(true);
@@ -98,6 +101,10 @@
             return "ERROR锛氬綋鍓嶈澶囬潪绮儏璁惧涓嶆敮鎸佸綋鍓嶆搷浣�";
         }
 
+        if(StringUtils.isEmpty(data.getCableRule())){
+            return "ERROR锛氬綋鍓嶈澶囨病鏈夐厤缃竷绾胯鍒欙紝鏃犳硶鎵ц";
+        }
+
         BaseResp resp = gatewayRemoteManager.getSyncGrainService(data.getSyncProtocol()).syncGrain2(reqData);
 
         //鑷姩鎺ㄩ��
@@ -105,10 +112,13 @@
             String topic = ScConstant.TOPIC_REPORT;
             topic = topic.replace("${productId}", data.getProductId()).replace("${deviceId}", data.getDeviceId());
 
-            publishService.publishMsg(topic, resp.getData());
-        }
+            publishService.publishMqttWithTopic(resp.getData(),topic);
 
-        log.info("=======鎵嬪姩娴嬭瘯绮儏鎺ㄩ��==========={}", resp.getData());
+            log.info("----------------------------鎵嬪姩鎺ㄩ�丮QTT绮儏淇℃伅---------------------------");
+            log.info("-----TOPIC-----{}",topic);
+            log.info("-----Message-----{}",resp.getData());
+
+        }
 
         return "SUCCESS锛氭墽琛屽畬鎴�";
     }
diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java b/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java
deleted file mode 100644
index 127cfe8..0000000
--- a/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.fzzy.mqtt;
-
-import com.fzzy.gateway.hx2023.service.OnReceiveMqttService;
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class MqttConsumerCallBack implements MqttCallback {
-
-
-    @Autowired
-    private OnReceiveMqttService onReceiveMqttService;
-
-    /**
-     * 瀹㈡埛绔柇寮�杩炴帴鐨勫洖璋�
-     */
-    @Override
-    public void connectionLost(Throwable throwable) {
-        log.info("涓庢湇鍔″櫒鏂紑杩炴帴锛屽彲閲嶈繛");
-    }
-
-    /**
-     * 娑堟伅鍒拌揪鐨勫洖璋�
-     */
-    @Override
-    public void messageArrived(String topic, MqttMessage message) throws Exception {
-
-
-        String messageStr = new String(message.getPayload());
-
-
-        log.info(String.format("鎺ユ敹娑堟伅涓婚 : %s", topic));
-        log.info(String.format("鎺ユ敹娑堟伅Qos : %d", message.getQos()));
-        log.info(String.format("鎺ユ敹娑堟伅鍐呭 : %s", messageStr));
-
-        log.info(String.format("鎺ユ敹娑堟伅retained : %b", message.isRetained()));
-
-        onReceiveMqttService.onReceiveMessage(messageStr);
-    }
-
-    /**
-     * 娑堟伅鍙戝竷鎴愬姛鐨勫洖璋�
-     */
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
-        log.info(String.format("鎺ユ敹娑堟伅鎴愬姛"));
-    }
-}
diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
index 076e3f7..854d9e5 100644
--- a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
+++ b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
@@ -1,93 +1,98 @@
 package com.fzzy.mqtt;
 
+import com.fzzy.gateway.hx2023.service.OnReceiveMqttService;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
 
-import javax.annotation.PostConstruct;
+import java.util.Arrays;
+import java.util.List;
 
 @Configuration
 @Slf4j
+@IntegrationComponentScan
 public class MqttConsumerConfig {
 
     @Autowired
-    private MqttProperties mqttProperties;
+    private OnReceiveMqttService onReceiveMqttService;
+
     @Autowired
-    private MqttConsumerCallBack mqttConsumerCallBack;
+    private MqttProperties mqttProperties;
 
-    /**
-     * 瀹㈡埛绔璞�
-     */
-    private MqttClient client;
-
-    /**
-     * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒
-     */
-    @PostConstruct
-    public void init() {
-        connect();
+    @Bean
+    public MqttConnectOptions getReceiverMqttConnectOptionsForSub() {
+        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
+        mqttConnectOptions.setUserName(mqttProperties.getUsername());
+        mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
+        List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(","));
+        String[] serverURIs = new String[hostList.size()];
+        hostList.toArray(serverURIs);
+        mqttConnectOptions.setServerURIs(serverURIs);
+        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
+        return mqttConnectOptions;
     }
 
-    /**
-     * 瀹㈡埛绔繛鎺ユ湇鍔$
-     */
-    public void connect() {
-        try {
-            //鍒涘缓MQTT瀹㈡埛绔璞�
-            client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence());
-            //杩炴帴璁剧疆
-            MqttConnectOptions options = new MqttConnectOptions();
-            //鏄惁娓呯┖session锛岃缃负false琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛屽鎴风閲嶈繛涔嬪悗鑳借幏鍙栧埌鏈嶅姟鍣ㄥ湪瀹㈡埛绔柇寮�杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅
-            //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鍒版湇鍔$閮芥槸浠ユ柊鐨勮韩浠�
-            options.setCleanSession(true);
-            //璁剧疆杩炴帴鐢ㄦ埛鍚�
-            options.setUserName(mqttProperties.getUsername());
-            //璁剧疆杩炴帴瀵嗙爜
-            options.setPassword(mqttProperties.getPassword().toCharArray());
-            //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉�
-            options.setConnectionTimeout(10);
-            //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎
-            options.setKeepAliveInterval(20);
-            //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭�
-            options.setWill("willTopic", (mqttProperties.getClientId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(), 0, false);
-            //璁剧疆鍥炶皟
-            // client.setCallback(new MqttConsumerCallBack());
-            client.setCallback(mqttConsumerCallBack);
-            client.connect(options);
-            //璁㈤槄涓婚
-            //娑堟伅绛夌骇锛屽拰涓婚鏁扮粍涓�涓�瀵瑰簲锛屾湇鍔$灏嗘寜鐓ф寚瀹氱瓑绾х粰璁㈤槄浜嗕富棰樼殑瀹㈡埛绔帹閫佹秷鎭�
-            int[] qos = {1, 1};
-            //涓婚
-            String[] topics = mqttProperties.getTopics().split(",");
-            //璁㈤槄涓婚
-            client.subscribe(topics, qos);
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
+    @Bean
+    public MqttPahoClientFactory receiverMqttClientFactoryForSub() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        factory.setConnectionOptions(getReceiverMqttConnectOptionsForSub());
+        return factory;
     }
 
-    /**
-     * 鏂紑杩炴帴
-     */
-    public void disConnect() {
-        try {
-            client.disconnect();
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
+    //鎺ユ敹閫氶亾
+    @Bean
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
     }
 
+    //閰嶇疆client,鐩戝惉鐨則opic
+    @Bean
+    public MessageProducer inbound() {
 
-    /**
-     * 璁㈤槄涓婚
-     */
-    public void subscribe(String topic, int qos) {
-        try {
-            client.subscribe(topic, qos);
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
+        // List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
+        String[] topics = mqttProperties.getTopics().split(",");
+        //topicList.toArray(topics);
+
+        MqttPahoMessageDrivenChannelAdapter adapter =
+                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientInId(), receiverMqttClientFactoryForSub(), topics);
+        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(mqttProperties.getQos());
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
     }
-}
+
+    //閫氳繃閫氶亾鑾峰彇鏁版嵁
+    @Bean
+    @ServiceActivator(inputChannel = "mqttInputChannel")
+    public MessageHandler handler() {
+        return new MessageHandler() {
+            @Override
+            public void handleMessage(Message<?> message) throws MessagingException {
+                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
+                String msg = message.getPayload().toString();
+
+                // 杩欓噷鍙互澶勭悊鎺ユ敹鐨勬暟鎹�
+                log.info("----------------------------鏀跺埌璁㈤槄鍐呭---------------------------");
+                log.info("-----TOPIC-----{}", topic);
+                log.info("-----Message-----{}", msg);
+
+                onReceiveMqttService.onReceiveMessage(topic, msg);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/com/fzzy/mqtt/MqttController.java b/src/main/java/com/fzzy/mqtt/MqttController.java
deleted file mode 100644
index e945120..0000000
--- a/src/main/java/com/fzzy/mqtt/MqttController.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.fzzy.mqtt;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Controller;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.ResponseBody;
-
-@Controller
-public class MqttController {
-    @Autowired
-    private MqttConsumerConfig client;
-
-    @Autowired
-    private MqttProperties mqttProperties;
-
-    @RequestMapping("/connect")
-    public @ResponseBody
-    String connect() {
-        client.connect();
-        return mqttProperties.getClientId() + "杩炴帴鍒版湇鍔″櫒";
-    }
-
-    @RequestMapping("/disConnect")
-    @ResponseBody
-    public String disConnect() {
-        client.disConnect();
-        return mqttProperties.getClientId() + "涓庢湇鍔″櫒鏂紑杩炴帴";
-    }
-
-}
diff --git a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java
new file mode 100644
index 0000000..f86ede5
--- /dev/null
+++ b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java
@@ -0,0 +1,31 @@
+package com.fzzy.mqtt;
+
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+
+
+@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
+public interface MqttGatewayService {
+
+    /**
+     * 鍙戦�佷俊鎭埌MQTT鏈嶅姟鍣�
+     *
+     * @param topic   涓婚
+     * @param message 娑堟伅涓讳綋
+     */
+    void publishMqttWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic);
+
+    /**
+     * 鍙戦�佷俊鎭埌MQTT鏈嶅姟鍣�
+     *
+     * @param topic   涓婚
+     * @param qos     瀵规秷鎭鐞嗙殑鍑犵鏈哄埗銆�<br> 0 琛ㄧず鐨勬槸璁㈤槄鑰呮病鏀跺埌娑堟伅涓嶄細鍐嶆鍙戦�侊紝娑堟伅浼氫涪澶便��<br>
+     *                1 琛ㄧず鐨勬槸浼氬皾璇曢噸璇曪紝涓�鐩村埌鎺ユ敹鍒版秷鎭紝浣嗚繖绉嶆儏鍐靛彲鑳藉鑷磋闃呰�呮敹鍒板娆¢噸澶嶆秷鎭��<br>
+     *                2 澶氫簡涓�娆″幓閲嶇殑鍔ㄤ綔锛岀‘淇濊闃呰�呮敹鍒扮殑娑堟伅鏈変竴娆°��
+     * @param message 娑堟伅涓讳綋
+     */
+    void publishMqttWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
+
+
+}
diff --git a/src/main/java/com/fzzy/mqtt/MqttProperties.java b/src/main/java/com/fzzy/mqtt/MqttProperties.java
index 4d35f80..d35ea0c 100644
--- a/src/main/java/com/fzzy/mqtt/MqttProperties.java
+++ b/src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -33,12 +33,17 @@
     /**
      * 瀹㈡埛绔疘d
      */
-    private String clientId;
+    private String clientInId;
+
+    /**
+     * 瀹㈡埛绔疘d
+     */
+    private String clientOutId;
 
     /**
      * 瓒呮椂鏃堕棿
      */
-    private int timout = 5000;
+    private int completionTimeout = 5000;
 
     /**
      * 璁剧疆浼氳瘽蹇冭烦鏃堕棿 鍗曚綅涓虹 鏈嶅姟鍣ㄤ細姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风
@@ -59,12 +64,6 @@
      * 榛樿杩炴帴涓婚锛屼互/#缁撳熬琛ㄧず璁㈤槄鎵�鏈変互test寮�澶寸殑涓婚
      */
     private String defaultTopic;
-
-    /**
-     * 璁剧疆鏄惁娓呯┖session,杩欓噷濡傛灉璁剧疆涓篺alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩�
-     * 鎺ヨ褰曪紝杩欓噷璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鍒版湇鍔″櫒閮戒互鏂扮殑韬唤杩炴帴
-     */
-    private Boolean cleanSession;
 
     /**
      * 鏄惁鏂嚎閲嶈繛
diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java b/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java
deleted file mode 100644
index 3b4de1c..0000000
--- a/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package com.fzzy.mqtt;
-
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class MqttProviderCallBack implements MqttCallback {
-
-    /**
-     * 瀹㈡埛绔柇寮�杩炴帴鐨勫洖璋�
-     */
-    @Override
-    public void connectionLost(Throwable throwable) {
-        System.out.println("涓庢湇鍔″櫒鏂紑杩炴帴锛屽彲閲嶈繛");
-    }
-
-    /**
-     * 娑堟伅鍒拌揪鐨勫洖璋�
-     */
-    @Override
-    public void messageArrived(String topic, MqttMessage message) throws Exception {
-        System.out.println(String.format("鎺ユ敹娑堟伅涓婚 : %s", topic));
-        System.out.println(String.format("鎺ユ敹娑堟伅Qos : %d", message.getQos()));
-        System.out.println(String.format("鎺ユ敹娑堟伅鍐呭 : %s", new String(message.getPayload())));
-        System.out.println(String.format("鎺ユ敹娑堟伅retained : %b", message.isRetained()));
-    }
-
-    /**
-     * 娑堟伅鍙戝竷鎴愬姛鐨勫洖璋�
-     */
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
-        System.out.println(String.format("鎺ユ敹娑堟伅鎴愬姛"));
-    }
-}
diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
index 04cca6a..7369600 100644
--- a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
+++ b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
@@ -4,100 +4,60 @@
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
 
 import javax.annotation.PostConstruct;
+import java.util.Arrays;
+import java.util.List;
 
 @Configuration
 @Slf4j
+@IntegrationComponentScan
 public class MqttProviderConfig {
 
     @Autowired
     private MqttProperties mqttProperties;
-    @Autowired
-    private MqttProviderCallBack mqttProviderCallBack;
-    /**
-     * 瀹㈡埛绔璞�
-     */
-    private MqttClient client;
 
-    /**
-     *
-     * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒
-     */
-    @PostConstruct
-    public void init(){
-        connect();
+    @Bean
+    public MqttConnectOptions getReceiverMqttConnectOptionsForSend() {
+        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
+        mqttConnectOptions.setUserName(mqttProperties.getUsername());
+        mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
+        List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(","));
+        String[] serverURIs = new String[hostList.size()];
+        hostList.toArray(serverURIs);
+        mqttConnectOptions.setServerURIs(serverURIs);
+        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
+        return mqttConnectOptions;
     }
 
-    /**
-     * 瀹㈡埛绔繛鎺ユ湇鍔$
-     */
-    public void connect(){
-        try{
-            //鍒涘缓MQTT瀹㈡埛绔璞�
-            client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientId(),new MemoryPersistence());
-            //杩炴帴璁剧疆
-            MqttConnectOptions options = new MqttConnectOptions();
-            //鏄惁娓呯┖session锛岃缃甪alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛堣闃呬富棰橈紝qos锛�,瀹㈡埛绔噸杩炰箣鍚庤兘鑾峰彇鍒版湇鍔″櫒鍦ㄥ鎴风鏂紑杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅
-            //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鏈嶅姟鍣ㄩ兘鏄互鏂扮殑韬唤
-            options.setCleanSession(true);
-            //璁剧疆杩炴帴鐢ㄦ埛鍚�
-            options.setUserName(mqttProperties.getUsername());
-            //璁剧疆杩炴帴瀵嗙爜
-            options.setPassword(mqttProperties.getPassword().toCharArray());
-            //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉�
-            options.setConnectionTimeout(100);
-            //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧 1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎
-            options.setKeepAliveInterval(20);
-            //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭�
-            options.setWill("willTopic",(mqttProperties.getClientId()+ "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(),0,false);
-            //璁剧疆鍥炶皟
-            client.setCallback(mqttProviderCallBack);
-            client.connect(options);
-        } catch(MqttException e){
-            e.printStackTrace();
-        }
+    @Bean
+    public MqttPahoClientFactory receiverMqttClientFactoryForSend() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        factory.setConnectionOptions(getReceiverMqttConnectOptionsForSend());
+        return factory;
     }
 
-    public void publish(String topic,String message){
-        MqttMessage mqttMessage = new MqttMessage();
-        mqttMessage.setQos(mqttProperties.getQos());
-        mqttMessage.setRetained(true);
-        mqttMessage.setPayload(message.getBytes());
-        //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅
-        MqttTopic mqttTopic = client.getTopic(topic);
-        //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴�
-        //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴�
-        MqttDeliveryToken token;
-        try {
-            //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬�
-            //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋��
-            token = mqttTopic.publish(mqttMessage);
-            token.waitForCompletion();
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
+    @Bean
+    @ServiceActivator(inputChannel = "mqttOutboundChannel")
+    public MessageHandler mqttOutbound() {
+        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientOutId(), receiverMqttClientFactoryForSend());
+        messageHandler.setAsync(false);
+        messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
+        return messageHandler;
     }
 
-    public void publish(int qos,boolean retained,String topic,String message){
-        MqttMessage mqttMessage = new MqttMessage();
-        mqttMessage.setQos(qos);
-        mqttMessage.setRetained(retained);
-        mqttMessage.setPayload(message.getBytes());
-        //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅
-        MqttTopic mqttTopic = client.getTopic(topic);
-        //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴�
-        //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴�
-        MqttDeliveryToken token;
-        try {
-            //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬�
-            //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋��
-            token = mqttTopic.publish(mqttMessage);
-            token.waitForCompletion();
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
+    @Bean
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
     }
-
 }
diff --git a/src/main/java/com/fzzy/mqtt/MqttPubController.java b/src/main/java/com/fzzy/mqtt/MqttPubController.java
deleted file mode 100644
index c03b424..0000000
--- a/src/main/java/com/fzzy/mqtt/MqttPubController.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.fzzy.mqtt;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.ResponseBody;
-import org.springframework.web.bind.annotation.RestController;
-
-@RestController
-public class MqttPubController {
-
-    @Autowired
-    private MqttProviderConfig providerClient;
-
-
-    @RequestMapping("/sendMessage")
-    public @ResponseBody
-    String sendMessage(String topic, String message) {
-        try {
-            providerClient.publish(topic, message);
-            return "鍙戦�佹垚鍔�";
-        } catch (Exception e) {
-            e.printStackTrace();
-            return "鍙戦�佸け璐�";
-        }
-    }
-}
diff --git a/src/main/java/com/fzzy/mqtt/MqttPublishService.java b/src/main/java/com/fzzy/mqtt/MqttPublishService.java
index 136ed84..fdbb845 100644
--- a/src/main/java/com/fzzy/mqtt/MqttPublishService.java
+++ b/src/main/java/com/fzzy/mqtt/MqttPublishService.java
@@ -26,7 +26,7 @@
         //String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
         String username = mqttProperties.getUsername();
         String password = mqttProperties.getPassword();
-        String clientid = mqttProperties.getClientId();
+        String clientid = mqttProperties.getClientOutId();
         String broker = mqttProperties.getHost();
         //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"鍦扮绉伴噸\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044     }";
         int qos = 0;
diff --git a/src/main/resources/application-devGateway.yml b/src/main/resources/application-devGateway.yml
index 4774c3a..044667c 100644
--- a/src/main/resources/application-devGateway.yml
+++ b/src/main/resources/application-devGateway.yml
@@ -72,13 +72,13 @@
   mqtt:
     host: tcp://127.0.0.1:1883
     username: admin
-    password: pwdmqtt..
-    client-id: fzzy-customer-igds-api
-    timeout: 10
-    keep-alive-interval: 20
+    password: admin123321
+    client-inId: fzzy-clientInId-igds-api
+    client-outId: fzzy-clientInOutId-igds-api
+    completionTimeout: 3000
+    keep-alive-interval: 2
     max-connect-times: 5
     qos: 0
     isOpen: false
-    default:
-      topic: testTopic
-    topics: "/+/+/properties/report,/device-message-sender/+/+"
\ No newline at end of file
+    default-topic: mqtt/+/test1
+    topics: /device-message-sender/#
\ No newline at end of file

--
Gitblit v1.9.3