From ebaaa34038ebda73630c9ab82465c5f76692b5c2 Mon Sep 17 00:00:00 2001
From: jiazx0107@163.com <jiazx0107@163.com>
Date: 星期二, 05 十二月 2023 00:01:48 +0800
Subject: [PATCH] 提交粮情协议2,完成

---
 src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java |  140 +++++++++++++++++++++++-----------------------
 1 files changed, 69 insertions(+), 71 deletions(-)

diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
index 076e3f7..4057450 100644
--- a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
+++ b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
@@ -1,93 +1,91 @@
 package com.fzzy.mqtt;
 
+import com.fzzy.gateway.hx2023.service.OnReceiveMqttService;
 import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
 
-import javax.annotation.PostConstruct;
+import java.util.Arrays;
+import java.util.List;
 
 @Configuration
 @Slf4j
+@IntegrationComponentScan
 public class MqttConsumerConfig {
 
     @Autowired
-    private MqttProperties mqttProperties;
+    private OnReceiveMqttService onReceiveMqttService;
+
     @Autowired
-    private MqttConsumerCallBack mqttConsumerCallBack;
+    private MqttProperties mqttProperties;
 
-    /**
-     * 瀹㈡埛绔璞�
-     */
-    private MqttClient client;
-
-    /**
-     * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒
-     */
-    @PostConstruct
-    public void init() {
-        connect();
+    @Bean
+    public MqttConnectOptions getReceiverMqttConnectOptionsForSub() {
+        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
+        mqttConnectOptions.setUserName(mqttProperties.getUsername());
+        mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
+        List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(","));
+        String[] serverURIs = new String[hostList.size()];
+        hostList.toArray(serverURIs);
+        mqttConnectOptions.setServerURIs(serverURIs);
+        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
+        return mqttConnectOptions;
     }
 
-    /**
-     * 瀹㈡埛绔繛鎺ユ湇鍔$
-     */
-    public void connect() {
-        try {
-            //鍒涘缓MQTT瀹㈡埛绔璞�
-            client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence());
-            //杩炴帴璁剧疆
-            MqttConnectOptions options = new MqttConnectOptions();
-            //鏄惁娓呯┖session锛岃缃负false琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛屽鎴风閲嶈繛涔嬪悗鑳借幏鍙栧埌鏈嶅姟鍣ㄥ湪瀹㈡埛绔柇寮�杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅
-            //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鍒版湇鍔$閮芥槸浠ユ柊鐨勮韩浠�
-            options.setCleanSession(true);
-            //璁剧疆杩炴帴鐢ㄦ埛鍚�
-            options.setUserName(mqttProperties.getUsername());
-            //璁剧疆杩炴帴瀵嗙爜
-            options.setPassword(mqttProperties.getPassword().toCharArray());
-            //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉�
-            options.setConnectionTimeout(10);
-            //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎
-            options.setKeepAliveInterval(20);
-            //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭�
-            options.setWill("willTopic", (mqttProperties.getClientId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(), 0, false);
-            //璁剧疆鍥炶皟
-            // client.setCallback(new MqttConsumerCallBack());
-            client.setCallback(mqttConsumerCallBack);
-            client.connect(options);
-            //璁㈤槄涓婚
-            //娑堟伅绛夌骇锛屽拰涓婚鏁扮粍涓�涓�瀵瑰簲锛屾湇鍔$灏嗘寜鐓ф寚瀹氱瓑绾х粰璁㈤槄浜嗕富棰樼殑瀹㈡埛绔帹閫佹秷鎭�
-            int[] qos = {1, 1};
-            //涓婚
-            String[] topics = mqttProperties.getTopics().split(",");
-            //璁㈤槄涓婚
-            client.subscribe(topics, qos);
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
+    @Bean
+    public MqttPahoClientFactory receiverMqttClientFactoryForSub() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        factory.setConnectionOptions(getReceiverMqttConnectOptionsForSub());
+        return factory;
     }
 
-    /**
-     * 鏂紑杩炴帴
-     */
-    public void disConnect() {
-        try {
-            client.disconnect();
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
+    //鎺ユ敹閫氶亾
+    @Bean
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
     }
 
+    //閰嶇疆client,鐩戝惉鐨則opic
+    @Bean
+    public MessageProducer inbound() {
 
-    /**
-     * 璁㈤槄涓婚
-     */
-    public void subscribe(String topic, int qos) {
-        try {
-            client.subscribe(topic, qos);
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
+        // List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
+        String[] topics = mqttProperties.getTopics().split(",");
+        //topicList.toArray(topics);
+
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientInId(), receiverMqttClientFactoryForSub(), topics);
+        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setQos(mqttProperties.getQos());
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
     }
-}
+
+    //閫氳繃閫氶亾鑾峰彇鏁版嵁
+    @Bean
+    @ServiceActivator(inputChannel = "mqttInputChannel")
+    public MessageHandler handler() {
+        return new MessageHandler() {
+            @Override
+            public void handleMessage(Message<?> message) throws MessagingException {
+                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
+                String msg = message.getPayload().toString();
+                onReceiveMqttService.onReceiveMessage(topic, msg);
+            }
+        };
+    }
+}
\ No newline at end of file

--
Gitblit v1.9.3