jiazx0107@163.com
2023-11-08 0a8ef890592a9c0389c42daeebb9a3f0974c37e6
调整MQTT配置
已删除5个文件
已修改13个文件
已添加8个文件
1170 ■■■■ 文件已修改
src/main/java/com/fzzy/gateway/entity/GatewayDevice.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java 102 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceLed.java 117 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java 135 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java 89 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttController.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttGatewayService.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java 91 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProperties.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProviderConfig.java 101 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttPubController.java 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttPublishService.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-dev.yml 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-devGateway.yml 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/entity/GatewayDevice.java
@@ -37,6 +37,10 @@
    @PropertyDef(label = "名称")
    private String deviceName;
    @Column(name = "PRODUCT_ID_", length = 50)
    @PropertyDef(label = "设备类型KEY")
    private String productId;
    @Column(name = "TYPE_", length = 10)
    @PropertyDef(label = "设备类型")
    private String type;
src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java
@@ -4,12 +4,15 @@
import com.fzzy.api.utils.ContextUtil;
import com.fzzy.api.utils.RedisConst;
import com.fzzy.api.utils.RedisUtil;
import com.fzzy.gateway.entity.GatewayConf;
import com.fzzy.gateway.hx2023.data.GatewayAuthData;
import com.fzzy.gateway.service.GatewayConfService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
/**
 *
@@ -21,6 +24,9 @@
    @Resource
    private RedisUtil redisUtil;
    @Resource
    private GatewayConfService confService;
    /**
     * é‰´æƒæŽ¥å£
@@ -39,28 +45,44 @@
    public @ResponseBody
    JSONObject authorize(@RequestBody GatewayAuthData data) {
        log.debug("============鉴权==========={}--{}", data.getUsername(), data.getPassword());
        List<GatewayConf> list = confService.getCacheConfList();
        JSONObject json = new JSONObject();
        json.put("timestamp", System.currentTimeMillis());
        if (null == list || list.isEmpty()) {
            json.put("code", 500);
            json.put("message", "未获取网关信息");
            return json;
        }
        String gatewayId = null;
        for (GatewayConf conf : list) {
            if (data.getUsername().equals(conf.getGatewayUsername()) && data.getPassword().equals(conf.getGatewayPassword())) {
                gatewayId = conf.getGatewayId();
                break;
            }
        }
        if (null == gatewayId) {
            json.put("code", 500);
            json.put("message", "未匹配到用户名和密码");
            return json;
        }
        //TODO éªŒè¯ç”¨æˆ·åå’Œå¯†ç 
        String token = "fzzy-" + gatewayId;
        String token = ContextUtil.getUUID();
        log.debug("============鉴权==========={}--{}--{}", data.getUsername(), data.getPassword(), token);
        this.updateGatewayToken(token, data.getUsername());
        JSONObject json = new JSONObject();
        JSONObject result = new JSONObject();
        result.put("token", token);
        json.put("result", result);
        json.put("message", "成功");
        json.put("status", 0);
        json.put("code", 200);
        json.put("timestamp", System.currentTimeMillis());
        return json;
    }
src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,50 @@
package com.fzzy.gateway.hx2023.websocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
/**
 * ä¸‹å‘调用粮温检测接口指令 topic
 */
@Slf4j
@Component
@ServerEndpoint(value = "/{productId}/{deviceId}/properties/report")
public class WebSockDeviceMessageReport {
    @OnOpen
    public void onOpen(Session session,
                       @PathParam("productId") String productId,
                       @PathParam("deviceId") String deviceId
    ) throws Exception {
        log.info("--------下发调用粮温检测接口指令 topic------");
    }
    @OnClose
    public void onClose() {
        log.info("WebSocket连接关闭={}");
    }
    /**
     * æ”¶åˆ°å‰ç«¯å‘送的信息
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("来自前端的信息:\n" + message);
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
    }
}
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,50 @@
package com.fzzy.gateway.hx2023.websocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
/**
 * ä¸‹å‘调用粮温检测接口指令 topic
 */
@Slf4j
@Component
@ServerEndpoint(value = "/device-message-sender/{productId}/{deviceId}")
public class WebSockDeviceMessageSender {
    @OnOpen
    public void onOpen(Session session,
                       @PathParam("productId") String productId,
                       @PathParam("deviceId") String deviceId
    ) throws Exception {
        log.info("--------下发调用粮温检测接口指令 topic------");
    }
    @OnClose
    public void onClose() {
        log.info("WebSocket连接关闭={}");
    }
    /**
     * æ”¶åˆ°å‰ç«¯å‘送的信息
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("来自前端的信息:\n" + message);
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
    }
}
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceLed.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java
@@ -16,7 +16,6 @@
import com.fzzy.gateway.entity.GatewayDevice;
import com.fzzy.gateway.hx2023.ScConstant;
import com.fzzy.gateway.hx2023.data.*;
import com.fzzy.gateway.hx2023.websocket.WebSocketDeviceReport;
import com.fzzy.gateway.service.repository.GatewayDeviceRep;
import com.fzzy.mqtt.MqttPublishService;
@@ -62,28 +61,27 @@
    /**
     * gatewayDeviceService#updateSave
     *
     * @param entity
     * @param data
     */
    @DataResolver
    public void updateSave(GatewayDevice entity) {
        GatewayDevice data = new GatewayDevice();
        BeanUtils.copyProperties(entity, data);
        if (null == data.getId()) {
            data.setId(ContextUtil.getUUID());
        }
        if (null == data.getDeviceSn()) {
            if (null != entity.getIp()) {
                data.setDeviceSn(entity.getIp());
    public void updateSave(GatewayDevice data) {
        GatewayDevice data2 = new GatewayDevice();
        BeanUtils.copyProperties(data, data2);
        if (null == data2.getDeviceSn()) {
            if (null != data2.getIp()) {
                data.setDeviceSn(data2.getIp());
            } else {
                data.setDeviceSn(data.getDeviceId());
                data.setDeviceSn(data2.getDeviceId());
            }
        }
        gatewayDeviceRep.save(data);
        if (null == data2.getId()) {
            data2.setId(ContextUtil.getUUID());
            gatewayDeviceRep.save(data2);
        }else{
            gatewayDeviceRep.save(data2);
        }
        flushCache();
    }
@@ -105,10 +103,6 @@
    }
    /**
     * gatewayDeviceService#flushCache
     */
    @Expose
    public void flushCache() {
        List<GatewayDevice> list = listAll();
        if (null == list || list.isEmpty()) return;
src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,62 @@
package com.fzzy.gateway.service;
import com.bstek.dorado.annotation.DataResolver;
import com.fzzy.api.utils.ContextUtil;
import com.fzzy.gateway.GatewayUtils;
import com.fzzy.gateway.entity.GatewayDevice;
import com.fzzy.gateway.service.repository.GatewayDeviceRep;
import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Component
public class GatewayDeviceService2 {
    @Resource
    private GatewayDeviceRep gatewayDeviceRep;
    public List<GatewayDevice> listAll() {
        Sort sort = new Sort(Sort.Direction.ASC, "deviceId");
        return gatewayDeviceRep.findAll(sort);
    }
    /**
     * gatewayDeviceService2#updateSave
     *
     * @param data
     */
    @DataResolver
    public void updateSave(GatewayDevice data) {
        GatewayDevice data2 = new GatewayDevice();
        BeanUtils.copyProperties(data, data2);
        if (null == data2.getDeviceSn()) {
            if (null != data2.getIp()) {
                data.setDeviceSn(data2.getIp());
            } else {
                data.setDeviceSn(data2.getDeviceId());
            }
        }
        if (null == data2.getId()) {
            data2.setId(ContextUtil.getUUID());
            gatewayDeviceRep.save(data2);
        } else {
            gatewayDeviceRep.save(data2);
        }
        flushCache();
    }
    public void flushCache() {
        List<GatewayDevice> list = listAll();
        if (null == list || list.isEmpty()) return;
        for (GatewayDevice device : list) {
            GatewayUtils.add2Cache(device);
        }
    }
}
src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java
@@ -9,12 +9,8 @@
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Map;
import java.util.Set;
/**
 * ç½‘关专用HTTP请求工具类
src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java
@@ -1,8 +1,6 @@
package com.fzzy.gateway.util;
import lombok.extern.slf4j.Slf4j;
import javax.crypto.Cipher;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
@@ -15,7 +13,6 @@
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class GatewayRSAUtils {
    /**
     * RSA最大加密明文大小 2048/8-11
src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
@@ -392,7 +392,7 @@
      <Buttons>
        <Button>
          <ClientEvent name="onClick">var data = view.get(&quot;#dsQuery.data&quot;);&#xD;
view.get(&quot;#ajaxTestWeight&quot;).set(&quot;parameter&quot;,data.get("weight")).execute(function(result){&#xD;
view.get(&quot;#ajaxTestWeight&quot;).set(&quot;parameter&quot;,data.get(&quot;weight&quot;)).execute(function(result){&#xD;
    self.get(&quot;parent&quot;).hide();&#xD;
    $alert(result);&#xD;
});</ClientEvent>
src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml
@@ -107,8 +107,12 @@
        <Property name="label">设备密码</Property>
      </PropertyDef>
      <PropertyDef name="depotIdSys">
        <Property></Property>
        <Property name="label">自定义仓库编码</Property>
      </PropertyDef>
      <PropertyDef name="productId">
        <Property/>
        <Property name="label">库区系统仓库编码</Property>
        <Property name="label">设备类型KEY</Property>
      </PropertyDef>
    </DataType>
  </Model>
@@ -182,6 +186,9 @@
        <Property name="property">type</Property>
        <Property name="align">center</Property>
      </DataColumn>
      <DataColumn name="productId">
        <Property name="property">productId</Property>
      </DataColumn>
      <DataColumn name="deviceId">
        <Property name="property">deviceId</Property>
        <Property name="align">center</Property>
@@ -203,8 +210,7 @@
      <Property name="closeable">false</Property>
      <Buttons>
        <Button>
          <ClientEvent name="onClick">var cur = view.get(&quot;#dgMain&quot;).getCurrentItem();&#xD;
view.get(&quot;#updateSave&quot;).execute(function(){&#xD;
          <ClientEvent name="onClick">view.get(&quot;#updateSave&quot;).execute(function(){&#xD;
    self.get(&quot;parent&quot;).hide();&#xD;
});</ClientEvent>
          <Property name="caption">保存修改</Property>
@@ -250,6 +256,11 @@
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
              <Property name="name">productId</Property>
              <Property name="property">productId</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
              <Property name="name">orgId</Property>
              <Property name="property">orgId</Property>
              <Editor/>
@@ -257,6 +268,11 @@
            <AutoFormElement>
              <Property name="name">depotId</Property>
              <Property name="property">depotId</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
              <Property name="name">depotIdSys</Property>
              <Property name="property">depotIdSys</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
@@ -310,11 +326,6 @@
              <Property name="property">httpUrl</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
              <Property name="name">depotIdSys</Property>
              <Property name="property">depotIdSys</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement layoutConstraint="colSpan:3">
              <Property name="name">remark</Property>
              <Property name="property">remark</Property>
@@ -327,10 +338,11 @@
      <Tools/>
    </Dialog>
    <UpdateAction id="updateSave">
      <Property name="dataResolver">gatewayDeviceService#updateSave</Property>
      <Property name="dataResolver">gatewayDeviceService2#updateSave</Property>
      <UpdateItem>
        <Property name="dataPath">[#current]</Property>
        <Property name="dataSet">dsMain</Property>
        <Property name="alias">data</Property>
      </UpdateItem>
    </UpdateAction>
    <AjaxAction id="ajaxDel">
src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,36 @@
package com.fzzy.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttConsumerCallBack implements MqttCallback {
    /**
     * å®¢æˆ·ç«¯æ–­å¼€è¿žæŽ¥çš„回调
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("与服务器断开连接,可重连");
    }
    /**
     * æ¶ˆæ¯åˆ°è¾¾çš„回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主题 : %s", topic));
        System.out.println(String.format("接收消息Qos : %d", message.getQos()));
        System.out.println(String.format("接收消息内容 : %s", new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b", message.isRetained()));
    }
    /**
     * æ¶ˆæ¯å‘布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println(String.format("接收消息成功"));
    }
}
src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,89 @@
package com.fzzy.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
@Slf4j
public class MqttConsumerConfig {
    @Autowired
    private MqttProperties mqttProperties;
    /**
     * å®¢æˆ·ç«¯å¯¹è±¡
     */
    private MqttClient client;
    /**
     * åœ¨bean初始化后连接到服务器
     */
    @PostConstruct
    public void init() {
        connect();
    }
    /**
     * å®¢æˆ·ç«¯è¿žæŽ¥æœåŠ¡ç«¯
     */
    public void connect() {
        try {
            //创建MQTT客户端对象
            client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientInId(), new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            //设置为true表示每次连接到服务端都是以新的身份
            options.setCleanSession(true);
            //设置连接用户名
            options.setUserName(mqttProperties.getUsername());
            //设置连接密码
            options.setPassword(mqttProperties.getPassword().toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(10);
            //设置心跳时间 å•位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic", (mqttProperties.getClientInId() + "与服务器断开连接").getBytes(), 0, false);
            //设置回调
            client.setCallback(new MqttConsumerCallBack());
            client.connect(options);
            //订阅主题
            //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
            int[] qos = {1, 1};
            //主题
            String[] topics = mqttProperties.getTopics().split(",");
            //订阅主题
            client.subscribe(topics, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * æ–­å¼€è¿žæŽ¥
     */
    public void disConnect() {
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * è®¢é˜…主题
     */
    public void subscribe(String topic, int qos) {
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/fzzy/mqtt/MqttController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,30 @@
package com.fzzy.mqtt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class MqttController {
    @Autowired
    private MqttConsumerConfig client;
    @Autowired
    private MqttProperties mqttProperties;
    @RequestMapping("/connect")
    public @ResponseBody
    String connect() {
        client.connect();
        return mqttProperties.getClientOutId() + "连接到服务器";
    }
    @RequestMapping("/disConnect")
    @ResponseBody
    public String disConnect() {
        client.disConnect();
        return mqttProperties.getClientOutId() + "与服务器断开连接";
    }
}
src/main/java/com/fzzy/mqtt/MqttGatewayService.java
@@ -1,6 +1,5 @@
package com.fzzy.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -22,39 +22,44 @@
    /**
     * ç”¨æˆ·å
     */
    private String clientUsername;
    private String username;
    /**
     * å¯†ç 
     */
    private String clientPassword;
    private String password;
    /**
     * å®¢æˆ·ç«¯Id,同一台服务器下,不允许出现重复的客户端id
     * å®¢æˆ·ç«¯Id-发布者Id
     */
    private String clientId;
    private String clientOutId;
    /**
     * å®¢æˆ·ç«¯Id-被订阅者Id
     */
    private String clientInId;
    /**
     * è¶…æ—¶æ—¶é—´
     */
    private int clientTimeout = 5000;
    private int timout = 5000;
    /**
     * è®¾ç½®ä¼šè¯å¿ƒè·³æ—¶é—´ å•位为秒 æœåŠ¡å™¨ä¼šæ¯éš”1.5*20秒的时间向客户端
     * å‘送个消息判断客户端是否在线,但这个方法并没有重连的机制
     */
    private int clientAliveTime = 30000;
    private int keepAliveInterval = 20;
    private int clientMaxConnectTime;
    private int maxConnectTimes = 5;
    private String clientTopics;
    private String topics;
    /**
     * è¿žæŽ¥æ–¹å¼
     */
    private Integer clientQos = 0;
    private Integer qos = 0;
    /**
     * é»˜è®¤è¿žæŽ¥ä¸»é¢˜ï¼Œä»¥/#结尾表示订阅所有以test开头的主题
src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,37 @@
package com.fzzy.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@Slf4j
public class MqttProviderCallBack implements MqttCallback {
    /**
     * å®¢æˆ·ç«¯æ–­å¼€è¿žæŽ¥çš„回调
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("与服务器断开连接,可重连");
    }
    /**
     * æ¶ˆæ¯åˆ°è¾¾çš„回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主题 : %s", topic));
        System.out.println(String.format("接收消息Qos : %d", message.getQos()));
        System.out.println(String.format("接收消息内容 : %s", new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b", message.isRetained()));
    }
    /**
     * æ¶ˆæ¯å‘布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println(String.format("接收消息成功"));
    }
}
src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,101 @@
package com.fzzy.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
@Slf4j
public class MqttProviderConfig {
    @Autowired
    private MqttProperties mqttProperties;
    /**
     * å®¢æˆ·ç«¯å¯¹è±¡
     */
    private MqttClient client;
    /**
     *
     * åœ¨bean初始化后连接到服务器
     */
    @PostConstruct
    public void init(){
        connect();
    }
    /**
     * å®¢æˆ·ç«¯è¿žæŽ¥æœåŠ¡ç«¯
     */
    public void connect(){
        try{
            //创建MQTT客户端对象
            client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientOutId(),new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            //设置为true表示每次连接服务器都是以新的身份
            options.setCleanSession(true);
            //设置连接用户名
            options.setUserName(mqttProperties.getUsername());
            //设置连接密码
            options.setPassword(mqttProperties.getPassword().toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(100);
            //设置心跳时间 å•位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic",(mqttProperties.getClientOutId() + "与服务器断开连接").getBytes(),0,false);
            //设置回调
            client.setCallback(new MqttProviderCallBack());
            client.connect(options);
        } catch(MqttException e){
            e.printStackTrace();
        }
    }
    public void publish(String topic,String message){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(mqttProperties.getQos());
        mqttMessage.setRetained(true);
        mqttMessage.setPayload(message.getBytes());
        //主题的目的地,用于发布/订阅信息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度
        //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
        MqttDeliveryToken token;
        try {
            //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
            //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    public void publish(int qos,boolean retained,String topic,String message){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        //主题的目的地,用于发布/订阅信息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度
        //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
        MqttDeliveryToken token;
        try {
            //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
            //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/fzzy/mqtt/MqttPubController.java
@@ -2,34 +2,25 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MqttPubController {
    @Autowired
    private MqttGatewayService gatewayService;
    private MqttProviderConfig providerClient;
    @RequestMapping("/hello")
    public String hello() {
        return "hello!";
    @RequestMapping("/sendMessage")
    public @ResponseBody
    String sendMessage(String topic, String message) {
        try {
            providerClient.publish(topic, message);
            return "发送成功";
        } catch (Exception e) {
            e.printStackTrace();
            return "发送失败";
        }
    }
    @RequestMapping("/sendMqtt")
    public String sendMqtt(String sendData) {
        System.out.println(sendData);
        System.out.println("进入sendMqtt-------" + sendData);
        gatewayService.sendToMqtt("topic01", (String) sendData);
        return "Test is OK";
    }
    @RequestMapping("/sendMqttTopic")
    public String sendMqtt(String sendData, String topic) {
        //System.out.println(sendData+"   "+topic);
        //System.out.println("进入inbound发送:"+sendData);
        gatewayService.sendToMqtt(topic, (String) sendData);
        return "Test is OK";
    }
}
src/main/java/com/fzzy/mqtt/MqttPublishService.java
@@ -9,6 +9,7 @@
@Service
public class MqttPublishService {
    private static MqttClient client ;
src/main/resources/application-dev.yml
@@ -96,15 +96,3 @@
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
mqtt:
  host: tcp://10.13.4.84:11883
  client-id:
  client-username:
  client-password:
  client-timeout: 10
  client-alive-time: 20
  client-max-connect-times: 5
  client-topics:
  client-qos: 0
  isOpen: false
src/main/resources/application-devGateway.yml
@@ -70,13 +70,16 @@
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  mqtt:
    host: tcp://10.13.4.84:11883
    client-id:
    client-username: admin
    client-password: 123456
    client-timeout: 10
    client-alive-time: 20
    client-max-connect-times: 5
    client-topics: "/device/${productId}/${deviceId}/message/property/report,/device-message-sender/${productId}/${deviceId}"
    client-qos: 0
    host: tcp://127.0.0.1:1883
    username: admin
    password: pwdmqtt..
    client-outId: fzzy-customer-id
    client-inId: fzzy-6e3d92ff71b911eea5e50250f2000002
    timeout: 10
    keep-alive-interval: 20
    max-connect-times: 5
    qos: 0
    isOpen: false
    default:
      topic: testTopic
    topics: "/+/+/properties/report,/device-message-sender/+/+"