From bf57ab9e4db58dbee018268dd8b593ee564bc7ee Mon Sep 17 00:00:00 2001
From: vince <757871790@qq.com>
Date: 星期四, 09 十一月 2023 12:07:09 +0800
Subject: [PATCH] Merge remote-tracking branch 'orgin/igds-api-gateway' into igds-api-gateway

---
 src/main/java/com/fzzy/mqtt/MqttProviderConfig.java |  116 +++++++++++++++++++---------------------------------------
 1 files changed, 38 insertions(+), 78 deletions(-)

diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
index 04cca6a..7369600 100644
--- a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
+++ b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
@@ -4,100 +4,60 @@
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
 
 import javax.annotation.PostConstruct;
+import java.util.Arrays;
+import java.util.List;
 
 @Configuration
 @Slf4j
+@IntegrationComponentScan
 public class MqttProviderConfig {
 
     @Autowired
     private MqttProperties mqttProperties;
-    @Autowired
-    private MqttProviderCallBack mqttProviderCallBack;
-    /**
-     * 瀹㈡埛绔璞�
-     */
-    private MqttClient client;
 
-    /**
-     *
-     * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒
-     */
-    @PostConstruct
-    public void init(){
-        connect();
+    @Bean
+    public MqttConnectOptions getReceiverMqttConnectOptionsForSend() {
+        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
+        mqttConnectOptions.setUserName(mqttProperties.getUsername());
+        mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
+        List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(","));
+        String[] serverURIs = new String[hostList.size()];
+        hostList.toArray(serverURIs);
+        mqttConnectOptions.setServerURIs(serverURIs);
+        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
+        return mqttConnectOptions;
     }
 
-    /**
-     * 瀹㈡埛绔繛鎺ユ湇鍔$
-     */
-    public void connect(){
-        try{
-            //鍒涘缓MQTT瀹㈡埛绔璞�
-            client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientId(),new MemoryPersistence());
-            //杩炴帴璁剧疆
-            MqttConnectOptions options = new MqttConnectOptions();
-            //鏄惁娓呯┖session锛岃缃甪alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛堣闃呬富棰橈紝qos锛�,瀹㈡埛绔噸杩炰箣鍚庤兘鑾峰彇鍒版湇鍔″櫒鍦ㄥ鎴风鏂紑杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅
-            //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鏈嶅姟鍣ㄩ兘鏄互鏂扮殑韬唤
-            options.setCleanSession(true);
-            //璁剧疆杩炴帴鐢ㄦ埛鍚�
-            options.setUserName(mqttProperties.getUsername());
-            //璁剧疆杩炴帴瀵嗙爜
-            options.setPassword(mqttProperties.getPassword().toCharArray());
-            //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉�
-            options.setConnectionTimeout(100);
-            //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧 1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎
-            options.setKeepAliveInterval(20);
-            //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭�
-            options.setWill("willTopic",(mqttProperties.getClientId()+ "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(),0,false);
-            //璁剧疆鍥炶皟
-            client.setCallback(mqttProviderCallBack);
-            client.connect(options);
-        } catch(MqttException e){
-            e.printStackTrace();
-        }
+    @Bean
+    public MqttPahoClientFactory receiverMqttClientFactoryForSend() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        factory.setConnectionOptions(getReceiverMqttConnectOptionsForSend());
+        return factory;
     }
 
-    public void publish(String topic,String message){
-        MqttMessage mqttMessage = new MqttMessage();
-        mqttMessage.setQos(mqttProperties.getQos());
-        mqttMessage.setRetained(true);
-        mqttMessage.setPayload(message.getBytes());
-        //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅
-        MqttTopic mqttTopic = client.getTopic(topic);
-        //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴�
-        //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴�
-        MqttDeliveryToken token;
-        try {
-            //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬�
-            //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋��
-            token = mqttTopic.publish(mqttMessage);
-            token.waitForCompletion();
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
+    @Bean
+    @ServiceActivator(inputChannel = "mqttOutboundChannel")
+    public MessageHandler mqttOutbound() {
+        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientOutId(), receiverMqttClientFactoryForSend());
+        messageHandler.setAsync(false);
+        messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
+        return messageHandler;
     }
 
-    public void publish(int qos,boolean retained,String topic,String message){
-        MqttMessage mqttMessage = new MqttMessage();
-        mqttMessage.setQos(qos);
-        mqttMessage.setRetained(retained);
-        mqttMessage.setPayload(message.getBytes());
-        //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅
-        MqttTopic mqttTopic = client.getTopic(topic);
-        //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴�
-        //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴�
-        MqttDeliveryToken token;
-        try {
-            //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬�
-            //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋��
-            token = mqttTopic.publish(mqttMessage);
-            token.waitForCompletion();
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
+    @Bean
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
     }
-
 }

--
Gitblit v1.9.3