From a31a452b9999ba3c811c36b5cb1b3ec0c18d037d Mon Sep 17 00:00:00 2001
From: jiazx0107@163.com <jiazx0107@163.com>
Date: 星期三, 08 十一月 2023 02:23:38 +0800
Subject: [PATCH] 提交MQTT相关功能

---
 src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java             |   12 +
 src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java |    1 
 src/main/java/com/fzzy/mqtt/MqttProperties.java                               |   15 +
 src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java                     |   91 +++++++++++++
 src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java            |    2 
 src/main/java/com/fzzy/gateway/service/GatewayConfService.java                |    1 
 src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java            |  102 ++++++++++++++
 /dev/null                                                                     |   25 ---
 src/main/resources/application-devGateway.yml                                 |   23 +-
 src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java                    |   58 ++++++++
 src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml                      |   10 +
 src/main/java/com/fzzy/mqtt/MqttGatewayService.java                           |   17 ++
 src/main/java/com/fzzy/mqtt/MqttPubController.java                            |   35 +++++
 13 files changed, 345 insertions(+), 47 deletions(-)

diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java b/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
index e98cafa..f9b093f 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
@@ -1,19 +1,21 @@
 package com.fzzy.gateway.hx2023.service;
 
+import com.bstek.dorado.annotation.Expose;
 import com.fzzy.gateway.api.GatewayRemoteManager;
 import com.fzzy.gateway.api.GatewayRemoteService;
 import com.fzzy.gateway.entity.GatewayConf;
 import com.fzzy.gateway.service.GatewayConfService;
-import lombok.extern.slf4j.Slf4j;
+
+
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+
 import java.util.List;
 
 /**
  * 褰撳墠鎺ュ彛锛屽垵濮嬪寲鐩稿叧
  */
-@Slf4j
 @Component
 public class ApiInitService {
 
@@ -23,6 +25,10 @@
     private GatewayRemoteManager gatewayRemoteManager;
 
 
+    /**
+     * apiInitService#init
+     */
+    @Expose
     public void init() {
 
         List<GatewayConf> list = confService.listAll();
@@ -38,5 +44,7 @@
         }
 
     }
+    
+    
 
 }
diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java b/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java
index a6088d8..c43f29e 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java
@@ -33,7 +33,6 @@
     @Resource
     private ApiLogRep apiLogRep;
 
-
     @Resource
     private GatewayConfService gatewayConfService;
 
diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java
new file mode 100644
index 0000000..1025124
--- /dev/null
+++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java
@@ -0,0 +1,102 @@
+package com.fzzy.gateway.hx2023.websocket;
+
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+import org.springframework.web.bind.annotation.CrossOrigin;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+@Component
+@CrossOrigin
+@Service
+@ServerEndpoint(value = "/mqtt")
+public class SocketService {
+
+    /**
+     * 鐢ㄦ潵瀛樻斁姣忎釜瀹㈡埛绔搴旂殑MyWebSocket瀵硅薄銆�
+     **/
+    private static CopyOnWriteArraySet<SocketService> socketSet = new CopyOnWriteArraySet<>();
+    /**
+     * 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹�
+     **/
+    private Session session;
+    /**
+     * 鐢ㄦ埛鍚嶇О
+     **/
+    private String nickname;
+    /**
+     * 鐢ㄦ潵璁板綍sessionId鍜岃session杩涜缁戝畾
+     **/
+    private static Map<String,Session> map = new HashMap<String, Session>();
+
+    /**
+     * 杩炴帴寤虹珛鎴愬姛璋冪敤鐨勬柟娉�
+     */
+    @OnOpen
+    public void onOpen(Session session,@PathParam("nickname") String nickname) {
+        this.session = session;
+        this.nickname=nickname;
+
+        map.put(nickname, session);
+        socketSet.add(this);
+
+        System.out.println("鏈夋柊杩炴帴鍔犲叆:"+nickname+",褰撳墠鍦ㄧ嚎浜烘暟涓�" + socketSet.size());
+    }
+
+    /**
+     * 杩炴帴鍏抽棴璋冪敤鐨勬柟娉�
+     */
+    @OnClose
+    public void onClose() {
+        socketSet.remove(this);
+        List<String> nickname = this.session.getRequestParameterMap().get("nickname");
+        for(String nick:nickname) {
+            map.remove(nick);
+        }
+        System.out.println("鏈変竴杩炴帴鍏抽棴锛佸綋鍓嶅湪绾夸汉鏁颁负" + socketSet.size());
+    }
+
+    /**
+     * 鏀跺埌瀹㈡埛绔秷鎭悗璋冪敤鐨勬柟娉�
+     */
+    @OnMessage
+    public void onMessage(String message, Session session,@PathParam("nickname") String nickname) {
+        System.out.println("鏉ヨ嚜瀹㈡埛绔殑娑堟伅-->"+nickname+": " + message);
+        //灏唌qtt鍙戦�佽繃鏉ョ殑鏁版嵁杩斿洖缁欏墠绔�
+        try {
+//            JSONObject parse = JSONObject.parse(message);
+//            Map<String,Object> valuesMap = (Map<String, Object>) parse.getByPath("values");
+//            String tag1 = valuesMap.get("tag1").toString();
+//            String replace = tag1.replace("\r\n", "");
+//            String substring = replace.substring(replace.indexOf(":") + 1).trim().replace(" ",":");
+//            String tag2 = valuesMap.get("tag2").toString();
+//            String substring1=substring+"tag2:"+tag2;
+//            String result="{"+substring1+"}";
+//            //鍙戦�佺粰鍓嶇锛岀敤浣滈〉闈㈡覆鏌�
+//            Session fromSession = map.get(nickname);
+//            fromSession.getAsyncRemote().sendText(result);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 鍙戠敓閿欒鏃惰皟鐢�
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+        System.out.println("鍙戠敓閿欒");
+        error.printStackTrace();
+    }
+
+
+}
diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
index 28e5666..9d9756c 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
@@ -21,7 +21,7 @@
  */
 @Slf4j
 @Component
-@ServerEndpoint(value = "/mqtt")
+//@ServerEndpoint(value = "/mqtt")
 public class WebSocketMqtt {
 
     private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
diff --git a/src/main/java/com/fzzy/gateway/service/GatewayConfService.java b/src/main/java/com/fzzy/gateway/service/GatewayConfService.java
index 81ddd0a..2c8b6b8 100644
--- a/src/main/java/com/fzzy/gateway/service/GatewayConfService.java
+++ b/src/main/java/com/fzzy/gateway/service/GatewayConfService.java
@@ -64,6 +64,7 @@
         gatewayConfRep.delete(data2);
         return null;
     }
+    
 
     public void updateCache(GatewayConf conf) {
         String key = RedisConst.buildKey(RedisConst.KYE_CONF_GATEWAY, conf.getKqdm());
diff --git a/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml b/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
index 033eef2..4a77418 100644
--- a/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
+++ b/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
@@ -166,6 +166,12 @@
         <Property name="iconClass">fa fa-search</Property>
         <Property name="exClassName">toolbar-button-push</Property>
       </ToolBarButton>
+      <ToolBarButton>
+        <Property name="caption">缃戝叧鍒濆鍖�</Property>
+        <Property name="iconClass">fa fa-search</Property>
+        <Property name="exClassName">toolbar-button-push</Property>
+        <Property name="action">ajaxInit</Property>
+      </ToolBarButton>
     </ToolBar>
     <DataGrid id="dgMain">
       <Property name="dataSet">dsMain</Property>
@@ -377,6 +383,10 @@
       <Property name="service">gatewayDeviceService#ajaxTestGrain</Property>
       <Property name="executingMessage">鍦ㄥ姫鍔涙墽琛屼腑鈥︹��</Property>
     </AjaxAction>
+    <AjaxAction id="ajaxInit">
+      <Property name="service">apiInitService#init</Property>
+      <Property name="executingMessage">鍦ㄥ姫鍔涙墽琛屼腑鈥︹��</Property>
+    </AjaxAction>
     <Dialog id="dialogWeight">
       <Property name="width">400</Property>
       <Property name="height">300</Property>
diff --git a/src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java b/src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java
deleted file mode 100644
index 94ace07..0000000
--- a/src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.fzzy.mqtt;
-
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * @Description : MQTT鎺ュ彈鏈嶅姟鐨勫洖璋冪被
- */
-@Slf4j
-@Component
-public class MqttAcceptCallback implements MqttCallbackExtended {
-
-    @Autowired
-    private MqttAcceptClient mqttAcceptClient;
-
-    @Autowired
-    private MqttProperties mqttProperties;
-
-
-    /**
-     * 瀹㈡埛绔柇寮�鍚庤Е鍙�
-     *
-     * @param throwable
-     */
-    @Override
-    public void connectionLost(Throwable throwable) {
-        log.info("杩炴帴鏂紑锛屽彲浠ラ噸杩�");
-        if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
-            log.info("銆恊mqx閲嶆柊杩炴帴銆�....................................................");
-            mqttAcceptClient.reconnection();
-        }
-    }
-
-
-    /**
-     * 瀹㈡埛绔敹鍒版秷鎭Е鍙�
-     *
-     * @param topic       涓婚
-     * @param mqttMessage 娑堟伅
-     */
-    @Override
-    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
-        log.info("銆愭帴鏀舵秷鎭富棰樸��:" + topic);
-        log.info("銆愭帴鏀舵秷鎭疩os銆�:" + mqttMessage.getQos());
-        log.info("銆愭帴鏀舵秷鎭唴瀹广��:" + new String(mqttMessage.getPayload()));
-        //        int i = 1/0;
-    }
-
-
-    /**
-     * 鍙戝竷娑堟伅鎴愬姛
-     *
-     * @param token token
-     */
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-        String[] topics = token.getTopics();
-        for (String topic : topics) {
-            log.info("鍚戜富棰樸��" + topic + "銆戝彂閫佹秷鎭垚鍔燂紒");
-        }
-        try {
-            MqttMessage message = token.getMessage();
-            byte[] payload = message.getPayload();
-            String s = new String(payload, "UTF-8");
-            log.info("銆愭秷鎭唴瀹广��:" + s);
-        } catch (Exception e) {
-            log.error("MqttAcceptCallback deliveryComplete error,message:{}", e.getMessage());
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * 杩炴帴emq鏈嶅姟鍣ㄥ悗瑙﹀彂
-     *
-     * @param b
-     * @param s
-     */
-    @Override
-    public void connectComplete(boolean b, String s) {
-        log.info("============================= 瀹㈡埛绔��" + MqttAcceptClient.client.getClientId() + "銆戣繛鎺ユ垚鍔燂紒=============================");
-        // 浠�/#缁撳熬琛ㄧず璁㈤槄鎵�鏈変互test寮�澶寸殑涓婚
-        // 璁㈤槄鎵�鏈夋満鏋勪富棰�
-        mqttAcceptClient.subscribe(mqttProperties.getDefaultTopic(), 0);
-    }
-
-}
diff --git a/src/main/java/com/fzzy/mqtt/MqttAcceptClient.java b/src/main/java/com/fzzy/mqtt/MqttAcceptClient.java
deleted file mode 100644
index a7823ad..0000000
--- a/src/main/java/com/fzzy/mqtt/MqttAcceptClient.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package com.fzzy.mqtt;
-
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * @Description : MQTT鎺ュ彈鏈嶅姟鐨勫鎴风
- */
-@Slf4j
-@Component
-public class MqttAcceptClient {
-
-    @Autowired
-    private MqttAcceptCallback mqttAcceptCallback;
-
-    @Autowired
-    private MqttProperties mqttProperties;
-
-    public static MqttClient client;
-
-    private static MqttClient getClient() {
-        return client;
-    }
-
-    private static void setClient(MqttClient client) {
-        MqttAcceptClient.client = client;
-    }
-
-
-    /**
-     * 瀹㈡埛绔繛鎺�
-     */
-    public void connect() {
-        MqttClient client;
-        try {
-            client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(),
-                    new MemoryPersistence());
-
-            MqttConnectOptions options = new MqttConnectOptions();
-            options.setUserName(mqttProperties.getClientUsername());
-            options.setPassword(mqttProperties.getClientPassword().toCharArray());
-            options.setConnectionTimeout(mqttProperties.getClientTimeout());
-            options.setKeepAliveInterval(mqttProperties.getClientAliveTime());
-            options.setAutomaticReconnect(mqttProperties.getReconnect());
-            options.setCleanSession(mqttProperties.getCleanSession());
-            MqttAcceptClient.setClient(client);
-            // 璁剧疆鍥炶皟
-            client.setCallback(mqttAcceptCallback);
-            client.connect(options);
-        } catch (Exception e) {
-            log.error("MqttAcceptClient connect error,message:{}", e.getMessage());
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * 閲嶆柊杩炴帴
-     */
-    public void reconnection() {
-        try {
-            client.connect();
-        } catch (MqttException e) {
-            log.error("MqttAcceptClient reconnection error,message:{}", e.getMessage());
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * 璁㈤槄鏌愪釜涓婚
-     *
-     * @param topic 涓婚
-     * @param qos   杩炴帴鏂瑰紡
-     */
-    public void subscribe(String topic, int qos) {
-        log.info("========================銆愬紑濮嬭闃呬富棰�:" + topic + "銆�========================");
-        try {
-            client.subscribe(topic, qos);
-        } catch (MqttException e) {
-            log.error("MqttAcceptClient subscribe error,message:{}", e.getMessage());
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * 鍙栨秷璁㈤槄鏌愪釜涓婚
-     *
-     * @param topic
-     */
-    public void unsubscribe(String topic) {
-        log.info("========================銆愬彇娑堣闃呬富棰�:" + topic + "銆�========================");
-        try {
-            client.unsubscribe(topic);
-        } catch (MqttException e) {
-            log.error("MqttAcceptClient unsubscribe error,message:{}", e.getMessage());
-            e.printStackTrace();
-        }
-    }
-
-}
diff --git a/src/main/java/com/fzzy/mqtt/MqttCondition.java b/src/main/java/com/fzzy/mqtt/MqttCondition.java
deleted file mode 100644
index f335e7b..0000000
--- a/src/main/java/com/fzzy/mqtt/MqttCondition.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.fzzy.mqtt;
-
-import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
-import org.springframework.context.annotation.Condition;
-import org.springframework.context.annotation.ConditionContext;
-import org.springframework.core.env.Environment;
-import org.springframework.core.type.AnnotatedTypeMetadata;
-
-/**
- * @Description : 鑷畾涔夐厤缃�,閫氳繃杩欎釜閰嶇疆锛屾潵鎺у埗鍚姩椤圭洰鐨勬椂鍊欐槸鍚﹀惎鍔╩qtt
- */
-public class MqttCondition implements Condition {
-
-    @Override
-    public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {
-        //1銆佽兘鑾峰彇鍒癷oc浣跨敤鐨刡eanfactory
-        ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
-        //2銆佽幏鍙栫被鍔犺浇鍣�
-        ClassLoader classLoader = context.getClassLoader();
-        //3銆佽幏鍙栧綋鍓嶇幆澧冧俊鎭�
-        Environment environment = context.getEnvironment();
-        String isOpen = environment.getProperty("mqtt.isOpen");
-        return Boolean.valueOf(isOpen);
-    }
-}
diff --git a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java
new file mode 100644
index 0000000..caed4e8
--- /dev/null
+++ b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java
@@ -0,0 +1,17 @@
+package com.fzzy.mqtt;
+
+
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Service;
+
+/**
+ * 鎺ㄩ�佹帴鍙�
+ */
+@Service
+@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
+public interface MqttGatewayService {
+
+    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic01, String sendData);
+}
diff --git a/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java b/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java
new file mode 100644
index 0000000..874464c
--- /dev/null
+++ b/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java
@@ -0,0 +1,91 @@
+package com.fzzy.mqtt;
+
+import com.fzzy.gateway.hx2023.websocket.SocketService;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
+
+/**
+ * MQTT娑堣垂绔�
+ */
+@Slf4j
+@Configuration
+@IntegrationComponentScan
+public class MqttInboundConfiguration {
+
+    @Autowired
+    private MqttProperties mqttProperties;
+
+    @Bean
+    public MessageChannel mqttInputChannel() {
+        return new DirectChannel();
+    }
+
+    @Bean
+    public MqttPahoClientFactory receiverMqttClientFactoryForSub() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        String[] array = mqttProperties.getHost().split(",");
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setServerURIs(array);
+        options.setUserName(mqttProperties.getClientUsername());
+        options.setPassword(mqttProperties.getClientPassword().toCharArray());
+        options.setKeepAliveInterval(mqttProperties.getClientAliveTime());
+        //鎺ュ彈绂荤嚎娑堟伅
+        options.setCleanSession(false);
+        options.setMqttVersion(4);
+
+        factory.setConnectionOptions(options);
+
+        return factory;
+    }
+
+    //閰嶇疆client,鐩戝惉鐨則opic
+    @Bean
+    public MessageProducer inbound() {
+        String[] inboundTopics = mqttProperties.getClientTopics().split(",");
+        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
+                mqttProperties.getClientId() + "_inbound", receiverMqttClientFactoryForSub(), inboundTopics);  //瀵筰nboundTopics涓婚杩涜鐩戝惉
+        adapter.setCompletionTimeout(mqttProperties.getClientTimeout());
+        adapter.setQos(1);
+        adapter.setConverter(new DefaultPahoMessageConverter());
+        adapter.setOutputChannel(mqttInputChannel());
+        return adapter;
+    }
+
+    //閫氳繃閫氶亾鑾峰彇鏁版嵁
+    @Bean
+    @ServiceActivator(inputChannel = "mqttInputChannel")  //寮傛澶勭悊
+    public MessageHandler handler() {
+        return new MessageHandler() {
+            @Override
+            public void handleMessage(Message<?> message) throws MessagingException {
+                log.info("----------------------");
+                //鑾峰彇mqtt鐨則opic
+                String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
+                //浣跨敤webSocket杩斿洖缁欏墠绔�
+                SocketService socketService = new SocketService();
+                socketService.onMessage(message.getPayload().toString(), null, topic);
+
+                log.info("message:" + message.getPayload());
+                log.info("PacketId:" + message.getHeaders().getId());
+                log.info("Qos:" + message.getHeaders().get(MqttHeaders.QOS));
+                log.info("topic:" + topic);
+            }
+        };
+    }
+}
diff --git a/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java b/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java
new file mode 100644
index 0000000..ccbd0f1
--- /dev/null
+++ b/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java
@@ -0,0 +1,58 @@
+package com.fzzy.mqtt;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+/**
+ * MQTT鐢熶骇绔�
+ */
+@Slf4j
+@Configuration
+@IntegrationComponentScan
+public class MqttOutboundConfiguration {
+    @Autowired
+    private MqttProperties mqttProperties;
+
+    @Bean
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    @Bean
+    public MqttPahoClientFactory getReceiverMqttConnectOptionsForSend() {
+        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+        String[] array = mqttProperties.getHost().split(",");
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setServerURIs(array);
+
+        if(null == mqttProperties.getClientUsername()) mqttProperties.setClientUsername(" ");
+        if(null == mqttProperties.getClientPassword()) mqttProperties.setClientPassword(" ");
+        options.setUserName(mqttProperties.getClientUsername());
+        options.setPassword(mqttProperties.getClientPassword().toCharArray());
+        // 鎺ュ彈绂荤嚎娑堟伅
+        options.setCleanSession(false); //鍛婅瘔浠g悊瀹㈡埛绔槸鍚﹁寤虹珛鎸佷箙浼氳瘽   false涓哄缓绔嬫寔涔呬細璇�
+        options.setMqttVersion(4);
+        factory.setConnectionOptions(options);
+        return factory;
+    }
+
+    @Bean
+    @ServiceActivator(inputChannel = "mqttOutboundChannel")
+    public MessageHandler mqttOutbound() {
+        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "outbound", getReceiverMqttConnectOptionsForSend());
+        messageHandler.setAsync(true);
+        return messageHandler;
+    }
+
+}
diff --git a/src/main/java/com/fzzy/mqtt/MqttProperties.java b/src/main/java/com/fzzy/mqtt/MqttProperties.java
index 8a7a554..a7293c9 100644
--- a/src/main/java/com/fzzy/mqtt/MqttProperties.java
+++ b/src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -1,15 +1,17 @@
 package com.fzzy.mqtt;
 
 import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 
 /**
  * MQTT 閰嶇疆淇℃伅
  */
-@Component
-@ConfigurationProperties("mqtt")
+@Slf4j
 @Data
+@Component
+@ConfigurationProperties("spring.mqtt")
 public class MqttProperties {
 
     /**
@@ -37,13 +39,13 @@
     /**
      * 瓒呮椂鏃堕棿
      */
-    private int clientTimeout;
+    private int clientTimeout = 5000;
 
     /**
      * 璁剧疆浼氳瘽蹇冭烦鏃堕棿 鍗曚綅涓虹 鏈嶅姟鍣ㄤ細姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风
      * 鍙戦�佷釜娑堟伅鍒ゆ柇瀹㈡埛绔槸鍚﹀湪绾匡紝浣嗚繖涓柟娉曞苟娌℃湁閲嶈繛鐨勬満鍒�
      */
-    private int clientAliveTime;
+    private int clientAliveTime = 30000;
 
     private int clientMaxConnectTime;
 
@@ -52,12 +54,12 @@
     /**
      * 杩炴帴鏂瑰紡
      */
-    private Integer clientQos;
+    private Integer clientQos = 0;
 
     /**
      * 榛樿杩炴帴涓婚锛屼互/#缁撳熬琛ㄧず璁㈤槄鎵�鏈変互test寮�澶寸殑涓婚
      */
-    private String  defaultTopic;
+    private String defaultTopic;
 
     /**
      * 璁剧疆鏄惁娓呯┖session,杩欓噷濡傛灉璁剧疆涓篺alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩�
@@ -75,4 +77,5 @@
      */
     private Boolean isOpen;
 
+
 }
diff --git a/src/main/java/com/fzzy/mqtt/MqttPubController.java b/src/main/java/com/fzzy/mqtt/MqttPubController.java
new file mode 100644
index 0000000..644928d
--- /dev/null
+++ b/src/main/java/com/fzzy/mqtt/MqttPubController.java
@@ -0,0 +1,35 @@
+package com.fzzy.mqtt;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class MqttPubController {
+
+    @Autowired
+    private MqttGatewayService gatewayService;
+
+
+    @RequestMapping("/hello")
+    public String hello() {
+        return "hello!";
+    }
+
+    @RequestMapping("/sendMqtt")
+    public String sendMqtt(String sendData) {
+        System.out.println(sendData);
+        System.out.println("杩涘叆sendMqtt-------" + sendData);
+        gatewayService.sendToMqtt("topic01", (String) sendData);
+        return "Test is OK";
+    }
+
+    @RequestMapping("/sendMqttTopic")
+    public String sendMqtt(String sendData, String topic) {
+        //System.out.println(sendData+"   "+topic);
+        //System.out.println("杩涘叆inbound鍙戦�侊細"+sendData);
+        gatewayService.sendToMqtt(topic, (String) sendData);
+        return "Test is OK";
+    }
+
+}
diff --git a/src/main/resources/application-devGateway.yml b/src/main/resources/application-devGateway.yml
index cc2c8df..09d7b84 100644
--- a/src/main/resources/application-devGateway.yml
+++ b/src/main/resources/application-devGateway.yml
@@ -69,15 +69,14 @@
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 
-
-mqtt:
-  host: tcp://10.13.4.84:11883
-  client-id:
-  client-username:
-  client-password:
-  client-timeout: 10
-  client-alive-time: 20
-  client-max-connect-times: 5
-  client-topics:
-  client-qos: 0
-  isOpen: false
+  mqtt:
+    host: tcp://10.13.4.84:11883
+    client-id:
+    client-username: admin
+    client-password: 123456
+    client-timeout: 10
+    client-alive-time: 20
+    client-max-connect-times: 5
+    client-topics: "/device/${productId}/${deviceId}/message/property/report,/device-message-sender/${productId}/${deviceId}"
+    client-qos: 0
+    isOpen: false

--
Gitblit v1.9.3