From a31a452b9999ba3c811c36b5cb1b3ec0c18d037d Mon Sep 17 00:00:00 2001 From: jiazx0107@163.com <jiazx0107@163.com> Date: 星期三, 08 十一月 2023 02:23:38 +0800 Subject: [PATCH] 提交MQTT相关功能 --- src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java | 12 + src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java | 1 src/main/java/com/fzzy/mqtt/MqttProperties.java | 15 + src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java | 91 +++++++++++++ src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java | 2 src/main/java/com/fzzy/gateway/service/GatewayConfService.java | 1 src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java | 102 ++++++++++++++ /dev/null | 25 --- src/main/resources/application-devGateway.yml | 23 +- src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java | 58 ++++++++ src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml | 10 + src/main/java/com/fzzy/mqtt/MqttGatewayService.java | 17 ++ src/main/java/com/fzzy/mqtt/MqttPubController.java | 35 +++++ 13 files changed, 345 insertions(+), 47 deletions(-) diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java b/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java index e98cafa..f9b093f 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java +++ b/src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java @@ -1,19 +1,21 @@ package com.fzzy.gateway.hx2023.service; +import com.bstek.dorado.annotation.Expose; import com.fzzy.gateway.api.GatewayRemoteManager; import com.fzzy.gateway.api.GatewayRemoteService; import com.fzzy.gateway.entity.GatewayConf; import com.fzzy.gateway.service.GatewayConfService; -import lombok.extern.slf4j.Slf4j; + + import org.springframework.stereotype.Component; import javax.annotation.Resource; + import java.util.List; /** * 褰撳墠鎺ュ彛锛屽垵濮嬪寲鐩稿叧 */ -@Slf4j @Component public class ApiInitService { @@ -23,6 +25,10 @@ private GatewayRemoteManager gatewayRemoteManager; + /** + * apiInitService#init + */ + @Expose public void init() { List<GatewayConf> list = confService.listAll(); @@ -38,5 +44,7 @@ } } + + } diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java b/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java index a6088d8..c43f29e 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java +++ b/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java @@ -33,7 +33,6 @@ @Resource private ApiLogRep apiLogRep; - @Resource private GatewayConfService gatewayConfService; diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java new file mode 100644 index 0000000..1025124 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java @@ -0,0 +1,102 @@ +package com.fzzy.gateway.hx2023.websocket; + + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; +import org.springframework.web.bind.annotation.CrossOrigin; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +@Component +@CrossOrigin +@Service +@ServerEndpoint(value = "/mqtt") +public class SocketService { + + /** + * 鐢ㄦ潵瀛樻斁姣忎釜瀹㈡埛绔搴旂殑MyWebSocket瀵硅薄銆� + **/ + private static CopyOnWriteArraySet<SocketService> socketSet = new CopyOnWriteArraySet<>(); + /** + * 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹� + **/ + private Session session; + /** + * 鐢ㄦ埛鍚嶇О + **/ + private String nickname; + /** + * 鐢ㄦ潵璁板綍sessionId鍜岃session杩涜缁戝畾 + **/ + private static Map<String,Session> map = new HashMap<String, Session>(); + + /** + * 杩炴帴寤虹珛鎴愬姛璋冪敤鐨勬柟娉� + */ + @OnOpen + public void onOpen(Session session,@PathParam("nickname") String nickname) { + this.session = session; + this.nickname=nickname; + + map.put(nickname, session); + socketSet.add(this); + + System.out.println("鏈夋柊杩炴帴鍔犲叆:"+nickname+",褰撳墠鍦ㄧ嚎浜烘暟涓�" + socketSet.size()); + } + + /** + * 杩炴帴鍏抽棴璋冪敤鐨勬柟娉� + */ + @OnClose + public void onClose() { + socketSet.remove(this); + List<String> nickname = this.session.getRequestParameterMap().get("nickname"); + for(String nick:nickname) { + map.remove(nick); + } + System.out.println("鏈変竴杩炴帴鍏抽棴锛佸綋鍓嶅湪绾夸汉鏁颁负" + socketSet.size()); + } + + /** + * 鏀跺埌瀹㈡埛绔秷鎭悗璋冪敤鐨勬柟娉� + */ + @OnMessage + public void onMessage(String message, Session session,@PathParam("nickname") String nickname) { + System.out.println("鏉ヨ嚜瀹㈡埛绔殑娑堟伅-->"+nickname+": " + message); + //灏唌qtt鍙戦�佽繃鏉ョ殑鏁版嵁杩斿洖缁欏墠绔� + try { +// JSONObject parse = JSONObject.parse(message); +// Map<String,Object> valuesMap = (Map<String, Object>) parse.getByPath("values"); +// String tag1 = valuesMap.get("tag1").toString(); +// String replace = tag1.replace("\r\n", ""); +// String substring = replace.substring(replace.indexOf(":") + 1).trim().replace(" ",":"); +// String tag2 = valuesMap.get("tag2").toString(); +// String substring1=substring+"tag2:"+tag2; +// String result="{"+substring1+"}"; +// //鍙戦�佺粰鍓嶇锛岀敤浣滈〉闈㈡覆鏌� +// Session fromSession = map.get(nickname); +// fromSession.getAsyncRemote().sendText(result); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 鍙戠敓閿欒鏃惰皟鐢� + */ + @OnError + public void onError(Session session, Throwable error) { + System.out.println("鍙戠敓閿欒"); + error.printStackTrace(); + } + + +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java index 28e5666..9d9756c 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java +++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java @@ -21,7 +21,7 @@ */ @Slf4j @Component -@ServerEndpoint(value = "/mqtt") +//@ServerEndpoint(value = "/mqtt") public class WebSocketMqtt { private static Map<String, Session> sessionPool = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/fzzy/gateway/service/GatewayConfService.java b/src/main/java/com/fzzy/gateway/service/GatewayConfService.java index 81ddd0a..2c8b6b8 100644 --- a/src/main/java/com/fzzy/gateway/service/GatewayConfService.java +++ b/src/main/java/com/fzzy/gateway/service/GatewayConfService.java @@ -64,6 +64,7 @@ gatewayConfRep.delete(data2); return null; } + public void updateCache(GatewayConf conf) { String key = RedisConst.buildKey(RedisConst.KYE_CONF_GATEWAY, conf.getKqdm()); diff --git a/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml b/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml index 033eef2..4a77418 100644 --- a/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml +++ b/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml @@ -166,6 +166,12 @@ <Property name="iconClass">fa fa-search</Property> <Property name="exClassName">toolbar-button-push</Property> </ToolBarButton> + <ToolBarButton> + <Property name="caption">缃戝叧鍒濆鍖�</Property> + <Property name="iconClass">fa fa-search</Property> + <Property name="exClassName">toolbar-button-push</Property> + <Property name="action">ajaxInit</Property> + </ToolBarButton> </ToolBar> <DataGrid id="dgMain"> <Property name="dataSet">dsMain</Property> @@ -377,6 +383,10 @@ <Property name="service">gatewayDeviceService#ajaxTestGrain</Property> <Property name="executingMessage">鍦ㄥ姫鍔涙墽琛屼腑鈥︹��</Property> </AjaxAction> + <AjaxAction id="ajaxInit"> + <Property name="service">apiInitService#init</Property> + <Property name="executingMessage">鍦ㄥ姫鍔涙墽琛屼腑鈥︹��</Property> + </AjaxAction> <Dialog id="dialogWeight"> <Property name="width">400</Property> <Property name="height">300</Property> diff --git a/src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java b/src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java deleted file mode 100644 index 94ace07..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.fzzy.mqtt; - -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * @Description : MQTT鎺ュ彈鏈嶅姟鐨勫洖璋冪被 - */ -@Slf4j -@Component -public class MqttAcceptCallback implements MqttCallbackExtended { - - @Autowired - private MqttAcceptClient mqttAcceptClient; - - @Autowired - private MqttProperties mqttProperties; - - - /** - * 瀹㈡埛绔柇寮�鍚庤Е鍙� - * - * @param throwable - */ - @Override - public void connectionLost(Throwable throwable) { - log.info("杩炴帴鏂紑锛屽彲浠ラ噸杩�"); - if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) { - log.info("銆恊mqx閲嶆柊杩炴帴銆�...................................................."); - mqttAcceptClient.reconnection(); - } - } - - - /** - * 瀹㈡埛绔敹鍒版秷鎭Е鍙� - * - * @param topic 涓婚 - * @param mqttMessage 娑堟伅 - */ - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - log.info("銆愭帴鏀舵秷鎭富棰樸��:" + topic); - log.info("銆愭帴鏀舵秷鎭疩os銆�:" + mqttMessage.getQos()); - log.info("銆愭帴鏀舵秷鎭唴瀹广��:" + new String(mqttMessage.getPayload())); - // int i = 1/0; - } - - - /** - * 鍙戝竷娑堟伅鎴愬姛 - * - * @param token token - */ - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - String[] topics = token.getTopics(); - for (String topic : topics) { - log.info("鍚戜富棰樸��" + topic + "銆戝彂閫佹秷鎭垚鍔燂紒"); - } - try { - MqttMessage message = token.getMessage(); - byte[] payload = message.getPayload(); - String s = new String(payload, "UTF-8"); - log.info("銆愭秷鎭唴瀹广��:" + s); - } catch (Exception e) { - log.error("MqttAcceptCallback deliveryComplete error,message:{}", e.getMessage()); - e.printStackTrace(); - } - } - - /** - * 杩炴帴emq鏈嶅姟鍣ㄥ悗瑙﹀彂 - * - * @param b - * @param s - */ - @Override - public void connectComplete(boolean b, String s) { - log.info("============================= 瀹㈡埛绔��" + MqttAcceptClient.client.getClientId() + "銆戣繛鎺ユ垚鍔燂紒============================="); - // 浠�/#缁撳熬琛ㄧず璁㈤槄鎵�鏈変互test寮�澶寸殑涓婚 - // 璁㈤槄鎵�鏈夋満鏋勪富棰� - mqttAcceptClient.subscribe(mqttProperties.getDefaultTopic(), 0); - } - -} diff --git a/src/main/java/com/fzzy/mqtt/MqttAcceptClient.java b/src/main/java/com/fzzy/mqtt/MqttAcceptClient.java deleted file mode 100644 index a7823ad..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttAcceptClient.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.fzzy.mqtt; - -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -/** - * @Description : MQTT鎺ュ彈鏈嶅姟鐨勫鎴风 - */ -@Slf4j -@Component -public class MqttAcceptClient { - - @Autowired - private MqttAcceptCallback mqttAcceptCallback; - - @Autowired - private MqttProperties mqttProperties; - - public static MqttClient client; - - private static MqttClient getClient() { - return client; - } - - private static void setClient(MqttClient client) { - MqttAcceptClient.client = client; - } - - - /** - * 瀹㈡埛绔繛鎺� - */ - public void connect() { - MqttClient client; - try { - client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(), - new MemoryPersistence()); - - MqttConnectOptions options = new MqttConnectOptions(); - options.setUserName(mqttProperties.getClientUsername()); - options.setPassword(mqttProperties.getClientPassword().toCharArray()); - options.setConnectionTimeout(mqttProperties.getClientTimeout()); - options.setKeepAliveInterval(mqttProperties.getClientAliveTime()); - options.setAutomaticReconnect(mqttProperties.getReconnect()); - options.setCleanSession(mqttProperties.getCleanSession()); - MqttAcceptClient.setClient(client); - // 璁剧疆鍥炶皟 - client.setCallback(mqttAcceptCallback); - client.connect(options); - } catch (Exception e) { - log.error("MqttAcceptClient connect error,message:{}", e.getMessage()); - e.printStackTrace(); - } - } - - /** - * 閲嶆柊杩炴帴 - */ - public void reconnection() { - try { - client.connect(); - } catch (MqttException e) { - log.error("MqttAcceptClient reconnection error,message:{}", e.getMessage()); - e.printStackTrace(); - } - } - - /** - * 璁㈤槄鏌愪釜涓婚 - * - * @param topic 涓婚 - * @param qos 杩炴帴鏂瑰紡 - */ - public void subscribe(String topic, int qos) { - log.info("========================銆愬紑濮嬭闃呬富棰�:" + topic + "銆�========================"); - try { - client.subscribe(topic, qos); - } catch (MqttException e) { - log.error("MqttAcceptClient subscribe error,message:{}", e.getMessage()); - e.printStackTrace(); - } - } - - /** - * 鍙栨秷璁㈤槄鏌愪釜涓婚 - * - * @param topic - */ - public void unsubscribe(String topic) { - log.info("========================銆愬彇娑堣闃呬富棰�:" + topic + "銆�========================"); - try { - client.unsubscribe(topic); - } catch (MqttException e) { - log.error("MqttAcceptClient unsubscribe error,message:{}", e.getMessage()); - e.printStackTrace(); - } - } - -} diff --git a/src/main/java/com/fzzy/mqtt/MqttCondition.java b/src/main/java/com/fzzy/mqtt/MqttCondition.java deleted file mode 100644 index f335e7b..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttCondition.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.fzzy.mqtt; - -import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; -import org.springframework.context.annotation.Condition; -import org.springframework.context.annotation.ConditionContext; -import org.springframework.core.env.Environment; -import org.springframework.core.type.AnnotatedTypeMetadata; - -/** - * @Description : 鑷畾涔夐厤缃�,閫氳繃杩欎釜閰嶇疆锛屾潵鎺у埗鍚姩椤圭洰鐨勬椂鍊欐槸鍚﹀惎鍔╩qtt - */ -public class MqttCondition implements Condition { - - @Override - public boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) { - //1銆佽兘鑾峰彇鍒癷oc浣跨敤鐨刡eanfactory - ConfigurableListableBeanFactory beanFactory = context.getBeanFactory(); - //2銆佽幏鍙栫被鍔犺浇鍣� - ClassLoader classLoader = context.getClassLoader(); - //3銆佽幏鍙栧綋鍓嶇幆澧冧俊鎭� - Environment environment = context.getEnvironment(); - String isOpen = environment.getProperty("mqtt.isOpen"); - return Boolean.valueOf(isOpen); - } -} diff --git a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java new file mode 100644 index 0000000..caed4e8 --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java @@ -0,0 +1,17 @@ +package com.fzzy.mqtt; + + +import org.springframework.integration.annotation.MessagingGateway; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +/** + * 鎺ㄩ�佹帴鍙� + */ +@Service +@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") +public interface MqttGatewayService { + + void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic01, String sendData); +} diff --git a/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java b/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java new file mode 100644 index 0000000..874464c --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java @@ -0,0 +1,91 @@ +package com.fzzy.mqtt; + +import com.fzzy.gateway.hx2023.websocket.SocketService; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; + +/** + * MQTT娑堣垂绔� + */ +@Slf4j +@Configuration +@IntegrationComponentScan +public class MqttInboundConfiguration { + + @Autowired + private MqttProperties mqttProperties; + + @Bean + public MessageChannel mqttInputChannel() { + return new DirectChannel(); + } + + @Bean + public MqttPahoClientFactory receiverMqttClientFactoryForSub() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + String[] array = mqttProperties.getHost().split(","); + MqttConnectOptions options = new MqttConnectOptions(); + options.setServerURIs(array); + options.setUserName(mqttProperties.getClientUsername()); + options.setPassword(mqttProperties.getClientPassword().toCharArray()); + options.setKeepAliveInterval(mqttProperties.getClientAliveTime()); + //鎺ュ彈绂荤嚎娑堟伅 + options.setCleanSession(false); + options.setMqttVersion(4); + + factory.setConnectionOptions(options); + + return factory; + } + + //閰嶇疆client,鐩戝惉鐨則opic + @Bean + public MessageProducer inbound() { + String[] inboundTopics = mqttProperties.getClientTopics().split(","); + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( + mqttProperties.getClientId() + "_inbound", receiverMqttClientFactoryForSub(), inboundTopics); //瀵筰nboundTopics涓婚杩涜鐩戝惉 + adapter.setCompletionTimeout(mqttProperties.getClientTimeout()); + adapter.setQos(1); + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setOutputChannel(mqttInputChannel()); + return adapter; + } + + //閫氳繃閫氶亾鑾峰彇鏁版嵁 + @Bean + @ServiceActivator(inputChannel = "mqttInputChannel") //寮傛澶勭悊 + public MessageHandler handler() { + return new MessageHandler() { + @Override + public void handleMessage(Message<?> message) throws MessagingException { + log.info("----------------------"); + //鑾峰彇mqtt鐨則opic + String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); + //浣跨敤webSocket杩斿洖缁欏墠绔� + SocketService socketService = new SocketService(); + socketService.onMessage(message.getPayload().toString(), null, topic); + + log.info("message:" + message.getPayload()); + log.info("PacketId:" + message.getHeaders().getId()); + log.info("Qos:" + message.getHeaders().get(MqttHeaders.QOS)); + log.info("topic:" + topic); + } + }; + } +} diff --git a/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java b/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java new file mode 100644 index 0000000..ccbd0f1 --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java @@ -0,0 +1,58 @@ +package com.fzzy.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.annotation.IntegrationComponentScan; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * MQTT鐢熶骇绔� + */ +@Slf4j +@Configuration +@IntegrationComponentScan +public class MqttOutboundConfiguration { + @Autowired + private MqttProperties mqttProperties; + + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + @Bean + public MqttPahoClientFactory getReceiverMqttConnectOptionsForSend() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + String[] array = mqttProperties.getHost().split(","); + MqttConnectOptions options = new MqttConnectOptions(); + options.setServerURIs(array); + + if(null == mqttProperties.getClientUsername()) mqttProperties.setClientUsername(" "); + if(null == mqttProperties.getClientPassword()) mqttProperties.setClientPassword(" "); + options.setUserName(mqttProperties.getClientUsername()); + options.setPassword(mqttProperties.getClientPassword().toCharArray()); + // 鎺ュ彈绂荤嚎娑堟伅 + options.setCleanSession(false); //鍛婅瘔浠g悊瀹㈡埛绔槸鍚﹁寤虹珛鎸佷箙浼氳瘽 false涓哄缓绔嬫寔涔呬細璇� + options.setMqttVersion(4); + factory.setConnectionOptions(options); + return factory; + } + + @Bean + @ServiceActivator(inputChannel = "mqttOutboundChannel") + public MessageHandler mqttOutbound() { + MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "outbound", getReceiverMqttConnectOptionsForSend()); + messageHandler.setAsync(true); + return messageHandler; + } + +} diff --git a/src/main/java/com/fzzy/mqtt/MqttProperties.java b/src/main/java/com/fzzy/mqtt/MqttProperties.java index 8a7a554..a7293c9 100644 --- a/src/main/java/com/fzzy/mqtt/MqttProperties.java +++ b/src/main/java/com/fzzy/mqtt/MqttProperties.java @@ -1,15 +1,17 @@ package com.fzzy.mqtt; import lombok.Data; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * MQTT 閰嶇疆淇℃伅 */ -@Component -@ConfigurationProperties("mqtt") +@Slf4j @Data +@Component +@ConfigurationProperties("spring.mqtt") public class MqttProperties { /** @@ -37,13 +39,13 @@ /** * 瓒呮椂鏃堕棿 */ - private int clientTimeout; + private int clientTimeout = 5000; /** * 璁剧疆浼氳瘽蹇冭烦鏃堕棿 鍗曚綅涓虹 鏈嶅姟鍣ㄤ細姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风 * 鍙戦�佷釜娑堟伅鍒ゆ柇瀹㈡埛绔槸鍚﹀湪绾匡紝浣嗚繖涓柟娉曞苟娌℃湁閲嶈繛鐨勬満鍒� */ - private int clientAliveTime; + private int clientAliveTime = 30000; private int clientMaxConnectTime; @@ -52,12 +54,12 @@ /** * 杩炴帴鏂瑰紡 */ - private Integer clientQos; + private Integer clientQos = 0; /** * 榛樿杩炴帴涓婚锛屼互/#缁撳熬琛ㄧず璁㈤槄鎵�鏈変互test寮�澶寸殑涓婚 */ - private String defaultTopic; + private String defaultTopic; /** * 璁剧疆鏄惁娓呯┖session,杩欓噷濡傛灉璁剧疆涓篺alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩� @@ -75,4 +77,5 @@ */ private Boolean isOpen; + } diff --git a/src/main/java/com/fzzy/mqtt/MqttPubController.java b/src/main/java/com/fzzy/mqtt/MqttPubController.java new file mode 100644 index 0000000..644928d --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttPubController.java @@ -0,0 +1,35 @@ +package com.fzzy.mqtt; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class MqttPubController { + + @Autowired + private MqttGatewayService gatewayService; + + + @RequestMapping("/hello") + public String hello() { + return "hello!"; + } + + @RequestMapping("/sendMqtt") + public String sendMqtt(String sendData) { + System.out.println(sendData); + System.out.println("杩涘叆sendMqtt-------" + sendData); + gatewayService.sendToMqtt("topic01", (String) sendData); + return "Test is OK"; + } + + @RequestMapping("/sendMqttTopic") + public String sendMqtt(String sendData, String topic) { + //System.out.println(sendData+" "+topic); + //System.out.println("杩涘叆inbound鍙戦�侊細"+sendData); + gatewayService.sendToMqtt(topic, (String) sendData); + return "Test is OK"; + } + +} diff --git a/src/main/resources/application-devGateway.yml b/src/main/resources/application-devGateway.yml index cc2c8df..09d7b84 100644 --- a/src/main/resources/application-devGateway.yml +++ b/src/main/resources/application-devGateway.yml @@ -69,15 +69,14 @@ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer - -mqtt: - host: tcp://10.13.4.84:11883 - client-id: - client-username: - client-password: - client-timeout: 10 - client-alive-time: 20 - client-max-connect-times: 5 - client-topics: - client-qos: 0 - isOpen: false + mqtt: + host: tcp://10.13.4.84:11883 + client-id: + client-username: admin + client-password: 123456 + client-timeout: 10 + client-alive-time: 20 + client-max-connect-times: 5 + client-topics: "/device/${productId}/${deviceId}/message/property/report,/device-message-sender/${productId}/${deviceId}" + client-qos: 0 + isOpen: false -- Gitblit v1.9.3