src/main/java/com/fzzy/gateway/entity/GatewayDevice.java
@@ -37,6 +37,10 @@ @PropertyDef(label = "åç§°") private String deviceName; @Column(name = "PRODUCT_ID_", length = 50) @PropertyDef(label = "设å¤ç±»åKEY") private String productId; @Column(name = "TYPE_", length = 10) @PropertyDef(label = "设å¤ç±»å") private String type; src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java
@@ -4,12 +4,15 @@ import com.fzzy.api.utils.ContextUtil; import com.fzzy.api.utils.RedisConst; import com.fzzy.api.utils.RedisUtil; import com.fzzy.gateway.entity.GatewayConf; import com.fzzy.gateway.hx2023.data.GatewayAuthData; import com.fzzy.gateway.service.GatewayConfService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import java.util.List; /** * @@ -21,6 +24,9 @@ @Resource private RedisUtil redisUtil; @Resource private GatewayConfService confService; /** * é´ææ¥å£ @@ -39,28 +45,44 @@ public @ResponseBody JSONObject authorize(@RequestBody GatewayAuthData data) { log.debug("============é´æ==========={}--{}", data.getUsername(), data.getPassword()); List<GatewayConf> list = confService.getCacheConfList(); JSONObject json = new JSONObject(); json.put("timestamp", System.currentTimeMillis()); if (null == list || list.isEmpty()) { json.put("code", 500); json.put("message", "æªè·åç½å ³ä¿¡æ¯"); return json; } String gatewayId = null; for (GatewayConf conf : list) { if (data.getUsername().equals(conf.getGatewayUsername()) && data.getPassword().equals(conf.getGatewayPassword())) { gatewayId = conf.getGatewayId(); break; } } if (null == gatewayId) { json.put("code", 500); json.put("message", "æªå¹é å°ç¨æ·ååå¯ç "); return json; } //TODO éªè¯ç¨æ·ååå¯ç String token = "fzzy-" + gatewayId; String token = ContextUtil.getUUID(); log.debug("============é´æ==========={}--{}--{}", data.getUsername(), data.getPassword(), token); this.updateGatewayToken(token, data.getUsername()); JSONObject json = new JSONObject(); JSONObject result = new JSONObject(); result.put("token", token); json.put("result", result); json.put("message", "æå"); json.put("status", 0); json.put("code", 200); json.put("timestamp", System.currentTimeMillis()); return json; } src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,50 @@ package com.fzzy.gateway.hx2023.websocket; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; /** * ä¸åè°ç¨ç²®æ¸©æ£æµæ¥å£æä»¤ topic */ @Slf4j @Component @ServerEndpoint(value = "/{productId}/{deviceId}/properties/report") public class WebSockDeviceMessageReport { @OnOpen public void onOpen(Session session, @PathParam("productId") String productId, @PathParam("deviceId") String deviceId ) throws Exception { log.info("--------ä¸åè°ç¨ç²®æ¸©æ£æµæ¥å£æä»¤ topic------"); } @OnClose public void onClose() { log.info("WebSocketè¿æ¥å ³é={}"); } /** * æ¶å°å端åéçä¿¡æ¯ * * @param message * @param session */ @OnMessage public void onMessage(String message, Session session) { log.info("æ¥èªå端çä¿¡æ¯:\n" + message); } @OnError public void onError(Session session, Throwable error) { log.error("åçé误"); } } src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,50 @@ package com.fzzy.gateway.hx2023.websocket; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; /** * ä¸åè°ç¨ç²®æ¸©æ£æµæ¥å£æä»¤ topic */ @Slf4j @Component @ServerEndpoint(value = "/device-message-sender/{productId}/{deviceId}") public class WebSockDeviceMessageSender { @OnOpen public void onOpen(Session session, @PathParam("productId") String productId, @PathParam("deviceId") String deviceId ) throws Exception { log.info("--------ä¸åè°ç¨ç²®æ¸©æ£æµæ¥å£æä»¤ topic------"); } @OnClose public void onClose() { log.info("WebSocketè¿æ¥å ³é={}"); } /** * æ¶å°å端åéçä¿¡æ¯ * * @param message * @param session */ @OnMessage public void onMessage(String message, Session session) { log.info("æ¥èªå端çä¿¡æ¯:\n" + message); } @OnError public void onError(Session session, Throwable error) { log.error("åçé误"); } } src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceLed.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java
@@ -16,7 +16,6 @@ import com.fzzy.gateway.entity.GatewayDevice; import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.*; import com.fzzy.gateway.hx2023.websocket.WebSocketDeviceReport; import com.fzzy.gateway.service.repository.GatewayDeviceRep; import com.fzzy.mqtt.MqttPublishService; @@ -62,28 +61,27 @@ /** * gatewayDeviceService#updateSave * * @param entity * @param data */ @DataResolver public void updateSave(GatewayDevice entity) { GatewayDevice data = new GatewayDevice(); BeanUtils.copyProperties(entity, data); if (null == data.getId()) { data.setId(ContextUtil.getUUID()); } if (null == data.getDeviceSn()) { if (null != entity.getIp()) { data.setDeviceSn(entity.getIp()); public void updateSave(GatewayDevice data) { GatewayDevice data2 = new GatewayDevice(); BeanUtils.copyProperties(data, data2); if (null == data2.getDeviceSn()) { if (null != data2.getIp()) { data.setDeviceSn(data2.getIp()); } else { data.setDeviceSn(data.getDeviceId()); data.setDeviceSn(data2.getDeviceId()); } } gatewayDeviceRep.save(data); if (null == data2.getId()) { data2.setId(ContextUtil.getUUID()); gatewayDeviceRep.save(data2); }else{ gatewayDeviceRep.save(data2); } flushCache(); } @@ -105,10 +103,6 @@ } /** * gatewayDeviceService#flushCache */ @Expose public void flushCache() { List<GatewayDevice> list = listAll(); if (null == list || list.isEmpty()) return; src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,62 @@ package com.fzzy.gateway.service; import com.bstek.dorado.annotation.DataResolver; import com.fzzy.api.utils.ContextUtil; import com.fzzy.gateway.GatewayUtils; import com.fzzy.gateway.entity.GatewayDevice; import com.fzzy.gateway.service.repository.GatewayDeviceRep; import org.springframework.beans.BeanUtils; import org.springframework.data.domain.Sort; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; @Component public class GatewayDeviceService2 { @Resource private GatewayDeviceRep gatewayDeviceRep; public List<GatewayDevice> listAll() { Sort sort = new Sort(Sort.Direction.ASC, "deviceId"); return gatewayDeviceRep.findAll(sort); } /** * gatewayDeviceService2#updateSave * * @param data */ @DataResolver public void updateSave(GatewayDevice data) { GatewayDevice data2 = new GatewayDevice(); BeanUtils.copyProperties(data, data2); if (null == data2.getDeviceSn()) { if (null != data2.getIp()) { data.setDeviceSn(data2.getIp()); } else { data.setDeviceSn(data2.getDeviceId()); } } if (null == data2.getId()) { data2.setId(ContextUtil.getUUID()); gatewayDeviceRep.save(data2); } else { gatewayDeviceRep.save(data2); } flushCache(); } public void flushCache() { List<GatewayDevice> list = listAll(); if (null == list || list.isEmpty()) return; for (GatewayDevice device : list) { GatewayUtils.add2Cache(device); } } } src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java
@@ -9,12 +9,8 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import java.io.*; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; import java.util.Map; import java.util.Set; /** * ç½å ³ä¸ç¨HTTP请æ±å·¥å ·ç±» src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java
@@ -1,8 +1,6 @@ package com.fzzy.gateway.util; import lombok.extern.slf4j.Slf4j; import javax.crypto.Cipher; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; @@ -15,7 +13,6 @@ import java.util.HashMap; import java.util.Map; @Slf4j public class GatewayRSAUtils { /** * RSAæå¤§å 坿æå¤§å° 2048/8-11 src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
@@ -392,7 +392,7 @@ <Buttons> <Button> <ClientEvent name="onClick">var data = view.get("#dsQuery.data");
 view.get("#ajaxTestWeight").set("parameter",data.get("weight")).execute(function(result){
 view.get("#ajaxTestWeight").set("parameter",data.get("weight")).execute(function(result){
 self.get("parent").hide();
 $alert(result);
 });</ClientEvent> src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml
@@ -107,8 +107,12 @@ <Property name="label">设å¤å¯ç </Property> </PropertyDef> <PropertyDef name="depotIdSys"> <Property></Property> <Property name="label">èªå®ä¹ä»åºç¼ç </Property> </PropertyDef> <PropertyDef name="productId"> <Property/> <Property name="label">åºåºç³»ç»ä»åºç¼ç </Property> <Property name="label">设å¤ç±»åKEY</Property> </PropertyDef> </DataType> </Model> @@ -182,6 +186,9 @@ <Property name="property">type</Property> <Property name="align">center</Property> </DataColumn> <DataColumn name="productId"> <Property name="property">productId</Property> </DataColumn> <DataColumn name="deviceId"> <Property name="property">deviceId</Property> <Property name="align">center</Property> @@ -203,8 +210,7 @@ <Property name="closeable">false</Property> <Buttons> <Button> <ClientEvent name="onClick">var cur = view.get("#dgMain").getCurrentItem();
 view.get("#updateSave").execute(function(){
 <ClientEvent name="onClick">view.get("#updateSave").execute(function(){
 self.get("parent").hide();
 });</ClientEvent> <Property name="caption">ä¿åä¿®æ¹</Property> @@ -250,6 +256,11 @@ <Editor/> </AutoFormElement> <AutoFormElement> <Property name="name">productId</Property> <Property name="property">productId</Property> <Editor/> </AutoFormElement> <AutoFormElement> <Property name="name">orgId</Property> <Property name="property">orgId</Property> <Editor/> @@ -257,6 +268,11 @@ <AutoFormElement> <Property name="name">depotId</Property> <Property name="property">depotId</Property> <Editor/> </AutoFormElement> <AutoFormElement> <Property name="name">depotIdSys</Property> <Property name="property">depotIdSys</Property> <Editor/> </AutoFormElement> <AutoFormElement> @@ -310,11 +326,6 @@ <Property name="property">httpUrl</Property> <Editor/> </AutoFormElement> <AutoFormElement> <Property name="name">depotIdSys</Property> <Property name="property">depotIdSys</Property> <Editor/> </AutoFormElement> <AutoFormElement layoutConstraint="colSpan:3"> <Property name="name">remark</Property> <Property name="property">remark</Property> @@ -327,10 +338,11 @@ <Tools/> </Dialog> <UpdateAction id="updateSave"> <Property name="dataResolver">gatewayDeviceService#updateSave</Property> <Property name="dataResolver">gatewayDeviceService2#updateSave</Property> <UpdateItem> <Property name="dataPath">[#current]</Property> <Property name="dataSet">dsMain</Property> <Property name="alias">data</Property> </UpdateItem> </UpdateAction> <AjaxAction id="ajaxDel"> src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,36 @@ package com.fzzy.mqtt; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class MqttConsumerCallBack implements MqttCallback { /** * 客æ·ç«¯æå¼è¿æ¥çåè° */ @Override public void connectionLost(Throwable throwable) { System.out.println("䏿å¡å¨æå¼è¿æ¥ï¼å¯éè¿"); } /** * æ¶æ¯å°è¾¾çåè° */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("æ¥æ¶æ¶æ¯ä¸»é¢ : %s", topic)); System.out.println(String.format("æ¥æ¶æ¶æ¯Qos : %d", message.getQos())); System.out.println(String.format("æ¥æ¶æ¶æ¯å 容 : %s", new String(message.getPayload()))); System.out.println(String.format("æ¥æ¶æ¶æ¯retained : %b", message.isRetained())); } /** * æ¶æ¯å叿åçåè° */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println(String.format("æ¥æ¶æ¶æ¯æå")); } } src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,89 @@ package com.fzzy.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration @Slf4j public class MqttConsumerConfig { @Autowired private MqttProperties mqttProperties; /** * 客æ·ç«¯å¯¹è±¡ */ private MqttClient client; /** * å¨beanåå§ååè¿æ¥å°æå¡å¨ */ @PostConstruct public void init() { connect(); } /** * 客æ·ç«¯è¿æ¥æå¡ç«¯ */ public void connect() { try { //å建MQTT客æ·ç«¯å¯¹è±¡ client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientInId(), new MemoryPersistence()); //è¿æ¥è®¾ç½® MqttConnectOptions options = new MqttConnectOptions(); //æ¯å¦æ¸ 空sessionï¼è®¾ç½®ä¸ºfalse表示æå¡å¨ä¼ä¿ç客æ·ç«¯çè¿æ¥è®°å½ï¼å®¢æ·ç«¯éè¿ä¹åè½è·åå°æå¡å¨å¨å®¢æ·ç«¯æå¼è¿æ¥æé´æ¨éçæ¶æ¯ //设置为trueè¡¨ç¤ºæ¯æ¬¡è¿æ¥å°æå¡ç«¯é½æ¯ä»¥æ°ç身份 options.setCleanSession(true); //è®¾ç½®è¿æ¥ç¨æ·å options.setUserName(mqttProperties.getUsername()); //è®¾ç½®è¿æ¥å¯ç options.setPassword(mqttProperties.getPassword().toCharArray()); //è®¾ç½®è¶ æ¶æ¶é´ï¼åä½ä¸ºç§ options.setConnectionTimeout(10); //设置å¿è·³æ¶é´ åä½ä¸ºç§ï¼è¡¨ç¤ºæå¡å¨æ¯é1.5*20ç§çæ¶é´å客æ·ç«¯åéå¿è·³å¤æå®¢æ·ç«¯æ¯å¦å¨çº¿ options.setKeepAliveInterval(20); //设置é屿¶æ¯çè¯é¢ï¼è¥å®¢æ·ç«¯åæå¡å¨ä¹é´çè¿æ¥æå¤æå¼ï¼æå¡å¨å°åå¸å®¢æ·ç«¯çéå±ä¿¡æ¯ options.setWill("willTopic", (mqttProperties.getClientInId() + "䏿å¡å¨æå¼è¿æ¥").getBytes(), 0, false); //设置åè° client.setCallback(new MqttConsumerCallBack()); client.connect(options); //订é ä¸»é¢ //æ¶æ¯ç级ï¼å䏻颿°ç»ä¸ä¸å¯¹åºï¼æå¡ç«¯å°æç §æå®ç级ç»è®¢é äºä¸»é¢ç客æ·ç«¯æ¨éæ¶æ¯ int[] qos = {1, 1}; //ä¸»é¢ String[] topics = mqttProperties.getTopics().split(","); //订é ä¸»é¢ client.subscribe(topics, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * æå¼è¿æ¥ */ public void disConnect() { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 订é ä¸»é¢ */ public void subscribe(String topic, int qos) { try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } } src/main/java/com/fzzy/mqtt/MqttController.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,30 @@ package com.fzzy.mqtt; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class MqttController { @Autowired private MqttConsumerConfig client; @Autowired private MqttProperties mqttProperties; @RequestMapping("/connect") public @ResponseBody String connect() { client.connect(); return mqttProperties.getClientOutId() + "è¿æ¥å°æå¡å¨"; } @RequestMapping("/disConnect") @ResponseBody public String disConnect() { client.disConnect(); return mqttProperties.getClientOutId() + "䏿å¡å¨æå¼è¿æ¥"; } } src/main/java/com/fzzy/mqtt/MqttGatewayService.java
@@ -1,6 +1,5 @@ package com.fzzy.mqtt; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java
ÎļþÒÑɾ³ý src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -22,39 +22,44 @@ /** * ç¨æ·å */ private String clientUsername; private String username; /** * å¯ç */ private String clientPassword; private String password; /** * 客æ·ç«¯Idï¼åä¸å°æå¡å¨ä¸ï¼ä¸å 许åºç°éå¤ç客æ·ç«¯id * 客æ·ç«¯Id-åå¸è Id */ private String clientId; private String clientOutId; /** * 客æ·ç«¯Id-被订é è Id */ private String clientInId; /** * è¶ æ¶æ¶é´ */ private int clientTimeout = 5000; private int timout = 5000; /** * 设置ä¼è¯å¿è·³æ¶é´ åä½ä¸ºç§ æå¡å¨ä¼æ¯é1.5*20ç§çæ¶é´å客æ·ç«¯ * åéä¸ªæ¶æ¯å¤æå®¢æ·ç«¯æ¯å¦å¨çº¿ï¼ä½è¿ä¸ªæ¹æ³å¹¶æ²¡æéè¿çæºå¶ */ private int clientAliveTime = 30000; private int keepAliveInterval = 20; private int clientMaxConnectTime; private int maxConnectTimes = 5; private String clientTopics; private String topics; /** * è¿æ¥æ¹å¼ */ private Integer clientQos = 0; private Integer qos = 0; /** * é»è®¤è¿æ¥ä¸»é¢ï¼ä»¥/#ç»å°¾è¡¨ç¤ºè®¢é ææä»¥testå¼å¤´çä¸»é¢ src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,37 @@ package com.fzzy.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; @Slf4j public class MqttProviderCallBack implements MqttCallback { /** * 客æ·ç«¯æå¼è¿æ¥çåè° */ @Override public void connectionLost(Throwable throwable) { System.out.println("䏿å¡å¨æå¼è¿æ¥ï¼å¯éè¿"); } /** * æ¶æ¯å°è¾¾çåè° */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(String.format("æ¥æ¶æ¶æ¯ä¸»é¢ : %s", topic)); System.out.println(String.format("æ¥æ¶æ¶æ¯Qos : %d", message.getQos())); System.out.println(String.format("æ¥æ¶æ¶æ¯å 容 : %s", new String(message.getPayload()))); System.out.println(String.format("æ¥æ¶æ¶æ¯retained : %b", message.isRetained())); } /** * æ¶æ¯å叿åçåè° */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println(String.format("æ¥æ¶æ¶æ¯æå")); } } src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,101 @@ package com.fzzy.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; @Configuration @Slf4j public class MqttProviderConfig { @Autowired private MqttProperties mqttProperties; /** * 客æ·ç«¯å¯¹è±¡ */ private MqttClient client; /** * * å¨beanåå§ååè¿æ¥å°æå¡å¨ */ @PostConstruct public void init(){ connect(); } /** * 客æ·ç«¯è¿æ¥æå¡ç«¯ */ public void connect(){ try{ //å建MQTT客æ·ç«¯å¯¹è±¡ client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientOutId(),new MemoryPersistence()); //è¿æ¥è®¾ç½® MqttConnectOptions options = new MqttConnectOptions(); //æ¯å¦æ¸ 空sessionï¼è®¾ç½®false表示æå¡å¨ä¼ä¿ç客æ·ç«¯çè¿æ¥è®°å½ï¼è®¢é 主é¢ï¼qosï¼,客æ·ç«¯éè¿ä¹åè½è·åå°æå¡å¨å¨å®¢æ·ç«¯æå¼è¿æ¥æé´æ¨éçæ¶æ¯ //设置为trueè¡¨ç¤ºæ¯æ¬¡è¿æ¥æå¡å¨é½æ¯ä»¥æ°ç身份 options.setCleanSession(true); //è®¾ç½®è¿æ¥ç¨æ·å options.setUserName(mqttProperties.getUsername()); //è®¾ç½®è¿æ¥å¯ç options.setPassword(mqttProperties.getPassword().toCharArray()); //è®¾ç½®è¶ æ¶æ¶é´ï¼åä½ä¸ºç§ options.setConnectionTimeout(100); //设置å¿è·³æ¶é´ åä½ä¸ºç§ï¼è¡¨ç¤ºæå¡å¨æ¯é 1.5*20ç§çæ¶é´å客æ·ç«¯åéå¿è·³å¤æå®¢æ·ç«¯æ¯å¦å¨çº¿ options.setKeepAliveInterval(20); //设置é屿¶æ¯çè¯é¢ï¼è¥å®¢æ·ç«¯åæå¡å¨ä¹é´çè¿æ¥æå¤æå¼ï¼æå¡å¨å°åå¸å®¢æ·ç«¯çéå±ä¿¡æ¯ options.setWill("willTopic",(mqttProperties.getClientOutId() + "䏿å¡å¨æå¼è¿æ¥").getBytes(),0,false); //设置åè° client.setCallback(new MqttProviderCallBack()); client.connect(options); } catch(MqttException e){ e.printStackTrace(); } } public void publish(String topic,String message){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(mqttProperties.getQos()); mqttMessage.setRetained(true); mqttMessage.setPayload(message.getBytes()); //主é¢çç®çå°ï¼ç¨äºåå¸/订é ä¿¡æ¯ MqttTopic mqttTopic = client.getTopic(topic); //æä¾ä¸ç§æºå¶æ¥è·è¸ªæ¶æ¯çä¼ éè¿åº¦ //ç¨äºå¨ä»¥éé»å¡æ¹å¼ï¼å¨åå°è¿è¡ï¼æ§è¡å叿¯è·è¸ªæ¶æ¯çä¼ éè¿åº¦ MqttDeliveryToken token; try { //尿宿¶æ¯åå¸å°ä¸»é¢ï¼ä½ä¸çå¾ æ¶æ¯ä¼ é宿ï¼è¿åçtokenå¯ç¨äºè·è¸ªæ¶æ¯çä¼ éç¶æ //䏿¦æ¤æ¹æ³å¹²åå°è¿åï¼æ¶æ¯å°±å·²è¢«å®¢æ·ç«¯æ¥ååå¸ï¼å½è¿æ¥å¯ç¨ï¼å°å¨åå°å®ææ¶æ¯ä¼ éã token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } } public void publish(int qos,boolean retained,String topic,String message){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(message.getBytes()); //主é¢çç®çå°ï¼ç¨äºåå¸/订é ä¿¡æ¯ MqttTopic mqttTopic = client.getTopic(topic); //æä¾ä¸ç§æºå¶æ¥è·è¸ªæ¶æ¯çä¼ éè¿åº¦ //ç¨äºå¨ä»¥éé»å¡æ¹å¼ï¼å¨åå°è¿è¡ï¼æ§è¡å叿¯è·è¸ªæ¶æ¯çä¼ éè¿åº¦ MqttDeliveryToken token; try { //尿宿¶æ¯åå¸å°ä¸»é¢ï¼ä½ä¸çå¾ æ¶æ¯ä¼ é宿ï¼è¿åçtokenå¯ç¨äºè·è¸ªæ¶æ¯çä¼ éç¶æ //䏿¦æ¤æ¹æ³å¹²åå°è¿åï¼æ¶æ¯å°±å·²è¢«å®¢æ·ç«¯æ¥ååå¸ï¼å½è¿æ¥å¯ç¨ï¼å°å¨åå°å®ææ¶æ¯ä¼ éã token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } } } src/main/java/com/fzzy/mqtt/MqttPubController.java
@@ -2,34 +2,25 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @RestController public class MqttPubController { @Autowired private MqttGatewayService gatewayService; private MqttProviderConfig providerClient; @RequestMapping("/hello") public String hello() { return "hello!"; @RequestMapping("/sendMessage") public @ResponseBody String sendMessage(String topic, String message) { try { providerClient.publish(topic, message); return "åéæå"; } catch (Exception e) { e.printStackTrace(); return "åé失败"; } } @RequestMapping("/sendMqtt") public String sendMqtt(String sendData) { System.out.println(sendData); System.out.println("è¿å ¥sendMqtt-------" + sendData); gatewayService.sendToMqtt("topic01", (String) sendData); return "Test is OK"; } @RequestMapping("/sendMqttTopic") public String sendMqtt(String sendData, String topic) { //System.out.println(sendData+" "+topic); //System.out.println("è¿å ¥inboundåéï¼"+sendData); gatewayService.sendToMqtt(topic, (String) sendData); return "Test is OK"; } } src/main/java/com/fzzy/mqtt/MqttPublishService.java
@@ -9,6 +9,7 @@ @Service public class MqttPublishService { private static MqttClient client ; src/main/resources/application-dev.yml
@@ -96,15 +96,3 @@ auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer mqtt: host: tcp://10.13.4.84:11883 client-id: client-username: client-password: client-timeout: 10 client-alive-time: 20 client-max-connect-times: 5 client-topics: client-qos: 0 isOpen: false src/main/resources/application-devGateway.yml
@@ -70,13 +70,16 @@ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer mqtt: host: tcp://10.13.4.84:11883 client-id: client-username: admin client-password: 123456 client-timeout: 10 client-alive-time: 20 client-max-connect-times: 5 client-topics: "/device/${productId}/${deviceId}/message/property/report,/device-message-sender/${productId}/${deviceId}" client-qos: 0 host: tcp://127.0.0.1:1883 username: admin password: pwdmqtt.. client-outId: fzzy-customer-id client-inId: fzzy-6e3d92ff71b911eea5e50250f2000002 timeout: 10 keep-alive-interval: 20 max-connect-times: 5 qos: 0 isOpen: false default: topic: testTopic topics: "/+/+/properties/report,/device-message-sender/+/+"