package com.fzzy.gateway.hx2023.service; import com.alibaba.fastjson2.JSONObject; import com.fzzy.api.Constant; import com.fzzy.api.data.GatewayDeviceType; import com.fzzy.api.data.PushProtocol; import com.fzzy.api.utils.ContextUtil; import com.fzzy.api.view.repository.ApiLogRep; import com.fzzy.gateway.GatewayUtils; import com.fzzy.gateway.api.GatewayRemoteService; import com.fzzy.gateway.data.BaseResp; import com.fzzy.gateway.entity.GatewayConf; import com.fzzy.gateway.entity.GatewayDevice; import com.fzzy.gateway.hx2023.ScConstant; import com.fzzy.gateway.hx2023.data.CloudResp; import com.fzzy.gateway.hx2023.data.DeviceStatusData; import com.fzzy.gateway.hx2023.data.HeartBeatData; import com.fzzy.gateway.hx2023.kafka.KafkaDeviceReportService; import com.fzzy.gateway.hx2023.util.OpenApiSignatureUtils; import com.fzzy.gateway.service.GatewayConfService; import com.fzzy.gateway.util.GatewayHttpUtil; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateFormatUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Map; @Slf4j @Data @Component public class HxGatewayRemoteServiceImpl implements GatewayRemoteService { String charset = "utf-8"; @Resource private ApiLogRep apiLogRep; @Resource private GatewayConfService gatewayConfService; @Resource private KafkaDeviceReportService kafkaDeviceReportService; @Override public String getProtocol() { return PushProtocol.GATEWAY_SC_2023.getCode(); } @Override public void init(GatewayConf gatewayConf) { try { if (StringUtils.isEmpty(gatewayConf.getApiUrl())) { return; } //获取公私钥接口 Map params = new HashMap<>(); params.put("appId", gatewayConf.getGatewayId()); String url = gatewayConf.getApiUrl() + "reserver/api/key/apply"; String jsonStr = GatewayHttpUtil.doGet(url, params); log.info("---获取公私钥接口-返回---{}", jsonStr); CloudResp respKey = JSONObject.parseObject(jsonStr, CloudResp.class); if (BaseResp.CODE_200 == respKey.getCode()) { JSONObject object = respKey.getData(); String pubKey = (String) object.get("pubKey"); String priKey = (String) object.get("priKey"); gatewayConf.setPublicKey(pubKey); gatewayConf.setPrivateKey(priKey); } //获取 AccessToken 接口 String sign = getSign(params, gatewayConf); params.put("sign", sign); url = gatewayConf.getApiUrl() + "reserver/api/token/apply"; jsonStr = GatewayHttpUtil.doGet(url, params); log.info("---获取AccessToken接口-返回---{}", jsonStr); CloudResp respToken = JSONObject.parseObject(jsonStr, CloudResp.class); if (BaseResp.CODE_200 == respToken.getCode()) { JSONObject object = respKey.getData(); if (null != object) { String token = (String) object.get("token"); gatewayConf.setAccessToken(token); } } //更新缓存 updateAuthToken(gatewayConf); } catch (Exception e) { log.error("------初始化失败-----{}", e); } } @Override public void heartbeat(GatewayConf gatewayConf) { log.info("------------定时心跳执行---------{}", gatewayConf.getKqmc()); try { gatewayConf = getCacheConf(gatewayConf.getKqdm()); HeartBeatData heartBeatData = new HeartBeatData(); heartBeatData.setMessageId(ContextUtil.getUUID()); heartBeatData.setApISource(ScConstant.API_SOURCE_TARGET_EQUIPMENT_HEARTBEAT); heartBeatData.setGatewayId(gatewayConf.getGatewayId()); heartBeatData.setGatewayIp(gatewayConf.getGatewayIp()); heartBeatData.setGatewayMac(null == gatewayConf.getGatewayMac() ? "无" : gatewayConf.getGatewayMac()); heartBeatData.setHeartbeat(1); heartBeatData.setReportTime(DateFormatUtils.format(new Date(), "yyyy-MM-dd hh:mm:ss")); heartBeatData.setMemo("FZZY"); String topic = ScConstant.TOPIC_EQUIPMENT_HEARTBEAT; topic = topic.replace("{kqdm}", gatewayConf.getKqdm()); String messageInfo = JSONObject.toJSONString(heartBeatData); kafkaDeviceReportService.publishWithTopic(messageInfo, topic); } catch (Exception e) { log.error("------网关心跳接口--执行失败-----{}", e); } //执行当前网关的设备状态推送 log.info("------------定时设备状态执行---------{}", gatewayConf.getKqmc()); pushDeviceStatus(gatewayConf); } @Override public void pushInfo(GatewayConf gatewayConf) { //DO NOTHING } @Override public void pushDeviceStatus(GatewayConf gatewayConf) { //获取设备列表 Collection list = GatewayUtils.allCacheDevice(); if (null == list || list.isEmpty()) { log.info("--------系统未获取到当前系统设备列表,不执行状态推送-----"); return; } //封装设备报文信息 DeviceStatusData statusData; String messageInfo; String topic = ScConstant.TOPIC_EQUIPMENT_STATUS; topic = topic.replace("{kqdm}", gatewayConf.getKqdm()); for (GatewayDevice device : list) { //如果设备没有配置productId直接跳过 if (StringUtils.isEmpty(device.getProductId())) continue; statusData = new DeviceStatusData(); statusData.setMessageId(ContextUtil.getUUID()); statusData.setApISource(ScConstant.API_SOURCE_TARGET_EQUIPMENT_STATUS); statusData.setGatewayId(gatewayConf.getGatewayId()); statusData.setGatewayIp(gatewayConf.getGatewayIp()); statusData.setProductId(device.getProductId()); statusData.setEquipId(device.getDeviceId()); statusData.setReportTime(DateFormatUtils.format(new Date(), "yyyy-MM-dd hh:mm:ss")); statusData.setMemo("FZZY"); //针对粮情设备 if (GatewayDeviceType.TYPE_07.getCode().equals(device.getType())) { if (Constant.YN_N.equals(device.getStatus())) statusData.setStatus(0); } messageInfo = JSONObject.toJSONString(statusData); //推送设备状态 kafkaDeviceReportService.publishWithTopic(messageInfo, topic); } } public String getSign(Map parames, GatewayConf gatewayConf) throws Exception { //参数调整 String signContent = OpenApiSignatureUtils.getSignContent(parames); log.debug("------待加密信息-----{}", signContent); //MD5加密 String md5sign = OpenApiSignatureUtils.getMd5Content(signContent, charset); //RSA加密 String singValue = OpenApiSignatureUtils.doSignByHex(gatewayConf.getGatewayId(), md5sign, charset, gatewayConf.getPublicKey(), gatewayConf.getPrivateKey()); log.debug("------RSA加密签名-----{}", singValue); return singValue; } /** * @param kqdm * @return */ public GatewayConf getCacheConf(String kqdm) { return gatewayConfService.getCacheConf(kqdm); } private void updateAuthToken(GatewayConf conf) { gatewayConfService.updateCache(conf); } }