package com.fzzy.gateway.hx2023.service;
|
|
import com.alibaba.fastjson2.JSONObject;
|
import com.fzzy.api.Constant;
|
import com.fzzy.api.data.GatewayDeviceType;
|
import com.fzzy.api.data.PushProtocol;
|
import com.fzzy.api.utils.ContextUtil;
|
import com.fzzy.api.view.repository.ApiLogRep;
|
import com.fzzy.gateway.GatewayUtils;
|
import com.fzzy.gateway.api.GatewayRemoteService;
|
import com.fzzy.gateway.data.BaseResp;
|
import com.fzzy.gateway.entity.GatewayConf;
|
import com.fzzy.gateway.entity.GatewayDevice;
|
import com.fzzy.gateway.hx2023.ScConstant;
|
import com.fzzy.gateway.hx2023.data.CloudResp;
|
import com.fzzy.gateway.hx2023.data.DeviceStatusData;
|
import com.fzzy.gateway.hx2023.data.HeartBeatData;
|
import com.fzzy.gateway.hx2023.kafka.KafkaDeviceReportService;
|
import com.fzzy.gateway.hx2023.util.OpenApiSignatureUtils;
|
import com.fzzy.gateway.service.GatewayConfService;
|
import com.fzzy.gateway.util.GatewayHttpUtil;
|
import lombok.Data;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.time.DateFormatUtils;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.util.Collection;
|
import java.util.Date;
|
import java.util.HashMap;
|
import java.util.Map;
|
|
|
@Slf4j
|
@Data
|
@Component
|
public class HxGatewayRemoteServiceImpl implements GatewayRemoteService {
|
|
|
String charset = "utf-8";
|
|
@Resource
|
private ApiLogRep apiLogRep;
|
|
@Resource
|
private GatewayConfService gatewayConfService;
|
@Resource
|
private KafkaDeviceReportService kafkaDeviceReportService;
|
|
|
@Override
|
public String getProtocol() {
|
return PushProtocol.GATEWAY_SC_2023.getCode();
|
}
|
|
@Override
|
public void init(GatewayConf gatewayConf) {
|
try {
|
|
if (StringUtils.isEmpty(gatewayConf.getApiUrl())) {
|
return;
|
}
|
|
//获取公私钥接口
|
Map<String, String> params = new HashMap<>();
|
params.put("appId", gatewayConf.getGatewayId());
|
|
String url = gatewayConf.getApiUrl() + "reserver/api/key/apply";
|
String jsonStr = GatewayHttpUtil.doGet(url, params);
|
|
log.info("---获取公私钥接口-返回---{}", jsonStr);
|
|
CloudResp respKey = JSONObject.parseObject(jsonStr, CloudResp.class);
|
|
if (BaseResp.CODE_200 == respKey.getCode()) {
|
JSONObject object = respKey.getData();
|
String pubKey = (String) object.get("pubKey");
|
String priKey = (String) object.get("priKey");
|
|
gatewayConf.setPublicKey(pubKey);
|
gatewayConf.setPrivateKey(priKey);
|
}
|
|
|
//获取 AccessToken 接口
|
String sign = getSign(params, gatewayConf);
|
params.put("sign", sign);
|
url = gatewayConf.getApiUrl() + "reserver/api/token/apply";
|
|
jsonStr = GatewayHttpUtil.doGet(url, params);
|
log.info("---获取AccessToken接口-返回---{}", jsonStr);
|
CloudResp respToken = JSONObject.parseObject(jsonStr, CloudResp.class);
|
|
if (BaseResp.CODE_200 == respToken.getCode()) {
|
JSONObject object = respKey.getData();
|
if (null != object) {
|
String token = (String) object.get("token");
|
gatewayConf.setAccessToken(token);
|
}
|
}
|
|
//更新缓存
|
updateAuthToken(gatewayConf);
|
|
} catch (Exception e) {
|
log.error("------初始化失败-----{}", e);
|
}
|
}
|
|
@Override
|
public void heartbeat(GatewayConf gatewayConf) {
|
|
log.info("------------定时心跳执行---------{}", gatewayConf.getKqmc());
|
|
try {
|
|
|
gatewayConf = getCacheConf(gatewayConf.getKqdm());
|
|
|
HeartBeatData heartBeatData = new HeartBeatData();
|
|
heartBeatData.setMessageId(ContextUtil.getUUID());
|
heartBeatData.setApISource(ScConstant.API_SOURCE_TARGET_EQUIPMENT_HEARTBEAT);
|
heartBeatData.setGatewayId(gatewayConf.getGatewayId());
|
heartBeatData.setGatewayIp(gatewayConf.getGatewayIp());
|
heartBeatData.setGatewayMac(null == gatewayConf.getGatewayMac() ? "无" : gatewayConf.getGatewayMac());
|
heartBeatData.setHeartbeat(1);
|
heartBeatData.setReportTime(DateFormatUtils.format(new Date(), "yyyy-MM-dd hh:mm:ss"));
|
heartBeatData.setMemo("FZZY");
|
|
String topic = ScConstant.TOPIC_EQUIPMENT_HEARTBEAT;
|
topic = topic.replace("{kqdm}", gatewayConf.getKqdm());
|
|
String messageInfo = JSONObject.toJSONString(heartBeatData);
|
|
log.info("---网关心跳推送--{}-{}", topic, messageInfo);
|
|
kafkaDeviceReportService.publishWithTopic(messageInfo, topic);
|
|
|
} catch (Exception e) {
|
log.error("------网关心跳接口--执行失败-----{}", e);
|
}
|
|
//执行当前网关的设备状态推送
|
pushDeviceStatus(gatewayConf);
|
}
|
|
@Override
|
public void pushInfo(GatewayConf gatewayConf) {
|
try {
|
|
if (StringUtils.isEmpty(gatewayConf.getApiUrl())) {
|
return;
|
}
|
|
gatewayConf = getCacheConf(gatewayConf.getKqdm());
|
//网关心跳接口
|
Map<String, String> params = new HashMap<>();
|
params.put("token", gatewayConf.getAccessToken());
|
params.put("gatewayId", gatewayConf.getGatewayId());
|
params.put("gatewayIp", gatewayConf.getGatewayIp());
|
params.put("gatewayMac", null == gatewayConf.getGatewayMac() ? "无" : gatewayConf.getGatewayMac());
|
params.put("gatewayCPU", null == gatewayConf.getGatewayCPU() ? "无" : gatewayConf.getGatewayCPU());
|
params.put("gatewayMem", null == gatewayConf.getGatewayMem() ? "无" : gatewayConf.getGatewayMem());
|
params.put("gatewayHardDisk", null == gatewayConf.getGatewayHardDisk() ? "无" : gatewayConf.getGatewayHardDisk());
|
params.put("timestamp", System.currentTimeMillis() + "");
|
|
String sign = getSign(params, gatewayConf);
|
params.put("sign", sign);
|
|
String url = gatewayConf.getApiUrl() + "reserver/api/iot/equipment/heartbeat";
|
|
String jsonStr = GatewayHttpUtil.doGet(url, params);
|
log.info("---推送网关信息-返回---{}", jsonStr);
|
|
} catch (Exception e) {
|
log.error("------推送网关信息--执行失败-----{}", e);
|
}
|
}
|
|
|
@Override
|
public void pushDeviceStatus(GatewayConf gatewayConf) {
|
|
//获取设备列表
|
Collection<GatewayDevice> list = GatewayUtils.allCacheDevice();
|
if (null == list || list.isEmpty()) {
|
log.info("--------系统未获取到当前系统设备列表,不执行状态推送-----");
|
return;
|
}
|
|
//封装设备报文信息
|
DeviceStatusData statusData;
|
String messageInfo;
|
String topic = ScConstant.TOPIC_EQUIPMENT_STATUS;
|
topic = topic.replace("{kqdm}", gatewayConf.getKqdm());
|
|
for (GatewayDevice device : list) {
|
//如果设备没有配置productId直接跳过
|
if (StringUtils.isEmpty(device.getProductId())) continue;
|
statusData = new DeviceStatusData();
|
statusData.setMessageId(ContextUtil.getUUID());
|
statusData.setApISource(ScConstant.API_SOURCE_TARGET_EQUIPMENT_STATUS);
|
statusData.setGatewayId(gatewayConf.getGatewayId());
|
statusData.setGatewayIp(gatewayConf.getGatewayIp());
|
statusData.setProductId(device.getProductId());
|
statusData.setEquipId(device.getDeviceId());
|
statusData.setReportTime(DateFormatUtils.format(new Date(), "yyyy-MM-dd hh:mm:ss"));
|
statusData.setMemo("FZZY");
|
|
//针对粮情设备
|
if (GatewayDeviceType.TYPE_07.getCode().equals(device.getType())) {
|
if (Constant.YN_N.equals(device.getStatus())) statusData.setStatus(0);
|
}
|
|
messageInfo = JSONObject.toJSONString(statusData);
|
//推送设备状态
|
kafkaDeviceReportService.publishWithTopic(messageInfo, topic);
|
|
}
|
}
|
|
|
public String getSign(Map<String, String> parames, GatewayConf gatewayConf) throws Exception {
|
|
//参数调整
|
String signContent = OpenApiSignatureUtils.getSignContent(parames);
|
log.debug("------待加密信息-----{}", signContent);
|
|
//MD5加密
|
String md5sign = OpenApiSignatureUtils.getMd5Content(signContent, charset);
|
|
//RSA加密
|
String singValue = OpenApiSignatureUtils.doSignByHex(gatewayConf.getGatewayId(), md5sign, charset, gatewayConf.getPublicKey(), gatewayConf.getPrivateKey());
|
log.debug("------RSA加密签名-----{}", singValue);
|
|
return singValue;
|
}
|
|
/**
|
* @param kqdm
|
* @return
|
*/
|
public GatewayConf getCacheConf(String kqdm) {
|
return gatewayConfService.getCacheConf(kqdm);
|
}
|
|
private void updateAuthToken(GatewayConf conf) {
|
gatewayConfService.updateCache(conf);
|
}
|
|
|
}
|