From 19860e76e91baf3cfce3c45bfa3ca886788c4ec8 Mon Sep 17 00:00:00 2001 From: jiazx0107@163.com <jiazx0107@163.com> Date: 星期四, 09 十一月 2023 11:59:59 +0800 Subject: [PATCH] 调整MQTT --- src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java | 9 + /dev/null | 26 --- src/main/resources/application-devGateway.yml | 14 +- src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java | 18 ++ src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java | 147 ++++++++++---------- src/main/java/com/fzzy/mqtt/MqttProviderConfig.java | 116 +++++----------- src/main/java/com/fzzy/mqtt/MqttGatewayService.java | 31 ++++ src/main/java/com/fzzy/mqtt/MqttProperties.java | 15 +- src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java | 13 + src/main/java/com/fzzy/gateway/GatewayRunner.java | 3 src/main/java/com/fzzy/mqtt/MqttPublishService.java | 2 11 files changed, 192 insertions(+), 202 deletions(-) diff --git a/src/main/java/com/fzzy/gateway/GatewayRunner.java b/src/main/java/com/fzzy/gateway/GatewayRunner.java index 9b85744..1f05550 100644 --- a/src/main/java/com/fzzy/gateway/GatewayRunner.java +++ b/src/main/java/com/fzzy/gateway/GatewayRunner.java @@ -36,6 +36,9 @@ //鑾峰彇姘旇薄淇℃伅 scheduled.doWeatherExe(); + //鏇存柊璁惧缂撳瓨 + apiInitService.updateDeviceCache(); + } } diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java b/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java index f9b093f..75a354b 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java +++ b/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java @@ -7,6 +7,7 @@ import com.fzzy.gateway.service.GatewayConfService; +import com.fzzy.gateway.service.GatewayDeviceService; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -21,6 +22,8 @@ @Resource private GatewayConfService confService; + @Resource + private GatewayDeviceService deviceService; @Resource private GatewayRemoteManager gatewayRemoteManager; @@ -44,7 +47,9 @@ } } - - + + public void updateDeviceCache() { + deviceService.flushCache(); + } } diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java b/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java index e782bd8..f2eed2f 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java +++ b/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java @@ -8,6 +8,7 @@ import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.CloudSendData; import com.fzzy.gateway.hx2023.data.SyncReqData; +import com.fzzy.mqtt.MqttGatewayService; import com.fzzy.mqtt.MqttProviderConfig; import com.fzzy.mqtt.MqttPublishService; import lombok.extern.slf4j.Slf4j; @@ -25,7 +26,7 @@ @Resource private GatewayRemoteManager gatewayRemoteManager; @Resource - private MqttPublishService publishService; + private MqttGatewayService mqttGatewayService; /** @@ -33,7 +34,7 @@ * * @param message */ - public void onReceiveMessage(String message) { + public void onReceiveMessage(String topic,String message) { try { CloudSendData cloudSendData = JSONObject.parseObject(message, CloudSendData.class); @@ -47,7 +48,7 @@ } } catch (Exception e) { - + log.error("--------鎵ц寮傚父-----{}",e); } } @@ -74,9 +75,11 @@ topic = topic.replace("${productId}", device.getProductId()).replace("${deviceId}", device.getDeviceId()); - publishService.publishMsg(topic, resp.getData()); + mqttGatewayService.publishMqttWithTopic(resp.getData(), topic); - log.info("=======绮儏鎺ㄩ��==========={}", resp.getData()); + log.info("----------------------------鎺ㄩ�丮QTT淇℃伅---------------------------"); + log.info("-----TOPIC-----{}",topic); + log.info("-----Message-----{}",resp.getData()); } } } diff --git a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java index c17e815..f30ac51 100644 --- a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java +++ b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java @@ -11,9 +11,11 @@ import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.SyncReqData; import com.fzzy.gateway.service.repository.GatewayDeviceRep; +import com.fzzy.mqtt.MqttGatewayService; import com.fzzy.mqtt.MqttProviderConfig; import com.fzzy.mqtt.MqttPublishService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.data.domain.Sort; import org.springframework.stereotype.Component; @@ -31,7 +33,7 @@ @Resource private GatewayRemoteManager gatewayRemoteManager; @Resource - private MqttPublishService publishService; + private MqttGatewayService publishService; public List<GatewayDevice> listAll() { Sort sort = new Sort(Sort.Direction.ASC, "deviceId"); @@ -86,6 +88,7 @@ @Expose public String ajaxTestGrain2(GatewayDevice data) { + SyncReqData reqData = new SyncReqData(); reqData.setDevice(data); reqData.setAutoReplay(true); @@ -98,6 +101,10 @@ return "ERROR锛氬綋鍓嶈澶囬潪绮儏璁惧涓嶆敮鎸佸綋鍓嶆搷浣�"; } + if(StringUtils.isEmpty(data.getCableRule())){ + return "ERROR锛氬綋鍓嶈澶囨病鏈夐厤缃竷绾胯鍒欙紝鏃犳硶鎵ц"; + } + BaseResp resp = gatewayRemoteManager.getSyncGrainService(data.getSyncProtocol()).syncGrain2(reqData); //鑷姩鎺ㄩ�� @@ -105,10 +112,13 @@ String topic = ScConstant.TOPIC_REPORT; topic = topic.replace("${productId}", data.getProductId()).replace("${deviceId}", data.getDeviceId()); - publishService.publishMsg(topic, resp.getData()); - } + publishService.publishMqttWithTopic(resp.getData(),topic); - log.info("=======鎵嬪姩娴嬭瘯绮儏鎺ㄩ��==========={}", resp.getData()); + log.info("----------------------------鎵嬪姩鎺ㄩ�丮QTT绮儏淇℃伅---------------------------"); + log.info("-----TOPIC-----{}",topic); + log.info("-----Message-----{}",resp.getData()); + + } return "SUCCESS锛氭墽琛屽畬鎴�"; } diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java b/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java deleted file mode 100644 index 127cfe8..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.fzzy.mqtt; - -import com.fzzy.gateway.hx2023.service.OnReceiveMqttService; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class MqttConsumerCallBack implements MqttCallback { - - - @Autowired - private OnReceiveMqttService onReceiveMqttService; - - /** - * 瀹㈡埛绔柇寮�杩炴帴鐨勫洖璋� - */ - @Override - public void connectionLost(Throwable throwable) { - log.info("涓庢湇鍔″櫒鏂紑杩炴帴锛屽彲閲嶈繛"); - } - - /** - * 娑堟伅鍒拌揪鐨勫洖璋� - */ - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - - - String messageStr = new String(message.getPayload()); - - - log.info(String.format("鎺ユ敹娑堟伅涓婚 : %s", topic)); - log.info(String.format("鎺ユ敹娑堟伅Qos : %d", message.getQos())); - log.info(String.format("鎺ユ敹娑堟伅鍐呭 : %s", messageStr)); - - log.info(String.format("鎺ユ敹娑堟伅retained : %b", message.isRetained())); - - onReceiveMqttService.onReceiveMessage(messageStr); - } - - /** - * 娑堟伅鍙戝竷鎴愬姛鐨勫洖璋� - */ - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - log.info(String.format("鎺ユ敹娑堟伅鎴愬姛")); - } -} diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java index 076e3f7..854d9e5 100644 --- a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java +++ b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java @@ -1,93 +1,98 @@ package com.fzzy.mqtt; +import com.fzzy.gateway.hx2023.service.OnReceiveMqttService; import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; -import javax.annotation.PostConstruct; +import java.util.Arrays; +import java.util.List; @Configuration @Slf4j +@IntegrationComponentScan public class MqttConsumerConfig { @Autowired - private MqttProperties mqttProperties; + private OnReceiveMqttService onReceiveMqttService; + @Autowired - private MqttConsumerCallBack mqttConsumerCallBack; + private MqttProperties mqttProperties; - /** - * 瀹㈡埛绔璞� - */ - private MqttClient client; - - /** - * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒 - */ - @PostConstruct - public void init() { - connect(); + @Bean + public MqttConnectOptions getReceiverMqttConnectOptionsForSub() { + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(mqttProperties.getUsername()); + mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); + List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(",")); + String[] serverURIs = new String[hostList.size()]; + hostList.toArray(serverURIs); + mqttConnectOptions.setServerURIs(serverURIs); + mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); + return mqttConnectOptions; } - /** - * 瀹㈡埛绔繛鎺ユ湇鍔$ - */ - public void connect() { - try { - //鍒涘缓MQTT瀹㈡埛绔璞� - client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence()); - //杩炴帴璁剧疆 - MqttConnectOptions options = new MqttConnectOptions(); - //鏄惁娓呯┖session锛岃缃负false琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛屽鎴风閲嶈繛涔嬪悗鑳借幏鍙栧埌鏈嶅姟鍣ㄥ湪瀹㈡埛绔柇寮�杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅 - //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鍒版湇鍔$閮芥槸浠ユ柊鐨勮韩浠� - options.setCleanSession(true); - //璁剧疆杩炴帴鐢ㄦ埛鍚� - options.setUserName(mqttProperties.getUsername()); - //璁剧疆杩炴帴瀵嗙爜 - options.setPassword(mqttProperties.getPassword().toCharArray()); - //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉� - options.setConnectionTimeout(10); - //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎 - options.setKeepAliveInterval(20); - //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭� - options.setWill("willTopic", (mqttProperties.getClientId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(), 0, false); - //璁剧疆鍥炶皟 - // client.setCallback(new MqttConsumerCallBack()); - client.setCallback(mqttConsumerCallBack); - client.connect(options); - //璁㈤槄涓婚 - //娑堟伅绛夌骇锛屽拰涓婚鏁扮粍涓�涓�瀵瑰簲锛屾湇鍔$灏嗘寜鐓ф寚瀹氱瓑绾х粰璁㈤槄浜嗕富棰樼殑瀹㈡埛绔帹閫佹秷鎭� - int[] qos = {1, 1}; - //涓婚 - String[] topics = mqttProperties.getTopics().split(","); - //璁㈤槄涓婚 - client.subscribe(topics, qos); - } catch (MqttException e) { - e.printStackTrace(); - } + @Bean + public MqttPahoClientFactory receiverMqttClientFactoryForSub() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.setConnectionOptions(getReceiverMqttConnectOptionsForSub()); + return factory; } - /** - * 鏂紑杩炴帴 - */ - public void disConnect() { - try { - client.disconnect(); - } catch (MqttException e) { - e.printStackTrace(); - } + //鎺ユ敹閫氶亾 + @Bean + public MessageChannel mqttInputChannel() { + return new DirectChannel(); } + //閰嶇疆client,鐩戝惉鐨則opic + @Bean + public MessageProducer inbound() { - /** - * 璁㈤槄涓婚 - */ - public void subscribe(String topic, int qos) { - try { - client.subscribe(topic, qos); - } catch (MqttException e) { - e.printStackTrace(); - } + // List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); + String[] topics = mqttProperties.getTopics().split(","); + //topicList.toArray(topics); + + MqttPahoMessageDrivenChannelAdapter adapter = + new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientInId(), receiverMqttClientFactoryForSub(), topics); + adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout()); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setQos(mqttProperties.getQos()); + adapter.setOutputChannel(mqttInputChannel()); + return adapter; } -} + + //閫氳繃閫氶亾鑾峰彇鏁版嵁 + @Bean + @ServiceActivator(inputChannel = "mqttInputChannel") + public MessageHandler handler() { + return new MessageHandler() { + @Override + public void handleMessage(Message<?> message) throws MessagingException { + String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); + String msg = message.getPayload().toString(); + + // 杩欓噷鍙互澶勭悊鎺ユ敹鐨勬暟鎹� + log.info("----------------------------鏀跺埌璁㈤槄鍐呭---------------------------"); + log.info("-----TOPIC-----{}", topic); + log.info("-----Message-----{}", msg); + + onReceiveMqttService.onReceiveMessage(topic, msg); + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/com/fzzy/mqtt/MqttController.java b/src/main/java/com/fzzy/mqtt/MqttController.java deleted file mode 100644 index e945120..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttController.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.fzzy.mqtt; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Controller; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.ResponseBody; - -@Controller -public class MqttController { - @Autowired - private MqttConsumerConfig client; - - @Autowired - private MqttProperties mqttProperties; - - @RequestMapping("/connect") - public @ResponseBody - String connect() { - client.connect(); - return mqttProperties.getClientId() + "杩炴帴鍒版湇鍔″櫒"; - } - - @RequestMapping("/disConnect") - @ResponseBody - public String disConnect() { - client.disConnect(); - return mqttProperties.getClientId() + "涓庢湇鍔″櫒鏂紑杩炴帴"; - } - -} diff --git a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java new file mode 100644 index 0000000..f86ede5 --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java @@ -0,0 +1,31 @@ +package com.fzzy.mqtt; + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; + + +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGatewayService { + + /** + * 鍙戦�佷俊鎭埌MQTT鏈嶅姟鍣� + * + * @param topic 涓婚 + * @param message 娑堟伅涓讳綋 + */ + void publishMqttWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic); + + /** + * 鍙戦�佷俊鎭埌MQTT鏈嶅姟鍣� + * + * @param topic 涓婚 + * @param qos 瀵规秷鎭鐞嗙殑鍑犵鏈哄埗銆�<br> 0 琛ㄧず鐨勬槸璁㈤槄鑰呮病鏀跺埌娑堟伅涓嶄細鍐嶆鍙戦�侊紝娑堟伅浼氫涪澶便��<br> + * 1 琛ㄧず鐨勬槸浼氬皾璇曢噸璇曪紝涓�鐩村埌鎺ユ敹鍒版秷鎭紝浣嗚繖绉嶆儏鍐靛彲鑳藉鑷磋闃呰�呮敹鍒板娆¢噸澶嶆秷鎭��<br> + * 2 澶氫簡涓�娆″幓閲嶇殑鍔ㄤ綔锛岀‘淇濊闃呰�呮敹鍒扮殑娑堟伅鏈変竴娆°�� + * @param message 娑堟伅涓讳綋 + */ + void publishMqttWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos); + + +} diff --git a/src/main/java/com/fzzy/mqtt/MqttProperties.java b/src/main/java/com/fzzy/mqtt/MqttProperties.java index 4d35f80..d35ea0c 100644 --- a/src/main/java/com/fzzy/mqtt/MqttProperties.java +++ b/src/main/java/com/fzzy/mqtt/MqttProperties.java @@ -33,12 +33,17 @@ /** * 瀹㈡埛绔疘d */ - private String clientId; + private String clientInId; + + /** + * 瀹㈡埛绔疘d + */ + private String clientOutId; /** * 瓒呮椂鏃堕棿 */ - private int timout = 5000; + private int completionTimeout = 5000; /** * 璁剧疆浼氳瘽蹇冭烦鏃堕棿 鍗曚綅涓虹 鏈嶅姟鍣ㄤ細姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风 @@ -59,12 +64,6 @@ * 榛樿杩炴帴涓婚锛屼互/#缁撳熬琛ㄧず璁㈤槄鎵�鏈変互test寮�澶寸殑涓婚 */ private String defaultTopic; - - /** - * 璁剧疆鏄惁娓呯┖session,杩欓噷濡傛灉璁剧疆涓篺alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩� - * 鎺ヨ褰曪紝杩欓噷璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鍒版湇鍔″櫒閮戒互鏂扮殑韬唤杩炴帴 - */ - private Boolean cleanSession; /** * 鏄惁鏂嚎閲嶈繛 diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java b/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java deleted file mode 100644 index 3b4de1c..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java +++ /dev/null @@ -1,39 +0,0 @@ -package com.fzzy.mqtt; - -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class MqttProviderCallBack implements MqttCallback { - - /** - * 瀹㈡埛绔柇寮�杩炴帴鐨勫洖璋� - */ - @Override - public void connectionLost(Throwable throwable) { - System.out.println("涓庢湇鍔″櫒鏂紑杩炴帴锛屽彲閲嶈繛"); - } - - /** - * 娑堟伅鍒拌揪鐨勫洖璋� - */ - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - System.out.println(String.format("鎺ユ敹娑堟伅涓婚 : %s", topic)); - System.out.println(String.format("鎺ユ敹娑堟伅Qos : %d", message.getQos())); - System.out.println(String.format("鎺ユ敹娑堟伅鍐呭 : %s", new String(message.getPayload()))); - System.out.println(String.format("鎺ユ敹娑堟伅retained : %b", message.isRetained())); - } - - /** - * 娑堟伅鍙戝竷鎴愬姛鐨勫洖璋� - */ - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - System.out.println(String.format("鎺ユ敹娑堟伅鎴愬姛")); - } -} diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java index 04cca6a..7369600 100644 --- a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java +++ b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java @@ -4,100 +4,60 @@ import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; import javax.annotation.PostConstruct; +import java.util.Arrays; +import java.util.List; @Configuration @Slf4j +@IntegrationComponentScan public class MqttProviderConfig { @Autowired private MqttProperties mqttProperties; - @Autowired - private MqttProviderCallBack mqttProviderCallBack; - /** - * 瀹㈡埛绔璞� - */ - private MqttClient client; - /** - * - * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒 - */ - @PostConstruct - public void init(){ - connect(); + @Bean + public MqttConnectOptions getReceiverMqttConnectOptionsForSend() { + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(mqttProperties.getUsername()); + mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); + List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(",")); + String[] serverURIs = new String[hostList.size()]; + hostList.toArray(serverURIs); + mqttConnectOptions.setServerURIs(serverURIs); + mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); + return mqttConnectOptions; } - /** - * 瀹㈡埛绔繛鎺ユ湇鍔$ - */ - public void connect(){ - try{ - //鍒涘缓MQTT瀹㈡埛绔璞� - client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientId(),new MemoryPersistence()); - //杩炴帴璁剧疆 - MqttConnectOptions options = new MqttConnectOptions(); - //鏄惁娓呯┖session锛岃缃甪alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛堣闃呬富棰橈紝qos锛�,瀹㈡埛绔噸杩炰箣鍚庤兘鑾峰彇鍒版湇鍔″櫒鍦ㄥ鎴风鏂紑杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅 - //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鏈嶅姟鍣ㄩ兘鏄互鏂扮殑韬唤 - options.setCleanSession(true); - //璁剧疆杩炴帴鐢ㄦ埛鍚� - options.setUserName(mqttProperties.getUsername()); - //璁剧疆杩炴帴瀵嗙爜 - options.setPassword(mqttProperties.getPassword().toCharArray()); - //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉� - options.setConnectionTimeout(100); - //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧 1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎 - options.setKeepAliveInterval(20); - //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭� - options.setWill("willTopic",(mqttProperties.getClientId()+ "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(),0,false); - //璁剧疆鍥炶皟 - client.setCallback(mqttProviderCallBack); - client.connect(options); - } catch(MqttException e){ - e.printStackTrace(); - } + @Bean + public MqttPahoClientFactory receiverMqttClientFactoryForSend() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.setConnectionOptions(getReceiverMqttConnectOptionsForSend()); + return factory; } - public void publish(String topic,String message){ - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setQos(mqttProperties.getQos()); - mqttMessage.setRetained(true); - mqttMessage.setPayload(message.getBytes()); - //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅 - MqttTopic mqttTopic = client.getTopic(topic); - //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴� - //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴� - MqttDeliveryToken token; - try { - //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬� - //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋�� - token = mqttTopic.publish(mqttMessage); - token.waitForCompletion(); - } catch (MqttException e) { - e.printStackTrace(); - } + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") + public MessageHandler mqttOutbound() { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientOutId(), receiverMqttClientFactoryForSend()); + messageHandler.setAsync(false); + messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic()); + return messageHandler; } - public void publish(int qos,boolean retained,String topic,String message){ - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setQos(qos); - mqttMessage.setRetained(retained); - mqttMessage.setPayload(message.getBytes()); - //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅 - MqttTopic mqttTopic = client.getTopic(topic); - //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴� - //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴� - MqttDeliveryToken token; - try { - //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬� - //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋�� - token = mqttTopic.publish(mqttMessage); - token.waitForCompletion(); - } catch (MqttException e) { - e.printStackTrace(); - } + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); } - } diff --git a/src/main/java/com/fzzy/mqtt/MqttPubController.java b/src/main/java/com/fzzy/mqtt/MqttPubController.java deleted file mode 100644 index c03b424..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttPubController.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.fzzy.mqtt; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.ResponseBody; -import org.springframework.web.bind.annotation.RestController; - -@RestController -public class MqttPubController { - - @Autowired - private MqttProviderConfig providerClient; - - - @RequestMapping("/sendMessage") - public @ResponseBody - String sendMessage(String topic, String message) { - try { - providerClient.publish(topic, message); - return "鍙戦�佹垚鍔�"; - } catch (Exception e) { - e.printStackTrace(); - return "鍙戦�佸け璐�"; - } - } -} diff --git a/src/main/java/com/fzzy/mqtt/MqttPublishService.java b/src/main/java/com/fzzy/mqtt/MqttPublishService.java index 136ed84..fdbb845 100644 --- a/src/main/java/com/fzzy/mqtt/MqttPublishService.java +++ b/src/main/java/com/fzzy/mqtt/MqttPublishService.java @@ -26,7 +26,7 @@ //String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report"; String username = mqttProperties.getUsername(); String password = mqttProperties.getPassword(); - String clientid = mqttProperties.getClientId(); + String clientid = mqttProperties.getClientOutId(); String broker = mqttProperties.getHost(); //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"鍦扮绉伴噸\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044 }"; int qos = 0; diff --git a/src/main/resources/application-devGateway.yml b/src/main/resources/application-devGateway.yml index 4774c3a..044667c 100644 --- a/src/main/resources/application-devGateway.yml +++ b/src/main/resources/application-devGateway.yml @@ -72,13 +72,13 @@ mqtt: host: tcp://127.0.0.1:1883 username: admin - password: pwdmqtt.. - client-id: fzzy-customer-igds-api - timeout: 10 - keep-alive-interval: 20 + password: admin123321 + client-inId: fzzy-clientInId-igds-api + client-outId: fzzy-clientInOutId-igds-api + completionTimeout: 3000 + keep-alive-interval: 2 max-connect-times: 5 qos: 0 isOpen: false - default: - topic: testTopic - topics: "/+/+/properties/report,/device-message-sender/+/+" \ No newline at end of file + default-topic: mqtt/+/test1 + topics: /device-message-sender/# \ No newline at end of file -- Gitblit v1.9.3