From d52795fc5de0b6ed748cd2ef217dcd1371e4b8e9 Mon Sep 17 00:00:00 2001 From: jiazx0107@163.com <jiazx0107@163.com> Date: 星期五, 10 十一月 2023 01:06:13 +0800 Subject: [PATCH] 重新调整代码结构和实现方式 --- src/main/java/com/fzzy/gateway/hx2023/service/DeviceReportServiceImpl.java | 105 +++++++++++++++++++++++++++------------------------- 1 files changed, 55 insertions(+), 50 deletions(-) diff --git a/src/main/java/com/fzzy/gateway/hx2023/service/DeviceReportServiceImpl.java b/src/main/java/com/fzzy/gateway/hx2023/service/DeviceReportServiceImpl.java index b6cd355..f91b0b8 100644 --- a/src/main/java/com/fzzy/gateway/hx2023/service/DeviceReportServiceImpl.java +++ b/src/main/java/com/fzzy/gateway/hx2023/service/DeviceReportServiceImpl.java @@ -1,17 +1,13 @@ package com.fzzy.gateway.hx2023.service; -import com.alibaba.fastjson2.JSONObject; -import com.fzzy.api.data.GatewayProtocol; import com.fzzy.api.data.PushProtocol; -import com.fzzy.gateway.api.DeviceReportService; +import com.fzzy.gateway.api.GatewayDeviceReportService; +import com.fzzy.gateway.data.BaseReqData; +import com.fzzy.gateway.data.BaseResp; import com.fzzy.gateway.entity.GatewayDevice; import com.fzzy.gateway.hx2023.ScConstant; -import com.fzzy.gateway.hx2023.data.KafaGrainData; -import com.fzzy.gateway.hx2023.data.WebSocketPacket; -import com.fzzy.gateway.hx2023.data.WebSocketPacketHeader; -import com.fzzy.gateway.hx2023.data.WeightInfo; -import com.fzzy.gateway.hx2023.kafka.KafkaDeviceReport; -import com.fzzy.gateway.hx2023.websocket.WebSocketDeviceReport; +import com.fzzy.gateway.hx2023.kafka.KafkaDeviceReportService; +import com.fzzy.mqtt.MqttGatewayService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -19,16 +15,15 @@ @Slf4j @Component -public class DeviceReportServiceImpl implements DeviceReportService { - +public class DeviceReportServiceImpl implements GatewayDeviceReportService { @Resource - private WebSocketDeviceReport webSocketDeviceReport; + private KafkaDeviceReportService kafkaDeviceReportService; @Resource - private KafkaDeviceReport kafkaDeviceReport; + private MqttGatewayService publishService; @Override - public String getProvinceProtocol() { + public String getProtocol() { return PushProtocol.GATEWAY_SC_2023.getCode(); } @@ -40,47 +35,57 @@ return "ERROR:娌℃湁鑾峰彇鍒拌澶囬厤缃俊鎭�"; } - //浣跨敤WEBSOCKET杩斿洖 - if (GatewayProtocol.GATE_WEBSOCKET.equals(device.getPushProtocol())) { - - WebSocketPacket packet = new WebSocketPacket(); - - //productId鐨勫�煎湪webSocket鎵ц鍙戦�佹柟娉曚腑琛ュ厖 - WebSocketPacketHeader header = new WebSocketPacketHeader(); - header.setDeviceName(device.getDeviceName()); - header.setProductId("hx-weigh-big-01"); - //header.set - packet.setHeaders(header); - packet.setMessageType(ScConstant.MESSAGE_TYPE_REPORT_PROPERTY); - packet.setDeviceId(device.getDeviceId()); - - //璁剧疆淇℃伅涓讳綋 - WeightInfo weightInfo = new WeightInfo(); - weightInfo.setGrossWeight(weigh); - weightInfo.setNetWeight(weigh); - weightInfo.setNetWeight(weigh); - weightInfo.setWeightUnit("KG"); - JSONObject jsonObject = new JSONObject(); - jsonObject.put("weightInfo", JSONObject.toJSONString(weightInfo)); - - packet.setProperties(jsonObject); - - packet.setTimestamp(System.currentTimeMillis()); - - - webSocketDeviceReport.sendByPacket(packet); - } - - if (GatewayProtocol.GATE_MQTT.equals(device.getPushProtocol())) { - //TODO ---->>>鍚庢湡娣诲姞鏀寔 - } return null; } @Override - public String pushGrainData2Cloud(KafaGrainData data) { + public BaseResp reportGrainData(BaseReqData reqData) { - return kafkaDeviceReport.sendGrainData2Cloud(data); + String topic = ScConstant.TOPIC_REPORT; + topic = topic.replace("${productId}", reqData.getProductId()).replace("${deviceId}", reqData.getDeviceId()); + + publishService.publishMqttWithTopic(reqData.getData(), topic); + + log.info("----------------------------鎺ㄩ�丮QTT绮儏淇℃伅---------------------------"); + log.info("-----TOPIC-----{}", topic); + log.info("-----Message-----{}", reqData.getData()); + + return new BaseResp(); + } + + @Override + public BaseResp reportWeightData(BaseReqData reqData) { + + String topic = ScConstant.TOPIC_MESSAGE_REPORT; + + topic = topic.replace("${productId}", reqData.getProductId()).replace("{deviceId}", reqData.getDeviceId()); + + publishService.publishMqttWithTopic(reqData.getData(), topic); + + log.info("----------------------------鎺ㄩ�丮QTT鍦扮淇℃伅---------------------------"); + log.info("-----TOPIC-----{}", topic); + log.info("-----Message-----{}", reqData.getData()); + return new BaseResp(); + } + + @Override + public BaseResp reportLprData(BaseReqData reqData) { + String topic = ScConstant.TOPIC_MESSAGE_REPORT; + topic = topic.replace("${productId}", reqData.getProductId()).replace("{deviceId}", reqData.getDeviceId()); + + publishService.publishMqttWithTopic(reqData.getData(), topic); + + log.info("----------------------------鎺ㄩ�丮QTT杞︾墝璇嗗埆淇℃伅---------------------------"); + log.info("-----TOPIC-----{}", topic); + log.info("-----Message-----{}", reqData.getData()); + return new BaseResp(); + } + + @Override + public BaseResp reportGrainDataByKafka(BaseReqData reqData) { + String topic = ScConstant.TOPIC_MESSAGE_REPORT; + kafkaDeviceReportService.publishWithTopic(reqData.getData(), topic); + return new BaseResp(); } } -- Gitblit v1.9.3