From 083eb4417a04201f1eb755faab8ee7121b3c7f16 Mon Sep 17 00:00:00 2001 From: jiazx0107@163.com <jiazx0107@163.com> Date: 星期五, 17 十一月 2023 16:19:05 +0800 Subject: [PATCH] Merge remote-tracking branch 'origin/igds-api-gateway' into igds-api-gateway --- src/main/java/com/fzzy/gateway/hx2023/service/DeviceReportServiceImpl.java | 87 +++++++++++++++++++++++++++++++------------ 1 files changed, 62 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/DeviceReportServiceImpl.java b/src/main/java/com/fzzy/gateway/hx2023/service/DeviceReportServiceImpl.java index 4cca0c6..56683c8 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/service/DeviceReportServiceImpl.java +++ b/src/main/java/com/fzzy/gateway/hx2023/service/DeviceReportServiceImpl.java @@ -1,15 +1,17 @@ package com.fzzy.gateway.hx2023.service; import com.alibaba.fastjson2.JSONObject; -import com.fzzy.api.data.GatewayProtocol; import com.fzzy.api.data.PushProtocol; -import com.fzzy.gateway.api.DeviceReportService; +import com.fzzy.gateway.api.GatewayDeviceReportService; +import com.fzzy.gateway.data.BaseReqData; +import com.fzzy.gateway.data.BaseResp; import com.fzzy.gateway.entity.GatewayDevice; import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.WebSocketPacket; import com.fzzy.gateway.hx2023.data.WebSocketPacketHeader; import com.fzzy.gateway.hx2023.data.WeightInfo; -import com.fzzy.gateway.hx2023.websocket.WebSocketDeviceReport; +import com.fzzy.gateway.hx2023.kafka.KafkaDeviceReportService; +import com.fzzy.mqtt.MqttGatewayService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -17,43 +19,58 @@ @Slf4j @Component -public class DeviceReportServiceImpl implements DeviceReportService { - +public class DeviceReportServiceImpl implements GatewayDeviceReportService { @Resource - private WebSocketDeviceReport webSocketDeviceReport; + private KafkaDeviceReportService kafkaDeviceReportService; + @Resource + private MqttGatewayService publishService; @Override - public String getProvinceProtocol() { + public String getProtocol() { return PushProtocol.GATEWAY_SC_2023.getCode(); } @Override - public String report2GatewayBySn(double weigh, GatewayDevice device) { + public BaseResp reportGrainData(BaseReqData reqData) { - if (null == device) { - log.error("-----------娌℃湁鑾峰彇鍒拌澶囬厤缃俊鎭�-----"); - return "ERROR:娌℃湁鑾峰彇鍒拌澶囬厤缃俊鎭�"; - } + String topic = ScConstant.TOPIC_REPORT; + topic = topic.replace("${productId}", reqData.getProductId()).replace("${deviceId}", reqData.getDeviceId()); - //浣跨敤WEBSOCKET杩斿洖 - if (GatewayProtocol.GATE_WEBSOCKET.equals(device.getPushProtocol())) { + publishService.publishMqttWithTopic(reqData.getData(), topic); + + log.info("----------------------------鎺ㄩ�丮QTT绮儏淇℃伅---------------------------"); + log.info("-----TOPIC-----{}", topic); + log.info("-----Message-----{}", reqData.getData()); + + return new BaseResp(); + } + + @Override + public BaseResp reportWeightData(BaseReqData reqData) { + + String topic = ScConstant.TOPIC_MESSAGE_REPORT; + + topic = topic.replace("${productId}", reqData.getProductId()).replace("${deviceId}", reqData.getDeviceId()); + + if (null == reqData.getData()) { + GatewayDevice device = reqData.getDevice(); WebSocketPacket packet = new WebSocketPacket(); - //productId鐨勫�煎湪webSocket鎵ц鍙戦�佹柟娉曚腑琛ュ厖 WebSocketPacketHeader header = new WebSocketPacketHeader(); header.setDeviceName(device.getDeviceName()); + header.setProductId(device.getProductId()); packet.setHeaders(header); packet.setMessageType(ScConstant.MESSAGE_TYPE_REPORT_PROPERTY); packet.setDeviceId(device.getDeviceId()); //璁剧疆淇℃伅涓讳綋 WeightInfo weightInfo = new WeightInfo(); - weightInfo.setGrossWeight(weigh); - weightInfo.setNetWeight(weigh); - weightInfo.setNetWeight(weigh); - + weightInfo.setGrossWeight(reqData.getWeight()); + weightInfo.setNetWeight(reqData.getWeight()); + weightInfo.setNetWeight(reqData.getWeight()); + weightInfo.setWeightUnit("KG"); JSONObject jsonObject = new JSONObject(); jsonObject.put("weightInfo", JSONObject.toJSONString(weightInfo)); @@ -61,14 +78,34 @@ packet.setTimestamp(System.currentTimeMillis()); - - webSocketDeviceReport.sendByPacket(packet); + reqData.setData(JSONObject.toJSONString(packet)); } - if (GatewayProtocol.GATE_MQTT.equals(device.getPushProtocol())) { - //TODO ---->>>鍚庢湡娣诲姞鏀寔 - } + publishService.publishMqttWithTopic(reqData.getData(), topic); - return null; + log.info("----------------------------鎺ㄩ�丮QTT鍦扮淇℃伅---------------------------"); + log.info("-----TOPIC-----{}", topic); + log.info("-----Message-----{}", reqData.getData()); + return new BaseResp(); + } + + @Override + public BaseResp reportLprData(BaseReqData reqData) { + String topic = ScConstant.TOPIC_MESSAGE_REPORT; + topic = topic.replace("${productId}", reqData.getProductId()).replace("${deviceId}", reqData.getDeviceId()); + + publishService.publishMqttWithTopic(reqData.getData(), topic); + + log.info("----------------------------鎺ㄩ�丮QTT杞︾墝璇嗗埆淇℃伅---------------------------"); + log.info("-----TOPIC-----{}", topic); + log.info("-----Message-----{}", reqData.getData()); + return new BaseResp(); + } + + @Override + public BaseResp reportGrainDataByKafka(BaseReqData reqData) { + String topic = ScConstant.TOPIC_MESSAGE_REPORT; + kafkaDeviceReportService.publishWithTopic(reqData.getData(), topic); + return new BaseResp(); } } -- Gitblit v1.9.3