jiazx0107@163.com
2023-11-08 a31a452b9999ba3c811c36b5cb1b3ec0c18d037d
提交MQTT相关功能
已删除3个文件
已修改7个文件
已添加5个文件
586 ■■■■■ 文件已修改
src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java 102 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/service/GatewayConfService.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java 90 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttAcceptClient.java 104 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttCondition.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttGatewayService.java 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java 91 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProperties.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttPubController.java 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-devGateway.yml 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
@@ -1,19 +1,21 @@
package com.fzzy.gateway.hx2023.service;
import com.bstek.dorado.annotation.Expose;
import com.fzzy.gateway.api.GatewayRemoteManager;
import com.fzzy.gateway.api.GatewayRemoteService;
import com.fzzy.gateway.entity.GatewayConf;
import com.fzzy.gateway.service.GatewayConfService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
 * å½“前接口,初始化相关
 */
@Slf4j
@Component
public class ApiInitService {
@@ -23,6 +25,10 @@
    private GatewayRemoteManager gatewayRemoteManager;
    /**
     * apiInitService#init
     */
    @Expose
    public void init() {
        List<GatewayConf> list = confService.listAll();
@@ -38,5 +44,7 @@
        }
    }
}
src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java
@@ -33,7 +33,6 @@
    @Resource
    private ApiLogRep apiLogRep;
    @Resource
    private GatewayConfService gatewayConfService;
src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,102 @@
package com.fzzy.gateway.hx2023.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.CrossOrigin;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
@CrossOrigin
@Service
@ServerEndpoint(value = "/mqtt")
public class SocketService {
    /**
     * ç”¨æ¥å­˜æ”¾æ¯ä¸ªå®¢æˆ·ç«¯å¯¹åº”çš„MyWebSocket对象。
     **/
    private static CopyOnWriteArraySet<SocketService> socketSet = new CopyOnWriteArraySet<>();
    /**
     * ä¸ŽæŸä¸ªå®¢æˆ·ç«¯çš„连接会话,需要通过它来给客户端发送数据
     **/
    private Session session;
    /**
     * ç”¨æˆ·åç§°
     **/
    private String nickname;
    /**
     * ç”¨æ¥è®°å½•sessionId和该session进行绑定
     **/
    private static Map<String,Session> map = new HashMap<String, Session>();
    /**
     * è¿žæŽ¥å»ºç«‹æˆåŠŸè°ƒç”¨çš„æ–¹æ³•
     */
    @OnOpen
    public void onOpen(Session session,@PathParam("nickname") String nickname) {
        this.session = session;
        this.nickname=nickname;
        map.put(nickname, session);
        socketSet.add(this);
        System.out.println("有新连接加入:"+nickname+",当前在线人数为" + socketSet.size());
    }
    /**
     * è¿žæŽ¥å…³é—­è°ƒç”¨çš„æ–¹æ³•
     */
    @OnClose
    public void onClose() {
        socketSet.remove(this);
        List<String> nickname = this.session.getRequestParameterMap().get("nickname");
        for(String nick:nickname) {
            map.remove(nick);
        }
        System.out.println("有一连接关闭!当前在线人数为" + socketSet.size());
    }
    /**
     * æ”¶åˆ°å®¢æˆ·ç«¯æ¶ˆæ¯åŽè°ƒç”¨çš„æ–¹æ³•
     */
    @OnMessage
    public void onMessage(String message, Session session,@PathParam("nickname") String nickname) {
        System.out.println("来自客户端的消息-->"+nickname+": " + message);
        //将mqtt发送过来的数据返回给前端
        try {
//            JSONObject parse = JSONObject.parse(message);
//            Map<String,Object> valuesMap = (Map<String, Object>) parse.getByPath("values");
//            String tag1 = valuesMap.get("tag1").toString();
//            String replace = tag1.replace("\r\n", "");
//            String substring = replace.substring(replace.indexOf(":") + 1).trim().replace(" ",":");
//            String tag2 = valuesMap.get("tag2").toString();
//            String substring1=substring+"tag2:"+tag2;
//            String result="{"+substring1+"}";
//            //发送给前端,用作页面渲染
//            Session fromSession = map.get(nickname);
//            fromSession.getAsyncRemote().sendText(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * å‘生错误时调用
     */
    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("发生错误");
        error.printStackTrace();
    }
}
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
@@ -21,7 +21,7 @@
 */
@Slf4j
@Component
@ServerEndpoint(value = "/mqtt")
//@ServerEndpoint(value = "/mqtt")
public class WebSocketMqtt {
    private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
src/main/java/com/fzzy/gateway/service/GatewayConfService.java
@@ -64,6 +64,7 @@
        gatewayConfRep.delete(data2);
        return null;
    }
    public void updateCache(GatewayConf conf) {
        String key = RedisConst.buildKey(RedisConst.KYE_CONF_GATEWAY, conf.getKqdm());
src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
@@ -166,6 +166,12 @@
        <Property name="iconClass">fa fa-search</Property>
        <Property name="exClassName">toolbar-button-push</Property>
      </ToolBarButton>
      <ToolBarButton>
        <Property name="caption">网关初始化</Property>
        <Property name="iconClass">fa fa-search</Property>
        <Property name="exClassName">toolbar-button-push</Property>
        <Property name="action">ajaxInit</Property>
      </ToolBarButton>
    </ToolBar>
    <DataGrid id="dgMain">
      <Property name="dataSet">dsMain</Property>
@@ -377,6 +383,10 @@
      <Property name="service">gatewayDeviceService#ajaxTestGrain</Property>
      <Property name="executingMessage">在努力执行中……</Property>
    </AjaxAction>
    <AjaxAction id="ajaxInit">
      <Property name="service">apiInitService#init</Property>
      <Property name="executingMessage">在努力执行中……</Property>
    </AjaxAction>
    <Dialog id="dialogWeight">
      <Property name="width">400</Property>
      <Property name="height">300</Property>
src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttAcceptClient.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttCondition.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttGatewayService.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,17 @@
package com.fzzy.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
/**
 * æŽ¨é€æŽ¥å£
 */
@Service
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGatewayService {
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic01, String sendData);
}
src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,91 @@
package com.fzzy.mqtt;
import com.fzzy.gateway.hx2023.websocket.SocketService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
 * MQTT消费端
 */
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {
    @Autowired
    private MqttProperties mqttProperties;
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    @Bean
    public MqttPahoClientFactory receiverMqttClientFactoryForSub() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        String[] array = mqttProperties.getHost().split(",");
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(array);
        options.setUserName(mqttProperties.getClientUsername());
        options.setPassword(mqttProperties.getClientPassword().toCharArray());
        options.setKeepAliveInterval(mqttProperties.getClientAliveTime());
        //接受离线消息
        options.setCleanSession(false);
        options.setMqttVersion(4);
        factory.setConnectionOptions(options);
        return factory;
    }
    //配置client,监听的topic
    @Bean
    public MessageProducer inbound() {
        String[] inboundTopics = mqttProperties.getClientTopics().split(",");
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                mqttProperties.getClientId() + "_inbound", receiverMqttClientFactoryForSub(), inboundTopics);  //对inboundTopics主题进行监听
        adapter.setCompletionTimeout(mqttProperties.getClientTimeout());
        adapter.setQos(1);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    //通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")  //异步处理
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                log.info("----------------------");
                //获取mqtt的topic
                String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
                //使用webSocket返回给前端
                SocketService socketService = new SocketService();
                socketService.onMessage(message.getPayload().toString(), null, topic);
                log.info("message:" + message.getPayload());
                log.info("PacketId:" + message.getHeaders().getId());
                log.info("Qos:" + message.getHeaders().get(MqttHeaders.QOS));
                log.info("topic:" + topic);
            }
        };
    }
}
src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,58 @@
package com.fzzy.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
 * MQTT生产端
 */
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttOutboundConfiguration {
    @Autowired
    private MqttProperties mqttProperties;
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
    @Bean
    public MqttPahoClientFactory getReceiverMqttConnectOptionsForSend() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        String[] array = mqttProperties.getHost().split(",");
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(array);
        if(null == mqttProperties.getClientUsername()) mqttProperties.setClientUsername(" ");
        if(null == mqttProperties.getClientPassword()) mqttProperties.setClientPassword(" ");
        options.setUserName(mqttProperties.getClientUsername());
        options.setPassword(mqttProperties.getClientPassword().toCharArray());
        // æŽ¥å—离线消息
        options.setCleanSession(false); //告诉代理客户端是否要建立持久会话   false为建立持久会话
        options.setMqttVersion(4);
        factory.setConnectionOptions(options);
        return factory;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "outbound", getReceiverMqttConnectOptionsForSend());
        messageHandler.setAsync(true);
        return messageHandler;
    }
}
src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -1,15 +1,17 @@
package com.fzzy.mqtt;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
 * MQTT é…ç½®ä¿¡æ¯
 */
@Component
@ConfigurationProperties("mqtt")
@Slf4j
@Data
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttProperties {
    /**
@@ -37,13 +39,13 @@
    /**
     * è¶…æ—¶æ—¶é—´
     */
    private int clientTimeout;
    private int clientTimeout = 5000;
    /**
     * è®¾ç½®ä¼šè¯å¿ƒè·³æ—¶é—´ å•位为秒 æœåŠ¡å™¨ä¼šæ¯éš”1.5*20秒的时间向客户端
     * å‘送个消息判断客户端是否在线,但这个方法并没有重连的机制
     */
    private int clientAliveTime;
    private int clientAliveTime = 30000;
    private int clientMaxConnectTime;
@@ -52,12 +54,12 @@
    /**
     * è¿žæŽ¥æ–¹å¼
     */
    private Integer clientQos;
    private Integer clientQos = 0;
    /**
     * é»˜è®¤è¿žæŽ¥ä¸»é¢˜ï¼Œä»¥/#结尾表示订阅所有以test开头的主题
     */
    private String  defaultTopic;
    private String defaultTopic;
    /**
     * è®¾ç½®æ˜¯å¦æ¸…空session,这里如果设置为false表示服务器会保留客户端的连
@@ -75,4 +77,5 @@
     */
    private Boolean isOpen;
}
src/main/java/com/fzzy/mqtt/MqttPubController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,35 @@
package com.fzzy.mqtt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MqttPubController {
    @Autowired
    private MqttGatewayService gatewayService;
    @RequestMapping("/hello")
    public String hello() {
        return "hello!";
    }
    @RequestMapping("/sendMqtt")
    public String sendMqtt(String sendData) {
        System.out.println(sendData);
        System.out.println("进入sendMqtt-------" + sendData);
        gatewayService.sendToMqtt("topic01", (String) sendData);
        return "Test is OK";
    }
    @RequestMapping("/sendMqttTopic")
    public String sendMqtt(String sendData, String topic) {
        //System.out.println(sendData+"   "+topic);
        //System.out.println("进入inbound发送:"+sendData);
        gatewayService.sendToMqtt(topic, (String) sendData);
        return "Test is OK";
    }
}
src/main/resources/application-devGateway.yml
@@ -69,15 +69,14 @@
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
mqtt:
  host: tcp://10.13.4.84:11883
  client-id:
  client-username:
  client-password:
  client-timeout: 10
  client-alive-time: 20
  client-max-connect-times: 5
  client-topics:
  client-qos: 0
  isOpen: false
  mqtt:
    host: tcp://10.13.4.84:11883
    client-id:
    client-username: admin
    client-password: 123456
    client-timeout: 10
    client-alive-time: 20
    client-max-connect-times: 5
    client-topics: "/device/${productId}/${deviceId}/message/property/report,/device-message-sender/${productId}/${deviceId}"
    client-qos: 0
    isOpen: false