package com.fzzy.gateway.hx2023.service; import com.alibaba.fastjson2.JSONObject; import com.fzzy.api.data.PushProtocol; import com.fzzy.async.fzzy40.entity.Fz40Grain; import com.fzzy.data.ConfigData; import com.fzzy.gateway.api.GatewayDeviceReportService; import com.fzzy.gateway.data.BaseReqData; import com.fzzy.gateway.data.BaseResp; import com.fzzy.gateway.entity.GatewayDevice; import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.*; import com.fzzy.gateway.hx2023.kafka.KafkaDeviceReportService; import com.fzzy.mqtt.MqttGatewayService; import jdk.nashorn.internal.runtime.regexp.joni.Config; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateFormatUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; @Slf4j @Component public class DeviceReportServiceImpl implements GatewayDeviceReportService { @Resource private KafkaDeviceReportService kafkaDeviceReportService; @Resource private MqttGatewayService publishService; @Resource private ConfigData configData; @Override public String getProtocol() { return PushProtocol.GATEWAY_SC_2023.getCode(); } @Override public BaseResp reportGrainData(BaseReqData reqData) { String topic = ScConstant.TOPIC_REPORT; topic = topic.replace("${productId}", reqData.getProductId()).replace("${deviceId}", reqData.getDeviceId()); //如果是测试模式不执行推送 if (configData.getActive().indexOf("dev") >= 0) { log.info("----------------------------推送MQTT粮情信息,注:调试模式不推送---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", reqData.getData()); return new BaseResp(); } publishService.publishMqttWithTopic(reqData.getData(), topic); log.info("----------------------------推送MQTT粮情信息---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", reqData.getData()); return new BaseResp(); } @Override public BaseResp reportWeightData(BaseReqData reqData) { String topic = ScConstant.TOPIC_MESSAGE_REPORT; topic = topic.replace("${productId}", reqData.getProductId()).replace("${deviceId}", reqData.getDeviceId()); if (null == reqData.getData()) { GatewayDevice device = reqData.getDevice(); WebSocketPacket packet = new WebSocketPacket(); WebSocketPacketHeader header = new WebSocketPacketHeader(); header.setDeviceName(device.getDeviceName()); header.setProductId(device.getProductId()); packet.setHeaders(header); packet.setMessageType(ScConstant.MESSAGE_TYPE_REPORT_PROPERTY); packet.setDeviceId(device.getDeviceId()); //设置信息主体 WeightInfo weightInfo = new WeightInfo(); weightInfo.setGrossWeight(reqData.getWeight()); weightInfo.setNetWeight(reqData.getWeight()); weightInfo.setNetWeight(reqData.getWeight()); weightInfo.setWeightUnit("KG"); JSONObject jsonObject = new JSONObject(); jsonObject.put("weightInfo", JSONObject.toJSONString(weightInfo)); packet.setProperties(jsonObject); packet.setTimestamp(System.currentTimeMillis()); reqData.setData(JSONObject.toJSONString(packet)); } publishService.publishMqttWithTopic(reqData.getData(), topic); log.info("----------------------------推送MQTT地磅信息---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", reqData.getData()); return new BaseResp(); } @Override public BaseResp reportLprData(BaseReqData reqData) { String topic = ScConstant.TOPIC_MESSAGE_REPORT; topic = topic.replace("${productId}", reqData.getProductId()).replace("${deviceId}", reqData.getDeviceId()); GatewayDevice device = reqData.getDevice(); if (StringUtils.isEmpty(reqData.getData())) { WebSocketPacket packet = new WebSocketPacket(); WebSocketPacketHeader header = new WebSocketPacketHeader(); header.setDeviceName(reqData.getDeviceName()); header.setProductId(reqData.getProductId()); packet.setHeaders(header); packet.setMessageType(ScConstant.MESSAGE_TYPE_REPORT_PROPERTY); packet.setDeviceId(reqData.getDeviceId()); packet.setMessageId(System.currentTimeMillis() + ""); //设置信息主体 LprData lpr = new LprData(); lpr.setDeviceId(reqData.getDeviceId()); lpr.setCarNumber(reqData.getCarNumber()); JSONObject jsonObject = new JSONObject(); jsonObject.put("carNumber", reqData.getCarNumber()); jsonObject.put("position", device.getPosition()); packet.setProperties(jsonObject); packet.setTimestamp(System.currentTimeMillis()); reqData.setData(JSONObject.toJSONString(packet)); } publishService.publishMqttWithTopic(reqData.getData(), topic); log.info("----------------------------推送MQTT车牌识别信息---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", reqData.getData()); return new BaseResp(); } @Override public BaseResp reportGrainDataByHand(BaseReqData reqData) { String topic = ScConstant.TOPIC_ZLJ_GRAIN_TEMPERATURE; //如果是测试模式不执行推送 if (configData.getActive().indexOf("dev") >= 0) { log.info("----------------------------推送KAFKA粮情信息,注:调试模式不推送---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", reqData.getData()); return new BaseResp(); } kafkaDeviceReportService.publishWithTopic(reqData.getData(), topic); return new BaseResp(); } @Override public BaseResp grainData2GatewayApiInfo(Fz40Grain grainData, GatewayDevice device) { BaseResp resp = new BaseResp(); KafaGrainData result = new KafaGrainData(); result.setMessageId(ScConstant.getMessageId()); result.setMessgeId(result.getMessageId()); result.setDeviceID(device.getDeviceId()); result.setAvgTemperature(grainData.getTempAve() + ""); result.setMinTemperature(grainData.getTempMin() + ""); result.setMaxTemperature(grainData.getTempMax() + ""); result.setCollectTime(DateFormatUtils.format(grainData.getReceiveDate(), "yyyy-MM-dd HH:mm:ss")); //层-行-列 String[] attrCable = grainData.getCable().split("-"); if (StringUtils.isNotEmpty(grainData.getCableCir())) { return grainData2GatewayApiInfo2(grainData, device); } //层行列 int cableZ = Integer.valueOf(attrCable[0]); int cableY = Integer.valueOf(attrCable[1]); int cableX = Integer.valueOf(attrCable[2]); //温度集合 String[] attr = grainData.getPoints().split(","); //根号 int cableNum = 1, position = 0; String curTemp; List temperature = new ArrayList<>(); int x = 0, y = 0, z = 0; for (int i = 0; i < attr.length; i++) { position = i; z = i % cableZ + 1; x = i / (cableZ * cableY); y = x * (cableZ * cableY); y = (i - y) / cableZ; // 倒转X轴 x = cableX - 1 - x; //根号 cableNum = (i / cableZ) + 1; curTemp = attr[i]; //如果是异常值,执行调整数据 TODO if (Double.valueOf(curTemp) < -99.9) { curTemp = grainData.getTempAve() + ""; } else { //判断最大 if (curTemp.equals(result.getMaxTemperature())) { result.setMaxX(x + ""); result.setMaxY(y + ""); result.setMaxZ(position + ""); } //判断最小 if (curTemp.equals(result.getMinTemperature())) { result.setMinX(x + ""); result.setMinY(y + ""); result.setMinZ(position + ""); } } temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", x + "", y + "")); } //粮温信息 JSONObject trhInfo = new JSONObject(); // TRHInfo trhInfo = new TRHInfo(); trhInfo.put("temperature", temperature); //仓温度信息 KafkaGrainTH grainTH = new KafkaGrainTH(); grainTH.setHumidity(grainData.getHumidityIn() + ""); grainTH.setTemperature(grainData.getTempIn() + ""); grainTH.setAirHumidity(grainData.getHumidityOut() + ""); grainTH.setAirTemperature(grainData.getTempOut() + ""); List temperatureAndhumidity = new ArrayList<>(); temperatureAndhumidity.add(grainTH); trhInfo.put("temperatureAndhumidity", temperatureAndhumidity); //trhInfo.put("temperatureAndhumidity",grainTH); JSONObject params = new JSONObject(); params.put("TRHInfo", trhInfo); result.setParams(params); //resp.setObj(result); resp.setData(JSONObject.toJSONString(result)); return resp; } private BaseResp grainData2GatewayApiInfo2(Fz40Grain grainData, GatewayDevice device) { BaseResp resp = new BaseResp(); resp.setCode(BaseResp.CODE_500); resp.setMsg("筒仓解析暂未实现"); return resp; } }