From 96f7af2f3bf9a36dd48e0e6bf4f8a8ca1e31ed7d Mon Sep 17 00:00:00 2001 From: vince <757871790@qq.com> Date: 星期三, 08 十一月 2023 17:49:56 +0800 Subject: [PATCH] Merge remote-tracking branch 'orgin/igds-api-gateway' into igds-api-gateway --- src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java | 50 ++ src/main/java/com/fzzy/gateway/hx2023/data/ClientHeaders.java | 12 src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java | 53 ++ src/main/java/com/fzzy/gateway/hx2023/data/GrainOutPut.java | 42 + src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java | 4 src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java | 39 src/main/java/com/fzzy/gateway/hx2023/data/CloudSendData.java | 28 + src/main/java/com/fzzy/gateway/hx2023/ScConstant.java | 7 src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java | 82 +++ src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml | 48 + src/main/java/com/fzzy/gateway/entity/GatewayDevice.java | 13 src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java | 50 ++ src/main/resources/application-devGateway.yml | 21 src/main/java/com/fzzy/mqtt/MqttController.java | 30 + src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java | 3 src/main/java/com/fzzy/mqtt/MqttGatewayService.java | 1 src/main/java/com/fzzy/mqtt/MqttPubController.java | 33 src/main/resources/application-dev.yml | 12 src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java | 50 ++ src/main/java/com/fzzy/gateway/hx2023/data/GrainTemp.java | 45 + src/main/java/com/fzzy/gateway/hx2023/service/HxGatewaySyncGrainImpl.java | 110 ++++ src/main/java/com/fzzy/gateway/hx2023/data/GrainData.java | 29 + src/main/java/com/fzzy/gateway/hx2023/data/SyncReqData.java | 5 src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java | 62 ++ src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java | 93 ++++ src/main/java/com/fzzy/mqtt/MqttProviderConfig.java | 101 ++++ src/main/java/com/fzzy/gateway/api/GatewaySyncGranService.java | 8 src/main/java/com/fzzy/mqtt/MqttProperties.java | 23 src/main/java/com/fzzy/gateway/data/BaseResp.java | 14 /dev/null | 58 -- src/main/java/com/fzzy/gateway/hx2023/data/GrainWeather.java | 31 + src/main/java/com/fzzy/gateway/hx2023/data/CloudHeaders.java | 11 src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java | 37 + src/main/java/com/fzzy/gateway/data/WeatherWebDto.java | 51 ++ src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java | 42 + src/main/java/com/fzzy/gateway/hx2023/data/InputData.java | 10 src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml | 10 src/main/java/com/fzzy/api/data/GatewayDeviceProtocol.java | 2 src/main/java/com/fzzy/gateway/GatewayRunner.java | 9 src/main/java/com/fzzy/mqtt/MqttPublishService.java | 1 src/main/java/com/fzzy/api/utils/NumberUtil.java | 49 ++ 41 files changed, 1,209 insertions(+), 170 deletions(-) diff --git a/src/main/java/com/fzzy/api/data/GatewayDeviceProtocol.java b/src/main/java/com/fzzy/api/data/GatewayDeviceProtocol.java index 3b8b65c..05f205c 100644 --- a/src/main/java/com/fzzy/api/data/GatewayDeviceProtocol.java +++ b/src/main/java/com/fzzy/api/data/GatewayDeviceProtocol.java @@ -18,7 +18,7 @@ DEVICE_IDCARD_HTTP("DEVICE_IDCARD_HTTP", "韬唤璇�-HTTP鍗忚"), DEVICE_LED_HTTP("DEVICE_LED_HTTP", "LED-HTTP鍗忚"), DEVICE_LPR_SDK_HK("DEVICE_LPR_SDK_HK", "杞︾墝璇嗗埆-SDK娴峰悍"), - DEVICE_TEST("DEVICE_TEST", "绌哄崗璁�"); + DEVICE_TEST("DEVICE_TEST", "娴嬭瘯鍗忚"); private String code; diff --git a/src/main/java/com/fzzy/api/utils/NumberUtil.java b/src/main/java/com/fzzy/api/utils/NumberUtil.java new file mode 100644 index 0000000..4582084 --- /dev/null +++ b/src/main/java/com/fzzy/api/utils/NumberUtil.java @@ -0,0 +1,49 @@ +package com.fzzy.api.utils; + +import java.math.BigDecimal; +import java.text.DecimalFormat; + +/** + * 鏁板瓧鏍煎紡鍖栧伐鍏风被 + */ +public class NumberUtil { + + /** + * 瀵筪ouble绫诲瀷鐨勬暟鍊间繚鐣欐寚瀹氫綅鏁扮殑灏忔暟銆�<br> + * 璇ユ柟娉曡垗鍏ユā寮忥細鍚戔�滄渶鎺ヨ繎鐨勨�濇暟瀛楄垗鍏ワ紝濡傛灉涓庝袱涓浉閭绘暟瀛楃殑璺濈鐩哥瓑锛屽垯涓哄悜涓婅垗鍏ョ殑鑸嶅叆妯″紡銆�<br> + * <b>娉ㄦ剰锛�</b>濡傛灉绮惧害瑕佹眰姣旇緝绮剧‘璇蜂娇鐢� keepPrecision(String number, int precision)鏂规硶 + * @param number 瑕佷繚鐣欏皬鏁扮殑鏁板瓧 + * @param precision 灏忔暟浣嶆暟 + * @return double 濡傛灉鏁板�艰緝澶э紝鍒欎娇鐢ㄧ瀛﹁鏁版硶琛ㄧず + */ + public static double keepPrecision(Double number, int precision) { + if(null == number || 0.0 == number ) return 0.0; + BigDecimal bg = new BigDecimal(number); + return bg.setScale(precision, BigDecimal.ROUND_HALF_UP).doubleValue(); + } + + /** + * 瀵筬loat绫诲瀷鐨勬暟鍊间繚鐣欐寚瀹氫綅鏁扮殑灏忔暟銆�<br> + * 璇ユ柟娉曡垗鍏ユā寮忥細鍚戔�滄渶鎺ヨ繎鐨勨�濇暟瀛楄垗鍏ワ紝濡傛灉涓庝袱涓浉閭绘暟瀛楃殑璺濈鐩哥瓑锛屽垯涓哄悜涓婅垗鍏ョ殑鑸嶅叆妯″紡銆�<br> + * <b>娉ㄦ剰锛�</b>濡傛灉绮惧害瑕佹眰姣旇緝绮剧‘璇蜂娇鐢� keepPrecision(String number, int precision)鏂规硶 + * @param number 瑕佷繚鐣欏皬鏁扮殑鏁板瓧 + * @param precision 灏忔暟浣嶆暟 + * @return float 濡傛灉鏁板�艰緝澶э紝鍒欎娇鐢ㄧ瀛﹁鏁版硶琛ㄧず + */ + public static float keepPrecision(Float number, int precision) { + if(null == number) return 0f; + BigDecimal bg = new BigDecimal(number); + return bg.setScale(precision, BigDecimal.ROUND_HALF_UP).floatValue(); + } + /** + * double杞瓧绗︿覆锛岄伩鍏嶅嚭鐜扮瀛﹁鏁版硶 + * @param d + * @return + */ + public static String doubleToStr(Double d) { + if(null == d) return ""; + DecimalFormat df = new DecimalFormat("0.0"); + return df.format(d); + } + +} diff --git a/src/main/java/com/fzzy/gateway/GatewayRunner.java b/src/main/java/com/fzzy/gateway/GatewayRunner.java index 9dbdcd8..9b85744 100644 --- a/src/main/java/com/fzzy/gateway/GatewayRunner.java +++ b/src/main/java/com/fzzy/gateway/GatewayRunner.java @@ -22,11 +22,20 @@ private ApiInitService apiInitService; @Autowired private MqttPublishService mqttPublishService; + + @Autowired + private GatewayTimerScheduled scheduled; + @Override public void run(String... args) throws Exception { mqttPublishService.init(); //鎵ц鍒濆鍖栨柟妗� apiInitService.init(); + + + //鑾峰彇姘旇薄淇℃伅 + scheduled.doWeatherExe(); + } } diff --git a/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java b/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java index b26ff83..4bb2a3e 100644 --- a/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java +++ b/src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java @@ -1,14 +1,18 @@ package com.fzzy.gateway; +import com.alibaba.fastjson.JSON; import com.fzzy.api.data.ApiParam; import com.fzzy.api.entity.ApiConfs; import com.fzzy.api.service.*; import com.fzzy.api.utils.ContextUtil; import com.fzzy.api.utils.RedisUtil; import com.fzzy.gateway.api.GatewayRemoteManager; +import com.fzzy.gateway.data.WeatherWebDto; import com.fzzy.gateway.entity.GatewayConf; import com.fzzy.gateway.service.GatewayConfService; +import com.fzzy.gateway.util.GatewayHttpUtil; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; @@ -24,6 +28,12 @@ @Slf4j @Component(GatewayTimerScheduled.BEAN_ID) public class GatewayTimerScheduled { + + public static String DEFAULT_URL = "https://v0.yiketianqi.com/api?unescape=1&version=v61&appid={appId}&appsecret={appsecret}&cityid={cityid}"; + public static String DEFAULT_APP_ID = "49421971"; + public static String DEFAULT_APP_SECRET = "JmJE48Fv"; + public static String DEFAULT_CITYID = "101270101";//鎴愰兘 + public static final String BEAN_ID = "gateway.timerScheduled"; @@ -46,6 +56,44 @@ } + /** + * 姣忛棿闅�30鍒嗛挓鎵ц涓�娆� + */ + @Scheduled(cron = "0 0/30 * * * ?") + public void scheduled30() { + + //瀹氭椂鑾峰彇姘旇薄淇℃伅 + doWeatherExe(); + } + + public void doWeatherExe() { + + try { + String url = DEFAULT_URL; + url = url.replace("{appId}", DEFAULT_APP_ID).replace("{appsecret}", DEFAULT_APP_SECRET).replace("{cityid}", DEFAULT_CITYID); + + String result = GatewayHttpUtil.doGet(url, null); + + if (null == result) { + log.error("褰撳墠澶栫綉鑾峰彇姘旇薄淇℃伅澶辫触鈥︹��"); + return; + } + WeatherWebDto dto = JSON.parseObject(result, WeatherWebDto.class); + if (StringUtils.isNotEmpty(dto.getErrcode())) { + log.error("褰撳墠澶栫綉鑾峰彇姘旇薄淇℃伅寮傚父:{}", dto.getErrmsg()); + return; + } + + WeatherWebDto.contextMap.put("default", dto); + + + log.info("===========================绯荤粺瀹氭椂鑾疯幏鍙栨皵璞′俊鎭�===={}==================",dto); + + } catch (Exception e) { + + } + } + /** * 鎵ц缃戝叧蹇冭烦 @@ -63,5 +111,5 @@ gatewayRemoteManager.getRemoteService(conf.getPushProtocol()).heartbeat(conf); } } - + } diff --git a/src/main/java/com/fzzy/gateway/api/GatewaySyncGranService.java b/src/main/java/com/fzzy/gateway/api/GatewaySyncGranService.java index 343555f..d1ba9d3 100644 --- a/src/main/java/com/fzzy/gateway/api/GatewaySyncGranService.java +++ b/src/main/java/com/fzzy/gateway/api/GatewaySyncGranService.java @@ -1,6 +1,7 @@ package com.fzzy.gateway.api; +import com.fzzy.gateway.data.BaseResp; import com.fzzy.gateway.hx2023.data.KafaGrainData; import com.fzzy.gateway.hx2023.data.*; @@ -25,4 +26,11 @@ public KafaGrainData syncGrain(SyncReqData reqData); + /** + * 鍚屾绮儏淇℃伅杩斿洖JSON鎶ユ枃 + * + * @param reqData + * @return + */ + public BaseResp syncGrain2(SyncReqData reqData); } diff --git a/src/main/java/com/fzzy/gateway/data/BaseResp.java b/src/main/java/com/fzzy/gateway/data/BaseResp.java new file mode 100644 index 0000000..b6e7ec2 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/data/BaseResp.java @@ -0,0 +1,14 @@ +package com.fzzy.gateway.data; + +import lombok.Data; + +@Data +public class BaseResp { + + private int code = 200; + + private String msg = "鎴愬姛"; + + + private String data; +} diff --git a/src/main/java/com/fzzy/gateway/data/WeatherWebDto.java b/src/main/java/com/fzzy/gateway/data/WeatherWebDto.java new file mode 100644 index 0000000..ead022a --- /dev/null +++ b/src/main/java/com/fzzy/gateway/data/WeatherWebDto.java @@ -0,0 +1,51 @@ +package com.fzzy.gateway.data; + +import lombok.Data; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * + * @author: andy.jia + * @description: 澶栫綉姘旇薄瑙f瀽浣跨敤DTO + * @version: + * @data:2019骞�12鏈�7鏃� + * + */ +@Data +public class WeatherWebDto implements Serializable { + + + public static Map<String,WeatherWebDto> contextMap = new HashMap<>(); + + /** + * + */ + private static final long serialVersionUID = 1L; + + + private String errcode;//閿欒缂栫爜 + private String errmsg;//閿欒淇℃伅 + + private String cityid;// 褰撳墠鍩庡競ID + private String city;//鍩庡競鍚嶇О + private String update_time;// 姘旇薄鍙版洿鏂版椂闂� + private String date;// 鏃ユ湡 + private String week;// 鏄熸湡 + private String wea;// 澶╂皵鎯呭喌 + private String wea_img;// 澶╂皵瀵瑰簲鍥炬爣(xue, lei, shachen, wu, bingbao, yun, yu,yin, qing) + private String tem;// 褰撳墠娓╁害 + private String air;// 绌烘皵璐ㄩ噺 + private String air_pm25;// PM2.5 + private String air_level;// 绌烘皵璐ㄩ噺绛夌骇 + private String air_tips;// 绌烘皵璐ㄩ噺鎻忚堪 + private String humidity;// 婀垮害 + private String visibility;// 鑳借搴� + private String pressure;// 姘斿帇hPa + private String win;// 椋庡悜 + private String win_speed;// 椋庨�熺瓑绾� + private String win_meter;// 椋庨�� 濡�: 12km/h + private String alarm;// 棰勮淇℃伅 +} diff --git a/src/main/java/com/fzzy/gateway/entity/GatewayDevice.java b/src/main/java/com/fzzy/gateway/entity/GatewayDevice.java index 32531b0..50a19c0 100644 --- a/src/main/java/com/fzzy/gateway/entity/GatewayDevice.java +++ b/src/main/java/com/fzzy/gateway/entity/GatewayDevice.java @@ -37,6 +37,10 @@ @PropertyDef(label = "鍚嶇О") private String deviceName; + @Column(name = "PRODUCT_ID_", length = 50) + @PropertyDef(label = "璁惧绫诲瀷KEY") + private String productId; + @Column(name = "TYPE_", length = 10) @PropertyDef(label = "璁惧绫诲瀷") private String type; @@ -101,4 +105,13 @@ @PropertyDef(label = "澶囨敞", description = "澶囨敞淇℃伅") private String remark; + + @Column(name = "CABLE_RULE_", length = 20) + @PropertyDef(label = "甯冪嚎瑙勫垯", description = "骞虫柟浠撹〃绀哄眰琛屽垪锛岀瓛浠撹〃绀烘瘡鍦堢殑鍒楁暟") + private String cableRule; + + @Column(name = "CABLE_CIR_", length = 20) + @PropertyDef(label = "绛掍粨灞傝鍒�", description = "閽堝绛掍粨") + private String cableCir; + } diff --git a/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java b/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java index 2786342..badcdf6 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java +++ b/src/main/java/com/fzzy/gateway/hx2023/ScConstant.java @@ -12,6 +12,13 @@ public static String MESSAGE_TYPE_REPORT_PROPERTY = "REPORT_PROPERTY"; + public static String MESSAGE_TYPE_INVOKE_FUNCTION = "INVOKE_FUNCTION"; + + /** + * 绮儏閲囨寚浠� + */ + public static String FUNCTION_getTAndRHInfo = "getTAndRHInfo"; + public static String getMessageId() { return System.currentTimeMillis() + RandomUtils.nextInt(1000) + ""; diff --git a/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java b/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java index 8ad1617..20cdbc2 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java +++ b/src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java @@ -4,12 +4,15 @@ import com.fzzy.api.utils.ContextUtil; import com.fzzy.api.utils.RedisConst; import com.fzzy.api.utils.RedisUtil; +import com.fzzy.gateway.entity.GatewayConf; import com.fzzy.gateway.hx2023.data.GatewayAuthData; +import com.fzzy.gateway.service.GatewayConfService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; +import java.util.List; /** * @@ -21,6 +24,9 @@ @Resource private RedisUtil redisUtil; + @Resource + private GatewayConfService confService; + /** * 閴存潈鎺ュ彛 @@ -39,28 +45,44 @@ public @ResponseBody JSONObject authorize(@RequestBody GatewayAuthData data) { - log.debug("============閴存潈==========={}--{}", data.getUsername(), data.getPassword()); + + List<GatewayConf> list = confService.getCacheConfList(); + + JSONObject json = new JSONObject(); + json.put("timestamp", System.currentTimeMillis()); + if (null == list || list.isEmpty()) { + json.put("code", 500); + json.put("message", "鏈幏鍙栫綉鍏充俊鎭�"); + return json; + } + + String gatewayId = null; + for (GatewayConf conf : list) { + if (data.getUsername().equals(conf.getGatewayUsername()) && data.getPassword().equals(conf.getGatewayPassword())) { + gatewayId = conf.getGatewayId(); + break; + } + } + + if (null == gatewayId) { + json.put("code", 500); + json.put("message", "鏈尮閰嶅埌鐢ㄦ埛鍚嶅拰瀵嗙爜"); + return json; + } - //TODO 楠岃瘉鐢ㄦ埛鍚嶅拰瀵嗙爜 + String token = "fzzy-" + gatewayId; - - String token = ContextUtil.getUUID(); + log.debug("============閴存潈==========={}--{}--{}", data.getUsername(), data.getPassword(), token); this.updateGatewayToken(token, data.getUsername()); - JSONObject json = new JSONObject(); - JSONObject result = new JSONObject(); - result.put("token", token); - json.put("result", result); json.put("message", "鎴愬姛"); json.put("status", 0); json.put("code", 200); - json.put("timestamp", System.currentTimeMillis()); - return json; } diff --git a/src/main/java/com/fzzy/gateway/hx2023/data/ClientHeaders.java b/src/main/java/com/fzzy/gateway/hx2023/data/ClientHeaders.java new file mode 100644 index 0000000..ffcf80f --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/data/ClientHeaders.java @@ -0,0 +1,12 @@ +package com.fzzy.gateway.hx2023.data; + +import lombok.Data; + +@Data +public class ClientHeaders { + + private String productId; + private String deviceName; + private String orgId; + private String msgId; +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/data/CloudHeaders.java b/src/main/java/com/fzzy/gateway/hx2023/data/CloudHeaders.java new file mode 100644 index 0000000..bac0c20 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/data/CloudHeaders.java @@ -0,0 +1,11 @@ +package com.fzzy.gateway.hx2023.data; + +import lombok.Data; + +@Data +public class CloudHeaders { + + private String productId; + private String deviceName; + private String orgId; +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/data/CloudSendData.java b/src/main/java/com/fzzy/gateway/hx2023/data/CloudSendData.java new file mode 100644 index 0000000..efca0cb --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/data/CloudSendData.java @@ -0,0 +1,28 @@ +package com.fzzy.gateway.hx2023.data; + +import lombok.Data; + +import java.util.List; + +/** + * 浜戠鍙戝姩鐨勬姤鏂� + */ +@Data +public class CloudSendData { + + private CloudHeaders headers; + + private String functionId; + + private String messageType; + + private String messageId; + + private String deviceId; + + private String timestamp; + + private boolean success; + + private List<InputData> inputs; +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/data/GrainData.java b/src/main/java/com/fzzy/gateway/hx2023/data/GrainData.java new file mode 100644 index 0000000..90a1c85 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/data/GrainData.java @@ -0,0 +1,29 @@ +package com.fzzy.gateway.hx2023.data; + +import lombok.Data; + + +/** + * 绮儏淇℃伅 + */ +@Data +public class GrainData { + + //璁惧缂栫爜 + private String deviceId; + private ClientHeaders headers; + + //娑堟伅 ID + private String messageId; + + private String messageType = "INVOKE_FUNCTION_REPLY"; + + private boolean success = true; + + private String timestamp; + + private GrainOutPut outPut; + + private GrainWeather weatherStation; + +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/data/GrainOutPut.java b/src/main/java/com/fzzy/gateway/hx2023/data/GrainOutPut.java new file mode 100644 index 0000000..565fef1 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/data/GrainOutPut.java @@ -0,0 +1,42 @@ +package com.fzzy.gateway.hx2023.data; + +import com.alibaba.fastjson2.JSONObject; +import lombok.Data; + +import java.util.List; + + +/** + * 绮儏淇℃伅 + */ +@Data +public class GrainOutPut { + + //妯″潡璇嗗埆鐮�-甯搁噺锛屽浐瀹氫紶 apiTemperature + private String apISource = "apiTemperature"; + + //鏁翠粨骞冲潎娓╁害 + private String avgTemperature; + + //鏁翠粨鏈�楂樻俯搴� + private String maxTemperature; + + //鏁翠粨鏈�浣庢俯搴� + private String minTemperature; + + private String minX = "0"; + + private String minY = "0"; + + private String minZ = "0"; + + private String maxX = "0"; + + private String maxY = "0"; + + private String maxZ = "0"; + + + private List<GrainTemp> temperature; + +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/data/GrainTemp.java b/src/main/java/com/fzzy/gateway/hx2023/data/GrainTemp.java new file mode 100644 index 0000000..61def1a --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/data/GrainTemp.java @@ -0,0 +1,45 @@ +package com.fzzy.gateway.hx2023.data; + +import lombok.Data; + +/** + * 绮儏淇℃伅璇︾粏 + */ +@Data +public class GrainTemp { + + //鏍瑰彿 + private String cableNum; + + //灞傚彿 + private String layerNumber; + + //娓╁害鍊� + private String temperature; + + //绱㈠紩锛屼粠0寮�濮� + private String position; + +// //璇ユ俯搴︾偣鎵�鍦ㄧ殑鍒�,骞虫柟浠撱�佸湴涓嬩粨蹇呭~ +// private String linex; +// +// //璇ユ俯搴︾偣鎵�鍦ㄧ殑琛�,骞虫柟浠撱�佸湴涓嬩粨蹇呭~ +// private String rowy; + + //娴呭渾浠撱�佺瓛浠撳繀濉紝绀轰緥锛歿\"totalCircle\":3,\"smallCircle\":\"4,10,16\"}锛宼otalCircle锛氭�诲湀鏁帮紝smallCircle锛氭瘡鍦堟湁鍑犳牴缂� + private String total_circle; + + + //鍏蜂綋鍦堟暟--娴呭渾浠撱�佺瓛浠撳繀濉� + private String circle; + + public GrainTemp() { + } + + public GrainTemp(String cableNum, String layerNumber, String temperature, String position) { + this.cableNum = cableNum; + this.layerNumber = layerNumber; + this.temperature = temperature; + this.position = position; + } +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/data/GrainWeather.java b/src/main/java/com/fzzy/gateway/hx2023/data/GrainWeather.java new file mode 100644 index 0000000..a9c7e68 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/data/GrainWeather.java @@ -0,0 +1,31 @@ +package com.fzzy.gateway.hx2023.data; + +import lombok.Data; + +@Data +public class GrainWeather { + + private String airPressure; + + private String humidity; + + private String id; + + private String messageId; + + private String pm; + + private String radiation; + + private String rainfallAmount; + + private String temperature; + + private String windAngle; + + private String windDirection; + + private String windPower; + + private String windSpeed; +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/data/InputData.java b/src/main/java/com/fzzy/gateway/hx2023/data/InputData.java new file mode 100644 index 0000000..d6d89ff --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/data/InputData.java @@ -0,0 +1,10 @@ +package com.fzzy.gateway.hx2023.data; + +import lombok.Data; + +@Data +public class InputData { + + private String name; + private String value; +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/data/SyncReqData.java b/src/main/java/com/fzzy/gateway/hx2023/data/SyncReqData.java index 31a2512..a260ba3 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/data/SyncReqData.java +++ b/src/main/java/com/fzzy/gateway/hx2023/data/SyncReqData.java @@ -1,5 +1,6 @@ package com.fzzy.gateway.hx2023.data; +import com.fzzy.gateway.entity.GatewayDevice; import lombok.Data; @Data @@ -16,4 +17,8 @@ private String functionId; private String jsonData; + + private GatewayDevice device; + + private boolean autoReplay; } diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewaySyncGrainImpl.java b/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewaySyncGrainImpl.java index 424ff4b..729570c 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewaySyncGrainImpl.java +++ b/src/main/java/com/fzzy/gateway/hx2023/service/HxGatewaySyncGrainImpl.java @@ -1,11 +1,21 @@ package com.fzzy.gateway.hx2023.service; +import com.alibaba.fastjson2.JSONObject; +import com.fzzy.api.data.GatewayDeviceProtocol; +import com.fzzy.api.utils.NumberUtil; import com.fzzy.gateway.api.GatewaySyncGranService; -import com.fzzy.gateway.hx2023.data.KafaGrainData; -import com.fzzy.gateway.hx2023.data.SyncReqData; +import com.fzzy.gateway.data.BaseResp; +import com.fzzy.gateway.data.WeatherWebDto; +import com.fzzy.gateway.entity.GatewayDevice; +import com.fzzy.gateway.hx2023.ScConstant; +import com.fzzy.gateway.hx2023.data.*; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; /** * 缃戝叧涓庣伯鎯呭垎鏈洪�氳鍜岃В鏋� @@ -18,11 +28,105 @@ @Override public String getGrainProtocol() { - return null; + return GatewayDeviceProtocol.DEVICE_TEST.getCode(); } @Override public KafaGrainData syncGrain(SyncReqData reqData) { + + return null; } + + + @Override + public BaseResp syncGrain2(SyncReqData reqData) { + + GatewayDevice device = reqData.getDevice(); + + + if (StringUtils.isEmpty(device.getCableCir())) { + return getGrainTest1(reqData, device); + } + + + BaseResp resp = new BaseResp(); + resp.setCode(500); + resp.setMsg("娌℃湁鍖归厤鍒拌鍒�"); + return resp; + } + + private BaseResp getGrainTest1(SyncReqData reqData, GatewayDevice device) { + + String[] cableRule = device.getCableRule().split("-"); + + int cableZ = Integer.valueOf(cableRule[0]); + int cableY = Integer.valueOf(cableRule[1]); + int cableX = Integer.valueOf(cableRule[2]); + int sumNum = cableZ * cableY * cableX; + + + WeatherWebDto weather = WeatherWebDto.contextMap.get("default"); + double tMIn = 20, tMax = 25; + if (null != weather) { + double tOut = Double.valueOf(weather.getTem()); + tMIn = tOut - 2; + tMax = tOut + 3; + } + + + //鏁版嵁灏佽 + GrainData grain = new GrainData(); + grain.setMessageId(ScConstant.getMessageId()); + grain.setDeviceId(device.getDeviceId()); + grain.setTimestamp(System.currentTimeMillis() + ""); + + ClientHeaders headers = new ClientHeaders(); + headers.setDeviceName(device.getDeviceName()); + headers.setProductId(device.getProductId()); + headers.setOrgId(device.getOrgId()); + headers.setMsgId(ScConstant.getMessageId()); + grain.setHeaders(headers); + + GrainOutPut outPut = new GrainOutPut(); + + outPut.setAvgTemperature(NumberUtil.keepPrecision((tMax + tMIn) / 2, 1) + ""); + outPut.setMinTemperature(tMax + ""); + outPut.setMaxTemperature(tMIn + ""); + + + List<GrainTemp> temperature = new ArrayList<>(); + //鏍瑰彿 + int cableNum = 1, position = 0; + + double curTemp = tMIn; + double randomNumber = tMIn; + int x = 0, y = 0, z = 0; + for (int i = 0; i < sumNum; i++) { + randomNumber = Math.random() * (tMax - tMIn + 1) + tMIn; + curTemp = NumberUtil.keepPrecision(randomNumber, 1); + position = i; + z = i % cableZ + 1; + x = i / (cableZ * cableY); + y = x * (cableZ * cableY); + y = (i - y) / cableZ; + + //鏍瑰彿 + cableNum = (i / cableZ) + 1; + + + temperature.add(new GrainTemp(cableNum + "", z + "", curTemp + "", position + "")); + } + + outPut.setTemperature(temperature); + + grain.setOutPut(outPut); + + + BaseResp resp = new BaseResp(); + resp.setData(JSONObject.toJSONString(grain)); + + return resp; + } } + diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java b/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java new file mode 100644 index 0000000..742bbf0 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java @@ -0,0 +1,82 @@ +package com.fzzy.gateway.hx2023.service; + +import com.alibaba.fastjson2.JSONObject; +import com.fzzy.gateway.GatewayUtils; +import com.fzzy.gateway.api.GatewayRemoteManager; +import com.fzzy.gateway.hx2023.data.CloudSendData; +import com.fzzy.gateway.entity.GatewayDevice; +import com.fzzy.gateway.hx2023.ScConstant; +import com.fzzy.gateway.data.BaseResp; +import com.fzzy.gateway.hx2023.data.SyncReqData; +import com.fzzy.mqtt.MqttProviderConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * 鏍规嵁鎺ュ彈鍒扮殑MQTT淇℃伅鎵ц + */ +@Slf4j +@Component +public class OnReceiveMqttService { + + @Resource + private GatewayRemoteManager gatewayRemoteManager; + @Resource + private MqttProviderConfig providerClient; + + + /** + * 褰撳墠鎺ユ敹鍒颁簯绔彂閫佷俊鎭� + * + * @param message + */ + public void onReceiveMessage(String message) { + + try { + CloudSendData cloudSendData = JSONObject.parseObject(message, CloudSendData.class); + + + String functionId = cloudSendData.getFunctionId(); + + //绮儏閲囬泦 + if (ScConstant.FUNCTION_getTAndRHInfo.equals(functionId)) { + getTAndRHInfo(cloudSendData); + } + + } catch (Exception e) { + + } + + } + + private void getTAndRHInfo(CloudSendData cloudSendData) { + + String deviceId = cloudSendData.getDeviceId(); + + GatewayDevice device = GatewayUtils.getCacheByDeviceId(deviceId); + + SyncReqData syncReqData = new SyncReqData(); + syncReqData.setDeviceId(deviceId); + syncReqData.setMessageId(cloudSendData.getMessageId()); + syncReqData.setMessageType(cloudSendData.getMessageType()); + syncReqData.setFunctionId(cloudSendData.getFunctionId()); + syncReqData.setAutoReplay(true); + syncReqData.setDevice(device); + + BaseResp resp = gatewayRemoteManager.getSyncGrainService(device.getSyncProtocol()).syncGrain2(syncReqData); + + //鑷姩鎺ㄩ�� + if (200 == resp.getCode() && syncReqData.isAutoReplay()) { + String topic = "/${productId}/${deviceId}/properties/report"; + + topic = topic.replace("${productId}", device.getProductId()).replace("${deviceId}", device.getDeviceId()); + + providerClient.publish(topic, resp.getData()); + + log.info("=======绮儏鎺ㄩ��==========={}", resp.getData()); + } + } +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java deleted file mode 100644 index 1025124..0000000 --- a/src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.fzzy.gateway.hx2023.websocket; - - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; -import org.springframework.web.bind.annotation.CrossOrigin; - -import javax.websocket.*; -import javax.websocket.server.PathParam; -import javax.websocket.server.ServerEndpoint; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArraySet; - -@Component -@CrossOrigin -@Service -@ServerEndpoint(value = "/mqtt") -public class SocketService { - - /** - * 鐢ㄦ潵瀛樻斁姣忎釜瀹㈡埛绔搴旂殑MyWebSocket瀵硅薄銆� - **/ - private static CopyOnWriteArraySet<SocketService> socketSet = new CopyOnWriteArraySet<>(); - /** - * 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹� - **/ - private Session session; - /** - * 鐢ㄦ埛鍚嶇О - **/ - private String nickname; - /** - * 鐢ㄦ潵璁板綍sessionId鍜岃session杩涜缁戝畾 - **/ - private static Map<String,Session> map = new HashMap<String, Session>(); - - /** - * 杩炴帴寤虹珛鎴愬姛璋冪敤鐨勬柟娉� - */ - @OnOpen - public void onOpen(Session session,@PathParam("nickname") String nickname) { - this.session = session; - this.nickname=nickname; - - map.put(nickname, session); - socketSet.add(this); - - System.out.println("鏈夋柊杩炴帴鍔犲叆:"+nickname+",褰撳墠鍦ㄧ嚎浜烘暟涓�" + socketSet.size()); - } - - /** - * 杩炴帴鍏抽棴璋冪敤鐨勬柟娉� - */ - @OnClose - public void onClose() { - socketSet.remove(this); - List<String> nickname = this.session.getRequestParameterMap().get("nickname"); - for(String nick:nickname) { - map.remove(nick); - } - System.out.println("鏈変竴杩炴帴鍏抽棴锛佸綋鍓嶅湪绾夸汉鏁颁负" + socketSet.size()); - } - - /** - * 鏀跺埌瀹㈡埛绔秷鎭悗璋冪敤鐨勬柟娉� - */ - @OnMessage - public void onMessage(String message, Session session,@PathParam("nickname") String nickname) { - System.out.println("鏉ヨ嚜瀹㈡埛绔殑娑堟伅-->"+nickname+": " + message); - //灏唌qtt鍙戦�佽繃鏉ョ殑鏁版嵁杩斿洖缁欏墠绔� - try { -// JSONObject parse = JSONObject.parse(message); -// Map<String,Object> valuesMap = (Map<String, Object>) parse.getByPath("values"); -// String tag1 = valuesMap.get("tag1").toString(); -// String replace = tag1.replace("\r\n", ""); -// String substring = replace.substring(replace.indexOf(":") + 1).trim().replace(" ",":"); -// String tag2 = valuesMap.get("tag2").toString(); -// String substring1=substring+"tag2:"+tag2; -// String result="{"+substring1+"}"; -// //鍙戦�佺粰鍓嶇锛岀敤浣滈〉闈㈡覆鏌� -// Session fromSession = map.get(nickname); -// fromSession.getAsyncRemote().sendText(result); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * 鍙戠敓閿欒鏃惰皟鐢� - */ - @OnError - public void onError(Session session, Throwable error) { - System.out.println("鍙戠敓閿欒"); - error.printStackTrace(); - } - - -} diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java new file mode 100644 index 0000000..a59ba47 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java @@ -0,0 +1,50 @@ +package com.fzzy.gateway.hx2023.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; + + +/** + * 涓嬪彂璋冪敤绮俯妫�娴嬫帴鍙f寚浠� topic + */ +@Slf4j +@Component +@ServerEndpoint(value = "/{productId}/{deviceId}/properties/report") +public class WebSockDeviceMessageReport { + + @OnOpen + public void onOpen(Session session, + @PathParam("productId") String productId, + @PathParam("deviceId") String deviceId + ) throws Exception { + + log.info("--------涓嬪彂璋冪敤绮俯妫�娴嬫帴鍙f寚浠� topic------"); + } + + @OnClose + public void onClose() { + + log.info("WebSocket杩炴帴鍏抽棴={}"); + } + + /** + * 鏀跺埌鍓嶇鍙戦�佺殑淇℃伅 + * + * @param message + * @param session + */ + @OnMessage + public void onMessage(String message, Session session) { + + log.info("鏉ヨ嚜鍓嶇鐨勪俊鎭�:\n" + message); + } + + @OnError + public void onError(Session session, Throwable error) { + log.error("鍙戠敓閿欒"); + } +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java new file mode 100644 index 0000000..a1fbce5 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java @@ -0,0 +1,50 @@ +package com.fzzy.gateway.hx2023.websocket; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; + + +/** + * 涓嬪彂璋冪敤绮俯妫�娴嬫帴鍙f寚浠� topic + */ +@Slf4j +@Component +@ServerEndpoint(value = "/device-message-sender/{productId}/{deviceId}") +public class WebSockDeviceMessageSender { + + @OnOpen + public void onOpen(Session session, + @PathParam("productId") String productId, + @PathParam("deviceId") String deviceId + ) throws Exception { + + log.info("--------涓嬪彂璋冪敤绮俯妫�娴嬫帴鍙f寚浠� topic------"); + } + + @OnClose + public void onClose() { + + log.info("WebSocket杩炴帴鍏抽棴={}"); + } + + /** + * 鏀跺埌鍓嶇鍙戦�佺殑淇℃伅 + * + * @param message + * @param session + */ + @OnMessage + public void onMessage(String message, Session session) { + + log.info("鏉ヨ嚜鍓嶇鐨勪俊鎭�:\n" + message); + } + + @OnError + public void onError(Session session, Throwable error) { + log.error("鍙戠敓閿欒"); + } +} diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceLed.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceLed.java deleted file mode 100644 index 70bed3b..0000000 --- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceLed.java +++ /dev/null @@ -1,117 +0,0 @@ -package com.fzzy.gateway.hx2023.websocket; - -import com.alibaba.fastjson.JSONObject; -import com.fzzy.gateway.GatewayUtils; -import com.fzzy.gateway.hx2023.data.WebSocketPacket; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; -import org.springframework.stereotype.Component; - -import javax.websocket.*; -import javax.websocket.server.PathParam; -import javax.websocket.server.ServerEndpoint; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * 缃戝叧鎺ュ彈绯荤粺鍙戦�佺殑鎶ユ枃淇℃伅 - */ -@Slf4j -@Component -@ServerEndpoint(value = "/device-message-sender/{productId}/{deviceId}") -public class WebSocketDeviceLed { - - private static Map<String, Session> sessionPool = new ConcurrentHashMap<>(); - private static Map<String, String> sessionIds = new ConcurrentHashMap<>(); - - // 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹� - private Session session; - - - @OnOpen - public void onOpen(Session session, - @PathParam("productId") String productId, - @PathParam("deviceId") String deviceId, - @PathParam("clientId") String clientId - ) throws Exception { - - this.session = session; - - String key = productId + "-" + deviceId; - - sessionPool.put(key, session); - sessionIds.put(session.getId(), key); - - GatewayUtils.updateOnline(deviceId); - - log.info("new webSocket,clientId={}", key); - } - - @OnClose - public void onClose() { - - String key = sessionIds.get(session.getId()); - - sessionPool.remove(key); - sessionIds.remove(session.getId()); - - String deviceId = key.substring(0, key.indexOf("-")); - - GatewayUtils.updateOffOnline(deviceId); - - log.info("WebSocket杩炴帴鍏抽棴={}", key); - - - } - - /** - * 鏀跺埌鍓嶇鍙戦�佺殑淇℃伅 - * - * @param message - * @param session - */ - @OnMessage - public void onMessage(String message, Session session) { - - log.info("鏉ヨ嚜鍓嶇鐨勪俊鎭�:\n" + message); - } - - @OnError - public void onError(Session session, Throwable error) { - log.error("鍙戠敓閿欒"); - - String key = sessionIds.get(session.getId()); - - String deviceId = key.substring(0, key.indexOf("-")); - - GatewayUtils.updateOffOnline(deviceId); - - sessionPool.remove(key); - sessionIds.remove(session.getId()); - error.printStackTrace(); - } - - - /** - * @param packet - */ - public static void sendByPacket(WebSocketPacket packet) { - if (StringUtils.isEmpty(packet.getDeviceId())) { - log.error("WebSocket淇℃伅鎺ㄩ�佸け璐ワ紝璁惧缂栫爜涓虹┖銆�"); - return; - } - - String tag = packet.getDeviceId(); - - // 閬嶅巻鎺ㄩ�� - Session session; - for (String key : sessionPool.keySet()) { - if (key.indexOf(tag) != -1) { - session = sessionPool.get(key); - session.getAsyncRemote().sendText( - JSONObject.toJSONString(packet)); - } - } - } -} diff --git a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java b/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java deleted file mode 100644 index 9d9756c..0000000 --- a/src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java +++ /dev/null @@ -1,135 +0,0 @@ -package com.fzzy.gateway.hx2023.websocket; - -import com.alibaba.fastjson.JSONObject; -import com.fzzy.gateway.GatewayUtils; -import com.fzzy.gateway.hx2023.data.WebSocketPacket; -import lombok.extern.slf4j.Slf4j; - -import org.apache.commons.lang.StringUtils; -import org.springframework.stereotype.Component; - -import javax.websocket.*; -import javax.websocket.server.PathParam; -import javax.websocket.server.ServerEndpoint; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * - */ -@Slf4j -@Component -//@ServerEndpoint(value = "/mqtt") -public class WebSocketMqtt { - - private static Map<String, Session> sessionPool = new ConcurrentHashMap<>(); - private static Map<String, String> sessionIds = new ConcurrentHashMap<>(); - - // 涓庢煇涓鎴风鐨勮繛鎺ヤ細璇濓紝闇�瑕侀�氳繃瀹冩潵缁欏鎴风鍙戦�佹暟鎹� - private Session session; - - - @OnOpen - public void onOpen(Session session) throws Exception { - - this.session = session; - - Map<String, List<String>> params = session.getRequestParameterMap(); - - log.info("new webSocket,params={}", params); - } - - public void onOpen2(Session session, - @PathParam("keepalive") String keepalive, - @PathParam("clientId") String clientId, - @PathParam("protocolId") String protocolId, - @PathParam("protocolVersion") String protocolVersion, - @PathParam("clean") boolean clean, - @PathParam("reconnectPeriod") int reconnectPeriod, - @PathParam("connectTimeout") int connectTimeout - ) throws Exception { - - this.session = session; - - String key = clientId; - - sessionPool.put(key, session); - sessionIds.put(session.getId(), key); - - GatewayUtils.updateOnline(clientId); - - log.info("new webSocket,clientId={}", key); - } - - @OnClose - public void onClose() { - -// String key = sessionIds.get(session.getId()); -// -// String deviceId = key.substring(key.indexOf("-")); -// -// GatewayUtils.updateOffOnline(deviceId); -// -// sessionPool.remove(key); -// sessionIds.remove(session.getId()); - - log.info("WebSocket杩炴帴鍏抽棴={}","----------"); - - } - - /** - * 鏀跺埌鍓嶇鍙戦�佺殑淇℃伅 - * - * @param message - * @param session - */ - @OnMessage - public void onMessage(String message, Session session) { - - log.info("鏉ヨ嚜瀹㈡埛绔俊鎭�:\n" + message); - } - - @OnError - public void onError(Session session, Throwable error) { - log.error("鍙戠敓閿欒"); - -// String clientId = sessionIds.get(session.getId()); -// -// sessionPool.remove(clientId); -// sessionIds.remove(session.getId()); - error.printStackTrace(); - } - - - /** - * 鎺ㄩ�佷俊鎭埌鍓嶇 - * - * @param packet - */ - public void sendByPacket(WebSocketPacket packet) { - if (StringUtils.isEmpty(packet.getDeviceId())) { - log.error("WebSocket淇℃伅鎺ㄩ�佸け璐ワ紝璁惧缂栫爜涓虹┖銆�"); - return; - } - - String tag = packet.getDeviceId(); - - // 閬嶅巻鎺ㄩ�� - Session session; - for (String key : sessionPool.keySet()) { - - packet.getHeaders().setProductId(key); - - log.debug("----------websocket杩斿洖淇℃伅-----{}", packet); - - if (key.indexOf(tag) != -1) { - session = sessionPool.get(key); - session.getAsyncRemote().sendText(JSONObject.toJSONString(packet)); - } - } - } - - -} diff --git a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java index 87fb25a..312a426 100644 --- a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java +++ b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java @@ -16,7 +16,6 @@ import com.fzzy.gateway.entity.GatewayDevice; import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.*; -import com.fzzy.gateway.hx2023.websocket.WebSocketDeviceReport; import com.fzzy.gateway.service.repository.GatewayDeviceRep; import com.fzzy.mqtt.MqttPublishService; @@ -62,28 +61,27 @@ /** * gatewayDeviceService#updateSave * - * @param entity + * @param data */ @DataResolver - public void updateSave(GatewayDevice entity) { - GatewayDevice data = new GatewayDevice(); - BeanUtils.copyProperties(entity, data); - - if (null == data.getId()) { - data.setId(ContextUtil.getUUID()); - } - - - if (null == data.getDeviceSn()) { - if (null != entity.getIp()) { - data.setDeviceSn(entity.getIp()); + public void updateSave(GatewayDevice data) { + GatewayDevice data2 = new GatewayDevice(); + BeanUtils.copyProperties(data, data2); + + if (null == data2.getDeviceSn()) { + if (null != data2.getIp()) { + data.setDeviceSn(data2.getIp()); } else { - data.setDeviceSn(data.getDeviceId()); + data.setDeviceSn(data2.getDeviceId()); } } - gatewayDeviceRep.save(data); - + if (null == data2.getId()) { + data2.setId(ContextUtil.getUUID()); + gatewayDeviceRep.save(data2); + }else{ + gatewayDeviceRep.save(data2); + } flushCache(); } @@ -105,10 +103,6 @@ } - /** - * gatewayDeviceService#flushCache - */ - @Expose public void flushCache() { List<GatewayDevice> list = listAll(); if (null == list || list.isEmpty()) return; @@ -119,6 +113,9 @@ + + + /** * gatewayDeviceService#ajaxTestWeight * 鍦扮鎺ㄩ�佹祴璇� diff --git a/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java new file mode 100644 index 0000000..42f0067 --- /dev/null +++ b/src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java @@ -0,0 +1,62 @@ +package com.fzzy.gateway.service; + +import com.bstek.dorado.annotation.DataResolver; +import com.fzzy.api.utils.ContextUtil; +import com.fzzy.gateway.GatewayUtils; +import com.fzzy.gateway.entity.GatewayDevice; +import com.fzzy.gateway.service.repository.GatewayDeviceRep; +import org.springframework.beans.BeanUtils; +import org.springframework.data.domain.Sort; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; + +@Component +public class GatewayDeviceService2 { + + @Resource + private GatewayDeviceRep gatewayDeviceRep; + + public List<GatewayDevice> listAll() { + Sort sort = new Sort(Sort.Direction.ASC, "deviceId"); + return gatewayDeviceRep.findAll(sort); + } + + + /** + * gatewayDeviceService2#updateSave + * + * @param data + */ + @DataResolver + public void updateSave(GatewayDevice data) { + GatewayDevice data2 = new GatewayDevice(); + BeanUtils.copyProperties(data, data2); + + if (null == data2.getDeviceSn()) { + if (null != data2.getIp()) { + data.setDeviceSn(data2.getIp()); + } else { + data.setDeviceSn(data2.getDeviceId()); + } + } + + if (null == data2.getId()) { + data2.setId(ContextUtil.getUUID()); + gatewayDeviceRep.save(data2); + } else { + gatewayDeviceRep.save(data2); + } + flushCache(); + } + + public void flushCache() { + List<GatewayDevice> list = listAll(); + if (null == list || list.isEmpty()) return; + for (GatewayDevice device : list) { + GatewayUtils.add2Cache(device); + } + } + +} diff --git a/src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java b/src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java index 30359de..8a3b796 100644 --- a/src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java +++ b/src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java @@ -9,12 +9,8 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import java.io.*; -import java.net.HttpURLConnection; -import java.net.URL; import java.net.URLEncoder; import java.util.Map; -import java.util.Set; /** * 缃戝叧涓撶敤HTTP璇锋眰宸ュ叿绫� diff --git a/src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java b/src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java index c81de2f..d504333 100644 --- a/src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java +++ b/src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java @@ -1,8 +1,6 @@ package com.fzzy.gateway.util; -import lombok.extern.slf4j.Slf4j; - import javax.crypto.Cipher; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; @@ -15,7 +13,6 @@ import java.util.HashMap; import java.util.Map; -@Slf4j public class GatewayRSAUtils { /** * RSA鏈�澶у姞瀵嗘槑鏂囧ぇ灏� 2048/8-11 diff --git a/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml b/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml index ee8e87e..dc60b1b 100644 --- a/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml +++ b/src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml @@ -171,12 +171,6 @@ <Property name="exClassName">toolbar-button-push</Property> </ToolBarButton> <ToolBarButton> - <ClientEvent name="onClick">view.get("#dialogLpr").show();</ClientEvent> - <Property name="caption">杞︾墝鎺ㄩ�佹祴璇�</Property> - <Property name="iconClass">fa fa-search</Property> - <Property name="exClassName">toolbar-button-push</Property> - </ToolBarButton> - <ToolBarButton> <Property name="caption">缃戝叧鍒濆鍖�</Property> <Property name="iconClass">fa fa-search</Property> <Property name="exClassName">toolbar-button-push</Property> @@ -365,7 +359,7 @@ <Buttons> <Button> <ClientEvent name="onClick">var data = view.get("#dsQuery.data");
 -view.get("#ajaxTestGrain").set("parameter",data).execute(function(result){
 +view.get("#ajaxTestGrain").set("parameter",data.toJSON()).execute(function(result){
 self.get("parent").hide();
 $alert(result);
 });</ClientEvent> @@ -404,7 +398,7 @@ <Buttons> <Button> <ClientEvent name="onClick">var data = view.get("#dsQuery.data");
 -view.get("#ajaxTestWeight").set("parameter",data).execute(function(result){
 +view.get("#ajaxTestWeight").set("parameter",data.get("weight")).execute(function(result){
 self.get("parent").hide();
 $alert(result);
 });</ClientEvent> diff --git a/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml b/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml index fc41c88..9db29e6 100644 --- a/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml +++ b/src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml @@ -107,8 +107,20 @@ <Property name="label">璁惧瀵嗙爜</Property> </PropertyDef> <PropertyDef name="depotIdSys"> + <Property></Property> + <Property name="label">鑷畾涔変粨搴撶紪鐮�</Property> + </PropertyDef> + <PropertyDef name="productId"> + <Property></Property> + <Property name="label">璁惧绫诲瀷KEY</Property> + </PropertyDef> + <PropertyDef name="cableRule"> <Property/> - <Property name="label">搴撳尯绯荤粺浠撳簱缂栫爜</Property> + <Property name="label">甯冪嚎瑙勫垯</Property> + </PropertyDef> + <PropertyDef name="cableCir"> + <Property/> + <Property name="label">绛掍粨灞傝鍒�</Property> </PropertyDef> </DataType> </Model> @@ -182,6 +194,9 @@ <Property name="property">type</Property> <Property name="align">center</Property> </DataColumn> + <DataColumn name="productId"> + <Property name="property">productId</Property> + </DataColumn> <DataColumn name="deviceId"> <Property name="property">deviceId</Property> <Property name="align">center</Property> @@ -203,8 +218,7 @@ <Property name="closeable">false</Property> <Buttons> <Button> - <ClientEvent name="onClick">var cur = view.get("#dgMain").getCurrentItem();
 -view.get("#updateSave").execute(function(){
 + <ClientEvent name="onClick">view.get("#updateSave").execute(function(){
 self.get("parent").hide();
 });</ClientEvent> <Property name="caption">淇濆瓨淇敼</Property> @@ -250,6 +264,11 @@ <Editor/> </AutoFormElement> <AutoFormElement> + <Property name="name">productId</Property> + <Property name="property">productId</Property> + <Editor/> + </AutoFormElement> + <AutoFormElement> <Property name="name">orgId</Property> <Property name="property">orgId</Property> <Editor/> @@ -257,6 +276,11 @@ <AutoFormElement> <Property name="name">depotId</Property> <Property name="property">depotId</Property> + <Editor/> + </AutoFormElement> + <AutoFormElement> + <Property name="name">depotIdSys</Property> + <Property name="property">depotIdSys</Property> <Editor/> </AutoFormElement> <AutoFormElement> @@ -310,15 +334,20 @@ <Property name="property">httpUrl</Property> <Editor/> </AutoFormElement> - <AutoFormElement> - <Property name="name">depotIdSys</Property> - <Property name="property">depotIdSys</Property> - <Editor/> - </AutoFormElement> <AutoFormElement layoutConstraint="colSpan:3"> <Property name="name">remark</Property> <Property name="property">remark</Property> <Property name="editorType">TextArea</Property> + <Editor/> + </AutoFormElement> + <AutoFormElement> + <Property name="name">cableRule</Property> + <Property name="property">cableRule</Property> + <Editor/> + </AutoFormElement> + <AutoFormElement> + <Property name="name">cableCir</Property> + <Property name="property">cableCir</Property> <Editor/> </AutoFormElement> </AutoForm> @@ -327,10 +356,11 @@ <Tools/> </Dialog> <UpdateAction id="updateSave"> - <Property name="dataResolver">gatewayDeviceService#updateSave</Property> + <Property name="dataResolver">gatewayDeviceService2#updateSave</Property> <UpdateItem> <Property name="dataPath">[#current]</Property> <Property name="dataSet">dsMain</Property> + <Property name="alias">data</Property> </UpdateItem> </UpdateAction> <AjaxAction id="ajaxDel"> diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java b/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java new file mode 100644 index 0000000..127cfe8 --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java @@ -0,0 +1,53 @@ +package com.fzzy.mqtt; + +import com.fzzy.gateway.hx2023.service.OnReceiveMqttService; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class MqttConsumerCallBack implements MqttCallback { + + + @Autowired + private OnReceiveMqttService onReceiveMqttService; + + /** + * 瀹㈡埛绔柇寮�杩炴帴鐨勫洖璋� + */ + @Override + public void connectionLost(Throwable throwable) { + log.info("涓庢湇鍔″櫒鏂紑杩炴帴锛屽彲閲嶈繛"); + } + + /** + * 娑堟伅鍒拌揪鐨勫洖璋� + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + + + String messageStr = new String(message.getPayload()); + + + log.info(String.format("鎺ユ敹娑堟伅涓婚 : %s", topic)); + log.info(String.format("鎺ユ敹娑堟伅Qos : %d", message.getQos())); + log.info(String.format("鎺ユ敹娑堟伅鍐呭 : %s", messageStr)); + + log.info(String.format("鎺ユ敹娑堟伅retained : %b", message.isRetained())); + + onReceiveMqttService.onReceiveMessage(messageStr); + } + + /** + * 娑堟伅鍙戝竷鎴愬姛鐨勫洖璋� + */ + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + log.info(String.format("鎺ユ敹娑堟伅鎴愬姛")); + } +} diff --git a/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java new file mode 100644 index 0000000..ca83cb1 --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java @@ -0,0 +1,93 @@ +package com.fzzy.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; + +@Configuration +@Slf4j +public class MqttConsumerConfig { + + @Autowired + private MqttProperties mqttProperties; + @Autowired + private MqttConsumerCallBack mqttConsumerCallBack; + + /** + * 瀹㈡埛绔璞� + */ + private MqttClient client; + + /** + * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒 + */ + @PostConstruct + public void init() { + connect(); + } + + /** + * 瀹㈡埛绔繛鎺ユ湇鍔$ + */ + public void connect() { + try { + //鍒涘缓MQTT瀹㈡埛绔璞� + client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientInId(), new MemoryPersistence()); + //杩炴帴璁剧疆 + MqttConnectOptions options = new MqttConnectOptions(); + //鏄惁娓呯┖session锛岃缃负false琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛屽鎴风閲嶈繛涔嬪悗鑳借幏鍙栧埌鏈嶅姟鍣ㄥ湪瀹㈡埛绔柇寮�杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅 + //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鍒版湇鍔$閮芥槸浠ユ柊鐨勮韩浠� + options.setCleanSession(true); + //璁剧疆杩炴帴鐢ㄦ埛鍚� + options.setUserName(mqttProperties.getUsername()); + //璁剧疆杩炴帴瀵嗙爜 + options.setPassword(mqttProperties.getPassword().toCharArray()); + //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉� + options.setConnectionTimeout(10); + //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎 + options.setKeepAliveInterval(20); + //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭� + options.setWill("willTopic", (mqttProperties.getClientInId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(), 0, false); + //璁剧疆鍥炶皟 + // client.setCallback(new MqttConsumerCallBack()); + client.setCallback(mqttConsumerCallBack); + client.connect(options); + //璁㈤槄涓婚 + //娑堟伅绛夌骇锛屽拰涓婚鏁扮粍涓�涓�瀵瑰簲锛屾湇鍔$灏嗘寜鐓ф寚瀹氱瓑绾х粰璁㈤槄浜嗕富棰樼殑瀹㈡埛绔帹閫佹秷鎭� + int[] qos = {1, 1}; + //涓婚 + String[] topics = mqttProperties.getTopics().split(","); + //璁㈤槄涓婚 + client.subscribe(topics, qos); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + /** + * 鏂紑杩炴帴 + */ + public void disConnect() { + try { + client.disconnect(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + + /** + * 璁㈤槄涓婚 + */ + public void subscribe(String topic, int qos) { + try { + client.subscribe(topic, qos); + } catch (MqttException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/fzzy/mqtt/MqttController.java b/src/main/java/com/fzzy/mqtt/MqttController.java new file mode 100644 index 0000000..d327c26 --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttController.java @@ -0,0 +1,30 @@ +package com.fzzy.mqtt; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; + +@Controller +public class MqttController { + @Autowired + private MqttConsumerConfig client; + + @Autowired + private MqttProperties mqttProperties; + + @RequestMapping("/connect") + public @ResponseBody + String connect() { + client.connect(); + return mqttProperties.getClientOutId() + "杩炴帴鍒版湇鍔″櫒"; + } + + @RequestMapping("/disConnect") + @ResponseBody + public String disConnect() { + client.disConnect(); + return mqttProperties.getClientOutId() + "涓庢湇鍔″櫒鏂紑杩炴帴"; + } + +} diff --git a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java index caed4e8..6457db5 100644 --- a/src/main/java/com/fzzy/mqtt/MqttGatewayService.java +++ b/src/main/java/com/fzzy/mqtt/MqttGatewayService.java @@ -1,6 +1,5 @@ package com.fzzy.mqtt; - import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; diff --git a/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java b/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java deleted file mode 100644 index 874464c..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java +++ /dev/null @@ -1,91 +0,0 @@ -package com.fzzy.mqtt; - -import com.fzzy.gateway.hx2023.websocket.SocketService; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.core.MessageProducer; -import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; -import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; -import org.springframework.integration.mqtt.support.MqttHeaders; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessagingException; - -/** - * MQTT娑堣垂绔� - */ -@Slf4j -@Configuration -@IntegrationComponentScan -public class MqttInboundConfiguration { - - @Autowired - private MqttProperties mqttProperties; - - @Bean - public MessageChannel mqttInputChannel() { - return new DirectChannel(); - } - - @Bean - public MqttPahoClientFactory receiverMqttClientFactoryForSub() { - DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); - String[] array = mqttProperties.getHost().split(","); - MqttConnectOptions options = new MqttConnectOptions(); - options.setServerURIs(array); - options.setUserName(mqttProperties.getClientUsername()); - options.setPassword(mqttProperties.getClientPassword().toCharArray()); - options.setKeepAliveInterval(mqttProperties.getClientAliveTime()); - //鎺ュ彈绂荤嚎娑堟伅 - options.setCleanSession(false); - options.setMqttVersion(4); - - factory.setConnectionOptions(options); - - return factory; - } - - //閰嶇疆client,鐩戝惉鐨則opic - @Bean - public MessageProducer inbound() { - String[] inboundTopics = mqttProperties.getClientTopics().split(","); - MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( - mqttProperties.getClientId() + "_inbound", receiverMqttClientFactoryForSub(), inboundTopics); //瀵筰nboundTopics涓婚杩涜鐩戝惉 - adapter.setCompletionTimeout(mqttProperties.getClientTimeout()); - adapter.setQos(1); - adapter.setConverter(new DefaultPahoMessageConverter()); - adapter.setOutputChannel(mqttInputChannel()); - return adapter; - } - - //閫氳繃閫氶亾鑾峰彇鏁版嵁 - @Bean - @ServiceActivator(inputChannel = "mqttInputChannel") //寮傛澶勭悊 - public MessageHandler handler() { - return new MessageHandler() { - @Override - public void handleMessage(Message<?> message) throws MessagingException { - log.info("----------------------"); - //鑾峰彇mqtt鐨則opic - String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); - //浣跨敤webSocket杩斿洖缁欏墠绔� - SocketService socketService = new SocketService(); - socketService.onMessage(message.getPayload().toString(), null, topic); - - log.info("message:" + message.getPayload()); - log.info("PacketId:" + message.getHeaders().getId()); - log.info("Qos:" + message.getHeaders().get(MqttHeaders.QOS)); - log.info("topic:" + topic); - } - }; - } -} diff --git a/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java b/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java deleted file mode 100644 index ccbd0f1..0000000 --- a/src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.fzzy.mqtt; - -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.annotation.IntegrationComponentScan; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; -import org.springframework.integration.mqtt.core.MqttPahoClientFactory; -import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; - -/** - * MQTT鐢熶骇绔� - */ -@Slf4j -@Configuration -@IntegrationComponentScan -public class MqttOutboundConfiguration { - @Autowired - private MqttProperties mqttProperties; - - @Bean - public MessageChannel mqttOutboundChannel() { - return new DirectChannel(); - } - - @Bean - public MqttPahoClientFactory getReceiverMqttConnectOptionsForSend() { - DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); - String[] array = mqttProperties.getHost().split(","); - MqttConnectOptions options = new MqttConnectOptions(); - options.setServerURIs(array); - - if(null == mqttProperties.getClientUsername()) mqttProperties.setClientUsername(" "); - if(null == mqttProperties.getClientPassword()) mqttProperties.setClientPassword(" "); - options.setUserName(mqttProperties.getClientUsername()); - options.setPassword(mqttProperties.getClientPassword().toCharArray()); - // 鎺ュ彈绂荤嚎娑堟伅 - options.setCleanSession(false); //鍛婅瘔浠g悊瀹㈡埛绔槸鍚﹁寤虹珛鎸佷箙浼氳瘽 false涓哄缓绔嬫寔涔呬細璇� - options.setMqttVersion(4); - factory.setConnectionOptions(options); - return factory; - } - - @Bean - @ServiceActivator(inputChannel = "mqttOutboundChannel") - public MessageHandler mqttOutbound() { - MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId() + "outbound", getReceiverMqttConnectOptionsForSend()); - messageHandler.setAsync(true); - return messageHandler; - } - -} diff --git a/src/main/java/com/fzzy/mqtt/MqttProperties.java b/src/main/java/com/fzzy/mqtt/MqttProperties.java index a7293c9..094df8b 100644 --- a/src/main/java/com/fzzy/mqtt/MqttProperties.java +++ b/src/main/java/com/fzzy/mqtt/MqttProperties.java @@ -22,39 +22,44 @@ /** * 鐢ㄦ埛鍚� */ - private String clientUsername; + private String username; /** * 瀵嗙爜 */ - private String clientPassword; + private String password; /** - * 瀹㈡埛绔疘d锛屽悓涓�鍙版湇鍔″櫒涓嬶紝涓嶅厑璁稿嚭鐜伴噸澶嶇殑瀹㈡埛绔痠d + * 瀹㈡埛绔疘d-鍙戝竷鑰匢d */ - private String clientId; + private String clientOutId; + + /** + * 瀹㈡埛绔疘d-琚闃呰�匢d + */ + private String clientInId; /** * 瓒呮椂鏃堕棿 */ - private int clientTimeout = 5000; + private int timout = 5000; /** * 璁剧疆浼氳瘽蹇冭烦鏃堕棿 鍗曚綅涓虹 鏈嶅姟鍣ㄤ細姣忛殧1.5*20绉掔殑鏃堕棿鍚戝鎴风 * 鍙戦�佷釜娑堟伅鍒ゆ柇瀹㈡埛绔槸鍚﹀湪绾匡紝浣嗚繖涓柟娉曞苟娌℃湁閲嶈繛鐨勬満鍒� */ - private int clientAliveTime = 30000; + private int keepAliveInterval = 20; - private int clientMaxConnectTime; + private int maxConnectTimes = 5; - private String clientTopics; + private String topics; /** * 杩炴帴鏂瑰紡 */ - private Integer clientQos = 0; + private Integer qos = 0; /** * 榛樿杩炴帴涓婚锛屼互/#缁撳熬琛ㄧず璁㈤槄鎵�鏈変互test寮�澶寸殑涓婚 diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java b/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java new file mode 100644 index 0000000..0191efe --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java @@ -0,0 +1,37 @@ +package com.fzzy.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +@Slf4j +public class MqttProviderCallBack implements MqttCallback { + + /** + * 瀹㈡埛绔柇寮�杩炴帴鐨勫洖璋� + */ + @Override + public void connectionLost(Throwable throwable) { + System.out.println("涓庢湇鍔″櫒鏂紑杩炴帴锛屽彲閲嶈繛"); + } + + /** + * 娑堟伅鍒拌揪鐨勫洖璋� + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + System.out.println(String.format("鎺ユ敹娑堟伅涓婚 : %s", topic)); + System.out.println(String.format("鎺ユ敹娑堟伅Qos : %d", message.getQos())); + System.out.println(String.format("鎺ユ敹娑堟伅鍐呭 : %s", new String(message.getPayload()))); + System.out.println(String.format("鎺ユ敹娑堟伅retained : %b", message.isRetained())); + } + + /** + * 娑堟伅鍙戝竷鎴愬姛鐨勫洖璋� + */ + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + System.out.println(String.format("鎺ユ敹娑堟伅鎴愬姛")); + } +} diff --git a/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java new file mode 100644 index 0000000..8198dbf --- /dev/null +++ b/src/main/java/com/fzzy/mqtt/MqttProviderConfig.java @@ -0,0 +1,101 @@ +package com.fzzy.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.PostConstruct; + +@Configuration +@Slf4j +public class MqttProviderConfig { + + @Autowired + private MqttProperties mqttProperties; + /** + * 瀹㈡埛绔璞� + */ + private MqttClient client; + + /** + * + * 鍦╞ean鍒濆鍖栧悗杩炴帴鍒版湇鍔″櫒 + */ + @PostConstruct + public void init(){ + connect(); + } + + /** + * 瀹㈡埛绔繛鎺ユ湇鍔$ + */ + public void connect(){ + try{ + //鍒涘缓MQTT瀹㈡埛绔璞� + client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientOutId(),new MemoryPersistence()); + //杩炴帴璁剧疆 + MqttConnectOptions options = new MqttConnectOptions(); + //鏄惁娓呯┖session锛岃缃甪alse琛ㄧず鏈嶅姟鍣ㄤ細淇濈暀瀹㈡埛绔殑杩炴帴璁板綍锛堣闃呬富棰橈紝qos锛�,瀹㈡埛绔噸杩炰箣鍚庤兘鑾峰彇鍒版湇鍔″櫒鍦ㄥ鎴风鏂紑杩炴帴鏈熼棿鎺ㄩ�佺殑娑堟伅 + //璁剧疆涓簍rue琛ㄧず姣忔杩炴帴鏈嶅姟鍣ㄩ兘鏄互鏂扮殑韬唤 + options.setCleanSession(true); + //璁剧疆杩炴帴鐢ㄦ埛鍚� + options.setUserName(mqttProperties.getUsername()); + //璁剧疆杩炴帴瀵嗙爜 + options.setPassword(mqttProperties.getPassword().toCharArray()); + //璁剧疆瓒呮椂鏃堕棿锛屽崟浣嶄负绉� + options.setConnectionTimeout(100); + //璁剧疆蹇冭烦鏃堕棿 鍗曚綅涓虹锛岃〃绀烘湇鍔″櫒姣忛殧 1.5*20绉掔殑鏃堕棿鍚戝鎴风鍙戦�佸績璺冲垽鏂鎴风鏄惁鍦ㄧ嚎 + options.setKeepAliveInterval(20); + //璁剧疆閬楀槺娑堟伅鐨勮瘽棰橈紝鑻ュ鎴风鍜屾湇鍔″櫒涔嬮棿鐨勮繛鎺ユ剰澶栨柇寮�锛屾湇鍔″櫒灏嗗彂甯冨鎴风鐨勯仐鍢变俊鎭� + options.setWill("willTopic",(mqttProperties.getClientOutId() + "涓庢湇鍔″櫒鏂紑杩炴帴").getBytes(),0,false); + //璁剧疆鍥炶皟 + client.setCallback(new MqttProviderCallBack()); + client.connect(options); + } catch(MqttException e){ + e.printStackTrace(); + } + } + + public void publish(String topic,String message){ + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(mqttProperties.getQos()); + mqttMessage.setRetained(true); + mqttMessage.setPayload(message.getBytes()); + //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅 + MqttTopic mqttTopic = client.getTopic(topic); + //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴� + //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴� + MqttDeliveryToken token; + try { + //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬� + //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋�� + token = mqttTopic.publish(mqttMessage); + token.waitForCompletion(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + public void publish(int qos,boolean retained,String topic,String message){ + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(qos); + mqttMessage.setRetained(retained); + mqttMessage.setPayload(message.getBytes()); + //涓婚鐨勭洰鐨勫湴锛岀敤浜庡彂甯�/璁㈤槄淇℃伅 + MqttTopic mqttTopic = client.getTopic(topic); + //鎻愪緵涓�绉嶆満鍒舵潵璺熻釜娑堟伅鐨勪紶閫掕繘搴� + //鐢ㄤ簬鍦ㄤ互闈為樆濉炴柟寮忥紙鍦ㄥ悗鍙拌繍琛岋級鎵ц鍙戝竷鏄窡韪秷鎭殑浼犻�掕繘搴� + MqttDeliveryToken token; + try { + //灏嗘寚瀹氭秷鎭彂甯冨埌涓婚锛屼絾涓嶇瓑寰呮秷鎭紶閫掑畬鎴愶紝杩斿洖鐨則oken鍙敤浜庤窡韪秷鎭殑浼犻�掔姸鎬� + //涓�鏃︽鏂规硶骞插噣鍦拌繑鍥烇紝娑堟伅灏卞凡琚鎴风鎺ュ彈鍙戝竷锛屽綋杩炴帴鍙敤锛屽皢鍦ㄥ悗鍙板畬鎴愭秷鎭紶閫掋�� + token = mqttTopic.publish(mqttMessage); + token.waitForCompletion(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + +} diff --git a/src/main/java/com/fzzy/mqtt/MqttPubController.java b/src/main/java/com/fzzy/mqtt/MqttPubController.java index 644928d..c03b424 100644 --- a/src/main/java/com/fzzy/mqtt/MqttPubController.java +++ b/src/main/java/com/fzzy/mqtt/MqttPubController.java @@ -2,34 +2,25 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @RestController public class MqttPubController { @Autowired - private MqttGatewayService gatewayService; + private MqttProviderConfig providerClient; - @RequestMapping("/hello") - public String hello() { - return "hello!"; + @RequestMapping("/sendMessage") + public @ResponseBody + String sendMessage(String topic, String message) { + try { + providerClient.publish(topic, message); + return "鍙戦�佹垚鍔�"; + } catch (Exception e) { + e.printStackTrace(); + return "鍙戦�佸け璐�"; + } } - - @RequestMapping("/sendMqtt") - public String sendMqtt(String sendData) { - System.out.println(sendData); - System.out.println("杩涘叆sendMqtt-------" + sendData); - gatewayService.sendToMqtt("topic01", (String) sendData); - return "Test is OK"; - } - - @RequestMapping("/sendMqttTopic") - public String sendMqtt(String sendData, String topic) { - //System.out.println(sendData+" "+topic); - //System.out.println("杩涘叆inbound鍙戦�侊細"+sendData); - gatewayService.sendToMqtt(topic, (String) sendData); - return "Test is OK"; - } - } diff --git a/src/main/java/com/fzzy/mqtt/MqttPublishService.java b/src/main/java/com/fzzy/mqtt/MqttPublishService.java index 8191a33..f02df27 100644 --- a/src/main/java/com/fzzy/mqtt/MqttPublishService.java +++ b/src/main/java/com/fzzy/mqtt/MqttPublishService.java @@ -9,6 +9,7 @@ @Service public class MqttPublishService { + private static MqttClient client ; diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index edd09d6..d4804f2 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -96,15 +96,3 @@ auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer - -mqtt: - host: tcp://10.13.4.84:11883 - client-id: - client-username: - client-password: - client-timeout: 10 - client-alive-time: 20 - client-max-connect-times: 5 - client-topics: - client-qos: 0 - isOpen: false diff --git a/src/main/resources/application-devGateway.yml b/src/main/resources/application-devGateway.yml index 09d7b84..f46809d 100644 --- a/src/main/resources/application-devGateway.yml +++ b/src/main/resources/application-devGateway.yml @@ -70,13 +70,16 @@ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer mqtt: - host: tcp://10.13.4.84:11883 - client-id: - client-username: admin - client-password: 123456 - client-timeout: 10 - client-alive-time: 20 - client-max-connect-times: 5 - client-topics: "/device/${productId}/${deviceId}/message/property/report,/device-message-sender/${productId}/${deviceId}" - client-qos: 0 + host: tcp://127.0.0.1:1883 + username: admin + password: pwdmqtt.. + client-outId: fzzy-customer-id + client-inId: fzzy-6e3d92ff71b911eea5e50250f2000002 + timeout: 10 + keep-alive-interval: 20 + max-connect-times: 5 + qos: 0 isOpen: false + default: + topic: testTopic + topics: "/+/+/properties/report,/device-message-sender/+/+" \ No newline at end of file -- Gitblit v1.9.3