From 96f7af2f3bf9a36dd48e0e6bf4f8a8ca1e31ed7d Mon Sep 17 00:00:00 2001
From: vince <757871790@qq.com>
Date: 星期三, 08 十一月 2023 17:49:56 +0800
Subject: [PATCH] Merge remote-tracking branch 'orgin/igds-api-gateway' into igds-api-gateway

---
 src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java |   93 ++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 93 insertions(+), 0 deletions(-)

diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
new file mode 100644
index 0000000..ca83cb1
--- /dev/null
+++ b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
@@ -0,0 +1,93 @@
+package com.fzzy.mqtt;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+
+@Configuration
+@Slf4j
+public class MqttConsumerConfig {
+
+    @Autowired
+    private MqttProperties mqttProperties;
+    @Autowired
+    private MqttConsumerCallBack mqttConsumerCallBack;
+
+    /**
+     * 瀹㈡埛绔璞�
+     */
+    private MqttClient client;
+
+    /**
+     * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒
+     */
+    @PostConstruct
+    public void init() {
+        connect();
+    }
+
+    /**
+     * 瀹㈡埛绔繛鎺ユ湇鍔$
+     */
+    public void connect() {
+        try {
+            //鍒涘缓MQTT瀹㈡埛绔璞�
+            client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientInId(), new MemoryPersistence());
+            //杩炴帴璁剧疆
+            MqttConnectOptions options = new MqttConnectOptions();
+            //鏄惁娓呯┖session锛岃缃负false琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛屽鎴风閲嶈繛涔嬪悗鑳借幏鍙栧埌鏈嶅姟鍣ㄥ湪瀹㈡埛绔柇寮�杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅
+            //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鍒版湇鍔$閮芥槸浠ユ柊鐨勮韩浠�
+            options.setCleanSession(true);
+            //璁剧疆杩炴帴鐢ㄦ埛鍚�
+            options.setUserName(mqttProperties.getUsername());
+            //璁剧疆杩炴帴瀵嗙爜
+            options.setPassword(mqttProperties.getPassword().toCharArray());
+            //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉�
+            options.setConnectionTimeout(10);
+            //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎
+            options.setKeepAliveInterval(20);
+            //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭�
+            options.setWill("willTopic", (mqttProperties.getClientInId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(), 0, false);
+            //璁剧疆鍥炶皟
+            // client.setCallback(new MqttConsumerCallBack());
+            client.setCallback(mqttConsumerCallBack);
+            client.connect(options);
+            //璁㈤槄涓婚
+            //娑堟伅绛夌骇锛屽拰涓婚鏁扮粍涓�涓�瀵瑰簲锛屾湇鍔$灏嗘寜鐓ф寚瀹氱瓑绾х粰璁㈤槄浜嗕富棰樼殑瀹㈡埛绔帹閫佹秷鎭�
+            int[] qos = {1, 1};
+            //涓婚
+            String[] topics = mqttProperties.getTopics().split(",");
+            //璁㈤槄涓婚
+            client.subscribe(topics, qos);
+        } catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 鏂紑杩炴帴
+     */
+    public void disConnect() {
+        try {
+            client.disconnect();
+        } catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    /**
+     * 璁㈤槄涓婚
+     */
+    public void subscribe(String topic, int qos) {
+        try {
+            client.subscribe(topic, qos);
+        } catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+}

--
Gitblit v1.9.3