From bf57ab9e4db58dbee018268dd8b593ee564bc7ee Mon Sep 17 00:00:00 2001 From: vince <757871790@qq.com> Date: 星期四, 09 十一月 2023 12:07:09 +0800 Subject: [PATCH] Merge remote-tracking branch 'orgin/igds-api-gateway' into igds-api-gateway --- src/main/java/com/fzzy/mqtt/MqttProviderConfig.java | 116 +++++++++++++++++++--------------------------------------- 1 files changed, 38 insertions(+), 78 deletions(-) diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java index 04cca6a..7369600 100644 --- a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java +++ b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java @@ -4,100 +4,60 @@ import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; import javax.annotation.PostConstruct; +import java.util.Arrays; +import java.util.List; @Configuration @Slf4j +@IntegrationComponentScan public class MqttProviderConfig { @Autowired private MqttProperties mqttProperties; - @Autowired - private MqttProviderCallBack mqttProviderCallBack; - /** - * 瀹㈡埛绔璞� - */ - private MqttClient client; - /** - * - * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒 - */ - @PostConstruct - public void init(){ - connect(); + @Bean + public MqttConnectOptions getReceiverMqttConnectOptionsForSend() { + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(mqttProperties.getUsername()); + mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); + List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(",")); + String[] serverURIs = new String[hostList.size()]; + hostList.toArray(serverURIs); + mqttConnectOptions.setServerURIs(serverURIs); + mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); + return mqttConnectOptions; } - /** - * 瀹㈡埛绔繛鎺ユ湇鍔$ - */ - public void connect(){ - try{ - //鍒涘缓MQTT瀹㈡埛绔璞� - client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientId(),new MemoryPersistence()); - //杩炴帴璁剧疆 - MqttConnectOptions options = new MqttConnectOptions(); - //鏄惁娓呯┖session锛岃缃甪alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛堣闃呬富棰橈紝qos锛�,瀹㈡埛绔噸杩炰箣鍚庤兘鑾峰彇鍒版湇鍔″櫒鍦ㄥ鎴风鏂紑杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅 - //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鏈嶅姟鍣ㄩ兘鏄互鏂扮殑韬唤 - options.setCleanSession(true); - //璁剧疆杩炴帴鐢ㄦ埛鍚� - options.setUserName(mqttProperties.getUsername()); - //璁剧疆杩炴帴瀵嗙爜 - options.setPassword(mqttProperties.getPassword().toCharArray()); - //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉� - options.setConnectionTimeout(100); - //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧 1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎 - options.setKeepAliveInterval(20); - //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭� - options.setWill("willTopic",(mqttProperties.getClientId()+ "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(),0,false); - //璁剧疆鍥炶皟 - client.setCallback(mqttProviderCallBack); - client.connect(options); - } catch(MqttException e){ - e.printStackTrace(); - } + @Bean + public MqttPahoClientFactory receiverMqttClientFactoryForSend() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + factory.setConnectionOptions(getReceiverMqttConnectOptionsForSend()); + return factory; } - public void publish(String topic,String message){ - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setQos(mqttProperties.getQos()); - mqttMessage.setRetained(true); - mqttMessage.setPayload(message.getBytes()); - //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅 - MqttTopic mqttTopic = client.getTopic(topic); - //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴� - //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴� - MqttDeliveryToken token; - try { - //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬� - //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋�� - token = mqttTopic.publish(mqttMessage); - token.waitForCompletion(); - } catch (MqttException e) { - e.printStackTrace(); - } + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") + public MessageHandler mqttOutbound() { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientOutId(), receiverMqttClientFactoryForSend()); + messageHandler.setAsync(false); + messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic()); + return messageHandler; } - public void publish(int qos,boolean retained,String topic,String message){ - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setQos(qos); - mqttMessage.setRetained(retained); - mqttMessage.setPayload(message.getBytes()); - //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅 - MqttTopic mqttTopic = client.getTopic(topic); - //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴� - //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴� - MqttDeliveryToken token; - try { - //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬� - //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋�� - token = mqttTopic.publish(mqttMessage); - token.waitForCompletion(); - } catch (MqttException e) { - e.printStackTrace(); - } + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); } - } -- Gitblit v1.9.3