From a31a452b9999ba3c811c36b5cb1b3ec0c18d037d Mon Sep 17 00:00:00 2001
From: jiazx0107@163.com <jiazx0107@163.com>
Date: 星期三, 08 十一月 2023 02:23:38 +0800
Subject: [PATCH] 提交MQTT相关功能
---
src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java | 12 +
src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java | 1
src/main/java/com/fzzy/mqtt/MqttProperties.java | 15 +
src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java | 91 +++++++++++++
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java | 2
src/main/java/com/fzzy/gateway/service/GatewayConfService.java | 1
src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java | 102 ++++++++++++++
/dev/null | 25 ---
src/main/resources/application-devGateway.yml | 23 +-
src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java | 58 ++++++++
src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml | 10 +
src/main/java/com/fzzy/mqtt/MqttGatewayService.java | 17 ++
src/main/java/com/fzzy/mqtt/MqttPubController.java | 35 +++++
13 files changed, 345 insertions(+), 47 deletions(-)
diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java b/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
index e98cafa..f9b093f 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
@@ -1,19 +1,21 @@
package com.fzzy.gateway.hx2023.service;
+import com.bstek.dorado.annotation.Expose;
import com.fzzy.gateway.api.GatewayRemoteManager;
import com.fzzy.gateway.api.GatewayRemoteService;
import com.fzzy.gateway.entity.GatewayConf;
import com.fzzy.gateway.service.GatewayConfService;
-import lombok.extern.slf4j.Slf4j;
+
+
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
+
import java.util.List;
/**
* 褰撳墠鎺ュ彛锛屽垵濮嬪寲鐩稿叧
*/
-@Slf4j
@Component
public class ApiInitService {
@@ -23,6 +25,10 @@
private GatewayRemoteManager gatewayRemoteManager;
+ /**
+ * apiInitService#init
+ */
+ @Expose
public void init() {
List<GatewayConf> list = confService.listAll();
@@ -38,5 +44,7 @@
}
}
+
+
}
diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java b/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java
index a6088d8..c43f29e 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java
@@ -33,7 +33,6 @@
@Resource
private ApiLogRep apiLogRep;
-
@Resource
private GatewayConfService gatewayConfService;
diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java
new file mode 100644
index 0000000..1025124
--- /dev/null
+++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java
@@ -0,0 +1,102 @@
+package com.fzzy.gateway.hx2023.websocket;
+
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+import org.springframework.web.bind.annotation.CrossOrigin;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+@Component
+@CrossOrigin
+@Service
+@ServerEndpoint(value = "/mqtt")
+public class SocketService {
+
+ /**
+ * 鐢ㄦ潵瀛樻斁姣忎釜瀹㈡埛绔搴旂殑MyWebSocket瀵硅薄銆�
+ **/
+ private static CopyOnWriteArraySet<SocketService> socketSet = new CopyOnWriteArraySet<>();
+ /**
+ * 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹�
+ **/
+ private Session session;
+ /**
+ * 鐢ㄦ埛鍚嶇О
+ **/
+ private String nickname;
+ /**
+ * 鐢ㄦ潵璁板綍sessionId鍜岃session杩涜缁戝畾
+ **/
+ private static Map<String,Session> map = new HashMap<String, Session>();
+
+ /**
+ * 杩炴帴寤虹珛鎴愬姛璋冪敤鐨勬柟娉�
+ */
+ @OnOpen
+ public void onOpen(Session session,@PathParam("nickname") String nickname) {
+ this.session = session;
+ this.nickname=nickname;
+
+ map.put(nickname, session);
+ socketSet.add(this);
+
+ System.out.println("鏈夋柊杩炴帴鍔犲叆:"+nickname+",褰撳墠鍦ㄧ嚎浜烘暟涓�" + socketSet.size());
+ }
+
+ /**
+ * 杩炴帴鍏抽棴璋冪敤鐨勬柟娉�
+ */
+ @OnClose
+ public void onClose() {
+ socketSet.remove(this);
+ List<String> nickname = this.session.getRequestParameterMap().get("nickname");
+ for(String nick:nickname) {
+ map.remove(nick);
+ }
+ System.out.println("鏈変竴杩炴帴鍏抽棴锛佸綋鍓嶅湪绾夸汉鏁颁负" + socketSet.size());
+ }
+
+ /**
+ * 鏀跺埌瀹㈡埛绔秷鎭悗璋冪敤鐨勬柟娉�
+ */
+ @OnMessage
+ public void onMessage(String message, Session session,@PathParam("nickname") String nickname) {
+ System.out.println("鏉ヨ嚜瀹㈡埛绔殑娑堟伅-->"+nickname+": " + message);
+ //灏唌qtt鍙戦�佽繃鏉ョ殑鏁版嵁杩斿洖缁欏墠绔�
+ try {
+// JSONObject parse = JSONObject.parse(message);
+// Map<String,Object> valuesMap = (Map<String, Object>) parse.getByPath("values");
+// String tag1 = valuesMap.get("tag1").toString();
+// String replace = tag1.replace("\r\n", "");
+// String substring = replace.substring(replace.indexOf(":") + 1).trim().replace(" ",":");
+// String tag2 = valuesMap.get("tag2").toString();
+// String substring1=substring+"tag2:"+tag2;
+// String result="{"+substring1+"}";
+// //鍙戦�佺粰鍓嶇锛岀敤浣滈〉闈㈡覆鏌�
+// Session fromSession = map.get(nickname);
+// fromSession.getAsyncRemote().sendText(result);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 鍙戠敓閿欒鏃惰皟鐢�
+ */
+ @OnError
+ public void onError(Session session, Throwable error) {
+ System.out.println("鍙戠敓閿欒");
+ error.printStackTrace();
+ }
+
+
+}
diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
index 28e5666..9d9756c 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
@@ -21,7 +21,7 @@
*/
@Slf4j
@Component
-@ServerEndpoint(value = "/mqtt")
+//@ServerEndpoint(value = "/mqtt")
public class WebSocketMqtt {
private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
diff --git a/src/main/java/com/fzzy/gateway/service/GatewayConfService.java b/src/main/java/com/fzzy/gateway/service/GatewayConfService.java
index 81ddd0a..2c8b6b8 100644
--- a/src/main/java/com/fzzy/gateway/service/GatewayConfService.java
+++ b/src/main/java/com/fzzy/gateway/service/GatewayConfService.java
@@ -64,6 +64,7 @@
gatewayConfRep.delete(data2);
return null;
}
+
public void updateCache(GatewayConf conf) {
String key = RedisConst.buildKey(RedisConst.KYE_CONF_GATEWAY, conf.getKqdm());
diff --git a/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml b/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
index 033eef2..4a77418 100644
--- a/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
+++ b/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
@@ -166,6 +166,12 @@
<Property name="iconClass">fa fa-search</Property>
<Property name="exClassName">toolbar-button-push</Property>
</ToolBarButton>
+ <ToolBarButton>
+ <Property name="caption">缃戝叧鍒濆鍖�</Property>
+ <Property name="iconClass">fa fa-search</Property>
+ <Property name="exClassName">toolbar-button-push</Property>
+ <Property name="action">ajaxInit</Property>
+ </ToolBarButton>
</ToolBar>
<DataGrid id="dgMain">
<Property name="dataSet">dsMain</Property>
@@ -377,6 +383,10 @@
<Property name="service">gatewayDeviceService#ajaxTestGrain</Property>
<Property name="executingMessage">鍦ㄥ姫鍔涙墽琛屼腑鈥︹��</Property>
</AjaxAction>
+ <AjaxAction id="ajaxInit">
+ <Property name="service">apiInitService#init</Property>
+ <Property name="executingMessage">鍦ㄥ姫鍔涙墽琛屼腑鈥︹��</Property>
+ </AjaxAction>
<Dialog id="dialogWeight">
<Property name="width">400</Property>
<Property name="height">300</Property>
diff --git a/src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java b/src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java
deleted file mode 100644
index 94ace07..0000000
--- a/src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.fzzy.mqtt;
-
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * @Description : MQTT鎺ュ彈鏈嶅姟鐨勫洖璋冪被
- */
-@Slf4j
-@Component
-public class MqttAcceptCallback implements MqttCallbackExtended {
-
- @Autowired
- private MqttAcceptClient mqttAcceptClient;
-
- @Autowired
- private MqttProperties mqttProperties;
-
-
- /**
- * 瀹㈡埛绔柇寮�鍚庤Е鍙�
- *
- * @param throwable
- */
- @Override
- public void connectionLost(Throwable throwable) {
- log.info("杩炴帴鏂紑锛屽彲浠ラ噸杩�");
- if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {
- log.info("銆恊mqx閲嶆柊杩炴帴銆�....................................................");
- mqttAcceptClient.reconnection();
- }
- }
-
-
- /**
- * 瀹㈡埛绔敹鍒版秷鎭Е鍙�
- *
- * @param topic 涓婚
- * @param mqttMessage 娑堟伅
- */
- @Override
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
- log.info("銆愭帴鏀舵秷鎭富棰樸��:" + topic);
- log.info("銆愭帴鏀舵秷鎭疩os銆�:" + mqttMessage.getQos());
- log.info("銆愭帴鏀舵秷鎭唴瀹广��:" + new String(mqttMessage.getPayload()));
- // int i = 1/0;
- }
-
-
- /**
- * 鍙戝竷娑堟伅鎴愬姛
- *
- * @param token token
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- String[] topics = token.getTopics();
- for (String topic : topics) {
- log.info("鍚戜富棰樸��" + topic + "銆戝彂閫佹秷鎭垚鍔燂紒");
- }
- try {
- MqttMessage message = token.getMessage();
- byte[] payload = message.getPayload();
- String s = new String(payload, "UTF-8");
- log.info("銆愭秷鎭唴瀹广��:" + s);
- } catch (Exception e) {
- log.error("MqttAcceptCallback deliveryComplete error,message:{}", e.getMessage());
- e.printStackTrace();
- }
- }
-
- /**
- * 杩炴帴emq鏈嶅姟鍣ㄥ悗瑙﹀彂
- *
- * @param b
- * @param s
- */
- @Override
- public void connectComplete(boolean b, String s) {
- log.info("============================= 瀹㈡埛绔��" + MqttAcceptClient.client.getClientId() + "銆戣繛鎺ユ垚鍔燂紒=============================");
- // 浠�/#缁撳熬琛ㄧず璁㈤槄鎵�鏈変互test寮�澶寸殑涓婚
- // 璁㈤槄鎵�鏈夋満鏋勪富棰�
- mqttAcceptClient.subscribe(mqttProperties.getDefaultTopic(), 0);
- }
-
-}
diff --git a/src/main/java/com/fzzy/mqtt/MqttAcceptClient.java b/src/main/java/com/fzzy/mqtt/MqttAcceptClient.java
deleted file mode 100644
index a7823ad..0000000
--- a/src/main/java/com/fzzy/mqtt/MqttAcceptClient.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package com.fzzy.mqtt;
-
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * @Description : MQTT鎺ュ彈鏈嶅姟鐨勫鎴风
- */
-@Slf4j
-@Component
-public class MqttAcceptClient {
-
- @Autowired
- private MqttAcceptCallback mqttAcceptCallback;
-
- @Autowired
- private MqttProperties mqttProperties;
-
- public static MqttClient client;
-
- private static MqttClient getClient() {
- return client;
- }
-
- private static void setClient(MqttClient client) {
- MqttAcceptClient.client = client;
- }
-
-
- /**
- * 瀹㈡埛绔繛鎺�
- */
- public void connect() {
- MqttClient client;
- try {
- client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(),
- new MemoryPersistence());
-
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName(mqttProperties.getClientUsername());
- options.setPassword(mqttProperties.getClientPassword().toCharArray());
- options.setConnectionTimeout(mqttProperties.getClientTimeout());
- options.setKeepAliveInterval(mqttProperties.getClientAliveTime());
- options.setAutomaticReconnect(mqttProperties.getReconnect());
- options.setCleanSession(mqttProperties.getCleanSession());
- MqttAcceptClient.setClient(client);
- // 璁剧疆鍥炶皟
- client.setCallback(mqttAcceptCallback);
- client.connect(options);
- } catch (Exception e) {
- log.error("MqttAcceptClient connect error,message:{}", e.getMessage());
- e.printStackTrace();
- }
- }
-
- /**
- * 閲嶆柊杩炴帴
- */
- public void reconnection() {
- try {
- client.connect();
- } catch (MqttException e) {
- log.error("MqttAcceptClient reconnection error,message:{}", e.getMessage());
- e.printStackTrace();
- }
- }
-
- /**
- * 璁㈤槄鏌愪釜涓婚
- *
- * @param topic 涓婚
- * @param qos 杩炴帴鏂瑰紡
- */
- public void subscribe(String topic, int qos) {
- log.info("========================銆愬紑濮嬭闃呬富棰�:" + topic + "銆�========================");
- try {
- client.subscribe(topic, qos);
- } catch (MqttException e) {
- log.error("MqttAcceptClient subscribe error,message:{}", e.getMessage());
- e.printStackTrace();
- }
- }
-
- /**
- * 鍙栨秷璁㈤槄鏌愪釜涓婚
- *
- * @param topic
- */
- public void unsubscribe(String topic) {
- log.info("========================銆愬彇娑堣闃呬富棰�:" + topic + "銆�========================");
- try {
- client.unsubscribe(topic);
- } catch (MqttException e) {
- log.error("MqttAcceptClient unsubscribe error,message:{}", e.getMessage());
- e.printStackTrace();
- }
- }
-
-}
diff --git a/src/main/java/com/fzzy/mqtt/MqttCondition.java b/src/main/java/com/fzzy/mqtt/MqttCondition.java
deleted file mode 100644
index f335e7b..0000000
--- a/src/main/java/com/fzzy/mqtt/MqttCondition.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.fzzy.mqtt;
-
-import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
-import org.springframework.context.annotation.Condition;
-import org.springframework.context.annotation.ConditionContext;
-import org.springframework.core.env.Environment;
-import org.springframework.core.type.AnnotatedTypeMetadata;
-
-/**
- * @Description : 鑷畾涔夐厤缃�,閫氳繃杩欎釜閰嶇疆锛屾潵鎺у埗鍚姩椤圭洰鐨勬椂鍊欐槸鍚﹀惎鍔╩qtt
- */
-public class MqttCondition implements Condition {
-
- @Override
- public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {
- //1銆佽兘鑾峰彇鍒癷oc浣跨敤鐨刡eanfactory
- ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
- //2銆佽幏鍙栫被鍔犺浇鍣�
- ClassLoader classLoader = context.getClassLoader();
- //3銆佽幏鍙栧綋鍓嶇幆澧冧俊鎭�
- Environment environment = context.getEnvironment();
- String isOpen = environment.getProperty("mqtt.isOpen");
- return Boolean.valueOf(isOpen);
- }
-}
diff --git a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java
new file mode 100644
index 0000000..caed4e8
--- /dev/null
+++ b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java
@@ -0,0 +1,17 @@
+package com.fzzy.mqtt;
+
+
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Service;
+
+/**
+ * 鎺ㄩ�佹帴鍙�
+ */
+@Service
+@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
+public interface MqttGatewayService {
+
+ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic01, String sendData);
+}
diff --git a/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java b/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java
new file mode 100644
index 0000000..874464c
--- /dev/null
+++ b/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java
@@ -0,0 +1,91 @@
+package com.fzzy.mqtt;
+
+import com.fzzy.gateway.hx2023.websocket.SocketService;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.MessagingException;
+
+/**
+ * MQTT娑堣垂绔�
+ */
+@Slf4j
+@Configuration
+@IntegrationComponentScan
+public class MqttInboundConfiguration {
+
+ @Autowired
+ private MqttProperties mqttProperties;
+
+ @Bean
+ public MessageChannel mqttInputChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean
+ public MqttPahoClientFactory receiverMqttClientFactoryForSub() {
+ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+ String[] array = mqttProperties.getHost().split(",");
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setServerURIs(array);
+ options.setUserName(mqttProperties.getClientUsername());
+ options.setPassword(mqttProperties.getClientPassword().toCharArray());
+ options.setKeepAliveInterval(mqttProperties.getClientAliveTime());
+ //鎺ュ彈绂荤嚎娑堟伅
+ options.setCleanSession(false);
+ options.setMqttVersion(4);
+
+ factory.setConnectionOptions(options);
+
+ return factory;
+ }
+
+ //閰嶇疆client,鐩戝惉鐨則opic
+ @Bean
+ public MessageProducer inbound() {
+ String[] inboundTopics = mqttProperties.getClientTopics().split(",");
+ MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
+ mqttProperties.getClientId() + "_inbound", receiverMqttClientFactoryForSub(), inboundTopics); //瀵筰nboundTopics涓婚杩涜鐩戝惉
+ adapter.setCompletionTimeout(mqttProperties.getClientTimeout());
+ adapter.setQos(1);
+ adapter.setConverter(new DefaultPahoMessageConverter());
+ adapter.setOutputChannel(mqttInputChannel());
+ return adapter;
+ }
+
+ //閫氳繃閫氶亾鑾峰彇鏁版嵁
+ @Bean
+ @ServiceActivator(inputChannel = "mqttInputChannel") //寮傛澶勭悊
+ public MessageHandler handler() {
+ return new MessageHandler() {
+ @Override
+ public void handleMessage(Message<?> message) throws MessagingException {
+ log.info("----------------------");
+ //鑾峰彇mqtt鐨則opic
+ String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
+ //浣跨敤webSocket杩斿洖缁欏墠绔�
+ SocketService socketService = new SocketService();
+ socketService.onMessage(message.getPayload().toString(), null, topic);
+
+ log.info("message:" + message.getPayload());
+ log.info("PacketId:" + message.getHeaders().getId());
+ log.info("Qos:" + message.getHeaders().get(MqttHeaders.QOS));
+ log.info("topic:" + topic);
+ }
+ };
+ }
+}
diff --git a/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java b/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java
new file mode 100644
index 0000000..ccbd0f1
--- /dev/null
+++ b/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java
@@ -0,0 +1,58 @@
+package com.fzzy.mqtt;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+/**
+ * MQTT鐢熶骇绔�
+ */
+@Slf4j
+@Configuration
+@IntegrationComponentScan
+public class MqttOutboundConfiguration {
+ @Autowired
+ private MqttProperties mqttProperties;
+
+ @Bean
+ public MessageChannel mqttOutboundChannel() {
+ return new DirectChannel();
+ }
+
+ @Bean
+ public MqttPahoClientFactory getReceiverMqttConnectOptionsForSend() {
+ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+ String[] array = mqttProperties.getHost().split(",");
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setServerURIs(array);
+
+ if(null == mqttProperties.getClientUsername()) mqttProperties.setClientUsername(" ");
+ if(null == mqttProperties.getClientPassword()) mqttProperties.setClientPassword(" ");
+ options.setUserName(mqttProperties.getClientUsername());
+ options.setPassword(mqttProperties.getClientPassword().toCharArray());
+ // 鎺ュ彈绂荤嚎娑堟伅
+ options.setCleanSession(false); //鍛婅瘔浠g悊瀹㈡埛绔槸鍚﹁寤虹珛鎸佷箙浼氳瘽 false涓哄缓绔嬫寔涔呬細璇�
+ options.setMqttVersion(4);
+ factory.setConnectionOptions(options);
+ return factory;
+ }
+
+ @Bean
+ @ServiceActivator(inputChannel = "mqttOutboundChannel")
+ public MessageHandler mqttOutbound() {
+ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "outbound", getReceiverMqttConnectOptionsForSend());
+ messageHandler.setAsync(true);
+ return messageHandler;
+ }
+
+}
diff --git a/src/main/java/com/fzzy/mqtt/MqttProperties.java b/src/main/java/com/fzzy/mqtt/MqttProperties.java
index 8a7a554..a7293c9 100644
--- a/src/main/java/com/fzzy/mqtt/MqttProperties.java
+++ b/src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -1,15 +1,17 @@
package com.fzzy.mqtt;
import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* MQTT 閰嶇疆淇℃伅
*/
-@Component
-@ConfigurationProperties("mqtt")
+@Slf4j
@Data
+@Component
+@ConfigurationProperties("spring.mqtt")
public class MqttProperties {
/**
@@ -37,13 +39,13 @@
/**
* 瓒呮椂鏃堕棿
*/
- private int clientTimeout;
+ private int clientTimeout = 5000;
/**
* 璁剧疆浼氳瘽蹇冭烦鏃堕棿 鍗曚綅涓虹 鏈嶅姟鍣ㄤ細姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风
* 鍙戦�佷釜娑堟伅鍒ゆ柇瀹㈡埛绔槸鍚﹀湪绾匡紝浣嗚繖涓柟娉曞苟娌℃湁閲嶈繛鐨勬満鍒�
*/
- private int clientAliveTime;
+ private int clientAliveTime = 30000;
private int clientMaxConnectTime;
@@ -52,12 +54,12 @@
/**
* 杩炴帴鏂瑰紡
*/
- private Integer clientQos;
+ private Integer clientQos = 0;
/**
* 榛樿杩炴帴涓婚锛屼互/#缁撳熬琛ㄧず璁㈤槄鎵�鏈変互test寮�澶寸殑涓婚
*/
- private String defaultTopic;
+ private String defaultTopic;
/**
* 璁剧疆鏄惁娓呯┖session,杩欓噷濡傛灉璁剧疆涓篺alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩�
@@ -75,4 +77,5 @@
*/
private Boolean isOpen;
+
}
diff --git a/src/main/java/com/fzzy/mqtt/MqttPubController.java b/src/main/java/com/fzzy/mqtt/MqttPubController.java
new file mode 100644
index 0000000..644928d
--- /dev/null
+++ b/src/main/java/com/fzzy/mqtt/MqttPubController.java
@@ -0,0 +1,35 @@
+package com.fzzy.mqtt;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class MqttPubController {
+
+ @Autowired
+ private MqttGatewayService gatewayService;
+
+
+ @RequestMapping("/hello")
+ public String hello() {
+ return "hello!";
+ }
+
+ @RequestMapping("/sendMqtt")
+ public String sendMqtt(String sendData) {
+ System.out.println(sendData);
+ System.out.println("杩涘叆sendMqtt-------" + sendData);
+ gatewayService.sendToMqtt("topic01", (String) sendData);
+ return "Test is OK";
+ }
+
+ @RequestMapping("/sendMqttTopic")
+ public String sendMqtt(String sendData, String topic) {
+ //System.out.println(sendData+" "+topic);
+ //System.out.println("杩涘叆inbound鍙戦�侊細"+sendData);
+ gatewayService.sendToMqtt(topic, (String) sendData);
+ return "Test is OK";
+ }
+
+}
diff --git a/src/main/resources/application-devGateway.yml b/src/main/resources/application-devGateway.yml
index cc2c8df..09d7b84 100644
--- a/src/main/resources/application-devGateway.yml
+++ b/src/main/resources/application-devGateway.yml
@@ -69,15 +69,14 @@
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-
-mqtt:
- host: tcp://10.13.4.84:11883
- client-id:
- client-username:
- client-password:
- client-timeout: 10
- client-alive-time: 20
- client-max-connect-times: 5
- client-topics:
- client-qos: 0
- isOpen: false
+ mqtt:
+ host: tcp://10.13.4.84:11883
+ client-id:
+ client-username: admin
+ client-password: 123456
+ client-timeout: 10
+ client-alive-time: 20
+ client-max-connect-times: 5
+ client-topics: "/device/${productId}/${deviceId}/message/property/report,/device-message-sender/${productId}/${deviceId}"
+ client-qos: 0
+ isOpen: false
--
Gitblit v1.9.3