From 97c75a868e9fca03598dfa862bdd7ad94fd5fdcb Mon Sep 17 00:00:00 2001
From: jiazx0107@163.com <jiazx0107@163.com>
Date: 星期四, 09 十一月 2023 00:54:08 +0800
Subject: [PATCH] 调整MQTT
---
src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java | 54 ++++++++++
src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java | 4
src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java | 66 +++++++------
src/main/java/com/fzzy/mqtt/MqttProviderConfig.java | 8 +
src/main/java/com/fzzy/mqtt/MqttProperties.java | 10 -
src/main/java/com/fzzy/gateway/hx2023/ScConstant.java | 6 +
src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java | 7
src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml | 19 +++
/dev/null | 48 ---------
src/main/resources/application-devGateway.yml | 3
src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java | 2
src/main/java/com/fzzy/mqtt/MqttController.java | 4
src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java | 2
src/main/java/com/fzzy/mqtt/MqttPublishService.java | 65 +++++++-----
14 files changed, 172 insertions(+), 126 deletions(-)
diff --git a/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java b/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java
index 4bb2a3e..379583d 100644
--- a/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java
+++ b/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java
@@ -72,6 +72,8 @@
String url = DEFAULT_URL;
url = url.replace("{appId}", DEFAULT_APP_ID).replace("{appsecret}", DEFAULT_APP_SECRET).replace("{cityid}", DEFAULT_CITYID);
+
+ log.debug("------姘旇薄URL---{}",url);
String result = GatewayHttpUtil.doGet(url, null);
if (null == result) {
diff --git a/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java b/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java
index badcdf6..03b0885 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java
@@ -26,4 +26,10 @@
public static int CODE_200 = 200;
+
+ /**
+ * 涓嬪彂鎸囦护鍥炲鎶ユ枃topic
+ */
+ public static String TOPIC_REPORT = "/${productId}/${deviceId}/properties/report";
+
}
diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java b/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java
index d863ab5..e782bd8 100644
--- a/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java
+++ b/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java
@@ -9,6 +9,7 @@
import com.fzzy.gateway.hx2023.data.CloudSendData;
import com.fzzy.gateway.hx2023.data.SyncReqData;
import com.fzzy.mqtt.MqttProviderConfig;
+import com.fzzy.mqtt.MqttPublishService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -24,7 +25,7 @@
@Resource
private GatewayRemoteManager gatewayRemoteManager;
@Resource
- private MqttProviderConfig providerClient;
+ private MqttPublishService publishService;
/**
@@ -69,11 +70,11 @@
//鑷姩鎺ㄩ��
if (200 == resp.getCode() && syncReqData.isAutoReplay()) {
- String topic = "/${productId}/${deviceId}/properties/report";
+ String topic = ScConstant.TOPIC_REPORT;
topic = topic.replace("${productId}", device.getProductId()).replace("${deviceId}", device.getDeviceId());
- providerClient.publish(topic, resp.getData());
+ publishService.publishMsg(topic, resp.getData());
log.info("=======绮儏鎺ㄩ��==========={}", resp.getData());
}
diff --git a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java
index 312a426..d571fc3 100644
--- a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java
+++ b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java
@@ -13,17 +13,20 @@
import com.fzzy.gateway.GatewayUtils;
import com.fzzy.gateway.api.DeviceReportService;
import com.fzzy.gateway.api.GatewayRemoteManager;
+import com.fzzy.gateway.data.BaseResp;
import com.fzzy.gateway.entity.GatewayDevice;
import com.fzzy.gateway.hx2023.ScConstant;
import com.fzzy.gateway.hx2023.data.*;
import com.fzzy.gateway.service.repository.GatewayDeviceRep;
+import com.fzzy.mqtt.MqttProviderConfig;
import com.fzzy.mqtt.MqttPublishService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateFormatUtils;
import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
@@ -47,6 +50,9 @@
private GatewayRemoteManager gatewayRemoteManager;
@Resource
private MqttPublishService mqttPublishService;
+ @Resource
+ private MqttProviderConfig providerClient;
+
/**
* gatewayDeviceService#listAll
*
@@ -67,7 +73,7 @@
public void updateSave(GatewayDevice data) {
GatewayDevice data2 = new GatewayDevice();
BeanUtils.copyProperties(data, data2);
-
+
if (null == data2.getDeviceSn()) {
if (null != data2.getIp()) {
data.setDeviceSn(data2.getIp());
@@ -77,10 +83,10 @@
}
if (null == data2.getId()) {
- data2.setId(ContextUtil.getUUID());
+ data2.setId(ContextUtil.getUUID());
gatewayDeviceRep.save(data2);
- }else{
- gatewayDeviceRep.save(data2);
+ } else {
+ gatewayDeviceRep.save(data2);
}
flushCache();
}
@@ -112,10 +118,6 @@
}
-
-
-
-
/**
* gatewayDeviceService#ajaxTestWeight
* 鍦扮鎺ㄩ�佹祴璇�
@@ -128,16 +130,16 @@
double weigh = (double) parameter.getWeight();
//double weigh = Double.parseDouble("3500.0");
List<GatewayDevice> devices = listAll();
- if(devices == null || devices.size()<= 0){
+ if (devices == null || devices.size() <= 0) {
return "娌℃湁璁惧";
}
- List<GatewayDevice> weights = devices.stream().filter(s ->(GatewayDeviceType.TYPE_01.getCode().equals(s.getType()))).collect(Collectors.toList());
- if(weights == null || weights.size()<= 0){
- return "娌℃湁鑾峰彇鍒板湴纾呰澶�";
- }
+ List<GatewayDevice> weights = devices.stream().filter(s -> (GatewayDeviceType.TYPE_01.getCode().equals(s.getType()))).collect(Collectors.toList());
+ if (weights == null || weights.size() <= 0) {
+ return "娌℃湁鑾峰彇鍒板湴纾呰澶�";
+ }
String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
- for (GatewayDevice device: weights) {
+ for (GatewayDevice device : weights) {
WebSocketPacket packet = new WebSocketPacket();
@@ -163,7 +165,7 @@
packet.setTimestamp(System.currentTimeMillis());
- topic = "/device/"+header.getProductId()+"/"+device.getDeviceId()+"/message/property/report";
+ topic = "/device/" + header.getProductId() + "/" + device.getDeviceId() + "/message/property/report";
mqttPublishService.publishMsg(topic, JSON.toJSONString(packet));
}
return "SUCCESS";
@@ -172,6 +174,7 @@
/**
* gatewayDeviceService#ajaxTestLpr
* 鍦扮鎺ㄩ�佹祴璇�
+ *
* @return
*/
@Expose
@@ -179,16 +182,16 @@
//String carNumber = parameter.getCarNumber();
String carNumber = "宸滱12345";
List<GatewayDevice> devices = listAll();
- if(devices == null || devices.size()<= 0){
+ if (devices == null || devices.size() <= 0) {
return "娌℃湁璁惧";
}
- List<GatewayDevice> weights = devices.stream().filter(s ->(GatewayDeviceType.TYPE_02.getCode().equals(s.getType()))).collect(Collectors.toList());
- if(weights == null || weights.size()<= 0){
+ List<GatewayDevice> weights = devices.stream().filter(s -> (GatewayDeviceType.TYPE_02.getCode().equals(s.getType()))).collect(Collectors.toList());
+ if (weights == null || weights.size() <= 0) {
return "娌℃湁鑾峰彇鍒拌澶�";
}
String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
- for (GatewayDevice device: weights) {
+ for (GatewayDevice device : weights) {
WebSocketPacket packet = new WebSocketPacket();
@@ -212,7 +215,7 @@
packet.setTimestamp(System.currentTimeMillis());
- topic = "/device/"+header.getProductId()+"/"+device.getDeviceId()+"/message/property/report";
+ topic = "/device/" + header.getProductId() + "/" + device.getDeviceId() + "/message/property/report";
mqttPublishService.publishMsg(topic, JSON.toJSONString(packet));
}
return "SUCCESS";
@@ -247,6 +250,9 @@
return this.pushByV40(list, start, end);
}
+
+
+
private String pushByV40(List<GatewayDevice> list, Date start, Date end) {
@@ -352,27 +358,27 @@
//鍒ゆ柇鏈�澶�
if (curTemp.equals(result.getMaxTemperature())) {
- result.setMaxX(x+"");
- result.setMaxY(y+"");
- result.setMaxZ(position+"");
+ result.setMaxX(x + "");
+ result.setMaxY(y + "");
+ result.setMaxZ(position + "");
}
//鍒ゆ柇鏈�灏�
if (curTemp.equals(result.getMinTemperature())) {
- result.setMinX(x+"");
- result.setMinY(y+"");
- result.setMinZ(position+"");
+ result.setMinX(x + "");
+ result.setMinY(y + "");
+ result.setMinZ(position + "");
}
- temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", x+"", y+""));
+ temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", x + "", y + ""));
}
//绮俯淇℃伅
JSONObject trhInfo = new JSONObject();
- // TRHInfo trhInfo = new TRHInfo();
- trhInfo.put("temperature",temperature);
+ // TRHInfo trhInfo = new TRHInfo();
+ trhInfo.put("temperature", temperature);
//浠撴俯搴︿俊鎭�
@@ -385,7 +391,7 @@
List<KafkaGrainTH> temperatureAndhumidity = new ArrayList<>();
temperatureAndhumidity.add(grainTH);
- trhInfo.put("temperatureAndhumidity",temperatureAndhumidity);
+ trhInfo.put("temperatureAndhumidity", temperatureAndhumidity);
//trhInfo.put("temperatureAndhumidity",grainTH);
JSONObject params = new JSONObject();
diff --git a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
index 42f0067..c17e815 100644
--- a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
+++ b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
@@ -1,10 +1,19 @@
package com.fzzy.gateway.service;
import com.bstek.dorado.annotation.DataResolver;
+import com.bstek.dorado.annotation.Expose;
+import com.fzzy.api.data.GatewayDeviceType;
import com.fzzy.api.utils.ContextUtil;
import com.fzzy.gateway.GatewayUtils;
+import com.fzzy.gateway.api.GatewayRemoteManager;
+import com.fzzy.gateway.data.BaseResp;
import com.fzzy.gateway.entity.GatewayDevice;
+import com.fzzy.gateway.hx2023.ScConstant;
+import com.fzzy.gateway.hx2023.data.SyncReqData;
import com.fzzy.gateway.service.repository.GatewayDeviceRep;
+import com.fzzy.mqtt.MqttProviderConfig;
+import com.fzzy.mqtt.MqttPublishService;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
@@ -12,11 +21,17 @@
import javax.annotation.Resource;
import java.util.List;
+@Slf4j
@Component
public class GatewayDeviceService2 {
@Resource
private GatewayDeviceRep gatewayDeviceRep;
+
+ @Resource
+ private GatewayRemoteManager gatewayRemoteManager;
+ @Resource
+ private MqttPublishService publishService;
public List<GatewayDevice> listAll() {
Sort sort = new Sort(Sort.Direction.ASC, "deviceId");
@@ -59,4 +74,43 @@
}
}
+
+ /**
+ * 娴嬭瘯MQTT绮儏妫�娴�
+ * gatewayDeviceService#ajaxTestGrain2
+ * 绮儏鎺ㄩ�佹祴璇�
+ *
+ * @param data
+ * @return
+ */
+ @Expose
+ public String ajaxTestGrain2(GatewayDevice data) {
+
+ SyncReqData reqData = new SyncReqData();
+ reqData.setDevice(data);
+ reqData.setAutoReplay(true);
+ reqData.setMessageType(ScConstant.MESSAGE_TYPE_INVOKE_FUNCTION);
+ reqData.setMessageId(ScConstant.getMessageId());
+ reqData.setFunctionId(ScConstant.FUNCTION_getTAndRHInfo);
+
+
+ if (!GatewayDeviceType.TYPE_07.getCode().equals(data.getType())) {
+ return "ERROR锛氬綋鍓嶈澶囬潪绮儏璁惧涓嶆敮鎸佸綋鍓嶆搷浣�";
+ }
+
+ BaseResp resp = gatewayRemoteManager.getSyncGrainService(data.getSyncProtocol()).syncGrain2(reqData);
+
+ //鑷姩鎺ㄩ��
+ if (200 == resp.getCode() && reqData.isAutoReplay()) {
+ String topic = ScConstant.TOPIC_REPORT;
+ topic = topic.replace("${productId}", data.getProductId()).replace("${deviceId}", data.getDeviceId());
+
+ publishService.publishMsg(topic, resp.getData());
+ }
+
+ log.info("=======鎵嬪姩娴嬭瘯绮儏鎺ㄩ��==========={}", resp.getData());
+
+ return "SUCCESS锛氭墽琛屽畬鎴�";
+ }
+
}
diff --git a/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml b/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml
index 9db29e6..75ba1bf 100644
--- a/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml
+++ b/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml
@@ -115,11 +115,11 @@
<Property name="label">璁惧绫诲瀷KEY</Property>
</PropertyDef>
<PropertyDef name="cableRule">
- <Property/>
+ <Property></Property>
<Property name="label">甯冪嚎瑙勫垯</Property>
</PropertyDef>
<PropertyDef name="cableCir">
- <Property/>
+ <Property></Property>
<Property name="label">绛掍粨灞傝鍒�</Property>
</PropertyDef>
</DataType>
@@ -179,6 +179,17 @@
<Property name="caption">鍒犻櫎</Property>
<Property name="iconClass">fa fa-minus</Property>
<Property name="width">90</Property>
+ <Property name="exClassName">toolbar-button-warn</Property>
+ </ToolBarButton>
+ <Separator/>
+ <ToolBarButton>
+ <ClientEvent name="onClick">var cur = view.get("#dgMain").getCurrentItem();
+view.get("#ajaxTestGrain").set("parameter",cur).execute(function(result){
+ $alert(result);
+});
+</ClientEvent>
+ <Property name="caption">娴嬭瘯绮儏</Property>
+ <Property name="iconClass">fa fa-minus</Property>
<Property name="exClassName">toolbar-button-warn</Property>
</ToolBarButton>
</ToolBar>
@@ -367,5 +378,9 @@
<Property name="service">gatewayDeviceService#delData</Property>
<Property name="confirmMessage">纭畾瑕佸垹闄や箞锛�</Property>
</AjaxAction>
+ <AjaxAction id="ajaxTestGrain">
+ <Property name="service">gatewayDeviceService2#ajaxTestGrain2</Property>
+ <Property name="confirmMessage">纭畾瑕佹墜鍔ㄦ墽琛岀伯鎯呬箞锛�</Property>
+ </AjaxAction>
</View>
</ViewConfig>
diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
index ca83cb1..076e3f7 100644
--- a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
+++ b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
@@ -36,7 +36,7 @@
public void connect() {
try {
//鍒涘缓MQTT瀹㈡埛绔璞�
- client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientInId(), new MemoryPersistence());
+ client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence());
//杩炴帴璁剧疆
MqttConnectOptions options = new MqttConnectOptions();
//鏄惁娓呯┖session锛岃缃负false琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛屽鎴风閲嶈繛涔嬪悗鑳借幏鍙栧埌鏈嶅姟鍣ㄥ湪瀹㈡埛绔柇寮�杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅
@@ -51,7 +51,7 @@
//璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎
options.setKeepAliveInterval(20);
//璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭�
- options.setWill("willTopic", (mqttProperties.getClientInId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(), 0, false);
+ options.setWill("willTopic", (mqttProperties.getClientId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(), 0, false);
//璁剧疆鍥炶皟
// client.setCallback(new MqttConsumerCallBack());
client.setCallback(mqttConsumerCallBack);
diff --git a/src/main/java/com/fzzy/mqtt/MqttController.java b/src/main/java/com/fzzy/mqtt/MqttController.java
index d327c26..e945120 100644
--- a/src/main/java/com/fzzy/mqtt/MqttController.java
+++ b/src/main/java/com/fzzy/mqtt/MqttController.java
@@ -17,14 +17,14 @@
public @ResponseBody
String connect() {
client.connect();
- return mqttProperties.getClientOutId() + "杩炴帴鍒版湇鍔″櫒";
+ return mqttProperties.getClientId() + "杩炴帴鍒版湇鍔″櫒";
}
@RequestMapping("/disConnect")
@ResponseBody
public String disConnect() {
client.disConnect();
- return mqttProperties.getClientOutId() + "涓庢湇鍔″櫒鏂紑杩炴帴";
+ return mqttProperties.getClientId() + "涓庢湇鍔″櫒鏂紑杩炴帴";
}
}
diff --git a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java
deleted file mode 100644
index 6457db5..0000000
--- a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package com.fzzy.mqtt;
-
-import org.springframework.integration.annotation.MessagingGateway;
-import org.springframework.integration.mqtt.support.MqttHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.stereotype.Service;
-
-/**
- * 鎺ㄩ�佹帴鍙�
- */
-@Service
-@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
-public interface MqttGatewayService {
-
- void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic01, String sendData);
-}
diff --git a/src/main/java/com/fzzy/mqtt/MqttProperties.java b/src/main/java/com/fzzy/mqtt/MqttProperties.java
index 094df8b..4d35f80 100644
--- a/src/main/java/com/fzzy/mqtt/MqttProperties.java
+++ b/src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -31,15 +31,9 @@
/**
- * 瀹㈡埛绔疘d-鍙戝竷鑰匢d
+ * 瀹㈡埛绔疘d
*/
- private String clientOutId;
-
- /**
- * 瀹㈡埛绔疘d-琚闃呰�匢d
- */
- private String clientInId;
-
+ private String clientId;
/**
* 瓒呮椂鏃堕棿
diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java b/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java
index 0191efe..3b4de1c 100644
--- a/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java
+++ b/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java
@@ -4,8 +4,10 @@
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.stereotype.Component;
@Slf4j
+@Component
public class MqttProviderCallBack implements MqttCallback {
/**
diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
index 8198dbf..04cca6a 100644
--- a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
+++ b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
@@ -14,6 +14,8 @@
@Autowired
private MqttProperties mqttProperties;
+ @Autowired
+ private MqttProviderCallBack mqttProviderCallBack;
/**
* 瀹㈡埛绔璞�
*/
@@ -34,7 +36,7 @@
public void connect(){
try{
//鍒涘缓MQTT瀹㈡埛绔璞�
- client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientOutId(),new MemoryPersistence());
+ client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientId(),new MemoryPersistence());
//杩炴帴璁剧疆
MqttConnectOptions options = new MqttConnectOptions();
//鏄惁娓呯┖session锛岃缃甪alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛堣闃呬富棰橈紝qos锛�,瀹㈡埛绔噸杩炰箣鍚庤兘鑾峰彇鍒版湇鍔″櫒鍦ㄥ鎴风鏂紑杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅
@@ -49,9 +51,9 @@
//璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧 1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎
options.setKeepAliveInterval(20);
//璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭�
- options.setWill("willTopic",(mqttProperties.getClientOutId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(),0,false);
+ options.setWill("willTopic",(mqttProperties.getClientId()+ "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(),0,false);
//璁剧疆鍥炶皟
- client.setCallback(new MqttProviderCallBack());
+ client.setCallback(mqttProviderCallBack);
client.connect(options);
} catch(MqttException e){
e.printStackTrace();
diff --git a/src/main/java/com/fzzy/mqtt/MqttPublishService.java b/src/main/java/com/fzzy/mqtt/MqttPublishService.java
index f02df27..136ed84 100644
--- a/src/main/java/com/fzzy/mqtt/MqttPublishService.java
+++ b/src/main/java/com/fzzy/mqtt/MqttPublishService.java
@@ -1,24 +1,33 @@
package com.fzzy.mqtt;
+import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import javax.annotation.Resource;
+
+@Slf4j
@Service
public class MqttPublishService {
- private static MqttClient client ;
+
+ @Resource
+ private MqttProperties mqttProperties;
+
+ private static MqttClient client;
public void init() throws MqttException {
//String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
- String username = "admin";
- String password = "admin123321";
- String clientid = "FZZY-gateway";
- String broker = "tcp://127.0.0.1:1883";
+ String username = mqttProperties.getUsername();
+ String password = mqttProperties.getPassword();
+ String clientid = mqttProperties.getClientId();
+ String broker = mqttProperties.getHost();
//String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"鍦扮绉伴噸\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044 }";
int qos = 0;
try {
@@ -38,27 +47,31 @@
}
}
- public void publishMsg(String topic,String content) {
- // String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
- //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"鍦扮绉伴噸\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044 }";
- int qos = 0;
+ public void publishMsg(String topic, String content) {
- try {
- // 鍒涘缓娑堟伅骞惰缃� QoS
- MqttMessage message = new MqttMessage(content.getBytes());
- message.setQos(qos);
- // 鍙戝竷娑堟伅
- client.publish(topic, message);
- System.out.println("Message published");
- System.out.println("topic: " + topic);
- System.out.println("message content: " + content);
- // 鍏抽棴杩炴帴
- //client.disconnect();
- // 鍏抽棴瀹㈡埛绔�
- //client.close();
- } catch (MqttException e) {
- throw new RuntimeException(e);
- }
- }
+ // String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
+ //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"鍦扮绉伴噸\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044 }";
+ int qos = mqttProperties.getQos();
+
+ try {
+ // 鍒涘缓娑堟伅骞惰缃� QoS
+ MqttMessage message = new MqttMessage(content.getBytes());
+ message.setQos(qos);
+ // 鍙戝竷娑堟伅
+ client.publish(topic, message);
+
+
+ log.info("------------Message published-------------");
+ log.info("topic: " + topic);
+ log.info("message content: " + content);
+
+ // 鍏抽棴杩炴帴
+ //client.disconnect();
+ // 鍏抽棴瀹㈡埛绔�
+ //client.close();
+ } catch (MqttException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/src/main/java/com/fzzy/mqtt/PublishSample.java b/src/main/java/com/fzzy/mqtt/PublishSample.java
deleted file mode 100644
index 765a81c..0000000
--- a/src/main/java/com/fzzy/mqtt/PublishSample.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.fzzy.mqtt;
-
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-public class PublishSample {
-
- public static void main(String[] args) {
-
- String broker = "tcp://127.0.0.1:1883";
- String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
- String username = "admin";
- String password = "admin123321";
- String clientid = "FZZY-gateway";
- String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"鍦扮绉伴噸\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044 }";
- int qos = 0;
-
- try {
- MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
- // 杩炴帴鍙傛暟
- MqttConnectOptions options = new MqttConnectOptions();
- // 璁剧疆鐢ㄦ埛鍚嶅拰瀵嗙爜
- options.setUserName(username);
- options.setPassword(password.toCharArray());
- options.setConnectionTimeout(60);
- options.setKeepAliveInterval(60);
- // 杩炴帴
- client.connect(options);
- // 鍒涘缓娑堟伅骞惰缃� QoS
- MqttMessage message = new MqttMessage(content.getBytes());
- message.setQos(qos);
- // 鍙戝竷娑堟伅
- client.publish(topic, message);
- System.out.println("Message published");
- System.out.println("topic: " + topic);
- System.out.println("message content: " + content);
- // 鍏抽棴杩炴帴
- client.disconnect();
- // 鍏抽棴瀹㈡埛绔�
- client.close();
- } catch (MqttException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git a/src/main/resources/application-devGateway.yml b/src/main/resources/application-devGateway.yml
index f46809d..4774c3a 100644
--- a/src/main/resources/application-devGateway.yml
+++ b/src/main/resources/application-devGateway.yml
@@ -73,8 +73,7 @@
host: tcp://127.0.0.1:1883
username: admin
password: pwdmqtt..
- client-outId: fzzy-customer-id
- client-inId: fzzy-6e3d92ff71b911eea5e50250f2000002
+ client-id: fzzy-customer-igds-api
timeout: 10
keep-alive-interval: 20
max-connect-times: 5
--
Gitblit v1.9.3