From 0a8ef890592a9c0389c42daeebb9a3f0974c37e6 Mon Sep 17 00:00:00 2001 From: jiazx0107@163.com <jiazx0107@163.com> Date: 星期三, 08 十一月 2023 14:59:41 +0800 Subject: [PATCH] 调整MQTT配置 --- src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java | 50 +++ src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java | 62 ++++ src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java | 36 ++ src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java | 89 ++++++ src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java | 4 src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java | 36 +- src/main/java/com/fzzy/mqtt/MqttProviderConfig.java | 101 +++++++ src/main/java/com/fzzy/mqtt/MqttProperties.java | 23 + src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml | 30 + src/main/java/com/fzzy/gateway/entity/GatewayDevice.java | 4 src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java | 50 +++ /dev/null | 58 ---- src/main/resources/application-devGateway.yml | 21 src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java | 37 ++ src/main/java/com/fzzy/mqtt/MqttController.java | 30 ++ src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java | 42 ++ src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java | 3 src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml | 2 src/main/java/com/fzzy/mqtt/MqttGatewayService.java | 1 src/main/java/com/fzzy/mqtt/MqttPubController.java | 33 - src/main/resources/application-dev.yml | 12 src/main/java/com/fzzy/mqtt/MqttPublishService.java | 1 22 files changed, 567 insertions(+), 158 deletions(-) diff --git a/src/main/java/com/fzzy/gateway/entity/GatewayDevice.java b/src/main/java/com/fzzy/gateway/entity/GatewayDevice.java index 32531b0..c5fc2e3 100644 --- a/src/main/java/com/fzzy/gateway/entity/GatewayDevice.java +++ b/src/main/java/com/fzzy/gateway/entity/GatewayDevice.java @@ -37,6 +37,10 @@ @PropertyDef(label = "鍚嶇О") private String deviceName; + @Column(name = "PRODUCT_ID_", length = 50) + @PropertyDef(label = "璁惧绫诲瀷KEY") + private String productId; + @Column(name = "TYPE_", length = 10) @PropertyDef(label = "璁惧绫诲瀷") private String type; diff --git a/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java b/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java index 8ad1617..20cdbc2 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java +++ b/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java @@ -4,12 +4,15 @@ import com.fzzy.api.utils.ContextUtil; import com.fzzy.api.utils.RedisConst; import com.fzzy.api.utils.RedisUtil; +import com.fzzy.gateway.entity.GatewayConf; import com.fzzy.gateway.hx2023.data.GatewayAuthData; +import com.fzzy.gateway.service.GatewayConfService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; +import java.util.List; /** * @@ -21,6 +24,9 @@ @Resource private RedisUtil redisUtil; + @Resource + private GatewayConfService confService; + /** * 閴存潈鎺ュ彛 @@ -39,28 +45,44 @@ public @ResponseBody JSONObject authorize(@RequestBody GatewayAuthData data) { - log.debug("============閴存潈==========={}--{}", data.getUsername(), data.getPassword()); + + List<GatewayConf> list = confService.getCacheConfList(); + + JSONObject json = new JSONObject(); + json.put("timestamp", System.currentTimeMillis()); + if (null == list || list.isEmpty()) { + json.put("code", 500); + json.put("message", "鏈幏鍙栫綉鍏充俊鎭�"); + return json; + } + + String gatewayId = null; + for (GatewayConf conf : list) { + if (data.getUsername().equals(conf.getGatewayUsername()) && data.getPassword().equals(conf.getGatewayPassword())) { + gatewayId = conf.getGatewayId(); + break; + } + } + + if (null == gatewayId) { + json.put("code", 500); + json.put("message", "鏈尮閰嶅埌鐢ㄦ埛鍚嶅拰瀵嗙爜"); + return json; + } - //TODO 楠岃瘉鐢ㄦ埛鍚嶅拰瀵嗙爜 + String token = "fzzy-" + gatewayId; - - String token = ContextUtil.getUUID(); + log.debug("============閴存潈==========={}--{}--{}", data.getUsername(), data.getPassword(), token); this.updateGatewayToken(token, data.getUsername()); - JSONObject json = new JSONObject(); - JSONObject result = new JSONObject(); - result.put("token", token); - json.put("result", result); json.put("message", "鎴愬姛"); json.put("status", 0); json.put("code", 200); - json.put("timestamp", System.currentTimeMillis()); - return json; } diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java deleted file mode 100644 index 1025124..0000000 --- a/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.fzzy.gateway.hx2023.websocket; - - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; -import org.springframework.web.bind.annotation.CrossOrigin; - -import javax.websocket.*; -import javax.websocket.server.PathParam; -import javax.websocket.server.ServerEndpoint; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArraySet; - -@Component -@CrossOrigin -@Service -@ServerEndpoint(value = "/mqtt") -public class SocketService { - - /** - * 鐢ㄦ潵瀛樻斁姣忎釜瀹㈡埛绔搴旂殑MyWebSocket瀵硅薄銆� - **/ - private static CopyOnWriteArraySet<SocketService> socketSet = new CopyOnWriteArraySet<>(); - /** - * 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹� - **/ - private Session session; - /** - * 鐢ㄦ埛鍚嶇О - **/ - private String nickname; - /** - * 鐢ㄦ潵璁板綍sessionId鍜岃session杩涜缁戝畾 - **/ - private static Map<String,Session> map = new HashMap<String, Session>(); - - /** - * 杩炴帴寤虹珛鎴愬姛璋冪敤鐨勬柟娉� - */ - @OnOpen - public void onOpen(Session session,@PathParam("nickname") String nickname) { - this.session = session; - this.nickname=nickname; - - map.put(nickname, session); - socketSet.add(this); - - System.out.println("鏈夋柊杩炴帴鍔犲叆:"+nickname+",褰撳墠鍦ㄧ嚎浜烘暟涓�" + socketSet.size()); - } - - /** - * 杩炴帴鍏抽棴璋冪敤鐨勬柟娉� - */ - @OnClose - public void onClose() { - socketSet.remove(this); - List<String> nickname = this.session.getRequestParameterMap().get("nickname"); - for(String nick:nickname) { - map.remove(nick); - } - System.out.println("鏈変竴杩炴帴鍏抽棴锛佸綋鍓嶅湪绾夸汉鏁颁负" + socketSet.size()); - } - - /** - * 鏀跺埌瀹㈡埛绔秷鎭悗璋冪敤鐨勬柟娉� - */ - @OnMessage - public void onMessage(String message, Session session,@PathParam("nickname") String nickname) { - System.out.println("鏉ヨ嚜瀹㈡埛绔殑娑堟伅-->"+nickname+": " + message); - //灏唌qtt鍙戦�佽繃鏉ョ殑鏁版嵁杩斿洖缁欏墠绔� - try { -// JSONObject parse = JSONObject.parse(message); -// Map<String,Object> valuesMap = (Map<String, Object>) parse.getByPath("values"); -// String tag1 = valuesMap.get("tag1").toString(); -// String replace = tag1.replace("\r\n", ""); -// String substring = replace.substring(replace.indexOf(":") + 1).trim().replace(" ",":"); -// String tag2 = valuesMap.get("tag2").toString(); -// String substring1=substring+"tag2:"+tag2; -// String result="{"+substring1+"}"; -// //鍙戦�佺粰鍓嶇锛岀敤浣滈〉闈㈡覆鏌� -// Session fromSession = map.get(nickname); -// fromSession.getAsyncRemote().sendText(result); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * 鍙戠敓閿欒鏃惰皟鐢� - */ - @OnError - public void onError(Session session, Throwable error) { - System.out.println("鍙戠敓閿欒"); - error.printStackTrace(); - } - - -} diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java new file mode 100644 index 0000000..a59ba47 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java @@ -0,0 +1,50 @@ +package com.fzzy.gateway.hx2023.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; + + +/** + * 涓嬪彂璋冪敤绮俯妫�娴嬫帴鍙f寚浠� topic + */ +@Slf4j +@Component +@ServerEndpoint(value = "/{productId}/{deviceId}/properties/report") +public class WebSockDeviceMessageReport { + + @OnOpen + public void onOpen(Session session, + @PathParam("productId") String productId, + @PathParam("deviceId") String deviceId + ) throws Exception { + + log.info("--------涓嬪彂璋冪敤绮俯妫�娴嬫帴鍙f寚浠� topic------"); + } + + @OnClose + public void onClose() { + + log.info("WebSocket杩炴帴鍏抽棴={}"); + } + + /** + * 鏀跺埌鍓嶇鍙戦�佺殑淇℃伅 + * + * @param message + * @param session + */ + @OnMessage + public void onMessage(String message, Session session) { + + log.info("鏉ヨ嚜鍓嶇鐨勪俊鎭�:\n" + message); + } + + @OnError + public void onError(Session session, Throwable error) { + log.error("鍙戠敓閿欒"); + } +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java new file mode 100644 index 0000000..a1fbce5 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java @@ -0,0 +1,50 @@ +package com.fzzy.gateway.hx2023.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; + + +/** + * 涓嬪彂璋冪敤绮俯妫�娴嬫帴鍙f寚浠� topic + */ +@Slf4j +@Component +@ServerEndpoint(value = "/device-message-sender/{productId}/{deviceId}") +public class WebSockDeviceMessageSender { + + @OnOpen + public void onOpen(Session session, + @PathParam("productId") String productId, + @PathParam("deviceId") String deviceId + ) throws Exception { + + log.info("--------涓嬪彂璋冪敤绮俯妫�娴嬫帴鍙f寚浠� topic------"); + } + + @OnClose + public void onClose() { + + log.info("WebSocket杩炴帴鍏抽棴={}"); + } + + /** + * 鏀跺埌鍓嶇鍙戦�佺殑淇℃伅 + * + * @param message + * @param session + */ + @OnMessage + public void onMessage(String message, Session session) { + + log.info("鏉ヨ嚜鍓嶇鐨勪俊鎭�:\n" + message); + } + + @OnError + public void onError(Session session, Throwable error) { + log.error("鍙戠敓閿欒"); + } +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceLed.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceLed.java deleted file mode 100644 index 70bed3b..0000000 --- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceLed.java +++ /dev/null @@ -1,117 +0,0 @@ -package com.fzzy.gateway.hx2023.websocket; - -import com.alibaba.fastjson.JSONObject; -import com.fzzy.gateway.GatewayUtils; -import com.fzzy.gateway.hx2023.data.WebSocketPacket; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; -import org.springframework.stereotype.Component; - -import javax.websocket.*; -import javax.websocket.server.PathParam; -import javax.websocket.server.ServerEndpoint; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * 缃戝叧鎺ュ彈绯荤粺鍙戦�佺殑鎶ユ枃淇℃伅 - */ -@Slf4j -@Component -@ServerEndpoint(value = "/device-message-sender/{productId}/{deviceId}") -public class WebSocketDeviceLed { - - private static Map<String, Session> sessionPool = new ConcurrentHashMap<>(); - private static Map<String, String> sessionIds = new ConcurrentHashMap<>(); - - // 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹� - private Session session; - - - @OnOpen - public void onOpen(Session session, - @PathParam("productId") String productId, - @PathParam("deviceId") String deviceId, - @PathParam("clientId") String clientId - ) throws Exception { - - this.session = session; - - String key = productId + "-" + deviceId; - - sessionPool.put(key, session); - sessionIds.put(session.getId(), key); - - GatewayUtils.updateOnline(deviceId); - - log.info("new webSocket,clientId={}", key); - } - - @OnClose - public void onClose() { - - String key = sessionIds.get(session.getId()); - - sessionPool.remove(key); - sessionIds.remove(session.getId()); - - String deviceId = key.substring(0, key.indexOf("-")); - - GatewayUtils.updateOffOnline(deviceId); - - log.info("WebSocket杩炴帴鍏抽棴={}", key); - - - } - - /** - * 鏀跺埌鍓嶇鍙戦�佺殑淇℃伅 - * - * @param message - * @param session - */ - @OnMessage - public void onMessage(String message, Session session) { - - log.info("鏉ヨ嚜鍓嶇鐨勪俊鎭�:\n" + message); - } - - @OnError - public void onError(Session session, Throwable error) { - log.error("鍙戠敓閿欒"); - - String key = sessionIds.get(session.getId()); - - String deviceId = key.substring(0, key.indexOf("-")); - - GatewayUtils.updateOffOnline(deviceId); - - sessionPool.remove(key); - sessionIds.remove(session.getId()); - error.printStackTrace(); - } - - - /** - * @param packet - */ - public static void sendByPacket(WebSocketPacket packet) { - if (StringUtils.isEmpty(packet.getDeviceId())) { - log.error("WebSocket淇℃伅鎺ㄩ�佸け璐ワ紝璁惧缂栫爜涓虹┖銆�"); - return; - } - - String tag = packet.getDeviceId(); - - // 閬嶅巻鎺ㄩ�� - Session session; - for (String key : sessionPool.keySet()) { - if (key.indexOf(tag) != -1) { - session = sessionPool.get(key); - session.getAsyncRemote().sendText( - JSONObject.toJSONString(packet)); - } - } - } -} diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java deleted file mode 100644 index 9d9756c..0000000 --- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java +++ /dev/null @@ -1,135 +0,0 @@ -package com.fzzy.gateway.hx2023.websocket; - -import com.alibaba.fastjson.JSONObject; -import com.fzzy.gateway.GatewayUtils; -import com.fzzy.gateway.hx2023.data.WebSocketPacket; -import lombok.extern.slf4j.Slf4j; - -import org.apache.commons.lang.StringUtils; -import org.springframework.stereotype.Component; - -import javax.websocket.*; -import javax.websocket.server.PathParam; -import javax.websocket.server.ServerEndpoint; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * - */ -@Slf4j -@Component -//@ServerEndpoint(value = "/mqtt") -public class WebSocketMqtt { - - private static Map<String, Session> sessionPool = new ConcurrentHashMap<>(); - private static Map<String, String> sessionIds = new ConcurrentHashMap<>(); - - // 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹� - private Session session; - - - @OnOpen - public void onOpen(Session session) throws Exception { - - this.session = session; - - Map<String, List<String>> params = session.getRequestParameterMap(); - - log.info("new webSocket,params={}", params); - } - - public void onOpen2(Session session, - @PathParam("keepalive") String keepalive, - @PathParam("clientId") String clientId, - @PathParam("protocolId") String protocolId, - @PathParam("protocolVersion") String protocolVersion, - @PathParam("clean") boolean clean, - @PathParam("reconnectPeriod") int reconnectPeriod, - @PathParam("connectTimeout") int connectTimeout - ) throws Exception { - - this.session = session; - - String key = clientId; - - sessionPool.put(key, session); - sessionIds.put(session.getId(), key); - - GatewayUtils.updateOnline(clientId); - - log.info("new webSocket,clientId={}", key); - } - - @OnClose - public void onClose() { - -// String key = sessionIds.get(session.getId()); -// -// String deviceId = key.substring(key.indexOf("-")); -// -// GatewayUtils.updateOffOnline(deviceId); -// -// sessionPool.remove(key); -// sessionIds.remove(session.getId()); - - log.info("WebSocket杩炴帴鍏抽棴={}","----------"); - - } - - /** - * 鏀跺埌鍓嶇鍙戦�佺殑淇℃伅 - * - * @param message - * @param session - */ - @OnMessage - public void onMessage(String message, Session session) { - - log.info("鏉ヨ嚜瀹㈡埛绔俊鎭�:\n" + message); - } - - @OnError - public void onError(Session session, Throwable error) { - log.error("鍙戠敓閿欒"); - -// String clientId = sessionIds.get(session.getId()); -// -// sessionPool.remove(clientId); -// sessionIds.remove(session.getId()); - error.printStackTrace(); - } - - - /** - * 鎺ㄩ�佷俊鎭埌鍓嶇 - * - * @param packet - */ - public void sendByPacket(WebSocketPacket packet) { - if (StringUtils.isEmpty(packet.getDeviceId())) { - log.error("WebSocket淇℃伅鎺ㄩ�佸け璐ワ紝璁惧缂栫爜涓虹┖銆�"); - return; - } - - String tag = packet.getDeviceId(); - - // 閬嶅巻鎺ㄩ�� - Session session; - for (String key : sessionPool.keySet()) { - - packet.getHeaders().setProductId(key); - - log.debug("----------websocket杩斿洖淇℃伅-----{}", packet); - - if (key.indexOf(tag) != -1) { - session = sessionPool.get(key); - session.getAsyncRemote().sendText(JSONObject.toJSONString(packet)); - } - } - } - - -} diff --git a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java index 5e9b45c..2fc86ac 100644 --- a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java +++ b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java @@ -16,7 +16,6 @@ import com.fzzy.gateway.entity.GatewayDevice; import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.*; -import com.fzzy.gateway.hx2023.websocket.WebSocketDeviceReport; import com.fzzy.gateway.service.repository.GatewayDeviceRep; import com.fzzy.mqtt.MqttPublishService; @@ -62,28 +61,27 @@ /** * gatewayDeviceService#updateSave * - * @param entity + * @param data */ @DataResolver - public void updateSave(GatewayDevice entity) { - GatewayDevice data = new GatewayDevice(); - BeanUtils.copyProperties(entity, data); - - if (null == data.getId()) { - data.setId(ContextUtil.getUUID()); - } - - - if (null == data.getDeviceSn()) { - if (null != entity.getIp()) { - data.setDeviceSn(entity.getIp()); + public void updateSave(GatewayDevice data) { + GatewayDevice data2 = new GatewayDevice(); + BeanUtils.copyProperties(data, data2); + + if (null == data2.getDeviceSn()) { + if (null != data2.getIp()) { + data.setDeviceSn(data2.getIp()); } else { - data.setDeviceSn(data.getDeviceId()); + data.setDeviceSn(data2.getDeviceId()); } } - gatewayDeviceRep.save(data); - + if (null == data2.getId()) { + data2.setId(ContextUtil.getUUID()); + gatewayDeviceRep.save(data2); + }else{ + gatewayDeviceRep.save(data2); + } flushCache(); } @@ -105,10 +103,6 @@ } - /** - * gatewayDeviceService#flushCache - */ - @Expose public void flushCache() { List<GatewayDevice> list = listAll(); if (null == list || list.isEmpty()) return; diff --git a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java new file mode 100644 index 0000000..42f0067 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java @@ -0,0 +1,62 @@ +package com.fzzy.gateway.service; + +import com.bstek.dorado.annotation.DataResolver; +import com.fzzy.api.utils.ContextUtil; +import com.fzzy.gateway.GatewayUtils; +import com.fzzy.gateway.entity.GatewayDevice; +import com.fzzy.gateway.service.repository.GatewayDeviceRep; +import org.springframework.beans.BeanUtils; +import org.springframework.data.domain.Sort; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; + +@Component +public class GatewayDeviceService2 { + + @Resource + private GatewayDeviceRep gatewayDeviceRep; + + public List<GatewayDevice> listAll() { + Sort sort = new Sort(Sort.Direction.ASC, "deviceId"); + return gatewayDeviceRep.findAll(sort); + } + + + /** + * gatewayDeviceService2#updateSave + * + * @param data + */ + @DataResolver + public void updateSave(GatewayDevice data) { + GatewayDevice data2 = new GatewayDevice(); + BeanUtils.copyProperties(data, data2); + + if (null == data2.getDeviceSn()) { + if (null != data2.getIp()) { + data.setDeviceSn(data2.getIp()); + } else { + data.setDeviceSn(data2.getDeviceId()); + } + } + + if (null == data2.getId()) { + data2.setId(ContextUtil.getUUID()); + gatewayDeviceRep.save(data2); + } else { + gatewayDeviceRep.save(data2); + } + flushCache(); + } + + public void flushCache() { + List<GatewayDevice> list = listAll(); + if (null == list || list.isEmpty()) return; + for (GatewayDevice device : list) { + GatewayUtils.add2Cache(device); + } + } + +} diff --git a/src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java b/src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java index 30359de..8a3b796 100644 --- a/src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java +++ b/src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java @@ -9,12 +9,8 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import java.io.*; -import java.net.HttpURLConnection; -import java.net.URL; import java.net.URLEncoder; import java.util.Map; -import java.util.Set; /** * 缃戝叧涓撶敤HTTP璇锋眰宸ュ叿绫� diff --git a/src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java b/src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java index c81de2f..d504333 100644 --- a/src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java +++ b/src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java @@ -1,8 +1,6 @@ package com.fzzy.gateway.util; -import lombok.extern.slf4j.Slf4j; - import javax.crypto.Cipher; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; @@ -15,7 +13,6 @@ import java.util.HashMap; import java.util.Map; -@Slf4j public class GatewayRSAUtils { /** * RSA鏈�澶у姞瀵嗘槑鏂囧ぇ灏� 2048/8-11 diff --git a/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml b/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml index 324c4ba..10ffeb0 100644 --- a/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml +++ b/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml @@ -392,7 +392,7 @@ <Buttons> <Button> <ClientEvent name="onClick">var data = view.get("#dsQuery.data");
 -view.get("#ajaxTestWeight").set("parameter",data.get("weight")).execute(function(result){
 +view.get("#ajaxTestWeight").set("parameter",data.get("weight")).execute(function(result){
 self.get("parent").hide();
 $alert(result);
 });</ClientEvent> diff --git a/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml b/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml index fc41c88..cccd044 100644 --- a/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml +++ b/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml @@ -107,8 +107,12 @@ <Property name="label">璁惧瀵嗙爜</Property> </PropertyDef> <PropertyDef name="depotIdSys"> + <Property></Property> + <Property name="label">鑷畾涔変粨搴撶紪鐮�</Property> + </PropertyDef> + <PropertyDef name="productId"> <Property/> - <Property name="label">搴撳尯绯荤粺浠撳簱缂栫爜</Property> + <Property name="label">璁惧绫诲瀷KEY</Property> </PropertyDef> </DataType> </Model> @@ -182,6 +186,9 @@ <Property name="property">type</Property> <Property name="align">center</Property> </DataColumn> + <DataColumn name="productId"> + <Property name="property">productId</Property> + </DataColumn> <DataColumn name="deviceId"> <Property name="property">deviceId</Property> <Property name="align">center</Property> @@ -203,8 +210,7 @@ <Property name="closeable">false</Property> <Buttons> <Button> - <ClientEvent name="onClick">var cur = view.get("#dgMain").getCurrentItem();
 -view.get("#updateSave").execute(function(){
 + <ClientEvent name="onClick">view.get("#updateSave").execute(function(){
 self.get("parent").hide();
 });</ClientEvent> <Property name="caption">淇濆瓨淇敼</Property> @@ -250,6 +256,11 @@ <Editor/> </AutoFormElement> <AutoFormElement> + <Property name="name">productId</Property> + <Property name="property">productId</Property> + <Editor/> + </AutoFormElement> + <AutoFormElement> <Property name="name">orgId</Property> <Property name="property">orgId</Property> <Editor/> @@ -257,6 +268,11 @@ <AutoFormElement> <Property name="name">depotId</Property> <Property name="property">depotId</Property> + <Editor/> + </AutoFormElement> + <AutoFormElement> + <Property name="name">depotIdSys</Property> + <Property name="property">depotIdSys</Property> <Editor/> </AutoFormElement> <AutoFormElement> @@ -310,11 +326,6 @@ <Property name="property">httpUrl</Property> <Editor/> </AutoFormElement> - <AutoFormElement> - <Property name="name">depotIdSys</Property> - <Property name="property">depotIdSys</Property> - <Editor/> - </AutoFormElement> <AutoFormElement layoutConstraint="colSpan:3"> <Property name="name">remark</Property> <Property name="property">remark</Property> @@ -327,10 +338,11 @@ <Tools/> </Dialog> <UpdateAction id="updateSave"> - <Property name="dataResolver">gatewayDeviceService#updateSave</Property> + <Property name="dataResolver">gatewayDeviceService2#updateSave</Property> <UpdateItem> <Property name="dataPath">[#current]</Property> <Property name="dataSet">dsMain</Property> + <Property name="alias">data</Property> </UpdateItem> </UpdateAction> <AjaxAction id="ajaxDel"> diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java b/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java new file mode 100644 index 0000000..cbf2aca --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java @@ -0,0 +1,36 @@ +package com.fzzy.mqtt; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class MqttConsumerCallBack implements MqttCallback { + + /** + * 瀹㈡埛绔柇寮�杩炴帴鐨勫洖璋� + */ + @Override + public void connectionLost(Throwable throwable) { + System.out.println("涓庢湇鍔″櫒鏂紑杩炴帴锛屽彲閲嶈繛"); + } + + /** + * 娑堟伅鍒拌揪鐨勫洖璋� + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + + System.out.println(String.format("鎺ユ敹娑堟伅涓婚 : %s", topic)); + System.out.println(String.format("鎺ユ敹娑堟伅Qos : %d", message.getQos())); + System.out.println(String.format("鎺ユ敹娑堟伅鍐呭 : %s", new String(message.getPayload()))); + System.out.println(String.format("鎺ユ敹娑堟伅retained : %b", message.isRetained())); + } + + /** + * 娑堟伅鍙戝竷鎴愬姛鐨勫洖璋� + */ + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + System.out.println(String.format("鎺ユ敹娑堟伅鎴愬姛")); + } +} diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java new file mode 100644 index 0000000..b4c6f6c --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java @@ -0,0 +1,89 @@ +package com.fzzy.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; + +@Configuration +@Slf4j +public class MqttConsumerConfig { + + @Autowired + private MqttProperties mqttProperties; + /** + * 瀹㈡埛绔璞� + */ + private MqttClient client; + + /** + * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒 + */ + @PostConstruct + public void init() { + connect(); + } + + /** + * 瀹㈡埛绔繛鎺ユ湇鍔$ + */ + public void connect() { + try { + //鍒涘缓MQTT瀹㈡埛绔璞� + client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientInId(), new MemoryPersistence()); + //杩炴帴璁剧疆 + MqttConnectOptions options = new MqttConnectOptions(); + //鏄惁娓呯┖session锛岃缃负false琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛屽鎴风閲嶈繛涔嬪悗鑳借幏鍙栧埌鏈嶅姟鍣ㄥ湪瀹㈡埛绔柇寮�杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅 + //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鍒版湇鍔$閮芥槸浠ユ柊鐨勮韩浠� + options.setCleanSession(true); + //璁剧疆杩炴帴鐢ㄦ埛鍚� + options.setUserName(mqttProperties.getUsername()); + //璁剧疆杩炴帴瀵嗙爜 + options.setPassword(mqttProperties.getPassword().toCharArray()); + //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉� + options.setConnectionTimeout(10); + //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎 + options.setKeepAliveInterval(20); + //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭� + options.setWill("willTopic", (mqttProperties.getClientInId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(), 0, false); + //璁剧疆鍥炶皟 + client.setCallback(new MqttConsumerCallBack()); + client.connect(options); + //璁㈤槄涓婚 + //娑堟伅绛夌骇锛屽拰涓婚鏁扮粍涓�涓�瀵瑰簲锛屾湇鍔$灏嗘寜鐓ф寚瀹氱瓑绾х粰璁㈤槄浜嗕富棰樼殑瀹㈡埛绔帹閫佹秷鎭� + int[] qos = {1, 1}; + //涓婚 + String[] topics = mqttProperties.getTopics().split(","); + //璁㈤槄涓婚 + client.subscribe(topics, qos); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + /** + * 鏂紑杩炴帴 + */ + public void disConnect() { + try { + client.disconnect(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + + /** + * 璁㈤槄涓婚 + */ + public void subscribe(String topic, int qos) { + try { + client.subscribe(topic, qos); + } catch (MqttException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/fzzy/mqtt/MqttController.java b/src/main/java/com/fzzy/mqtt/MqttController.java new file mode 100644 index 0000000..d327c26 --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttController.java @@ -0,0 +1,30 @@ +package com.fzzy.mqtt; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; + +@Controller +public class MqttController { + @Autowired + private MqttConsumerConfig client; + + @Autowired + private MqttProperties mqttProperties; + + @RequestMapping("/connect") + public @ResponseBody + String connect() { + client.connect(); + return mqttProperties.getClientOutId() + "杩炴帴鍒版湇鍔″櫒"; + } + + @RequestMapping("/disConnect") + @ResponseBody + public String disConnect() { + client.disConnect(); + return mqttProperties.getClientOutId() + "涓庢湇鍔″櫒鏂紑杩炴帴"; + } + +} diff --git a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java index caed4e8..6457db5 100644 --- a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java +++ b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java @@ -1,6 +1,5 @@ package com.fzzy.mqtt; - import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; diff --git a/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java b/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java deleted file mode 100644 index 874464c..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java +++ /dev/null @@ -1,91 +0,0 @@ -package com.fzzy.mqtt; - -import com.fzzy.gateway.hx2023.websocket.SocketService; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.core.MessageProducer; -import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.integration.mqtt.support.MqttHeaders; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessagingException; - -/** - * MQTT娑堣垂绔� - */ -@Slf4j -@Configuration -@IntegrationComponentScan -public class MqttInboundConfiguration { - - @Autowired - private MqttProperties mqttProperties; - - @Bean - public MessageChannel mqttInputChannel() { - return new DirectChannel(); - } - - @Bean - public MqttPahoClientFactory receiverMqttClientFactoryForSub() { - DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); - String[] array = mqttProperties.getHost().split(","); - MqttConnectOptions options = new MqttConnectOptions(); - options.setServerURIs(array); - options.setUserName(mqttProperties.getClientUsername()); - options.setPassword(mqttProperties.getClientPassword().toCharArray()); - options.setKeepAliveInterval(mqttProperties.getClientAliveTime()); - //鎺ュ彈绂荤嚎娑堟伅 - options.setCleanSession(false); - options.setMqttVersion(4); - - factory.setConnectionOptions(options); - - return factory; - } - - //閰嶇疆client,鐩戝惉鐨則opic - @Bean - public MessageProducer inbound() { - String[] inboundTopics = mqttProperties.getClientTopics().split(","); - MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( - mqttProperties.getClientId() + "_inbound", receiverMqttClientFactoryForSub(), inboundTopics); //瀵筰nboundTopics涓婚杩涜鐩戝惉 - adapter.setCompletionTimeout(mqttProperties.getClientTimeout()); - adapter.setQos(1); - adapter.setConverter(new DefaultPahoMessageConverter()); - adapter.setOutputChannel(mqttInputChannel()); - return adapter; - } - - //閫氳繃閫氶亾鑾峰彇鏁版嵁 - @Bean - @ServiceActivator(inputChannel = "mqttInputChannel") //寮傛澶勭悊 - public MessageHandler handler() { - return new MessageHandler() { - @Override - public void handleMessage(Message<?> message) throws MessagingException { - log.info("----------------------"); - //鑾峰彇mqtt鐨則opic - String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); - //浣跨敤webSocket杩斿洖缁欏墠绔� - SocketService socketService = new SocketService(); - socketService.onMessage(message.getPayload().toString(), null, topic); - - log.info("message:" + message.getPayload()); - log.info("PacketId:" + message.getHeaders().getId()); - log.info("Qos:" + message.getHeaders().get(MqttHeaders.QOS)); - log.info("topic:" + topic); - } - }; - } -} diff --git a/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java b/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java deleted file mode 100644 index ccbd0f1..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.fzzy.mqtt; - -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; - -/** - * MQTT鐢熶骇绔� - */ -@Slf4j -@Configuration -@IntegrationComponentScan -public class MqttOutboundConfiguration { - @Autowired - private MqttProperties mqttProperties; - - @Bean - public MessageChannel mqttOutboundChannel() { - return new DirectChannel(); - } - - @Bean - public MqttPahoClientFactory getReceiverMqttConnectOptionsForSend() { - DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); - String[] array = mqttProperties.getHost().split(","); - MqttConnectOptions options = new MqttConnectOptions(); - options.setServerURIs(array); - - if(null == mqttProperties.getClientUsername()) mqttProperties.setClientUsername(" "); - if(null == mqttProperties.getClientPassword()) mqttProperties.setClientPassword(" "); - options.setUserName(mqttProperties.getClientUsername()); - options.setPassword(mqttProperties.getClientPassword().toCharArray()); - // 鎺ュ彈绂荤嚎娑堟伅 - options.setCleanSession(false); //鍛婅瘔浠g悊瀹㈡埛绔槸鍚﹁寤虹珛鎸佷箙浼氳瘽 false涓哄缓绔嬫寔涔呬細璇� - options.setMqttVersion(4); - factory.setConnectionOptions(options); - return factory; - } - - @Bean - @ServiceActivator(inputChannel = "mqttOutboundChannel") - public MessageHandler mqttOutbound() { - MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "outbound", getReceiverMqttConnectOptionsForSend()); - messageHandler.setAsync(true); - return messageHandler; - } - -} diff --git a/src/main/java/com/fzzy/mqtt/MqttProperties.java b/src/main/java/com/fzzy/mqtt/MqttProperties.java index a7293c9..094df8b 100644 --- a/src/main/java/com/fzzy/mqtt/MqttProperties.java +++ b/src/main/java/com/fzzy/mqtt/MqttProperties.java @@ -22,39 +22,44 @@ /** * 鐢ㄦ埛鍚� */ - private String clientUsername; + private String username; /** * 瀵嗙爜 */ - private String clientPassword; + private String password; /** - * 瀹㈡埛绔疘d锛屽悓涓�鍙版湇鍔″櫒涓嬶紝涓嶅厑璁稿嚭鐜伴噸澶嶇殑瀹㈡埛绔痠d + * 瀹㈡埛绔疘d-鍙戝竷鑰匢d */ - private String clientId; + private String clientOutId; + + /** + * 瀹㈡埛绔疘d-琚闃呰�匢d + */ + private String clientInId; /** * 瓒呮椂鏃堕棿 */ - private int clientTimeout = 5000; + private int timout = 5000; /** * 璁剧疆浼氳瘽蹇冭烦鏃堕棿 鍗曚綅涓虹 鏈嶅姟鍣ㄤ細姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风 * 鍙戦�佷釜娑堟伅鍒ゆ柇瀹㈡埛绔槸鍚﹀湪绾匡紝浣嗚繖涓柟娉曞苟娌℃湁閲嶈繛鐨勬満鍒� */ - private int clientAliveTime = 30000; + private int keepAliveInterval = 20; - private int clientMaxConnectTime; + private int maxConnectTimes = 5; - private String clientTopics; + private String topics; /** * 杩炴帴鏂瑰紡 */ - private Integer clientQos = 0; + private Integer qos = 0; /** * 榛樿杩炴帴涓婚锛屼互/#缁撳熬琛ㄧず璁㈤槄鎵�鏈変互test寮�澶寸殑涓婚 diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java b/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java new file mode 100644 index 0000000..0191efe --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java @@ -0,0 +1,37 @@ +package com.fzzy.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +@Slf4j +public class MqttProviderCallBack implements MqttCallback { + + /** + * 瀹㈡埛绔柇寮�杩炴帴鐨勫洖璋� + */ + @Override + public void connectionLost(Throwable throwable) { + System.out.println("涓庢湇鍔″櫒鏂紑杩炴帴锛屽彲閲嶈繛"); + } + + /** + * 娑堟伅鍒拌揪鐨勫洖璋� + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + System.out.println(String.format("鎺ユ敹娑堟伅涓婚 : %s", topic)); + System.out.println(String.format("鎺ユ敹娑堟伅Qos : %d", message.getQos())); + System.out.println(String.format("鎺ユ敹娑堟伅鍐呭 : %s", new String(message.getPayload()))); + System.out.println(String.format("鎺ユ敹娑堟伅retained : %b", message.isRetained())); + } + + /** + * 娑堟伅鍙戝竷鎴愬姛鐨勫洖璋� + */ + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + System.out.println(String.format("鎺ユ敹娑堟伅鎴愬姛")); + } +} diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java new file mode 100644 index 0000000..8198dbf --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java @@ -0,0 +1,101 @@ +package com.fzzy.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; + +@Configuration +@Slf4j +public class MqttProviderConfig { + + @Autowired + private MqttProperties mqttProperties; + /** + * 瀹㈡埛绔璞� + */ + private MqttClient client; + + /** + * + * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒 + */ + @PostConstruct + public void init(){ + connect(); + } + + /** + * 瀹㈡埛绔繛鎺ユ湇鍔$ + */ + public void connect(){ + try{ + //鍒涘缓MQTT瀹㈡埛绔璞� + client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientOutId(),new MemoryPersistence()); + //杩炴帴璁剧疆 + MqttConnectOptions options = new MqttConnectOptions(); + //鏄惁娓呯┖session锛岃缃甪alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛堣闃呬富棰橈紝qos锛�,瀹㈡埛绔噸杩炰箣鍚庤兘鑾峰彇鍒版湇鍔″櫒鍦ㄥ鎴风鏂紑杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅 + //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鏈嶅姟鍣ㄩ兘鏄互鏂扮殑韬唤 + options.setCleanSession(true); + //璁剧疆杩炴帴鐢ㄦ埛鍚� + options.setUserName(mqttProperties.getUsername()); + //璁剧疆杩炴帴瀵嗙爜 + options.setPassword(mqttProperties.getPassword().toCharArray()); + //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉� + options.setConnectionTimeout(100); + //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧 1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎 + options.setKeepAliveInterval(20); + //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭� + options.setWill("willTopic",(mqttProperties.getClientOutId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(),0,false); + //璁剧疆鍥炶皟 + client.setCallback(new MqttProviderCallBack()); + client.connect(options); + } catch(MqttException e){ + e.printStackTrace(); + } + } + + public void publish(String topic,String message){ + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(mqttProperties.getQos()); + mqttMessage.setRetained(true); + mqttMessage.setPayload(message.getBytes()); + //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅 + MqttTopic mqttTopic = client.getTopic(topic); + //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴� + //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴� + MqttDeliveryToken token; + try { + //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬� + //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋�� + token = mqttTopic.publish(mqttMessage); + token.waitForCompletion(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + public void publish(int qos,boolean retained,String topic,String message){ + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(qos); + mqttMessage.setRetained(retained); + mqttMessage.setPayload(message.getBytes()); + //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅 + MqttTopic mqttTopic = client.getTopic(topic); + //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴� + //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴� + MqttDeliveryToken token; + try { + //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬� + //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋�� + token = mqttTopic.publish(mqttMessage); + token.waitForCompletion(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + +} diff --git a/src/main/java/com/fzzy/mqtt/MqttPubController.java b/src/main/java/com/fzzy/mqtt/MqttPubController.java index 644928d..c03b424 100644 --- a/src/main/java/com/fzzy/mqtt/MqttPubController.java +++ b/src/main/java/com/fzzy/mqtt/MqttPubController.java @@ -2,34 +2,25 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @RestController public class MqttPubController { @Autowired - private MqttGatewayService gatewayService; + private MqttProviderConfig providerClient; - @RequestMapping("/hello") - public String hello() { - return "hello!"; + @RequestMapping("/sendMessage") + public @ResponseBody + String sendMessage(String topic, String message) { + try { + providerClient.publish(topic, message); + return "鍙戦�佹垚鍔�"; + } catch (Exception e) { + e.printStackTrace(); + return "鍙戦�佸け璐�"; + } } - - @RequestMapping("/sendMqtt") - public String sendMqtt(String sendData) { - System.out.println(sendData); - System.out.println("杩涘叆sendMqtt-------" + sendData); - gatewayService.sendToMqtt("topic01", (String) sendData); - return "Test is OK"; - } - - @RequestMapping("/sendMqttTopic") - public String sendMqtt(String sendData, String topic) { - //System.out.println(sendData+" "+topic); - //System.out.println("杩涘叆inbound鍙戦�侊細"+sendData); - gatewayService.sendToMqtt(topic, (String) sendData); - return "Test is OK"; - } - } diff --git a/src/main/java/com/fzzy/mqtt/MqttPublishService.java b/src/main/java/com/fzzy/mqtt/MqttPublishService.java index 8191a33..f02df27 100644 --- a/src/main/java/com/fzzy/mqtt/MqttPublishService.java +++ b/src/main/java/com/fzzy/mqtt/MqttPublishService.java @@ -9,6 +9,7 @@ @Service public class MqttPublishService { + private static MqttClient client ; diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index edd09d6..d4804f2 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -96,15 +96,3 @@ auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer - -mqtt: - host: tcp://10.13.4.84:11883 - client-id: - client-username: - client-password: - client-timeout: 10 - client-alive-time: 20 - client-max-connect-times: 5 - client-topics: - client-qos: 0 - isOpen: false diff --git a/src/main/resources/application-devGateway.yml b/src/main/resources/application-devGateway.yml index 09d7b84..f46809d 100644 --- a/src/main/resources/application-devGateway.yml +++ b/src/main/resources/application-devGateway.yml @@ -70,13 +70,16 @@ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer mqtt: - host: tcp://10.13.4.84:11883 - client-id: - client-username: admin - client-password: 123456 - client-timeout: 10 - client-alive-time: 20 - client-max-connect-times: 5 - client-topics: "/device/${productId}/${deviceId}/message/property/report,/device-message-sender/${productId}/${deviceId}" - client-qos: 0 + host: tcp://127.0.0.1:1883 + username: admin + password: pwdmqtt.. + client-outId: fzzy-customer-id + client-inId: fzzy-6e3d92ff71b911eea5e50250f2000002 + timeout: 10 + keep-alive-interval: 20 + max-connect-times: 5 + qos: 0 isOpen: false + default: + topic: testTopic + topics: "/+/+/properties/report,/device-message-sender/+/+" \ No newline at end of file -- Gitblit v1.9.3