jiazx0107@163.com
2023-11-09 19860e76e91baf3cfce3c45bfa3ca886788c4ec8
调整MQTT
已删除4个文件
已修改9个文件
已添加1个文件
516 ■■■■■ 文件已修改
src/main/java/com/fzzy/gateway/GatewayRunner.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java 147 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttController.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttGatewayService.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProperties.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProviderConfig.java 116 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttPubController.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttPublishService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-devGateway.yml 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/GatewayRunner.java
@@ -36,6 +36,9 @@
        //获取气象信息
        scheduled.doWeatherExe();
        //更新设备缓存
        apiInitService.updateDeviceCache();
    }
}
src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
@@ -7,6 +7,7 @@
import com.fzzy.gateway.service.GatewayConfService;
import com.fzzy.gateway.service.GatewayDeviceService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@@ -21,6 +22,8 @@
    @Resource
    private GatewayConfService confService;
    @Resource
    private GatewayDeviceService deviceService;
    @Resource
    private GatewayRemoteManager gatewayRemoteManager;
@@ -44,7 +47,9 @@
        }
    }
    public void updateDeviceCache() {
        deviceService.flushCache();
    }
}
src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java
@@ -8,6 +8,7 @@
import com.fzzy.gateway.hx2023.ScConstant;
import com.fzzy.gateway.hx2023.data.CloudSendData;
import com.fzzy.gateway.hx2023.data.SyncReqData;
import com.fzzy.mqtt.MqttGatewayService;
import com.fzzy.mqtt.MqttProviderConfig;
import com.fzzy.mqtt.MqttPublishService;
import lombok.extern.slf4j.Slf4j;
@@ -25,7 +26,7 @@
    @Resource
    private GatewayRemoteManager gatewayRemoteManager;
    @Resource
    private MqttPublishService publishService;
    private MqttGatewayService mqttGatewayService;
    /**
@@ -33,7 +34,7 @@
     *
     * @param message
     */
    public void onReceiveMessage(String message) {
    public void onReceiveMessage(String topic,String message) {
        try {
            CloudSendData cloudSendData = JSONObject.parseObject(message, CloudSendData.class);
@@ -47,7 +48,7 @@
            }
        } catch (Exception e) {
            log.error("--------执行异常-----{}",e);
        }
    }
@@ -74,9 +75,11 @@
            topic = topic.replace("${productId}", device.getProductId()).replace("${deviceId}", device.getDeviceId());
            publishService.publishMsg(topic, resp.getData());
            mqttGatewayService.publishMqttWithTopic(resp.getData(), topic);
            log.info("=======粮情推送==========={}", resp.getData());
            log.info("----------------------------推送MQTT信息---------------------------");
            log.info("-----TOPIC-----{}",topic);
            log.info("-----Message-----{}",resp.getData());
        }
    }
}
src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
@@ -11,9 +11,11 @@
import com.fzzy.gateway.hx2023.ScConstant;
import com.fzzy.gateway.hx2023.data.SyncReqData;
import com.fzzy.gateway.service.repository.GatewayDeviceRep;
import com.fzzy.mqtt.MqttGatewayService;
import com.fzzy.mqtt.MqttProviderConfig;
import com.fzzy.mqtt.MqttPublishService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
@@ -31,7 +33,7 @@
    @Resource
    private GatewayRemoteManager gatewayRemoteManager;
    @Resource
    private MqttPublishService publishService;
    private MqttGatewayService publishService;
    public List<GatewayDevice> listAll() {
        Sort sort = new Sort(Sort.Direction.ASC, "deviceId");
@@ -86,6 +88,7 @@
    @Expose
    public String ajaxTestGrain2(GatewayDevice data) {
        SyncReqData reqData = new SyncReqData();
        reqData.setDevice(data);
        reqData.setAutoReplay(true);
@@ -98,6 +101,10 @@
            return "ERROR:当前设备非粮情设备不支持当前操作";
        }
        if(StringUtils.isEmpty(data.getCableRule())){
            return "ERROR:当前设备没有配置布线规则,无法执行";
        }
        BaseResp resp = gatewayRemoteManager.getSyncGrainService(data.getSyncProtocol()).syncGrain2(reqData);
        //自动推送
@@ -105,10 +112,13 @@
            String topic = ScConstant.TOPIC_REPORT;
            topic = topic.replace("${productId}", data.getProductId()).replace("${deviceId}", data.getDeviceId());
            publishService.publishMsg(topic, resp.getData());
        }
            publishService.publishMqttWithTopic(resp.getData(),topic);
        log.info("=======手动测试粮情推送==========={}", resp.getData());
            log.info("----------------------------手动推送MQTT粮情信息---------------------------");
            log.info("-----TOPIC-----{}",topic);
            log.info("-----Message-----{}",resp.getData());
        }
        return "SUCCESS:执行完成";
    }
src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
@@ -1,93 +1,98 @@
package com.fzzy.mqtt;
import com.fzzy.gateway.hx2023.service.OnReceiveMqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;
@Configuration
@Slf4j
@IntegrationComponentScan
public class MqttConsumerConfig {
    @Autowired
    private MqttProperties mqttProperties;
    private OnReceiveMqttService onReceiveMqttService;
    @Autowired
    private MqttConsumerCallBack mqttConsumerCallBack;
    private MqttProperties mqttProperties;
    /**
     * å®¢æˆ·ç«¯å¯¹è±¡
     */
    private MqttClient client;
    /**
     * åœ¨bean初始化后连接到服务器
     */
    @PostConstruct
    public void init() {
        connect();
    @Bean
    public MqttConnectOptions getReceiverMqttConnectOptionsForSub() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(mqttProperties.getUsername());
        mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
        List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(","));
        String[] serverURIs = new String[hostList.size()];
        hostList.toArray(serverURIs);
        mqttConnectOptions.setServerURIs(serverURIs);
        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        return mqttConnectOptions;
    }
    /**
     * å®¢æˆ·ç«¯è¿žæŽ¥æœåŠ¡ç«¯
     */
    public void connect() {
        try {
            //创建MQTT客户端对象
            client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            //设置为true表示每次连接到服务端都是以新的身份
            options.setCleanSession(true);
            //设置连接用户名
            options.setUserName(mqttProperties.getUsername());
            //设置连接密码
            options.setPassword(mqttProperties.getPassword().toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(10);
            //设置心跳时间 å•位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic", (mqttProperties.getClientId() + "与服务器断开连接").getBytes(), 0, false);
            //设置回调
            // client.setCallback(new MqttConsumerCallBack());
            client.setCallback(mqttConsumerCallBack);
            client.connect(options);
            //订阅主题
            //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
            int[] qos = {1, 1};
            //主题
            String[] topics = mqttProperties.getTopics().split(",");
            //订阅主题
            client.subscribe(topics, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    @Bean
    public MqttPahoClientFactory receiverMqttClientFactoryForSub() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getReceiverMqttConnectOptionsForSub());
        return factory;
    }
    /**
     * æ–­å¼€è¿žæŽ¥
     */
    public void disConnect() {
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    //配置client,监听的topic
    @Bean
    public MessageProducer inbound() {
    /**
     * è®¢é˜…主题
     */
    public void subscribe(String topic, int qos) {
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
        // List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
        String[] topics = mqttProperties.getTopics().split(",");
        //topicList.toArray(topics);
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientInId(), receiverMqttClientFactoryForSub(), topics);
        adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(mqttProperties.getQos());
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
}
    //通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String msg = message.getPayload().toString();
                // è¿™é‡Œå¯ä»¥å¤„理接收的数据
                log.info("----------------------------收到订阅内容---------------------------");
                log.info("-----TOPIC-----{}", topic);
                log.info("-----Message-----{}", msg);
                onReceiveMqttService.onReceiveMessage(topic, msg);
            }
        };
    }
}
src/main/java/com/fzzy/mqtt/MqttController.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttGatewayService.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,31 @@
package com.fzzy.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGatewayService {
    /**
     * å‘送信息到MQTT服务器
     *
     * @param topic   ä¸»é¢˜
     * @param message æ¶ˆæ¯ä¸»ä½“
     */
    void publishMqttWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic);
    /**
     * å‘送信息到MQTT服务器
     *
     * @param topic   ä¸»é¢˜
     * @param qos     å¯¹æ¶ˆæ¯å¤„理的几种机制。<br> 0 è¡¨ç¤ºçš„æ˜¯è®¢é˜…者没收到消息不会再次发送,消息会丢失。<br>
     *                1 è¡¨ç¤ºçš„æ˜¯ä¼šå°è¯•重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
     *                2 å¤šäº†ä¸€æ¬¡åŽ»é‡çš„åŠ¨ä½œï¼Œç¡®ä¿è®¢é˜…è€…æ”¶åˆ°çš„æ¶ˆæ¯æœ‰ä¸€æ¬¡ã€‚
     * @param message æ¶ˆæ¯ä¸»ä½“
     */
    void publishMqttWithTopic(String message, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
}
src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -33,12 +33,17 @@
    /**
     * å®¢æˆ·ç«¯Id
     */
    private String clientId;
    private String clientInId;
    /**
     * å®¢æˆ·ç«¯Id
     */
    private String clientOutId;
    /**
     * è¶…æ—¶æ—¶é—´
     */
    private int timout = 5000;
    private int completionTimeout = 5000;
    /**
     * è®¾ç½®ä¼šè¯å¿ƒè·³æ—¶é—´ å•位为秒 æœåŠ¡å™¨ä¼šæ¯éš”1.5*20秒的时间向客户端
@@ -59,12 +64,6 @@
     * é»˜è®¤è¿žæŽ¥ä¸»é¢˜ï¼Œä»¥/#结尾表示订阅所有以test开头的主题
     */
    private String defaultTopic;
    /**
     * è®¾ç½®æ˜¯å¦æ¸…空session,这里如果设置为false表示服务器会保留客户端的连
     * æŽ¥è®°å½•,这里设置为true表示每次连接到服务器都以新的身份连接
     */
    private Boolean cleanSession;
    /**
     * æ˜¯å¦æ–­çº¿é‡è¿ž
src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
@@ -4,100 +4,60 @@
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import java.util.List;
@Configuration
@Slf4j
@IntegrationComponentScan
public class MqttProviderConfig {
    @Autowired
    private MqttProperties mqttProperties;
    @Autowired
    private MqttProviderCallBack mqttProviderCallBack;
    /**
     * å®¢æˆ·ç«¯å¯¹è±¡
     */
    private MqttClient client;
    /**
     *
     * åœ¨bean初始化后连接到服务器
     */
    @PostConstruct
    public void init(){
        connect();
    @Bean
    public MqttConnectOptions getReceiverMqttConnectOptionsForSend() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(mqttProperties.getUsername());
        mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
        List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(","));
        String[] serverURIs = new String[hostList.size()];
        hostList.toArray(serverURIs);
        mqttConnectOptions.setServerURIs(serverURIs);
        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
        return mqttConnectOptions;
    }
    /**
     * å®¢æˆ·ç«¯è¿žæŽ¥æœåŠ¡ç«¯
     */
    public void connect(){
        try{
            //创建MQTT客户端对象
            client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientId(),new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            //设置为true表示每次连接服务器都是以新的身份
            options.setCleanSession(true);
            //设置连接用户名
            options.setUserName(mqttProperties.getUsername());
            //设置连接密码
            options.setPassword(mqttProperties.getPassword().toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(100);
            //设置心跳时间 å•位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic",(mqttProperties.getClientId()+ "与服务器断开连接").getBytes(),0,false);
            //设置回调
            client.setCallback(mqttProviderCallBack);
            client.connect(options);
        } catch(MqttException e){
            e.printStackTrace();
        }
    @Bean
    public MqttPahoClientFactory receiverMqttClientFactoryForSend() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getReceiverMqttConnectOptionsForSend());
        return factory;
    }
    public void publish(String topic,String message){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(mqttProperties.getQos());
        mqttMessage.setRetained(true);
        mqttMessage.setPayload(message.getBytes());
        //主题的目的地,用于发布/订阅信息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度
        //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
        MqttDeliveryToken token;
        try {
            //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
            //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientOutId(), receiverMqttClientFactoryForSend());
        messageHandler.setAsync(false);
        messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
        return messageHandler;
    }
    public void publish(int qos,boolean retained,String topic,String message){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        //主题的目的地,用于发布/订阅信息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度
        //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
        MqttDeliveryToken token;
        try {
            //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
            //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}
src/main/java/com/fzzy/mqtt/MqttPubController.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttPublishService.java
@@ -26,7 +26,7 @@
        //String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
        String username = mqttProperties.getUsername();
        String password = mqttProperties.getPassword();
        String clientid = mqttProperties.getClientId();
        String clientid = mqttProperties.getClientOutId();
        String broker = mqttProperties.getHost();
        //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"地磅称重\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044     }";
        int qos = 0;
src/main/resources/application-devGateway.yml
@@ -72,13 +72,13 @@
  mqtt:
    host: tcp://127.0.0.1:1883
    username: admin
    password: pwdmqtt..
    client-id: fzzy-customer-igds-api
    timeout: 10
    keep-alive-interval: 20
    password: admin123321
    client-inId: fzzy-clientInId-igds-api
    client-outId: fzzy-clientInOutId-igds-api
    completionTimeout: 3000
    keep-alive-interval: 2
    max-connect-times: 5
    qos: 0
    isOpen: false
    default:
      topic: testTopic
    topics: "/+/+/properties/report,/device-message-sender/+/+"
    default-topic: mqtt/+/test1
    topics: /device-message-sender/#