From bf57ab9e4db58dbee018268dd8b593ee564bc7ee Mon Sep 17 00:00:00 2001 From: vince <757871790@qq.com> Date: 星期四, 09 十一月 2023 12:07:09 +0800 Subject: [PATCH] Merge remote-tracking branch 'orgin/igds-api-gateway' into igds-api-gateway --- src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java | 147 +++++++++++++++++++++++++----------------------- 1 files changed, 76 insertions(+), 71 deletions(-) diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java index 076e3f7..854d9e5 100644 --- a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java +++ b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java @@ -1,93 +1,98 @@ package com.fzzy.mqtt; +import com.fzzy.gateway.hx2023.service.OnReceiveMqttService; import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; -import javax.annotation.PostConstruct; +import java.util.Arrays; +import java.util.List; @Configuration @Slf4j +@IntegrationComponentScan public class MqttConsumerConfig { @Autowired - private MqttProperties mqttProperties; + private OnReceiveMqttService onReceiveMqttService; + @Autowired - private MqttConsumerCallBack mqttConsumerCallBack; + private MqttProperties mqttProperties; - /** - * 瀹㈡埛绔璞� - */ - private MqttClient client; - - /** - * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒 - */ - @PostConstruct - public void init() { - connect(); + @Bean + public MqttConnectOptions getReceiverMqttConnectOptionsForSub() { + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(mqttProperties.getUsername()); + mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); + List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(",")); + String[] serverURIs = new String[hostList.size()]; + hostList.toArray(serverURIs); + mqttConnectOptions.setServerURIs(serverURIs); + mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); + return mqttConnectOptions; } - /** - * 瀹㈡埛绔繛鎺ユ湇鍔$ - */ - public void connect() { - try { - //鍒涘缓MQTT瀹㈡埛绔璞� - client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence()); - //杩炴帴璁剧疆 - MqttConnectOptions options = new MqttConnectOptions(); - //鏄惁娓呯┖session锛岃缃负false琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛屽鎴风閲嶈繛涔嬪悗鑳借幏鍙栧埌鏈嶅姟鍣ㄥ湪瀹㈡埛绔柇寮�杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅 - //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鍒版湇鍔$閮芥槸浠ユ柊鐨勮韩浠� - options.setCleanSession(true); - //璁剧疆杩炴帴鐢ㄦ埛鍚� - options.setUserName(mqttProperties.getUsername()); - //璁剧疆杩炴帴瀵嗙爜 - options.setPassword(mqttProperties.getPassword().toCharArray()); - //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉� - options.setConnectionTimeout(10); - //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎 - options.setKeepAliveInterval(20); - //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭� - options.setWill("willTopic", (mqttProperties.getClientId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(), 0, false); - //璁剧疆鍥炶皟 - // client.setCallback(new MqttConsumerCallBack()); - client.setCallback(mqttConsumerCallBack); - client.connect(options); - //璁㈤槄涓婚 - //娑堟伅绛夌骇锛屽拰涓婚鏁扮粍涓�涓�瀵瑰簲锛屾湇鍔$灏嗘寜鐓ф寚瀹氱瓑绾х粰璁㈤槄浜嗕富棰樼殑瀹㈡埛绔帹閫佹秷鎭� - int[] qos = {1, 1}; - //涓婚 - String[] topics = mqttProperties.getTopics().split(","); - //璁㈤槄涓婚 - client.subscribe(topics, qos); - } catch (MqttException e) { - e.printStackTrace(); - } + @Bean + public MqttPahoClientFactory receiverMqttClientFactoryForSub() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.setConnectionOptions(getReceiverMqttConnectOptionsForSub()); + return factory; } - /** - * 鏂紑杩炴帴 - */ - public void disConnect() { - try { - client.disconnect(); - } catch (MqttException e) { - e.printStackTrace(); - } + //鎺ユ敹閫氶亾 + @Bean + public MessageChannel mqttInputChannel() { + return new DirectChannel(); } + //閰嶇疆client,鐩戝惉鐨則opic + @Bean + public MessageProducer inbound() { - /** - * 璁㈤槄涓婚 - */ - public void subscribe(String topic, int qos) { - try { - client.subscribe(topic, qos); - } catch (MqttException e) { - e.printStackTrace(); - } + // List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); + String[] topics = mqttProperties.getTopics().split(","); + //topicList.toArray(topics); + + MqttPahoMessageDrivenChannelAdapter adapter = + new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientInId(), receiverMqttClientFactoryForSub(), topics); + adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout()); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setQos(mqttProperties.getQos()); + adapter.setOutputChannel(mqttInputChannel()); + return adapter; } -} + + //閫氳繃閫氶亾鑾峰彇鏁版嵁 + @Bean + @ServiceActivator(inputChannel = "mqttInputChannel") + public MessageHandler handler() { + return new MessageHandler() { + @Override + public void handleMessage(Message<?> message) throws MessagingException { + String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); + String msg = message.getPayload().toString(); + + // 杩欓噷鍙互澶勭悊鎺ユ敹鐨勬暟鎹� + log.info("----------------------------鏀跺埌璁㈤槄鍐呭---------------------------"); + log.info("-----TOPIC-----{}", topic); + log.info("-----Message-----{}", msg); + + onReceiveMqttService.onReceiveMessage(topic, msg); + } + }; + } +} \ No newline at end of file -- Gitblit v1.9.3