package com.fzzy.gateway.service; import com.alibaba.fastjson2.JSONObject; import com.bstek.dorado.annotation.Expose; import com.fzzy.api.data.GatewayDeviceType; import com.fzzy.api.utils.DateUtil; import com.fzzy.async.fzzy40.entity.Fz40Grain; import com.fzzy.gateway.api.GatewayDeviceReportService; import com.fzzy.gateway.api.GatewayRemoteManager; import com.fzzy.gateway.data.BaseResp; import com.fzzy.gateway.data.BaseReqData; import com.fzzy.gateway.entity.GatewayDevice; import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.*; import com.fzzy.gateway.service.repository.GatewayDeviceRep; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateFormatUtils; import org.springframework.data.domain.Sort; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * */ @Slf4j @Component public class GatewayDeviceTestPR { @Resource private GatewayDeviceRep gatewayDeviceRep; // @Resource // private Fzzy40CommonService fzzy40CommonService; @Resource private GatewayRemoteManager gatewayRemoteManager; // @Resource // private MqttGatewayService publishService; public List listAll() { Sort sort = new Sort(Sort.Direction.ASC, "deviceId"); return gatewayDeviceRep.findAll(sort); } /** * 测试MQTT粮情检测 * 粮情推送测试 * * @param data * @return */ @Expose public String ajaxTestGrain2(GatewayDevice data) { BaseReqData reqData = new BaseReqData(); reqData.setDevice(data); reqData.setAutoReplay(true); reqData.setMessageType(ScConstant.MESSAGE_TYPE_INVOKE_FUNCTION); reqData.setMessageId(ScConstant.getMessageId()); reqData.setFunctionId(ScConstant.FUNCTION_getTAndRHInfo); if (!GatewayDeviceType.TYPE_07.getCode().equals(data.getType())) { return "ERROR:当前设备非粮情设备不支持当前操作"; } if (StringUtils.isEmpty(data.getCableRule())) { return "ERROR:当前设备没有配置布线规则,无法执行"; } BaseResp resp = gatewayRemoteManager.getSyncGrainService(data.getSyncProtocol()).syncGrain(reqData); //自动推送 if (200 == resp.getCode() && reqData.isAutoReplay()) { String topic = ScConstant.TOPIC_REPORT; topic = topic.replace("${productId}", data.getProductId()).replace("${deviceId}", data.getDeviceId()); // publishService.publishMqttWithTopic(resp.getData(), topic); log.info("----------------------------手动推送MQTT粮情信息---------------------------"); log.info("-----TOPIC-----{}", topic); log.info("-----Message-----{}", resp.getData()); } return "SUCCESS:执行完成"; } /** * 粮情推送测试KAFKA方式 * * @param data * @return */ @Expose public String ajaxTestKafkaGrain(Map data) throws Exception { //获取设备配置,只针对粮情设备进行执行 List list = this.listAll(); if (null == list || list.isEmpty()) { return "ERROR:为获取到系统中设备配置,取消执行"; } Date dayTime = (Date) data.get("dayTime"); if (null == dayTime) dayTime = new Date(); Date start = DateUtil.getCurZero(dayTime); Date end = DateUtil.getNextZero(dayTime); //如果部署FZZY-IGDS-V40版本系统 return this.pushByV40(list, start, end); } private String pushByV40(List list, Date start, Date end) { String depotIdSys; List listGrain; Fz40Grain lastData; KafaGrainData pushData; GatewayDeviceReportService deviceReportService = null; for (GatewayDevice device : list) { depotIdSys = device.getDepotIdSys(); if (StringUtils.isEmpty(depotIdSys)) { log.info("--------设备--{}-未配置系统相关仓库编码,无法执行当前操作", device.getDeviceName()); continue; } //TODO ----》 // listGrain = fzzy40CommonService.listGrain(depotIdSys, start, end); listGrain = null; if (null == listGrain || listGrain.isEmpty()) { log.info("---------设备---{}--未同步到粮情信息,请确认当前条件下是否有数据", device.getDeviceName()); continue; } //获取最后一条粮情作为当前数据 lastData = listGrain.get(listGrain.size() - 1); pushData = this.lastData2PushData(lastData, device); if (null == deviceReportService) { deviceReportService = gatewayRemoteManager.getDeviceReportService(device.getPushProtocol()); } // deviceReportService.pushGrainData2Cloud(pushData); } return "SUCCESS"; } /** * 将粮情数据转换为推送需要的数据格式 * * @param lastData * @return */ private KafaGrainData lastData2PushData(Fz40Grain lastData, GatewayDevice device) { KafaGrainData result = new KafaGrainData(); result.setMessageId(ScConstant.getMessageId()); result.setMessgeId(result.getMessageId()); result.setDeviceID(device.getDeviceId()); result.setAvgTemperature(lastData.getTempAve() + ""); result.setMinTemperature(lastData.getTempMin() + ""); result.setMaxTemperature(lastData.getTempMax() + ""); result.setCollectTime(DateFormatUtils.format(lastData.getReceiveDate(), "yyyy-MM-dd HH:mm:ss")); //层-行-列 String[] attrCable = lastData.getCable().split("-"); if (StringUtils.isNotEmpty(lastData.getCableCir())) { return lastData2PushData2(lastData, device); } //层行列 int cableZ = Integer.valueOf(attrCable[0]); int cableY = Integer.valueOf(attrCable[1]); int cableX = Integer.valueOf(attrCable[2]); //温度集合 String[] attr = lastData.getPoints().split(","); //根号 int cableNum = 1, position = 0; String curTemp; List temperature = new ArrayList<>(); int x = 0, y = 0, z = 0; for (int i = 0; i < attr.length; i++) { position = i; z = i % cableZ + 1; x = i / (cableZ * cableY); y = x * (cableZ * cableY); y = (i - y) / cableZ; // 倒转X轴 x = cableX - 1 - x; //根号 cableNum = (i / cableZ) + 1; curTemp = attr[i]; //判断最大 if (curTemp.equals(result.getMaxTemperature())) { result.setMaxX(x + ""); result.setMaxY(y + ""); result.setMaxZ(position + ""); } //判断最小 if (curTemp.equals(result.getMinTemperature())) { result.setMinX(x + ""); result.setMinY(y + ""); result.setMinZ(position + ""); } temperature.add(new KafkaGrainDataDetail1(cableNum + "", z + "", curTemp, position + "", x + "", y + "")); } //粮温信息 JSONObject trhInfo = new JSONObject(); // TRHInfo trhInfo = new TRHInfo(); trhInfo.put("temperature", temperature); //仓温度信息 KafkaGrainTH grainTH = new KafkaGrainTH(); grainTH.setHumidity(lastData.getHumidityIn() + ""); grainTH.setTemperature(lastData.getTempIn() + ""); grainTH.setAirHumidity(lastData.getHumidityOut() + ""); grainTH.setAirTemperature(lastData.getTempOut() + ""); List temperatureAndhumidity = new ArrayList<>(); temperatureAndhumidity.add(grainTH); trhInfo.put("temperatureAndhumidity", temperatureAndhumidity); //trhInfo.put("temperatureAndhumidity",grainTH); JSONObject params = new JSONObject(); params.put("TRHInfo", trhInfo); result.setParams(params); return result; } /** * 针对筒仓 TODO ----- * * @param lastData * @param device * @return */ private KafaGrainData lastData2PushData2(Fz40Grain lastData, GatewayDevice device) { return null; } /** * 地磅推送测试 * * @param data * @return */ @Expose public String ajaxTestWeight(Map data) throws Exception { double weigh = (double) data.get("weight"); double deviceId = (double) data.get("deviceId"); List devices = listAll(); if (devices == null || devices.size() <= 0) { return "ERROR:没有配置设备信息,执行失败"; } List weights = devices.stream().filter(s -> (GatewayDeviceType.TYPE_01.getCode().equals(s.getType()))).collect(Collectors.toList()); if (weights == null || weights.size() <= 0) { return "ERROR:ERROR:没有配置设备信息,执行失败"; } String topic; for (GatewayDevice device : weights) { WebSocketPacket packet = new WebSocketPacket(); //productId的值在webSocket执行发送方法中补充 WebSocketPacketHeader header = new WebSocketPacketHeader(); header.setDeviceName(device.getDeviceName()); header.setProductId(device.getProductId()); //header.set packet.setHeaders(header); packet.setMessageType(ScConstant.MESSAGE_TYPE_REPORT_PROPERTY); packet.setDeviceId(device.getDeviceId()); //设置信息主体 WeightInfo weightInfo = new WeightInfo(); weightInfo.setGrossWeight(weigh); weightInfo.setNetWeight(weigh); weightInfo.setNetWeight(weigh); weightInfo.setWeightUnit("KG"); JSONObject jsonObject = new JSONObject(); jsonObject.put("weightInfo", JSONObject.toJSONString(weightInfo)); packet.setProperties(jsonObject); packet.setTimestamp(System.currentTimeMillis()); topic = ScConstant.TOPIC_MESSAGE_REPORT; topic = topic.replace("{${productId}}", device.getProductId()).replace("{deviceId}", device.getDeviceId()); // topic = "/device/" + header.getProductId() + "/" + device.getDeviceId() + "/message/property/report"; // publishService.publishMqttWithTopic(JSON.toJSONString(packet), topic); } return "SUCCESS"; } /** * 地磅推送测试 * * @return */ @Expose public String ajaxTestLpr(Map data) throws Exception { String carNumber = (String) data.get("carNumber"); // String carNumber = "川A12345"; List devices = listAll(); if (devices == null || devices.size() <= 0) { return "ERROR:没有配置设备信息,执行失败"; } List weights = devices.stream().filter(s -> (GatewayDeviceType.TYPE_02.getCode().equals(s.getType()))).collect(Collectors.toList()); if (weights == null || weights.size() <= 0) { return "ERROR:ERROR:没有配置设备信息,执行失败"; } String topic; for (GatewayDevice device : weights) { WebSocketPacket packet = new WebSocketPacket(); //productId的值在webSocket执行发送方法中补充 WebSocketPacketHeader header = new WebSocketPacketHeader(); header.setDeviceName(device.getDeviceName()); header.setProductId(device.getProductId()); //header.set packet.setHeaders(header); packet.setMessageType(ScConstant.MESSAGE_TYPE_REPORT_PROPERTY); packet.setDeviceId(device.getDeviceId()); packet.setMessageId(System.currentTimeMillis() + ""); //设置信息主体 LprData lpr = new LprData(); lpr.setDeviceId(device.getDeviceId()); lpr.setCarNumber(carNumber); JSONObject jsonObject = new JSONObject(); jsonObject.put("carNumber", carNumber); jsonObject.put("position", "big"); packet.setProperties(jsonObject); packet.setTimestamp(System.currentTimeMillis()); topic = ScConstant.TOPIC_MESSAGE_REPORT; topic = topic.replace("{${productId}}", device.getProductId()).replace("{deviceId}", device.getDeviceId()); // topic = "/device/" + header.getProductId() + "/" + device.getDeviceId() + "/message/property/report"; // publishService.publishMqttWithTopic(JSON.toJSONString(packet), topic); } return "SUCCESS"; } @Expose public String test(GatewayDevice data) { log.info("-----------test-------------------"); return "SUCCESS"; } }