jiazx0107@163.com
2023-11-09 97c75a868e9fca03598dfa862bdd7ad94fd5fdcb
调整MQTT
已删除2个文件
已修改13个文件
314 ■■■■ 文件已修改
src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/ScConstant.java 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java 66 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttController.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttGatewayService.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProperties.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProviderConfig.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttPublishService.java 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/PublishSample.java 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-devGateway.yml 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java
@@ -72,6 +72,8 @@
            String url = DEFAULT_URL;
            url = url.replace("{appId}", DEFAULT_APP_ID).replace("{appsecret}", DEFAULT_APP_SECRET).replace("{cityid}", DEFAULT_CITYID);
            log.debug("------气象URL---{}",url);
            String result = GatewayHttpUtil.doGet(url, null);
            if (null == result) {
src/main/java/com/fzzy/gateway/hx2023/ScConstant.java
@@ -26,4 +26,10 @@
    public static int CODE_200 = 200;
    /**
     * ä¸‹å‘指令回复报文topic
     */
    public static String TOPIC_REPORT = "/${productId}/${deviceId}/properties/report";
}
src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java
@@ -9,6 +9,7 @@
import com.fzzy.gateway.hx2023.data.CloudSendData;
import com.fzzy.gateway.hx2023.data.SyncReqData;
import com.fzzy.mqtt.MqttProviderConfig;
import com.fzzy.mqtt.MqttPublishService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -24,7 +25,7 @@
    @Resource
    private GatewayRemoteManager gatewayRemoteManager;
    @Resource
    private MqttProviderConfig providerClient;
    private MqttPublishService publishService;
    /**
@@ -69,11 +70,11 @@
        //自动推送
        if (200 == resp.getCode() && syncReqData.isAutoReplay()) {
            String topic = "/${productId}/${deviceId}/properties/report";
            String topic = ScConstant.TOPIC_REPORT;
            topic = topic.replace("${productId}", device.getProductId()).replace("${deviceId}", device.getDeviceId());
            providerClient.publish(topic, resp.getData());
            publishService.publishMsg(topic, resp.getData());
            log.info("=======粮情推送==========={}", resp.getData());
        }
src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java
@@ -13,17 +13,20 @@
import com.fzzy.gateway.GatewayUtils;
import com.fzzy.gateway.api.DeviceReportService;
import com.fzzy.gateway.api.GatewayRemoteManager;
import com.fzzy.gateway.data.BaseResp;
import com.fzzy.gateway.entity.GatewayDevice;
import com.fzzy.gateway.hx2023.ScConstant;
import com.fzzy.gateway.hx2023.data.*;
import com.fzzy.gateway.service.repository.GatewayDeviceRep;
import com.fzzy.mqtt.MqttProviderConfig;
import com.fzzy.mqtt.MqttPublishService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateFormatUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
@@ -47,6 +50,9 @@
    private GatewayRemoteManager gatewayRemoteManager;
    @Resource
    private MqttPublishService mqttPublishService;
    @Resource
    private MqttProviderConfig providerClient;
    /**
     * gatewayDeviceService#listAll
     *
@@ -67,7 +73,7 @@
    public void updateSave(GatewayDevice data) {
        GatewayDevice data2 = new GatewayDevice();
        BeanUtils.copyProperties(data, data2);
        if (null == data2.getDeviceSn()) {
            if (null != data2.getIp()) {
                data.setDeviceSn(data2.getIp());
@@ -77,10 +83,10 @@
        }
        if (null == data2.getId()) {
            data2.setId(ContextUtil.getUUID());
            data2.setId(ContextUtil.getUUID());
            gatewayDeviceRep.save(data2);
        }else{
            gatewayDeviceRep.save(data2);
        } else {
            gatewayDeviceRep.save(data2);
        }
        flushCache();
    }
@@ -112,10 +118,6 @@
    }
    /**
     * gatewayDeviceService#ajaxTestWeight
     * åœ°ç£…推送测试
@@ -128,16 +130,16 @@
        double weigh = (double) parameter.getWeight();
        //double weigh = Double.parseDouble("3500.0");
        List<GatewayDevice> devices = listAll();
        if(devices == null || devices.size()<= 0){
        if (devices == null || devices.size() <= 0) {
            return "没有设备";
        }
        List<GatewayDevice> weights = devices.stream().filter(s ->(GatewayDeviceType.TYPE_01.getCode().equals(s.getType()))).collect(Collectors.toList());
       if(weights == null || weights.size()<= 0){
           return "没有获取到地磅设备";
       }
        List<GatewayDevice> weights = devices.stream().filter(s -> (GatewayDeviceType.TYPE_01.getCode().equals(s.getType()))).collect(Collectors.toList());
        if (weights == null || weights.size() <= 0) {
            return "没有获取到地磅设备";
        }
        String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
        for (GatewayDevice device: weights) {
        for (GatewayDevice device : weights) {
            WebSocketPacket packet = new WebSocketPacket();
@@ -163,7 +165,7 @@
            packet.setTimestamp(System.currentTimeMillis());
            topic = "/device/"+header.getProductId()+"/"+device.getDeviceId()+"/message/property/report";
            topic = "/device/" + header.getProductId() + "/" + device.getDeviceId() + "/message/property/report";
            mqttPublishService.publishMsg(topic, JSON.toJSONString(packet));
        }
        return "SUCCESS";
@@ -172,6 +174,7 @@
    /**
     * gatewayDeviceService#ajaxTestLpr
     * åœ°ç£…推送测试
     *
     * @return
     */
    @Expose
@@ -179,16 +182,16 @@
        //String carNumber = parameter.getCarNumber();
        String carNumber = "川A12345";
        List<GatewayDevice> devices = listAll();
        if(devices == null || devices.size()<= 0){
        if (devices == null || devices.size() <= 0) {
            return "没有设备";
        }
        List<GatewayDevice> weights = devices.stream().filter(s ->(GatewayDeviceType.TYPE_02.getCode().equals(s.getType()))).collect(Collectors.toList());
        if(weights == null || weights.size()<= 0){
        List<GatewayDevice> weights = devices.stream().filter(s -> (GatewayDeviceType.TYPE_02.getCode().equals(s.getType()))).collect(Collectors.toList());
        if (weights == null || weights.size() <= 0) {
            return "没有获取到设备";
        }
        String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
        for (GatewayDevice device: weights) {
        for (GatewayDevice device : weights) {
            WebSocketPacket packet = new WebSocketPacket();
@@ -212,7 +215,7 @@
            packet.setTimestamp(System.currentTimeMillis());
            topic = "/device/"+header.getProductId()+"/"+device.getDeviceId()+"/message/property/report";
            topic = "/device/" + header.getProductId() + "/" + device.getDeviceId() + "/message/property/report";
            mqttPublishService.publishMsg(topic, JSON.toJSONString(packet));
        }
        return "SUCCESS";
@@ -247,6 +250,9 @@
        return this.pushByV40(list, start, end);
    }
    private String pushByV40(List<GatewayDevice> list, Date start, Date end) {
@@ -352,27 +358,27 @@
            //判断最大
            if (curTemp.equals(result.getMaxTemperature())) {
                result.setMaxX(x+"");
                result.setMaxY(y+"");
                result.setMaxZ(position+"");
                result.setMaxX(x + "");
                result.setMaxY(y + "");
                result.setMaxZ(position + "");
            }
            //判断最小
            if (curTemp.equals(result.getMinTemperature())) {
                result.setMinX(x+"");
                result.setMinY(y+"");
                result.setMinZ(position+"");
                result.setMinX(x + "");
                result.setMinY(y + "");
                result.setMinZ(position + "");
            }
            temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", x+"", y+""));
            temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", x + "", y + ""));
        }
        //粮温信息
        JSONObject trhInfo = new JSONObject();
       // TRHInfo trhInfo = new TRHInfo();
        trhInfo.put("temperature",temperature);
        // TRHInfo trhInfo = new TRHInfo();
        trhInfo.put("temperature", temperature);
        //仓温度信息
@@ -385,7 +391,7 @@
        List<KafkaGrainTH> temperatureAndhumidity = new ArrayList<>();
        temperatureAndhumidity.add(grainTH);
        trhInfo.put("temperatureAndhumidity",temperatureAndhumidity);
        trhInfo.put("temperatureAndhumidity", temperatureAndhumidity);
        //trhInfo.put("temperatureAndhumidity",grainTH);
        JSONObject params = new JSONObject();
src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
@@ -1,10 +1,19 @@
package com.fzzy.gateway.service;
import com.bstek.dorado.annotation.DataResolver;
import com.bstek.dorado.annotation.Expose;
import com.fzzy.api.data.GatewayDeviceType;
import com.fzzy.api.utils.ContextUtil;
import com.fzzy.gateway.GatewayUtils;
import com.fzzy.gateway.api.GatewayRemoteManager;
import com.fzzy.gateway.data.BaseResp;
import com.fzzy.gateway.entity.GatewayDevice;
import com.fzzy.gateway.hx2023.ScConstant;
import com.fzzy.gateway.hx2023.data.SyncReqData;
import com.fzzy.gateway.service.repository.GatewayDeviceRep;
import com.fzzy.mqtt.MqttProviderConfig;
import com.fzzy.mqtt.MqttPublishService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
@@ -12,11 +21,17 @@
import javax.annotation.Resource;
import java.util.List;
@Slf4j
@Component
public class GatewayDeviceService2 {
    @Resource
    private GatewayDeviceRep gatewayDeviceRep;
    @Resource
    private GatewayRemoteManager gatewayRemoteManager;
    @Resource
    private MqttPublishService publishService;
    public List<GatewayDevice> listAll() {
        Sort sort = new Sort(Sort.Direction.ASC, "deviceId");
@@ -59,4 +74,43 @@
        }
    }
    /**
     * æµ‹è¯•MQTT粮情检测
     * gatewayDeviceService#ajaxTestGrain2
     * ç²®æƒ…推送测试
     *
     * @param data
     * @return
     */
    @Expose
    public String ajaxTestGrain2(GatewayDevice data) {
        SyncReqData reqData = new SyncReqData();
        reqData.setDevice(data);
        reqData.setAutoReplay(true);
        reqData.setMessageType(ScConstant.MESSAGE_TYPE_INVOKE_FUNCTION);
        reqData.setMessageId(ScConstant.getMessageId());
        reqData.setFunctionId(ScConstant.FUNCTION_getTAndRHInfo);
        if (!GatewayDeviceType.TYPE_07.getCode().equals(data.getType())) {
            return "ERROR:当前设备非粮情设备不支持当前操作";
        }
        BaseResp resp = gatewayRemoteManager.getSyncGrainService(data.getSyncProtocol()).syncGrain2(reqData);
        //自动推送
        if (200 == resp.getCode() && reqData.isAutoReplay()) {
            String topic = ScConstant.TOPIC_REPORT;
            topic = topic.replace("${productId}", data.getProductId()).replace("${deviceId}", data.getDeviceId());
            publishService.publishMsg(topic, resp.getData());
        }
        log.info("=======手动测试粮情推送==========={}", resp.getData());
        return "SUCCESS:执行完成";
    }
}
src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml
@@ -115,11 +115,11 @@
        <Property name="label">设备类型KEY</Property>
      </PropertyDef>
      <PropertyDef name="cableRule">
        <Property/>
        <Property></Property>
        <Property name="label">布线规则</Property>
      </PropertyDef>
      <PropertyDef name="cableCir">
        <Property/>
        <Property></Property>
        <Property name="label">筒仓层规则</Property>
      </PropertyDef>
    </DataType>
@@ -179,6 +179,17 @@
        <Property name="caption">删除</Property>
        <Property name="iconClass">fa fa-minus</Property>
        <Property name="width">90</Property>
        <Property name="exClassName">toolbar-button-warn</Property>
      </ToolBarButton>
      <Separator/>
      <ToolBarButton>
        <ClientEvent name="onClick">var cur = view.get(&quot;#dgMain&quot;).getCurrentItem();&#xD;
view.get(&quot;#ajaxTestGrain&quot;).set(&quot;parameter&quot;,cur).execute(function(result){&#xD;
    $alert(result);&#xD;
});&#xD;
</ClientEvent>
        <Property name="caption">测试粮情</Property>
        <Property name="iconClass">fa fa-minus</Property>
        <Property name="exClassName">toolbar-button-warn</Property>
      </ToolBarButton>
    </ToolBar>
@@ -367,5 +378,9 @@
      <Property name="service">gatewayDeviceService#delData</Property>
      <Property name="confirmMessage">确定要删除么?</Property>
    </AjaxAction>
    <AjaxAction id="ajaxTestGrain">
      <Property name="service">gatewayDeviceService2#ajaxTestGrain2</Property>
      <Property name="confirmMessage">确定要手动执行粮情么?</Property>
    </AjaxAction>
  </View>
</ViewConfig>
src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
@@ -36,7 +36,7 @@
    public void connect() {
        try {
            //创建MQTT客户端对象
            client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientInId(), new MemoryPersistence());
            client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientId(), new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
@@ -51,7 +51,7 @@
            //设置心跳时间 å•位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic", (mqttProperties.getClientInId() + "与服务器断开连接").getBytes(), 0, false);
            options.setWill("willTopic", (mqttProperties.getClientId() + "与服务器断开连接").getBytes(), 0, false);
            //设置回调
            // client.setCallback(new MqttConsumerCallBack());
            client.setCallback(mqttConsumerCallBack);
src/main/java/com/fzzy/mqtt/MqttController.java
@@ -17,14 +17,14 @@
    public @ResponseBody
    String connect() {
        client.connect();
        return mqttProperties.getClientOutId() + "连接到服务器";
        return mqttProperties.getClientId() + "连接到服务器";
    }
    @RequestMapping("/disConnect")
    @ResponseBody
    public String disConnect() {
        client.disConnect();
        return mqttProperties.getClientOutId() + "与服务器断开连接";
        return mqttProperties.getClientId() + "与服务器断开连接";
    }
}
src/main/java/com/fzzy/mqtt/MqttGatewayService.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -31,15 +31,9 @@
    /**
     * å®¢æˆ·ç«¯Id-发布者Id
     * å®¢æˆ·ç«¯Id
     */
    private String clientOutId;
    /**
     * å®¢æˆ·ç«¯Id-被订阅者Id
     */
    private String clientInId;
    private String clientId;
    /**
     * è¶…æ—¶æ—¶é—´
src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java
@@ -4,8 +4,10 @@
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqttProviderCallBack implements MqttCallback {
    /**
src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
@@ -14,6 +14,8 @@
    @Autowired
    private MqttProperties mqttProperties;
    @Autowired
    private MqttProviderCallBack mqttProviderCallBack;
    /**
     * å®¢æˆ·ç«¯å¯¹è±¡
     */
@@ -34,7 +36,7 @@
    public void connect(){
        try{
            //创建MQTT客户端对象
            client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientOutId(),new MemoryPersistence());
            client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientId(),new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
@@ -49,9 +51,9 @@
            //设置心跳时间 å•位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic",(mqttProperties.getClientOutId() + "与服务器断开连接").getBytes(),0,false);
            options.setWill("willTopic",(mqttProperties.getClientId()+ "与服务器断开连接").getBytes(),0,false);
            //设置回调
            client.setCallback(new MqttProviderCallBack());
            client.setCallback(mqttProviderCallBack);
            client.connect(options);
        } catch(MqttException e){
            e.printStackTrace();
src/main/java/com/fzzy/mqtt/MqttPublishService.java
@@ -1,24 +1,33 @@
package com.fzzy.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Slf4j
@Service
public class MqttPublishService {
    private static MqttClient client ;
    @Resource
    private MqttProperties mqttProperties;
    private static MqttClient client;
    public void init() throws MqttException {
        //String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
        String username = "admin";
        String password = "admin123321";
        String clientid = "FZZY-gateway";
        String broker = "tcp://127.0.0.1:1883";
        String username = mqttProperties.getUsername();
        String password = mqttProperties.getPassword();
        String clientid = mqttProperties.getClientId();
        String broker = mqttProperties.getHost();
        //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"地磅称重\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044     }";
        int qos = 0;
        try {
@@ -38,27 +47,31 @@
        }
    }
   public  void publishMsg(String topic,String content) {
      // String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
       //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"地磅称重\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044     }";
       int qos = 0;
    public void publishMsg(String topic, String content) {
       try {
           // åˆ›å»ºæ¶ˆæ¯å¹¶è®¾ç½® QoS
           MqttMessage message = new MqttMessage(content.getBytes());
           message.setQos(qos);
           // å‘布消息
           client.publish(topic, message);
           System.out.println("Message published");
           System.out.println("topic: " + topic);
           System.out.println("message content: " + content);
           // å…³é—­è¿žæŽ¥
           //client.disconnect();
           // å…³é—­å®¢æˆ·ç«¯
           //client.close();
      } catch (MqttException e) {
           throw new RuntimeException(e);
      }
  }
        // String topic = "/device/hx-weigh-big-01/91511424746940066Y001_91511424746940066Y0010000_002_004_002_001/message/property/report";
        //String content = " { \"headers\":{ \"productId\":\"hx-weigh-big-01\", \"keepOnlineTimeoutSeconds\":600, \"keepOnline\":true, \"deviceName\":\"地磅称重\" }, \"messageType\":\"REPORT_PROPERTY\", \"deviceId\":\"91511424746940066Y001_91511424746940066Y0010000_002_004_002_001\", \"properties\":{ \"weightInfo\":\"{\\\"exceed\\\":false,\\\"grossWeight\\\":3000.0,\\\"netWeight\\\":3000.0,\\\"static\\\":false,\\\"tareWeight\\\":3000.0,\\\"weightUnit\\\":\\\"KG\\\"}\" }, \"timestamp\":1698336020044     }";
        int qos = mqttProperties.getQos();
        try {
            // åˆ›å»ºæ¶ˆæ¯å¹¶è®¾ç½® QoS
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            // å‘布消息
            client.publish(topic, message);
            log.info("------------Message published-------------");
            log.info("topic: " + topic);
            log.info("message content: " + content);
            // å…³é—­è¿žæŽ¥
            //client.disconnect();
            // å…³é—­å®¢æˆ·ç«¯
            //client.close();
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
}
src/main/java/com/fzzy/mqtt/PublishSample.java
ÎļþÒÑɾ³ý
src/main/resources/application-devGateway.yml
@@ -73,8 +73,7 @@
    host: tcp://127.0.0.1:1883
    username: admin
    password: pwdmqtt..
    client-outId: fzzy-customer-id
    client-inId: fzzy-6e3d92ff71b911eea5e50250f2000002
    client-id: fzzy-customer-igds-api
    timeout: 10
    keep-alive-interval: 20
    max-connect-times: 5