src/main/java/com/fzzy/gateway/hx2023/service/ApiInitService.java
@@ -1,19 +1,21 @@ package com.fzzy.gateway.hx2023.service; import com.bstek.dorado.annotation.Expose; import com.fzzy.gateway.api.GatewayRemoteManager; import com.fzzy.gateway.api.GatewayRemoteService; import com.fzzy.gateway.entity.GatewayConf; import com.fzzy.gateway.service.GatewayConfService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; /** * å½åæ¥å£ï¼åå§åç¸å ³ */ @Slf4j @Component public class ApiInitService { @@ -23,6 +25,10 @@ private GatewayRemoteManager gatewayRemoteManager; /** * apiInitService#init */ @Expose public void init() { List<GatewayConf> list = confService.listAll(); @@ -39,4 +45,6 @@ } } src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java
@@ -33,7 +33,6 @@ @Resource private ApiLogRep apiLogRep; @Resource private GatewayConfService gatewayConfService; src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,102 @@ package com.fzzy.gateway.hx2023.websocket; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.CrossOrigin; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; @Component @CrossOrigin @Service @ServerEndpoint(value = "/mqtt") public class SocketService { /** * ç¨æ¥åæ¾æ¯ä¸ªå®¢æ·ç«¯å¯¹åºçMyWebSocket对象ã **/ private static CopyOnWriteArraySet<SocketService> socketSet = new CopyOnWriteArraySet<>(); /** * ä¸æä¸ªå®¢æ·ç«¯çè¿æ¥ä¼è¯ï¼éè¦éè¿å®æ¥ç»å®¢æ·ç«¯åéæ°æ® **/ private Session session; /** * ç¨æ·åç§° **/ private String nickname; /** * ç¨æ¥è®°å½sessionIdå该sessionè¿è¡ç»å® **/ private static Map<String,Session> map = new HashMap<String, Session>(); /** * è¿æ¥å»ºç«æåè°ç¨çæ¹æ³ */ @OnOpen public void onOpen(Session session,@PathParam("nickname") String nickname) { this.session = session; this.nickname=nickname; map.put(nickname, session); socketSet.add(this); System.out.println("ææ°è¿æ¥å å ¥:"+nickname+",å½åå¨çº¿äººæ°ä¸º" + socketSet.size()); } /** * è¿æ¥å ³éè°ç¨çæ¹æ³ */ @OnClose public void onClose() { socketSet.remove(this); List<String> nickname = this.session.getRequestParameterMap().get("nickname"); for(String nick:nickname) { map.remove(nick); } System.out.println("æä¸è¿æ¥å ³éï¼å½åå¨çº¿äººæ°ä¸º" + socketSet.size()); } /** * æ¶å°å®¢æ·ç«¯æ¶æ¯åè°ç¨çæ¹æ³ */ @OnMessage public void onMessage(String message, Session session,@PathParam("nickname") String nickname) { System.out.println("æ¥èªå®¢æ·ç«¯çæ¶æ¯-->"+nickname+": " + message); //å°mqttåéè¿æ¥çæ°æ®è¿åç»å端 try { // JSONObject parse = JSONObject.parse(message); // Map<String,Object> valuesMap = (Map<String, Object>) parse.getByPath("values"); // String tag1 = valuesMap.get("tag1").toString(); // String replace = tag1.replace("\r\n", ""); // String substring = replace.substring(replace.indexOf(":") + 1).trim().replace(" ",":"); // String tag2 = valuesMap.get("tag2").toString(); // String substring1=substring+"tag2:"+tag2; // String result="{"+substring1+"}"; // //åéç»å端ï¼ç¨ä½é¡µé¢æ¸²æ // Session fromSession = map.get(nickname); // fromSession.getAsyncRemote().sendText(result); } catch (Exception e) { e.printStackTrace(); } } /** * åçé误æ¶è°ç¨ */ @OnError public void onError(Session session, Throwable error) { System.out.println("åçé误"); error.printStackTrace(); } } src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
@@ -21,7 +21,7 @@ */ @Slf4j @Component @ServerEndpoint(value = "/mqtt") //@ServerEndpoint(value = "/mqtt") public class WebSocketMqtt { private static Map<String, Session> sessionPool = new ConcurrentHashMap<>(); src/main/java/com/fzzy/gateway/service/GatewayConfService.java
@@ -64,6 +64,7 @@ gatewayConfRep.delete(data2); return null; } public void updateCache(GatewayConf conf) { String key = RedisConst.buildKey(RedisConst.KYE_CONF_GATEWAY, conf.getKqdm()); src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
@@ -166,6 +166,12 @@ <Property name="iconClass">fa fa-search</Property> <Property name="exClassName">toolbar-button-push</Property> </ToolBarButton> <ToolBarButton> <Property name="caption">ç½å ³åå§å</Property> <Property name="iconClass">fa fa-search</Property> <Property name="exClassName">toolbar-button-push</Property> <Property name="action">ajaxInit</Property> </ToolBarButton> </ToolBar> <DataGrid id="dgMain"> <Property name="dataSet">dsMain</Property> @@ -377,6 +383,10 @@ <Property name="service">gatewayDeviceService#ajaxTestGrain</Property> <Property name="executingMessage">å¨åªåæ§è¡ä¸â¦â¦</Property> </AjaxAction> <AjaxAction id="ajaxInit"> <Property name="service">apiInitService#init</Property> <Property name="executingMessage">å¨åªåæ§è¡ä¸â¦â¦</Property> </AjaxAction> <Dialog id="dialogWeight"> <Property name="width">400</Property> <Property name="height">300</Property> src/main/java/com/fzzy/mqtt/MqttAcceptCallback.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/mqtt/MqttAcceptClient.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/mqtt/MqttCondition.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/mqtt/MqttGatewayService.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,17 @@ package com.fzzy.mqtt; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; /** * æ¨éæ¥å£ */ @Service @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGatewayService { void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic01, String sendData); } src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,91 @@ package com.fzzy.mqtt; import com.fzzy.gateway.hx2023.websocket.SocketService; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; /** * MQTTæ¶è´¹ç«¯ */ @Slf4j @Configuration @IntegrationComponentScan public class MqttInboundConfiguration { @Autowired private MqttProperties mqttProperties; @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MqttPahoClientFactory receiverMqttClientFactoryForSub() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); String[] array = mqttProperties.getHost().split(","); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(array); options.setUserName(mqttProperties.getClientUsername()); options.setPassword(mqttProperties.getClientPassword().toCharArray()); options.setKeepAliveInterval(mqttProperties.getClientAliveTime()); //æ¥åç¦»çº¿æ¶æ¯ options.setCleanSession(false); options.setMqttVersion(4); factory.setConnectionOptions(options); return factory; } //é ç½®client,çå¬çtopic @Bean public MessageProducer inbound() { String[] inboundTopics = mqttProperties.getClientTopics().split(","); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( mqttProperties.getClientId() + "_inbound", receiverMqttClientFactoryForSub(), inboundTopics); //对inboundTopics主é¢è¿è¡çå¬ adapter.setCompletionTimeout(mqttProperties.getClientTimeout()); adapter.setQos(1); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setOutputChannel(mqttInputChannel()); return adapter; } //éè¿ééè·åæ°æ® @Bean @ServiceActivator(inputChannel = "mqttInputChannel") //弿¥å¤ç public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("----------------------"); //è·åmqttçtopic String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); //使ç¨webSocketè¿åç»å端 SocketService socketService = new SocketService(); socketService.onMessage(message.getPayload().toString(), null, topic); log.info("message:" + message.getPayload()); log.info("PacketId:" + message.getHeaders().getId()); log.info("Qos:" + message.getHeaders().get(MqttHeaders.QOS)); log.info("topic:" + topic); } }; } } src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,58 @@ package com.fzzy.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; /** * MQTTç产端 */ @Slf4j @Configuration @IntegrationComponentScan public class MqttOutboundConfiguration { @Autowired private MqttProperties mqttProperties; @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean public MqttPahoClientFactory getReceiverMqttConnectOptionsForSend() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); String[] array = mqttProperties.getHost().split(","); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(array); if(null == mqttProperties.getClientUsername()) mqttProperties.setClientUsername(" "); if(null == mqttProperties.getClientPassword()) mqttProperties.setClientPassword(" "); options.setUserName(mqttProperties.getClientUsername()); options.setPassword(mqttProperties.getClientPassword().toCharArray()); // æ¥åç¦»çº¿æ¶æ¯ options.setCleanSession(false); //åè¯ä»£ç客æ·ç«¯æ¯å¦è¦å»ºç«æä¹ ä¼è¯ falseä¸ºå»ºç«æä¹ ä¼è¯ options.setMqttVersion(4); factory.setConnectionOptions(options); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "outbound", getReceiverMqttConnectOptionsForSend()); messageHandler.setAsync(true); return messageHandler; } } src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -1,15 +1,17 @@ package com.fzzy.mqtt; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * MQTT é ç½®ä¿¡æ¯ */ @Component @ConfigurationProperties("mqtt") @Slf4j @Data @Component @ConfigurationProperties("spring.mqtt") public class MqttProperties { /** @@ -37,13 +39,13 @@ /** * è¶ æ¶æ¶é´ */ private int clientTimeout; private int clientTimeout = 5000; /** * 设置ä¼è¯å¿è·³æ¶é´ åä½ä¸ºç§ æå¡å¨ä¼æ¯é1.5*20ç§çæ¶é´å客æ·ç«¯ * åéä¸ªæ¶æ¯å¤æå®¢æ·ç«¯æ¯å¦å¨çº¿ï¼ä½è¿ä¸ªæ¹æ³å¹¶æ²¡æéè¿çæºå¶ */ private int clientAliveTime; private int clientAliveTime = 30000; private int clientMaxConnectTime; @@ -52,7 +54,7 @@ /** * è¿æ¥æ¹å¼ */ private Integer clientQos; private Integer clientQos = 0; /** * é»è®¤è¿æ¥ä¸»é¢ï¼ä»¥/#ç»å°¾è¡¨ç¤ºè®¢é ææä»¥testå¼å¤´çä¸»é¢ @@ -75,4 +77,5 @@ */ private Boolean isOpen; } src/main/java/com/fzzy/mqtt/MqttPubController.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,35 @@ package com.fzzy.mqtt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class MqttPubController { @Autowired private MqttGatewayService gatewayService; @RequestMapping("/hello") public String hello() { return "hello!"; } @RequestMapping("/sendMqtt") public String sendMqtt(String sendData) { System.out.println(sendData); System.out.println("è¿å ¥sendMqtt-------" + sendData); gatewayService.sendToMqtt("topic01", (String) sendData); return "Test is OK"; } @RequestMapping("/sendMqttTopic") public String sendMqtt(String sendData, String topic) { //System.out.println(sendData+" "+topic); //System.out.println("è¿å ¥inboundåéï¼"+sendData); gatewayService.sendToMqtt(topic, (String) sendData); return "Test is OK"; } } src/main/resources/application-devGateway.yml
@@ -69,15 +69,14 @@ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer mqtt: host: tcp://10.13.4.84:11883 client-id: client-username: client-password: client-username: admin client-password: 123456 client-timeout: 10 client-alive-time: 20 client-max-connect-times: 5 client-topics: client-topics: "/device/${productId}/${deviceId}/message/property/report,/device-message-sender/${productId}/${deviceId}" client-qos: 0 isOpen: false