src/main/java/com/fzzy/gateway/GatewayRunner.java
@@ -36,6 +36,9 @@ //è·åæ°è±¡ä¿¡æ¯ scheduled.doWeatherExe(); //æ´æ°è®¾å¤ç¼å apiInitService.updateDeviceCache(); } } src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
@@ -7,6 +7,7 @@ import com.fzzy.gateway.service.GatewayConfService; import com.fzzy.gateway.service.GatewayDeviceService; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -21,6 +22,8 @@ @Resource private GatewayConfService confService; @Resource private GatewayDeviceService deviceService; @Resource private GatewayRemoteManager gatewayRemoteManager; @@ -46,5 +49,7 @@ } public void updateDeviceCache() { deviceService.flushCache(); } } src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java
@@ -8,6 +8,7 @@ import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.CloudSendData; import com.fzzy.gateway.hx2023.data.SyncReqData; import com.fzzy.mqtt.MqttGatewayService; import com.fzzy.mqtt.MqttProviderConfig; import com.fzzy.mqtt.MqttPublishService; import lombok.extern.slf4j.Slf4j; @@ -25,7 +26,7 @@ @Resource private GatewayRemoteManager gatewayRemoteManager; @Resource private MqttPublishService publishService; private MqttGatewayService mqttGatewayService; /** @@ -33,7 +34,7 @@ * * @param message */ public void onReceiveMessage(String message) { public void onReceiveMessage(String topic,String message) { try { CloudSendData cloudSendData = JSONObject.parseObject(message, CloudSendData.class); @@ -47,7 +48,7 @@ } } catch (Exception e) { log.error("--------æ§è¡å¼å¸¸-----{}",e); } } @@ -74,9 +75,11 @@ topic = topic.replace("${productId}", device.getProductId()).replace("${deviceId}", device.getDeviceId()); publishService.publishMsg(topic, resp.getData()); mqttGatewayService.publishMqttWithTopic(resp.getData(), topic); log.info("=======ç²®æ æ¨é==========={}", resp.getData()); log.info("----------------------------æ¨éMQTTä¿¡æ¯---------------------------"); log.info("-----TOPIC-----{}",topic); log.info("-----Message-----{}",resp.getData()); } } } src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
@@ -11,9 +11,11 @@ import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.SyncReqData; import com.fzzy.gateway.service.repository.GatewayDeviceRep; import com.fzzy.mqtt.MqttGatewayService; import com.fzzy.mqtt.MqttProviderConfig; import com.fzzy.mqtt.MqttPublishService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.data.domain.Sort; import org.springframework.stereotype.Component; @@ -31,7 +33,7 @@ @Resource private GatewayRemoteManager gatewayRemoteManager; @Resource private MqttPublishService publishService; private MqttGatewayService publishService; public List<GatewayDevice> listAll() { Sort sort = new Sort(Sort.Direction.ASC, "deviceId"); @@ -86,6 +88,7 @@ @Expose public String ajaxTestGrain2(GatewayDevice data) { SyncReqData reqData = new SyncReqData(); reqData.setDevice(data); reqData.setAutoReplay(true); @@ -98,6 +101,10 @@ return "ERRORï¼å½å设å¤éç²®æ 设å¤ä¸æ¯æå½åæä½"; } if(StringUtils.isEmpty(data.getCableRule())){ return "ERRORï¼å½åè®¾å¤æ²¡æé ç½®å¸çº¿è§åï¼æ æ³æ§è¡"; } BaseResp resp = gatewayRemoteManager.getSyncGrainService(data.getSyncProtocol()).syncGrain2(reqData); //èªå¨æ¨é @@ -105,10 +112,13 @@ String topic = ScConstant.TOPIC_REPORT; topic = topic.replace("${productId}", data.getProductId()).replace("${deviceId}", data.getDeviceId()); publishService.publishMsg(topic, resp.getData()); } publishService.publishMqttWithTopic(resp.getData(),topic); log.info("=======æå¨æµè¯ç²®æ æ¨é==========={}", resp.getData()); log.info("----------------------------æå¨æ¨éMQTTç²®æ ä¿¡æ¯---------------------------"); log.info("-----TOPIC-----{}",topic); log.info("-----Message-----{}",resp.getData()); } return "SUCCESSï¼æ§è¡å®æ"; } src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
@@ -1,93 +1,98 @@ package com.fzzy.mqtt; import com.fzzy.gateway.hx2023.service.OnReceiveMqttService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import javax.annotation.PostConstruct; import java.util.Arrays; import java.util.List; @Configuration @Slf4j @IntegrationComponentScan public class MqttConsumerConfig { @Autowired private MqttProperties mqttProperties; private OnReceiveMqttService onReceiveMqttService; @Autowired private MqttConsumerCallBack mqttConsumerCallBack; private MqttProperties mqttProperties; /** * 客æ·ç«¯å¯¹è±¡ */ private MqttClient client; /** * å¨beanåå§ååè¿æ¥å°æå¡å¨ */ @PostConstruct public void init() { connect(); @Bean public MqttConnectOptions getReceiverMqttConnectOptionsForSub() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(mqttProperties.getUsername()); mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(",")); String[] serverURIs = new String[hostList.size()]; hostList.toArray(serverURIs); mqttConnectOptions.setServerURIs(serverURIs); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); return mqttConnectOptions; } /** * 客æ·ç«¯è¿æ¥æå¡ç«¯ */ public void connect() { try { //å建MQTT客æ·ç«¯å¯¹è±¡ client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence()); //è¿æ¥è®¾ç½® MqttConnectOptions options = new MqttConnectOptions(); //æ¯å¦æ¸ 空sessionï¼è®¾ç½®ä¸ºfalse表示æå¡å¨ä¼ä¿ç客æ·ç«¯çè¿æ¥è®°å½ï¼å®¢æ·ç«¯éè¿ä¹åè½è·åå°æå¡å¨å¨å®¢æ·ç«¯æå¼è¿æ¥æé´æ¨éçæ¶æ¯ //设置为trueè¡¨ç¤ºæ¯æ¬¡è¿æ¥å°æå¡ç«¯é½æ¯ä»¥æ°ç身份 options.setCleanSession(true); //è®¾ç½®è¿æ¥ç¨æ·å options.setUserName(mqttProperties.getUsername()); //è®¾ç½®è¿æ¥å¯ç options.setPassword(mqttProperties.getPassword().toCharArray()); //è®¾ç½®è¶ æ¶æ¶é´ï¼åä½ä¸ºç§ options.setConnectionTimeout(10); //设置å¿è·³æ¶é´ åä½ä¸ºç§ï¼è¡¨ç¤ºæå¡å¨æ¯é1.5*20ç§çæ¶é´å客æ·ç«¯åéå¿è·³å¤æå®¢æ·ç«¯æ¯å¦å¨çº¿ options.setKeepAliveInterval(20); //设置é屿¶æ¯çè¯é¢ï¼è¥å®¢æ·ç«¯åæå¡å¨ä¹é´çè¿æ¥æå¤æå¼ï¼æå¡å¨å°åå¸å®¢æ·ç«¯çéå±ä¿¡æ¯ options.setWill("willTopic", (mqttProperties.getClientId() + "䏿å¡å¨æå¼è¿æ¥").getBytes(), 0, false); //设置åè° // client.setCallback(new MqttConsumerCallBack()); client.setCallback(mqttConsumerCallBack); client.connect(options); //订é ä¸»é¢ //æ¶æ¯ç级ï¼å䏻颿°ç»ä¸ä¸å¯¹åºï¼æå¡ç«¯å°æç §æå®ç级ç»è®¢é äºä¸»é¢ç客æ·ç«¯æ¨éæ¶æ¯ int[] qos = {1, 1}; //ä¸»é¢ @Bean public MqttPahoClientFactory receiverMqttClientFactoryForSub() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptionsForSub()); return factory; } //æ¥æ¶éé @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } //é ç½®client,çå¬çtopic @Bean public MessageProducer inbound() { // List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); String[] topics = mqttProperties.getTopics().split(","); //订é ä¸»é¢ client.subscribe(topics, qos); } catch (MqttException e) { e.printStackTrace(); } //topicList.toArray(topics); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientInId(), receiverMqttClientFactoryForSub(), topics); adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout()); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(mqttProperties.getQos()); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * æå¼è¿æ¥ */ public void disConnect() { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } //éè¿ééè·åæ°æ® @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String msg = message.getPayload().toString(); // è¿éå¯ä»¥å¤çæ¥æ¶çæ°æ® log.info("----------------------------æ¶å°è®¢é å 容---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", msg); /** * 订é ä¸»é¢ */ public void subscribe(String topic, int qos) { try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); onReceiveMqttService.onReceiveMessage(topic, msg); } }; } } src/main/java/com/fzzy/mqtt/MqttController.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/mqtt/MqttGatewayService.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,31 @@ package com.fzzy.mqtt; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGatewayService { /** * åéä¿¡æ¯å°MQTTæå¡å¨ * * @param topic ä¸»é¢ * @param message æ¶æ¯ä¸»ä½ */ void publishMqttWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic); /** * åéä¿¡æ¯å°MQTTæå¡å¨ * * @param topic ä¸»é¢ * @param qos å¯¹æ¶æ¯å¤ççå ç§æºå¶ã<br> 0 è¡¨ç¤ºçæ¯è®¢é è æ²¡æ¶å°æ¶æ¯ä¸ä¼å次åéï¼æ¶æ¯ä¼ä¸¢å¤±ã<br> * 1 è¡¨ç¤ºçæ¯ä¼å°è¯éè¯ï¼ä¸ç´å°æ¥æ¶å°æ¶æ¯ï¼ä½è¿ç§æ åµå¯è½å¯¼è´è®¢é è æ¶å°å¤æ¬¡é夿¶æ¯ã<br> * 2 å¤äºä¸æ¬¡å»éçå¨ä½ï¼ç¡®ä¿è®¢é è æ¶å°çæ¶æ¯æä¸æ¬¡ã * @param message æ¶æ¯ä¸»ä½ */ void publishMqttWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos); } src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -33,12 +33,17 @@ /** * 客æ·ç«¯Id */ private String clientId; private String clientInId; /** * 客æ·ç«¯Id */ private String clientOutId; /** * è¶ æ¶æ¶é´ */ private int timout = 5000; private int completionTimeout = 5000; /** * 设置ä¼è¯å¿è·³æ¶é´ åä½ä¸ºç§ æå¡å¨ä¼æ¯é1.5*20ç§çæ¶é´å客æ·ç«¯ @@ -59,12 +64,6 @@ * é»è®¤è¿æ¥ä¸»é¢ï¼ä»¥/#ç»å°¾è¡¨ç¤ºè®¢é ææä»¥testå¼å¤´çä¸»é¢ */ private String defaultTopic; /** * 设置æ¯å¦æ¸ 空session,è¿éå¦æè®¾ç½®ä¸ºfalse表示æå¡å¨ä¼ä¿ç客æ·ç«¯çè¿ * æ¥è®°å½ï¼è¿é设置为trueè¡¨ç¤ºæ¯æ¬¡è¿æ¥å°æå¡å¨é½ä»¥æ°çèº«ä»½è¿æ¥ */ private Boolean cleanSession; /** * æ¯å¦æçº¿éè¿ src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
@@ -4,100 +4,60 @@ import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import javax.annotation.PostConstruct; import java.util.Arrays; import java.util.List; @Configuration @Slf4j @IntegrationComponentScan public class MqttProviderConfig { @Autowired private MqttProperties mqttProperties; @Autowired private MqttProviderCallBack mqttProviderCallBack; /** * 客æ·ç«¯å¯¹è±¡ */ private MqttClient client; /** * * å¨beanåå§ååè¿æ¥å°æå¡å¨ */ @PostConstruct public void init(){ connect(); @Bean public MqttConnectOptions getReceiverMqttConnectOptionsForSend() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(mqttProperties.getUsername()); mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(",")); String[] serverURIs = new String[hostList.size()]; hostList.toArray(serverURIs); mqttConnectOptions.setServerURIs(serverURIs); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); return mqttConnectOptions; } /** * 客æ·ç«¯è¿æ¥æå¡ç«¯ */ public void connect(){ try{ //å建MQTT客æ·ç«¯å¯¹è±¡ client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientId(),new MemoryPersistence()); //è¿æ¥è®¾ç½® MqttConnectOptions options = new MqttConnectOptions(); //æ¯å¦æ¸ 空sessionï¼è®¾ç½®false表示æå¡å¨ä¼ä¿ç客æ·ç«¯çè¿æ¥è®°å½ï¼è®¢é 主é¢ï¼qosï¼,客æ·ç«¯éè¿ä¹åè½è·åå°æå¡å¨å¨å®¢æ·ç«¯æå¼è¿æ¥æé´æ¨éçæ¶æ¯ //设置为trueè¡¨ç¤ºæ¯æ¬¡è¿æ¥æå¡å¨é½æ¯ä»¥æ°ç身份 options.setCleanSession(true); //è®¾ç½®è¿æ¥ç¨æ·å options.setUserName(mqttProperties.getUsername()); //è®¾ç½®è¿æ¥å¯ç options.setPassword(mqttProperties.getPassword().toCharArray()); //è®¾ç½®è¶ æ¶æ¶é´ï¼åä½ä¸ºç§ options.setConnectionTimeout(100); //设置å¿è·³æ¶é´ åä½ä¸ºç§ï¼è¡¨ç¤ºæå¡å¨æ¯é 1.5*20ç§çæ¶é´å客æ·ç«¯åéå¿è·³å¤æå®¢æ·ç«¯æ¯å¦å¨çº¿ options.setKeepAliveInterval(20); //设置é屿¶æ¯çè¯é¢ï¼è¥å®¢æ·ç«¯åæå¡å¨ä¹é´çè¿æ¥æå¤æå¼ï¼æå¡å¨å°åå¸å®¢æ·ç«¯çéå±ä¿¡æ¯ options.setWill("willTopic",(mqttProperties.getClientId()+ "䏿å¡å¨æå¼è¿æ¥").getBytes(),0,false); //设置åè° client.setCallback(mqttProviderCallBack); client.connect(options); } catch(MqttException e){ e.printStackTrace(); } @Bean public MqttPahoClientFactory receiverMqttClientFactoryForSend() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getReceiverMqttConnectOptionsForSend()); return factory; } public void publish(String topic,String message){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(mqttProperties.getQos()); mqttMessage.setRetained(true); mqttMessage.setPayload(message.getBytes()); //主é¢çç®çå°ï¼ç¨äºåå¸/订é ä¿¡æ¯ MqttTopic mqttTopic = client.getTopic(topic); //æä¾ä¸ç§æºå¶æ¥è·è¸ªæ¶æ¯çä¼ éè¿åº¦ //ç¨äºå¨ä»¥éé»å¡æ¹å¼ï¼å¨åå°è¿è¡ï¼æ§è¡å叿¯è·è¸ªæ¶æ¯çä¼ éè¿åº¦ MqttDeliveryToken token; try { //尿宿¶æ¯åå¸å°ä¸»é¢ï¼ä½ä¸çå¾ æ¶æ¯ä¼ é宿ï¼è¿åçtokenå¯ç¨äºè·è¸ªæ¶æ¯çä¼ éç¶æ //䏿¦æ¤æ¹æ³å¹²åå°è¿åï¼æ¶æ¯å°±å·²è¢«å®¢æ·ç«¯æ¥ååå¸ï¼å½è¿æ¥å¯ç¨ï¼å°å¨åå°å®ææ¶æ¯ä¼ éã token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientOutId(), receiverMqttClientFactoryForSend()); messageHandler.setAsync(false); messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic()); return messageHandler; } public void publish(int qos,boolean retained,String topic,String message){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(message.getBytes()); //主é¢çç®çå°ï¼ç¨äºåå¸/订é ä¿¡æ¯ MqttTopic mqttTopic = client.getTopic(topic); //æä¾ä¸ç§æºå¶æ¥è·è¸ªæ¶æ¯çä¼ éè¿åº¦ //ç¨äºå¨ä»¥éé»å¡æ¹å¼ï¼å¨åå°è¿è¡ï¼æ§è¡å叿¯è·è¸ªæ¶æ¯çä¼ éè¿åº¦ MqttDeliveryToken token; try { //尿宿¶æ¯åå¸å°ä¸»é¢ï¼ä½ä¸çå¾ æ¶æ¯ä¼ é宿ï¼è¿åçtokenå¯ç¨äºè·è¸ªæ¶æ¯çä¼ éç¶æ //䏿¦æ¤æ¹æ³å¹²åå°è¿åï¼æ¶æ¯å°±å·²è¢«å®¢æ·ç«¯æ¥ååå¸ï¼å½è¿æ¥å¯ç¨ï¼å°å¨åå°å®ææ¶æ¯ä¼ éã token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } } } src/main/java/com/fzzy/mqtt/MqttPubController.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/mqtt/MqttPublishService.java
@@ -26,7 +26,7 @@ //String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report"; String username = mqttProperties.getUsername(); String password = mqttProperties.getPassword(); String clientid = mqttProperties.getClientId(); String clientid = mqttProperties.getClientOutId(); String broker = mqttProperties.getHost(); //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"å°ç£ ç§°é\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044 }"; int qos = 0; src/main/resources/application-devGateway.yml
@@ -72,13 +72,13 @@ mqtt: host: tcp://127.0.0.1:1883 username: admin password: pwdmqtt.. client-id: fzzy-customer-igds-api timeout: 10 keep-alive-interval: 20 password: admin123321 client-inId: fzzy-clientInId-igds-api client-outId: fzzy-clientInOutId-igds-api completionTimeout: 3000 keep-alive-interval: 2 max-connect-times: 5 qos: 0 isOpen: false default: topic: testTopic topics: "/+/+/properties/report,/device-message-sender/+/+" default-topic: mqtt/+/test1 topics: /device-message-sender/#