jiazx0107@163.com
2023-12-12 a9098372191b3c51995d41ee28404d1b71244d98
src/main/java/com/fzzy/gateway/hx2023/service/HxGatewayRemoteServiceImpl.java
@@ -1,23 +1,33 @@
package com.fzzy.gateway.hx2023.service;
import com.fzzy.api.data.AuthToken;
import com.alibaba.fastjson2.JSONObject;
import com.fzzy.api.Constant;
import com.fzzy.api.data.GatewayDeviceType;
import com.fzzy.api.data.PushProtocol;
import com.fzzy.api.entity.ApiLog;
import com.fzzy.api.utils.ContextUtil;
import com.fzzy.api.utils.RedisConst;
import com.fzzy.api.utils.RedisUtil;
import com.fzzy.api.view.repository.ApiLogRep;
import com.fzzy.gateway.GatewayUtils;
import com.fzzy.gateway.api.GatewayRemoteService;
import com.fzzy.gateway.data.GatewayResponse;
import com.fzzy.gateway.data.BaseResp;
import com.fzzy.gateway.entity.GatewayConf;
import com.fzzy.gateway.entity.GatewayDevice;
import com.fzzy.gateway.hx2023.ScConstant;
import com.fzzy.push.gb2022.HttpClientUtil;
import com.fzzy.gateway.hx2023.data.CloudResp;
import com.fzzy.gateway.hx2023.data.DeviceStatusData;
import com.fzzy.gateway.hx2023.data.HeartBeatData;
import com.fzzy.gateway.hx2023.kafka.KafkaDeviceReportService;
import com.fzzy.gateway.hx2023.util.OpenApiSignatureUtils;
import com.fzzy.gateway.service.GatewayConfService;
import com.fzzy.gateway.util.GatewayHttpUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateFormatUtils;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -28,10 +38,15 @@
public class HxGatewayRemoteServiceImpl implements GatewayRemoteService {
    String charset = "utf-8";
    @Resource
    private ApiLogRep apiLogRep;
    @Resource
    private RedisUtil redisUtil;
    private GatewayConfService gatewayConfService;
    @Resource
    private KafkaDeviceReportService kafkaDeviceReportService;
    @Override
@@ -40,79 +55,172 @@
    }
    @Override
    public GatewayResponse authorize(GatewayConf conf) {
    public void init(GatewayConf gatewayConf) {
        try {
        //添加LOG
        ApiLog apiLog = new ApiLog();
        apiLog.setData("鉴权接口");
        apiLog.setId(ContextUtil.getUUID());
        apiLog.setKqdm(conf.getKqdm());
            if (StringUtils.isEmpty(gatewayConf.getApiUrl())) {
                return;
            }
            //获取公私钥接口
            Map<String, String> params = new HashMap<>();
            params.put("appId", gatewayConf.getGatewayId());
            String url = gatewayConf.getApiUrl() + "reserver/api/key/apply";
            String jsonStr = GatewayHttpUtil.doGet(url, params);
            log.info("---获取公私钥接口-返回---{}", jsonStr);
            CloudResp respKey = JSONObject.parseObject(jsonStr, CloudResp.class);
            if (BaseResp.CODE_200 == respKey.getCode()) {
                JSONObject object = respKey.getData();
                String pubKey = (String) object.get("pubKey");
                String priKey = (String) object.get("priKey");
                gatewayConf.setPublicKey(pubKey);
                gatewayConf.setPrivateKey(priKey);
            }
            //获取 AccessToken 接口
            String sign = getSign(params, gatewayConf);
            params.put("sign", sign);
            url = gatewayConf.getApiUrl() + "reserver/api/token/apply";
            jsonStr = GatewayHttpUtil.doGet(url, params);
            log.info("---获取AccessToken接口-返回---{}", jsonStr);
            CloudResp respToken = JSONObject.parseObject(jsonStr, CloudResp.class);
            if (BaseResp.CODE_200 == respToken.getCode()) {
                JSONObject object = respKey.getData();
                if (null != object) {
                    String token = (String) object.get("token");
                    gatewayConf.setAccessToken(token);
                }
            }
            //更新缓存
            updateAuthToken(gatewayConf);
        } catch (Exception e) {
            log.error("------初始化失败-----{}", e);
        }
    }
    @Override
    public void heartbeat(GatewayConf gatewayConf) {
        log.info("------------定时心跳执行---------{}", gatewayConf.getKqmc());
        try {
            Map<String, Object> map = new HashMap<>();
            map.put("username", conf.getUserName());
            map.put("username", conf.getPassword());
            log.debug("-----------------数据报文----------------{}", map);
            gatewayConf = getCacheConf(gatewayConf.getKqdm());
            String url = conf.getApiUrl() + ScConstant.API_URL_AUTH;
            HeartBeatData heartBeatData = new HeartBeatData();
            heartBeatData.setMessageId(ContextUtil.getUUID());
            heartBeatData.setApISource(ScConstant.API_SOURCE_TARGET_EQUIPMENT_HEARTBEAT);
            heartBeatData.setGatewayId(gatewayConf.getGatewayId());
            heartBeatData.setGatewayIp(gatewayConf.getGatewayIp());
            heartBeatData.setGatewayMac(null == gatewayConf.getGatewayMac() ? "无" : gatewayConf.getGatewayMac());
            heartBeatData.setHeartbeat(1);
            heartBeatData.setReportTime(DateFormatUtils.format(new Date(), "yyyy-MM-dd hh:mm:ss"));
            heartBeatData.setMemo("FZZY");
            String topic = ScConstant.TOPIC_EQUIPMENT_HEARTBEAT;
            topic = topic.replace("{kqdm}", gatewayConf.getKqdm());
            String messageInfo = JSONObject.toJSONString(heartBeatData);
            kafkaDeviceReportService.publishWithTopic(messageInfo, topic);
            GatewayResponse responseDto = HttpClientUtil.pushGateway(url, map);
            apiLog.setStatus(responseDto.getStatus());
            apiLog.setResult(responseDto.getMessage());
            apiLogRep.save(apiLog);
            updateAuthToken(responseDto, conf);
            return responseDto;
        } catch (Exception e) {
            apiLog.setStatus(99);
            apiLog.setResult("失败:" + e.getMessage());
            apiLogRep.save(apiLog);
            log.error(e.getMessage(), e);
            return new GatewayResponse(99, e.getMessage());
            log.error("------网关心跳接口--执行失败-----{}", e);
        }
        //执行当前网关的设备状态推送
        log.info("------------定时设备状态执行---------{}", gatewayConf.getKqmc());
        pushDeviceStatus(gatewayConf);
    }
    @Override
    public void pushInfo(GatewayConf gatewayConf) {
        //DO NOTHING
    }
    @Override
    public void pushDeviceStatus(GatewayConf gatewayConf) {
        //获取设备列表
        Collection<GatewayDevice> list = GatewayUtils.allCacheDevice();
        if (null == list || list.isEmpty()) {
            log.info("--------系统未获取到当前系统设备列表,不执行状态推送-----");
            return;
        }
        //封装设备报文信息
        DeviceStatusData statusData;
        String messageInfo;
        String topic = ScConstant.TOPIC_EQUIPMENT_STATUS;
        topic = topic.replace("{kqdm}", gatewayConf.getKqdm());
        for (GatewayDevice device : list) {
            //如果设备没有配置productId直接跳过
            if (StringUtils.isEmpty(device.getProductId())) continue;
            statusData = new DeviceStatusData();
            statusData.setMessageId(ContextUtil.getUUID());
            statusData.setApISource(ScConstant.API_SOURCE_TARGET_EQUIPMENT_STATUS);
            statusData.setGatewayId(gatewayConf.getGatewayId());
            statusData.setGatewayIp(gatewayConf.getGatewayIp());
            statusData.setProductId(device.getProductId());
            statusData.setEquipId(device.getDeviceId());
            statusData.setReportTime(DateFormatUtils.format(new Date(), "yyyy-MM-dd hh:mm:ss"));
            statusData.setMemo("FZZY");
            //针对粮情设备
            if (GatewayDeviceType.TYPE_07.getCode().equals(device.getType())) {
                if (Constant.YN_N.equals(device.getStatus())) statusData.setStatus(0);
            }
            messageInfo = JSONObject.toJSONString(statusData);
            //推送设备状态
            kafkaDeviceReportService.publishWithTopic(messageInfo, topic);
        }
    }
    public String getSign(Map<String, String> parames, GatewayConf gatewayConf) throws Exception {
        //参数调整
        String signContent = OpenApiSignatureUtils.getSignContent(parames);
        log.debug("------待加密信息-----{}", signContent);
        //MD5加密
        String md5sign = OpenApiSignatureUtils.getMd5Content(signContent, charset);
        //RSA加密
        String singValue = OpenApiSignatureUtils.doSignByHex(gatewayConf.getGatewayId(), md5sign, charset, gatewayConf.getPublicKey(), gatewayConf.getPrivateKey());
        log.debug("------RSA加密签名-----{}", singValue);
        return singValue;
    }
    /**
     * @param kqdm
     * @return
     */
    public AuthToken getAuthToken(String kqdm) {
        try {
            String key = RedisConst.buildKey(RedisConst.KYE_TOKEN, kqdm);
            AuthToken token = (AuthToken) redisUtil.get(key);
            if (null == token) {
                log.error("------------------未获取到TOKEN---------------");
                return null;
            }
            return token;
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return null;
        }
    public GatewayConf getCacheConf(String kqdm) {
        return gatewayConfService.getCacheConf(kqdm);
    }
    private void updateAuthToken(GatewayResponse dto, GatewayConf conf) {
        String key = RedisConst.buildKey(RedisConst.KYE_TOKEN, conf.getKqdm());
        AuthToken token = getAuthToken(conf.getKqdm());
        if (null == token) {
            token = new AuthToken();
            token.setKqdm(conf.getKqdm());
        }
        if (null != dto.getResult()) {
            token.setToken(dto.getResult().getToken());
        }
        redisUtil.set(key, token);
    private void updateAuthToken(GatewayConf conf) {
        gatewayConfService.updateCache(conf);
    }