package com.fzzy.gateway.hx2023.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.fzzy.api.data.DepotType; import com.fzzy.api.data.PushProtocol; import com.fzzy.async.fzzy40.entity.Fz40Grain; import com.fzzy.data.ConfigData; import com.fzzy.gateway.GatewayUtils; import com.fzzy.gateway.api.GatewayDeviceReportService; import com.fzzy.gateway.data.BaseReqData; import com.fzzy.gateway.data.BaseResp; import com.fzzy.gateway.data.GrainCableData; import com.fzzy.gateway.entity.GatewayDevice; import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.*; import com.fzzy.gateway.hx2023.kafka.KafkaDeviceReportService; import com.fzzy.mqtt.MqttGatewayService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateFormatUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.Date; import java.util.List; @Slf4j @Component public class DeviceReportServiceImpl implements GatewayDeviceReportService { @Resource private KafkaDeviceReportService kafkaDeviceReportService; @Resource private MqttGatewayService publishService; @Resource private ConfigData configData; @Override public String getProtocol() { return PushProtocol.GATEWAY_SC_2023.getCode(); } @Override public BaseResp reportGrainData(BaseReqData reqData) { String topic = ScConstant.TOPIC_REPORT; topic = topic.replace("${productId}", reqData.getProductId()).replace("${deviceId}", reqData.getDeviceId()); //如果是测试模式不执行推送 if (configData.getActive().indexOf("dev") >= 0) { log.info("----------------------------推送MQTT粮情信息,注:调试模式不推送---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", reqData.getData()); return new BaseResp(); } publishService.publishMqttWithTopic(reqData.getData(), topic); log.info("----------------------------推送MQTT粮情信息---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", reqData.getData()); return new BaseResp(); } @Override public BaseResp reportGrainDataByKafka(BaseReqData reqData) { String topic = ScConstant.TOPIC_ZLJ_GRAIN_TEMPERATURE; //如果是测试模式不执行推送 if (configData.getActive().indexOf("dev") >= 0) { log.info("----------------------------推送KAFKA粮情信息,注:调试模式不推送---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", reqData.getData()); return new BaseResp(); } kafkaDeviceReportService.publishWithTopic(reqData.getData(), topic); return new BaseResp(); } @Override public BaseResp reportWeightData(BaseReqData reqData) { String topic = ScConstant.TOPIC_MESSAGE_REPORT; topic = topic.replace("${productId}", reqData.getProductId()).replace("${deviceId}", reqData.getDeviceId()); if (null == reqData.getData()) { GatewayDevice device = reqData.getDevice(); WebSocketPacket packet = new WebSocketPacket(); WebSocketPacketHeader header = new WebSocketPacketHeader(); header.setDeviceName(device.getDeviceName()); header.setProductId(device.getProductId()); packet.setHeaders(header); packet.setMessageType(ScConstant.MESSAGE_TYPE_REPORT_PROPERTY); packet.setDeviceId(device.getDeviceId()); //设置信息主体 WeightInfo weightInfo = new WeightInfo(); weightInfo.setGrossWeight(reqData.getWeight()); weightInfo.setNetWeight(reqData.getWeight()); weightInfo.setNetWeight(reqData.getWeight()); weightInfo.setWeightUnit("KG"); JSONObject jsonObject = new JSONObject(); jsonObject.put("weightInfo", JSON.toJSONString(weightInfo)); packet.setProperties(jsonObject); packet.setTimestamp(System.currentTimeMillis()); reqData.setData(JSONObject.toJSONString(packet)); } publishService.publishMqttWithTopic(reqData.getData(), topic); log.info("----------------------------推送MQTT地磅信息---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", reqData.getData()); return new BaseResp(); } @Override public BaseResp reportLprData(BaseReqData reqData) { String topic = ScConstant.TOPIC_MESSAGE_REPORT; topic = topic.replace("${productId}", reqData.getProductId()).replace("${deviceId}", reqData.getDeviceId()); GatewayDevice device = reqData.getDevice(); if (StringUtils.isEmpty(reqData.getData())) { WebSocketPacket packet = new WebSocketPacket(); WebSocketPacketHeader header = new WebSocketPacketHeader(); header.setDeviceName(reqData.getDeviceName()); header.setProductId(reqData.getProductId()); packet.setHeaders(header); packet.setMessageType(ScConstant.MESSAGE_TYPE_REPORT_PROPERTY); packet.setDeviceId(reqData.getDeviceId()); packet.setMessageId(System.currentTimeMillis() + ""); //设置信息主体 LprData lpr = new LprData(); lpr.setDeviceId(reqData.getDeviceId()); lpr.setCarNumber(reqData.getCarNumber()); JSONObject jsonObject = new JSONObject(); jsonObject.put("carNumber", reqData.getCarNumber()); jsonObject.put("position", device.getPosition()); packet.setProperties(jsonObject); packet.setTimestamp(System.currentTimeMillis()); reqData.setData(JSONObject.toJSONString(packet)); } publishService.publishMqttWithTopic(reqData.getData(), topic); log.info("----------------------------推送MQTT车牌识别信息---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", reqData.getData()); return new BaseResp(); } @Override public BaseResp reportGrainDataByHand(BaseReqData reqData) { String topic = ScConstant.TOPIC_ZLJ_GRAIN_TEMPERATURE; //如果是测试模式不执行推送 if (configData.getActive().indexOf("dev") >= 0) { log.info("----------------------------推送KAFKA粮情信息,注:调试模式不推送---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", reqData.getData()); return new BaseResp(); } kafkaDeviceReportService.publishWithTopic(reqData.getData(), topic); return new BaseResp(); } @Override public BaseResp grainData2GatewayApiInfo(Fz40Grain grainData, GatewayDevice device) { BaseResp resp = new BaseResp(); GrainCableData cableData = GatewayUtils.getCableData(device); if (null == device.getDepotType()) device.setDepotType(DepotType.TYPE_01.getCode()); //表示筒仓 if (DepotType.TYPE_02.getCode().equals(device.getDepotType()) || DepotType.TYPE_04.getCode().equals(device.getDepotType())) { return grainData2GatewayApiInfo2(grainData, device, cableData); } //表示为筒仓包括油罐仓 if (DepotType.TYPE_03.getCode().equals(device.getDepotType())) { return grainData2GatewayApiInfo3(grainData, device, cableData); } KafaGrainData result = new KafaGrainData(); result.setMessageId(ScConstant.getMessageId()); result.setMessgeId(result.getMessageId()); result.setDeviceID(device.getDeviceId()); result.setAvgTemperature(grainData.getTempAve() + ""); result.setMinTemperature(grainData.getTempMin() + ""); result.setMaxTemperature(grainData.getTempMax() + ""); result.setCollectTime(DateFormatUtils.format(grainData.getReceiveDate(), "yyyy-MM-dd HH:mm:ss")); //层行列 int cableZ = cableData.getCableZ(); int cableY = cableData.getCableY(); int cableX = cableData.getCableX(); //温度集合 String[] attr = grainData.getPoints().split(","); //根号 int cableNum = 1, position = 0; String curTemp; List temperature = new ArrayList<>(); int x = 0, y = 0, z = 0; for (int i = 0; i < attr.length; i++) { position = i; z = i % cableZ + 1; x = i / (cableZ * cableY); y = x * (cableZ * cableY); y = (i - y) / cableZ; // 倒转X轴 x = cableX - 1 - x; //根号 cableNum = (i / cableZ) + 1; curTemp = attr[i]; //如果是异常值,执行调整数据 TODO if (Double.valueOf(curTemp) < -99.9) { curTemp = grainData.getTempAve() + ""; } else { //判断最大 if (curTemp.equals(result.getMaxTemperature())) { result.setMaxX(x + ""); result.setMaxY(y + ""); result.setMaxZ(position + ""); } //判断最小 if (curTemp.equals(result.getMinTemperature())) { result.setMinX(x + ""); result.setMinY(y + ""); result.setMinZ(position + ""); } } temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", x + "", y + "")); } //粮温信息 JSONObject trhInfo = new JSONObject(); // TRHInfo trhInfo = new TRHInfo(); trhInfo.put("temperature", temperature); //仓温度信息 KafkaGrainTH grainTH = new KafkaGrainTH(); grainTH.setHumidity(grainData.getHumidityIn() + ""); grainTH.setTemperature(grainData.getTempIn() + ""); grainTH.setAirHumidity(grainData.getHumidityOut() + ""); grainTH.setAirTemperature(grainData.getTempOut() + ""); List temperatureAndhumidity = new ArrayList<>(); temperatureAndhumidity.add(grainTH); trhInfo.put("temperatureAndhumidity", temperatureAndhumidity); JSONObject params = new JSONObject(); params.put("TRHInfo", trhInfo); result.setParams(params); resp.setData(JSONObject.toJSONString(result)); return resp; } /** * 筒仓 * * @param grainData * @param device * @param cableData * @return */ private BaseResp grainData2GatewayApiInfo2(Fz40Grain grainData, GatewayDevice device, GrainCableData cableData) { BaseResp resp = new BaseResp(); KafaGrainData result = new KafaGrainData(); result.setMessageId(ScConstant.getMessageId()); result.setMessgeId(result.getMessageId()); result.setDeviceID(device.getDeviceId()); result.setAvgTemperature(grainData.getTempAve() + ""); result.setMinTemperature(grainData.getTempMin() + ""); result.setMaxTemperature(grainData.getTempMax() + ""); result.setCollectTime(DateFormatUtils.format(grainData.getReceiveDate(), "yyyy-MM-dd HH:mm:ss")); //层行列 int cableZ = cableData.getCableZ(); //温度集合 String[] attr = grainData.getPoints().split(","); //根号 int cableNum = 1, position = 0; String curTemp; List temperature = new ArrayList<>(); JSONObject totalCircle = new JSONObject(); totalCircle.put("totalCircle", cableData.getTotalCircle() + ""); totalCircle.put("smallCircle", cableData.getSmallCircle()); String totalCircleStr = totalCircle.toJSONString(); int x = 0, y = 0, z = 0, circle = 1; for (int i = 0; i < attr.length; i++) { position = i; z = i % cableZ + 1; //根号 cableNum = (i / cableZ) + 1; curTemp = attr[i]; circle = this.getCirCle(position, cableNum, cableData); y = 1; //如果是异常值,执行调整数据 TODO if (Double.valueOf(curTemp) < -99.9) { curTemp = grainData.getTempAve() + ""; } else { //判断最大 if (curTemp.equals(result.getMaxTemperature())) { result.setMaxX(cableNum + ""); result.setMaxY(y + ""); result.setMaxZ(position + ""); } //判断最小 if (curTemp.equals(result.getMinTemperature())) { result.setMinX(cableNum + ""); result.setMinY(y + ""); result.setMinZ(position + ""); } } temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", null, null, circle + "", totalCircleStr)); } //粮温信息 JSONObject trhInfo = new JSONObject(); trhInfo.put("temperature", temperature); //仓温度信息 KafkaGrainTH grainTH = new KafkaGrainTH(); grainTH.setHumidity(grainData.getHumidityIn() + ""); grainTH.setTemperature(grainData.getTempIn() + ""); grainTH.setAirHumidity(grainData.getHumidityOut() + ""); grainTH.setAirTemperature(grainData.getTempOut() + ""); List temperatureAndhumidity = new ArrayList<>(); temperatureAndhumidity.add(grainTH); trhInfo.put("temperatureAndhumidity", temperatureAndhumidity); JSONObject params = new JSONObject(); params.put("TRHInfo", trhInfo); result.setParams(params); resp.setData(JSONObject.toJSONString(result)); return resp; } /** * 获取当点所在圈 * * @param cableNum * @param cableData * @return */ private int getCirCle(int position, int cableNum, GrainCableData cableData) { int num1 = 1, num2 = 2; String[] attCable = cableData.getCableRule().split("-"); if (cableData.getTotalCircle() == 1) return 1; if (cableData.getTotalCircle() == 2) { num1 = Integer.valueOf(attCable[0]); if (cableNum <= num1) return 1; return 2; } if (cableData.getTotalCircle() == 3) { num1 = Integer.valueOf(attCable[0]); num2 = num1 + Integer.valueOf(attCable[1]); if (cableNum <= num1) return 1; if (cableNum <= num2) return 2; return 3; } if (cableData.getTotalCircle() == 4) { num1 = Integer.valueOf(attCable[0]); num2 = num1 + Integer.valueOf(attCable[1]); if (cableNum <= num1) return 1; if (cableNum <= num2) return 2; num2 = num1 + Integer.valueOf(attCable[1]) + Integer.valueOf(attCable[2]); if (cableNum <= num2) return 3; return 4; } return 1; } /** * 油罐仓的处理 *

* 2024年1月20日 暂时用的平房仓报文 * * @param grainData * @param device * @param cableData * @return */ private BaseResp grainData2GatewayApiInfo3(Fz40Grain grainData, GatewayDevice device, GrainCableData cableData) { BaseResp resp = new BaseResp(); KafaGrainData result = new KafaGrainData(); result.setMessageId(ScConstant.getMessageId()); result.setMessgeId(result.getMessageId()); result.setDeviceID(device.getDeviceId()); result.setAvgTemperature(grainData.getTempAve() + ""); result.setMinTemperature(grainData.getTempMin() + ""); result.setMaxTemperature(grainData.getTempMax() + ""); result.setCollectTime(DateFormatUtils.format(grainData.getReceiveDate(), "yyyy-MM-dd HH:mm:ss")); //层行列 int cableZ = cableData.getCableZ(); int cableY = cableData.getCableY(); int cableX = cableData.getCableX(); //温度集合 String[] attr = grainData.getPoints().split(","); //根号 int cableNum = 1, position = 0; String curTemp; List temperature = new ArrayList<>(); int x = 0, y = 0, z = 0; for (int i = 0; i < attr.length; i++) { position = i; z = i % cableZ + 1; x = i / (cableZ * cableY); y = x * (cableZ * cableY); y = (i - y) / cableZ; // 倒转X轴 x = cableX - 1 - x; //根号 cableNum = (i / cableZ) + 1; curTemp = attr[i]; //如果是异常值,执行调整数据 TODO if (Double.valueOf(curTemp) < -99.9) { curTemp = grainData.getTempAve() + ""; } else { //判断最大 if (curTemp.equals(result.getMaxTemperature())) { result.setMaxX(x + ""); result.setMaxY(y + ""); result.setMaxZ(position + ""); } //判断最小 if (curTemp.equals(result.getMinTemperature())) { result.setMinX(x + ""); result.setMinY(y + ""); result.setMinZ(position + ""); } } temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", x + "", y + "")); } //粮温信息 JSONObject trhInfo = new JSONObject(); // TRHInfo trhInfo = new TRHInfo(); trhInfo.put("temperature", temperature); //仓温度信息 KafkaGrainTH grainTH = new KafkaGrainTH(); grainTH.setHumidity(grainData.getHumidityIn() + ""); grainTH.setTemperature(grainData.getTempIn() + ""); grainTH.setAirHumidity(grainData.getHumidityOut() + ""); grainTH.setAirTemperature(grainData.getTempOut() + ""); List temperatureAndhumidity = new ArrayList<>(); temperatureAndhumidity.add(grainTH); trhInfo.put("temperatureAndhumidity", temperatureAndhumidity); JSONObject params = new JSONObject(); params.put("TRHInfo", trhInfo); result.setParams(params); resp.setData(JSONObject.toJSONString(result)); return resp; } // ---------------------------------------------------- @Override public BaseResp grainData2GatewayApiInfoKafka(GrainData grainData, GatewayDevice device) { BaseResp resp = new BaseResp(); GrainCableData cableData = GatewayUtils.getCableData(device); if (null == device.getDepotType()) device.setDepotType(DepotType.TYPE_01.getCode()); // //表示筒仓 // if (DepotType.TYPE_02.getCode().equals(device.getDepotType()) || DepotType.TYPE_04.getCode().equals(device.getDepotType())) { // return grainData2GatewayApiInfo2(grainData, device, cableData); // } // // //表示为筒仓包括油罐仓 // if (DepotType.TYPE_03.getCode().equals(device.getDepotType())) { // return grainData2GatewayApiInfo3(grainData, device, cableData); // } GrainOutPut output = JSONObject.parseObject(grainData.getOutput(),GrainOutPut.class); KafaGrainData result = new KafaGrainData(); result.setMessageId(ScConstant.getMessageId()); result.setMessgeId(result.getMessageId()); result.setDeviceID(device.getDeviceId()); result.setAvgTemperature(output.getAvgTemperature()); result.setMinTemperature(output.getMinTemperature()); result.setMaxTemperature(output.getMaxTemperature() ); result.setCollectTime(DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")); //层行列 int cableZ = cableData.getCableZ(); int cableY = cableData.getCableY(); int cableX = cableData.getCableX(); //温度集合 List attr = output.getTemperature(); //根号 int cableNum = 1, position = 0; String curTemp; List temperature = new ArrayList<>(); int x = 0, y = 0, z = 0; for (int i = 0; i < attr.size(); i++) { position = i; z = i % cableZ + 1; x = i / (cableZ * cableY); y = x * (cableZ * cableY); y = (i - y) / cableZ; // 倒转X轴 x = cableX - 1 - x; //根号 cableNum = (i / cableZ) + 1; curTemp = attr.get(i).getTemperature(); //如果是异常值,执行调整数据 TODO if (Double.valueOf(curTemp) < -99.9) { //curTemp = output.getAvgTemperature(); } else { //判断最大 if (curTemp.equals(result.getMaxTemperature())) { result.setMaxX(x + ""); result.setMaxY(y + ""); result.setMaxZ(position + ""); } //判断最小 if (curTemp.equals(result.getMinTemperature())) { result.setMinX(x + ""); result.setMinY(y + ""); result.setMinZ(position + ""); } } temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", x + "", y + "")); } //粮温信息 JSONObject trhInfo = new JSONObject(); // TRHInfo trhInfo = new TRHInfo(); trhInfo.put("temperature", temperature); //仓温度信息 KafkaGrainTH grainTH = new KafkaGrainTH(); List ths= output.getTemperatureAndhumidity(); if(ths != null && ths.size() > 0){ grainTH.setHumidity(ths.get(0).getHumidity()); grainTH.setTemperature(ths.get(0).getTemperature() ); } GrainWeather weather = JSON.parseObject(grainData.getWeatherStation(),GrainWeather.class); if (weather!=null){ grainTH.setAirHumidity(weather.getHumidity()); grainTH.setAirTemperature(weather.getTemperature() ); } List temperatureAndhumidity = new ArrayList<>(); temperatureAndhumidity.add(grainTH); trhInfo.put("temperatureAndhumidity", temperatureAndhumidity); JSONObject params = new JSONObject(); params.put("TRHInfo", trhInfo); result.setParams(params); resp.setData(JSONObject.toJSONString(result)); return resp; } /** * 获取当点所在圈 * * @param cableNum * @param cableData * @return */ private int getCirCleKafka(int position, int cableNum, GrainCableData cableData) { int num1 = 1, num2 = 2; String[] attCable = cableData.getCableRule().split("-"); if (cableData.getTotalCircle() == 1) return 1; if (cableData.getTotalCircle() == 2) { num1 = Integer.valueOf(attCable[0]); if (cableNum <= num1) return 1; return 2; } if (cableData.getTotalCircle() == 3) { num1 = Integer.valueOf(attCable[0]); num2 = num1 + Integer.valueOf(attCable[1]); if (cableNum <= num1) return 1; if (cableNum <= num2) return 2; return 3; } if (cableData.getTotalCircle() == 4) { num1 = Integer.valueOf(attCable[0]); num2 = num1 + Integer.valueOf(attCable[1]); if (cableNum <= num1) return 1; if (cableNum <= num2) return 2; num2 = num1 + Integer.valueOf(attCable[1]) + Integer.valueOf(attCable[2]); if (cableNum <= num2) return 3; return 4; } return 1; } }