From 97c75a868e9fca03598dfa862bdd7ad94fd5fdcb Mon Sep 17 00:00:00 2001 From: jiazx0107@163.com <jiazx0107@163.com> Date: 星期四, 09 十一月 2023 00:54:08 +0800 Subject: [PATCH] 调整MQTT --- src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java | 54 ++++++++++ src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java | 4 src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java | 66 +++++++------ src/main/java/com/fzzy/mqtt/MqttProviderConfig.java | 8 + src/main/java/com/fzzy/mqtt/MqttProperties.java | 10 - src/main/java/com/fzzy/gateway/hx2023/ScConstant.java | 6 + src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java | 7 src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml | 19 +++ /dev/null | 48 --------- src/main/resources/application-devGateway.yml | 3 src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java | 2 src/main/java/com/fzzy/mqtt/MqttController.java | 4 src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java | 2 src/main/java/com/fzzy/mqtt/MqttPublishService.java | 65 +++++++----- 14 files changed, 172 insertions(+), 126 deletions(-) diff --git a/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java b/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java index 4bb2a3e..379583d 100644 --- a/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java +++ b/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java @@ -72,6 +72,8 @@ String url = DEFAULT_URL; url = url.replace("{appId}", DEFAULT_APP_ID).replace("{appsecret}", DEFAULT_APP_SECRET).replace("{cityid}", DEFAULT_CITYID); + + log.debug("------姘旇薄URL---{}",url); String result = GatewayHttpUtil.doGet(url, null); if (null == result) { diff --git a/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java b/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java index badcdf6..03b0885 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java +++ b/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java @@ -26,4 +26,10 @@ public static int CODE_200 = 200; + + /** + * 涓嬪彂鎸囦护鍥炲鎶ユ枃topic + */ + public static String TOPIC_REPORT = "/${productId}/${deviceId}/properties/report"; + } diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java b/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java index d863ab5..e782bd8 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java +++ b/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java @@ -9,6 +9,7 @@ import com.fzzy.gateway.hx2023.data.CloudSendData; import com.fzzy.gateway.hx2023.data.SyncReqData; import com.fzzy.mqtt.MqttProviderConfig; +import com.fzzy.mqtt.MqttPublishService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -24,7 +25,7 @@ @Resource private GatewayRemoteManager gatewayRemoteManager; @Resource - private MqttProviderConfig providerClient; + private MqttPublishService publishService; /** @@ -69,11 +70,11 @@ //鑷姩鎺ㄩ�� if (200 == resp.getCode() && syncReqData.isAutoReplay()) { - String topic = "/${productId}/${deviceId}/properties/report"; + String topic = ScConstant.TOPIC_REPORT; topic = topic.replace("${productId}", device.getProductId()).replace("${deviceId}", device.getDeviceId()); - providerClient.publish(topic, resp.getData()); + publishService.publishMsg(topic, resp.getData()); log.info("=======绮儏鎺ㄩ��==========={}", resp.getData()); } diff --git a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java index 312a426..d571fc3 100644 --- a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java +++ b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java @@ -13,17 +13,20 @@ import com.fzzy.gateway.GatewayUtils; import com.fzzy.gateway.api.DeviceReportService; import com.fzzy.gateway.api.GatewayRemoteManager; +import com.fzzy.gateway.data.BaseResp; import com.fzzy.gateway.entity.GatewayDevice; import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.*; import com.fzzy.gateway.service.repository.GatewayDeviceRep; +import com.fzzy.mqtt.MqttProviderConfig; import com.fzzy.mqtt.MqttPublishService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateFormatUtils; import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Sort; import org.springframework.stereotype.Component; @@ -47,6 +50,9 @@ private GatewayRemoteManager gatewayRemoteManager; @Resource private MqttPublishService mqttPublishService; + @Resource + private MqttProviderConfig providerClient; + /** * gatewayDeviceService#listAll * @@ -67,7 +73,7 @@ public void updateSave(GatewayDevice data) { GatewayDevice data2 = new GatewayDevice(); BeanUtils.copyProperties(data, data2); - + if (null == data2.getDeviceSn()) { if (null != data2.getIp()) { data.setDeviceSn(data2.getIp()); @@ -77,10 +83,10 @@ } if (null == data2.getId()) { - data2.setId(ContextUtil.getUUID()); + data2.setId(ContextUtil.getUUID()); gatewayDeviceRep.save(data2); - }else{ - gatewayDeviceRep.save(data2); + } else { + gatewayDeviceRep.save(data2); } flushCache(); } @@ -112,10 +118,6 @@ } - - - - /** * gatewayDeviceService#ajaxTestWeight * 鍦扮鎺ㄩ�佹祴璇� @@ -128,16 +130,16 @@ double weigh = (double) parameter.getWeight(); //double weigh = Double.parseDouble("3500.0"); List<GatewayDevice> devices = listAll(); - if(devices == null || devices.size()<= 0){ + if (devices == null || devices.size() <= 0) { return "娌℃湁璁惧"; } - List<GatewayDevice> weights = devices.stream().filter(s ->(GatewayDeviceType.TYPE_01.getCode().equals(s.getType()))).collect(Collectors.toList()); - if(weights == null || weights.size()<= 0){ - return "娌℃湁鑾峰彇鍒板湴纾呰澶�"; - } + List<GatewayDevice> weights = devices.stream().filter(s -> (GatewayDeviceType.TYPE_01.getCode().equals(s.getType()))).collect(Collectors.toList()); + if (weights == null || weights.size() <= 0) { + return "娌℃湁鑾峰彇鍒板湴纾呰澶�"; + } String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report"; - for (GatewayDevice device: weights) { + for (GatewayDevice device : weights) { WebSocketPacket packet = new WebSocketPacket(); @@ -163,7 +165,7 @@ packet.setTimestamp(System.currentTimeMillis()); - topic = "/device/"+header.getProductId()+"/"+device.getDeviceId()+"/message/property/report"; + topic = "/device/" + header.getProductId() + "/" + device.getDeviceId() + "/message/property/report"; mqttPublishService.publishMsg(topic, JSON.toJSONString(packet)); } return "SUCCESS"; @@ -172,6 +174,7 @@ /** * gatewayDeviceService#ajaxTestLpr * 鍦扮鎺ㄩ�佹祴璇� + * * @return */ @Expose @@ -179,16 +182,16 @@ //String carNumber = parameter.getCarNumber(); String carNumber = "宸滱12345"; List<GatewayDevice> devices = listAll(); - if(devices == null || devices.size()<= 0){ + if (devices == null || devices.size() <= 0) { return "娌℃湁璁惧"; } - List<GatewayDevice> weights = devices.stream().filter(s ->(GatewayDeviceType.TYPE_02.getCode().equals(s.getType()))).collect(Collectors.toList()); - if(weights == null || weights.size()<= 0){ + List<GatewayDevice> weights = devices.stream().filter(s -> (GatewayDeviceType.TYPE_02.getCode().equals(s.getType()))).collect(Collectors.toList()); + if (weights == null || weights.size() <= 0) { return "娌℃湁鑾峰彇鍒拌澶�"; } String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report"; - for (GatewayDevice device: weights) { + for (GatewayDevice device : weights) { WebSocketPacket packet = new WebSocketPacket(); @@ -212,7 +215,7 @@ packet.setTimestamp(System.currentTimeMillis()); - topic = "/device/"+header.getProductId()+"/"+device.getDeviceId()+"/message/property/report"; + topic = "/device/" + header.getProductId() + "/" + device.getDeviceId() + "/message/property/report"; mqttPublishService.publishMsg(topic, JSON.toJSONString(packet)); } return "SUCCESS"; @@ -247,6 +250,9 @@ return this.pushByV40(list, start, end); } + + + private String pushByV40(List<GatewayDevice> list, Date start, Date end) { @@ -352,27 +358,27 @@ //鍒ゆ柇鏈�澶� if (curTemp.equals(result.getMaxTemperature())) { - result.setMaxX(x+""); - result.setMaxY(y+""); - result.setMaxZ(position+""); + result.setMaxX(x + ""); + result.setMaxY(y + ""); + result.setMaxZ(position + ""); } //鍒ゆ柇鏈�灏� if (curTemp.equals(result.getMinTemperature())) { - result.setMinX(x+""); - result.setMinY(y+""); - result.setMinZ(position+""); + result.setMinX(x + ""); + result.setMinY(y + ""); + result.setMinZ(position + ""); } - temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", x+"", y+"")); + temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", x + "", y + "")); } //绮俯淇℃伅 JSONObject trhInfo = new JSONObject(); - // TRHInfo trhInfo = new TRHInfo(); - trhInfo.put("temperature",temperature); + // TRHInfo trhInfo = new TRHInfo(); + trhInfo.put("temperature", temperature); //浠撴俯搴︿俊鎭� @@ -385,7 +391,7 @@ List<KafkaGrainTH> temperatureAndhumidity = new ArrayList<>(); temperatureAndhumidity.add(grainTH); - trhInfo.put("temperatureAndhumidity",temperatureAndhumidity); + trhInfo.put("temperatureAndhumidity", temperatureAndhumidity); //trhInfo.put("temperatureAndhumidity",grainTH); JSONObject params = new JSONObject(); diff --git a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java index 42f0067..c17e815 100644 --- a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java +++ b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java @@ -1,10 +1,19 @@ package com.fzzy.gateway.service; import com.bstek.dorado.annotation.DataResolver; +import com.bstek.dorado.annotation.Expose; +import com.fzzy.api.data.GatewayDeviceType; import com.fzzy.api.utils.ContextUtil; import com.fzzy.gateway.GatewayUtils; +import com.fzzy.gateway.api.GatewayRemoteManager; +import com.fzzy.gateway.data.BaseResp; import com.fzzy.gateway.entity.GatewayDevice; +import com.fzzy.gateway.hx2023.ScConstant; +import com.fzzy.gateway.hx2023.data.SyncReqData; import com.fzzy.gateway.service.repository.GatewayDeviceRep; +import com.fzzy.mqtt.MqttProviderConfig; +import com.fzzy.mqtt.MqttPublishService; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.data.domain.Sort; import org.springframework.stereotype.Component; @@ -12,11 +21,17 @@ import javax.annotation.Resource; import java.util.List; +@Slf4j @Component public class GatewayDeviceService2 { @Resource private GatewayDeviceRep gatewayDeviceRep; + + @Resource + private GatewayRemoteManager gatewayRemoteManager; + @Resource + private MqttPublishService publishService; public List<GatewayDevice> listAll() { Sort sort = new Sort(Sort.Direction.ASC, "deviceId"); @@ -59,4 +74,43 @@ } } + + /** + * 娴嬭瘯MQTT绮儏妫�娴� + * gatewayDeviceService#ajaxTestGrain2 + * 绮儏鎺ㄩ�佹祴璇� + * + * @param data + * @return + */ + @Expose + public String ajaxTestGrain2(GatewayDevice data) { + + SyncReqData reqData = new SyncReqData(); + reqData.setDevice(data); + reqData.setAutoReplay(true); + reqData.setMessageType(ScConstant.MESSAGE_TYPE_INVOKE_FUNCTION); + reqData.setMessageId(ScConstant.getMessageId()); + reqData.setFunctionId(ScConstant.FUNCTION_getTAndRHInfo); + + + if (!GatewayDeviceType.TYPE_07.getCode().equals(data.getType())) { + return "ERROR锛氬綋鍓嶈澶囬潪绮儏璁惧涓嶆敮鎸佸綋鍓嶆搷浣�"; + } + + BaseResp resp = gatewayRemoteManager.getSyncGrainService(data.getSyncProtocol()).syncGrain2(reqData); + + //鑷姩鎺ㄩ�� + if (200 == resp.getCode() && reqData.isAutoReplay()) { + String topic = ScConstant.TOPIC_REPORT; + topic = topic.replace("${productId}", data.getProductId()).replace("${deviceId}", data.getDeviceId()); + + publishService.publishMsg(topic, resp.getData()); + } + + log.info("=======鎵嬪姩娴嬭瘯绮儏鎺ㄩ��==========={}", resp.getData()); + + return "SUCCESS锛氭墽琛屽畬鎴�"; + } + } diff --git a/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml b/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml index 9db29e6..75ba1bf 100644 --- a/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml +++ b/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml @@ -115,11 +115,11 @@ <Property name="label">璁惧绫诲瀷KEY</Property> </PropertyDef> <PropertyDef name="cableRule"> - <Property/> + <Property></Property> <Property name="label">甯冪嚎瑙勫垯</Property> </PropertyDef> <PropertyDef name="cableCir"> - <Property/> + <Property></Property> <Property name="label">绛掍粨灞傝鍒�</Property> </PropertyDef> </DataType> @@ -179,6 +179,17 @@ <Property name="caption">鍒犻櫎</Property> <Property name="iconClass">fa fa-minus</Property> <Property name="width">90</Property> + <Property name="exClassName">toolbar-button-warn</Property> + </ToolBarButton> + <Separator/> + <ToolBarButton> + <ClientEvent name="onClick">var cur = view.get("#dgMain").getCurrentItem();
 +view.get("#ajaxTestGrain").set("parameter",cur).execute(function(result){
 + $alert(result);
 +});
 +</ClientEvent> + <Property name="caption">娴嬭瘯绮儏</Property> + <Property name="iconClass">fa fa-minus</Property> <Property name="exClassName">toolbar-button-warn</Property> </ToolBarButton> </ToolBar> @@ -367,5 +378,9 @@ <Property name="service">gatewayDeviceService#delData</Property> <Property name="confirmMessage">纭畾瑕佸垹闄や箞锛�</Property> </AjaxAction> + <AjaxAction id="ajaxTestGrain"> + <Property name="service">gatewayDeviceService2#ajaxTestGrain2</Property> + <Property name="confirmMessage">纭畾瑕佹墜鍔ㄦ墽琛岀伯鎯呬箞锛�</Property> + </AjaxAction> </View> </ViewConfig> diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java index ca83cb1..076e3f7 100644 --- a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java +++ b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java @@ -36,7 +36,7 @@ public void connect() { try { //鍒涘缓MQTT瀹㈡埛绔璞� - client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientInId(), new MemoryPersistence()); + client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence()); //杩炴帴璁剧疆 MqttConnectOptions options = new MqttConnectOptions(); //鏄惁娓呯┖session锛岃缃负false琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛屽鎴风閲嶈繛涔嬪悗鑳借幏鍙栧埌鏈嶅姟鍣ㄥ湪瀹㈡埛绔柇寮�杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅 @@ -51,7 +51,7 @@ //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎 options.setKeepAliveInterval(20); //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭� - options.setWill("willTopic", (mqttProperties.getClientInId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(), 0, false); + options.setWill("willTopic", (mqttProperties.getClientId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(), 0, false); //璁剧疆鍥炶皟 // client.setCallback(new MqttConsumerCallBack()); client.setCallback(mqttConsumerCallBack); diff --git a/src/main/java/com/fzzy/mqtt/MqttController.java b/src/main/java/com/fzzy/mqtt/MqttController.java index d327c26..e945120 100644 --- a/src/main/java/com/fzzy/mqtt/MqttController.java +++ b/src/main/java/com/fzzy/mqtt/MqttController.java @@ -17,14 +17,14 @@ public @ResponseBody String connect() { client.connect(); - return mqttProperties.getClientOutId() + "杩炴帴鍒版湇鍔″櫒"; + return mqttProperties.getClientId() + "杩炴帴鍒版湇鍔″櫒"; } @RequestMapping("/disConnect") @ResponseBody public String disConnect() { client.disConnect(); - return mqttProperties.getClientOutId() + "涓庢湇鍔″櫒鏂紑杩炴帴"; + return mqttProperties.getClientId() + "涓庢湇鍔″櫒鏂紑杩炴帴"; } } diff --git a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java deleted file mode 100644 index 6457db5..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.fzzy.mqtt; - -import org.springframework.integration.annotation.MessagingGateway; -import org.springframework.integration.mqtt.support.MqttHeaders; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.stereotype.Service; - -/** - * 鎺ㄩ�佹帴鍙� - */ -@Service -@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") -public interface MqttGatewayService { - - void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic01, String sendData); -} diff --git a/src/main/java/com/fzzy/mqtt/MqttProperties.java b/src/main/java/com/fzzy/mqtt/MqttProperties.java index 094df8b..4d35f80 100644 --- a/src/main/java/com/fzzy/mqtt/MqttProperties.java +++ b/src/main/java/com/fzzy/mqtt/MqttProperties.java @@ -31,15 +31,9 @@ /** - * 瀹㈡埛绔疘d-鍙戝竷鑰匢d + * 瀹㈡埛绔疘d */ - private String clientOutId; - - /** - * 瀹㈡埛绔疘d-琚闃呰�匢d - */ - private String clientInId; - + private String clientId; /** * 瓒呮椂鏃堕棿 diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java b/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java index 0191efe..3b4de1c 100644 --- a/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java +++ b/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java @@ -4,8 +4,10 @@ import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.stereotype.Component; @Slf4j +@Component public class MqttProviderCallBack implements MqttCallback { /** diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java index 8198dbf..04cca6a 100644 --- a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java +++ b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java @@ -14,6 +14,8 @@ @Autowired private MqttProperties mqttProperties; + @Autowired + private MqttProviderCallBack mqttProviderCallBack; /** * 瀹㈡埛绔璞� */ @@ -34,7 +36,7 @@ public void connect(){ try{ //鍒涘缓MQTT瀹㈡埛绔璞� - client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientOutId(),new MemoryPersistence()); + client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientId(),new MemoryPersistence()); //杩炴帴璁剧疆 MqttConnectOptions options = new MqttConnectOptions(); //鏄惁娓呯┖session锛岃缃甪alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛堣闃呬富棰橈紝qos锛�,瀹㈡埛绔噸杩炰箣鍚庤兘鑾峰彇鍒版湇鍔″櫒鍦ㄥ鎴风鏂紑杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅 @@ -49,9 +51,9 @@ //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧 1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎 options.setKeepAliveInterval(20); //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭� - options.setWill("willTopic",(mqttProperties.getClientOutId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(),0,false); + options.setWill("willTopic",(mqttProperties.getClientId()+ "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(),0,false); //璁剧疆鍥炶皟 - client.setCallback(new MqttProviderCallBack()); + client.setCallback(mqttProviderCallBack); client.connect(options); } catch(MqttException e){ e.printStackTrace(); diff --git a/src/main/java/com/fzzy/mqtt/MqttPublishService.java b/src/main/java/com/fzzy/mqtt/MqttPublishService.java index f02df27..136ed84 100644 --- a/src/main/java/com/fzzy/mqtt/MqttPublishService.java +++ b/src/main/java/com/fzzy/mqtt/MqttPublishService.java @@ -1,24 +1,33 @@ package com.fzzy.mqtt; +import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.annotation.Resource; + +@Slf4j @Service public class MqttPublishService { - private static MqttClient client ; + + @Resource + private MqttProperties mqttProperties; + + private static MqttClient client; public void init() throws MqttException { //String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report"; - String username = "admin"; - String password = "admin123321"; - String clientid = "FZZY-gateway"; - String broker = "tcp://127.0.0.1:1883"; + String username = mqttProperties.getUsername(); + String password = mqttProperties.getPassword(); + String clientid = mqttProperties.getClientId(); + String broker = mqttProperties.getHost(); //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"鍦扮绉伴噸\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044 }"; int qos = 0; try { @@ -38,27 +47,31 @@ } } - public void publishMsg(String topic,String content) { - // String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report"; - //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"鍦扮绉伴噸\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044 }"; - int qos = 0; + public void publishMsg(String topic, String content) { - try { - // 鍒涘缓娑堟伅骞惰缃� QoS - MqttMessage message = new MqttMessage(content.getBytes()); - message.setQos(qos); - // 鍙戝竷娑堟伅 - client.publish(topic, message); - System.out.println("Message published"); - System.out.println("topic: " + topic); - System.out.println("message content: " + content); - // 鍏抽棴杩炴帴 - //client.disconnect(); - // 鍏抽棴瀹㈡埛绔� - //client.close(); - } catch (MqttException e) { - throw new RuntimeException(e); - } - } + // String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report"; + //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"鍦扮绉伴噸\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044 }"; + int qos = mqttProperties.getQos(); + + try { + // 鍒涘缓娑堟伅骞惰缃� QoS + MqttMessage message = new MqttMessage(content.getBytes()); + message.setQos(qos); + // 鍙戝竷娑堟伅 + client.publish(topic, message); + + + log.info("------------Message published-------------"); + log.info("topic: " + topic); + log.info("message content: " + content); + + // 鍏抽棴杩炴帴 + //client.disconnect(); + // 鍏抽棴瀹㈡埛绔� + //client.close(); + } catch (MqttException e) { + throw new RuntimeException(e); + } + } } diff --git a/src/main/java/com/fzzy/mqtt/PublishSample.java b/src/main/java/com/fzzy/mqtt/PublishSample.java deleted file mode 100644 index 765a81c..0000000 --- a/src/main/java/com/fzzy/mqtt/PublishSample.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.fzzy.mqtt; - -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -public class PublishSample { - - public static void main(String[] args) { - - String broker = "tcp://127.0.0.1:1883"; - String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report"; - String username = "admin"; - String password = "admin123321"; - String clientid = "FZZY-gateway"; - String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"鍦扮绉伴噸\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044 }"; - int qos = 0; - - try { - MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); - // 杩炴帴鍙傛暟 - MqttConnectOptions options = new MqttConnectOptions(); - // 璁剧疆鐢ㄦ埛鍚嶅拰瀵嗙爜 - options.setUserName(username); - options.setPassword(password.toCharArray()); - options.setConnectionTimeout(60); - options.setKeepAliveInterval(60); - // 杩炴帴 - client.connect(options); - // 鍒涘缓娑堟伅骞惰缃� QoS - MqttMessage message = new MqttMessage(content.getBytes()); - message.setQos(qos); - // 鍙戝竷娑堟伅 - client.publish(topic, message); - System.out.println("Message published"); - System.out.println("topic: " + topic); - System.out.println("message content: " + content); - // 鍏抽棴杩炴帴 - client.disconnect(); - // 鍏抽棴瀹㈡埛绔� - client.close(); - } catch (MqttException e) { - throw new RuntimeException(e); - } - } -} diff --git a/src/main/resources/application-devGateway.yml b/src/main/resources/application-devGateway.yml index f46809d..4774c3a 100644 --- a/src/main/resources/application-devGateway.yml +++ b/src/main/resources/application-devGateway.yml @@ -73,8 +73,7 @@ host: tcp://127.0.0.1:1883 username: admin password: pwdmqtt.. - client-outId: fzzy-customer-id - client-inId: fzzy-6e3d92ff71b911eea5e50250f2000002 + client-id: fzzy-customer-igds-api timeout: 10 keep-alive-interval: 20 max-connect-times: 5 -- Gitblit v1.9.3