vince
2023-11-08 96f7af2f3bf9a36dd48e0e6bf4f8a8ca1e31ed7d
Merge remote-tracking branch 'orgin/igds-api-gateway' into igds-api-gateway

# Conflicts:
# src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
已删除5个文件
已修改20个文件
已添加20个文件
1818 ■■■■■ 文件已修改
src/main/java/com/fzzy/api/data/GatewayDeviceProtocol.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/api/utils/NumberUtil.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/GatewayRunner.java 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/api/GatewaySyncGranService.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/data/BaseResp.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/data/WeatherWebDto.java 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/entity/GatewayDevice.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/ScConstant.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/data/ClientHeaders.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/data/CloudHeaders.java 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/data/CloudSendData.java 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/data/GrainData.java 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/data/GrainOutPut.java 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/data/GrainTemp.java 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/data/GrainWeather.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/data/InputData.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/data/SyncReqData.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/service/HxGatewaySyncGrainImpl.java 110 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java 102 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceLed.java 117 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java 135 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml 48 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java 93 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttController.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttGatewayService.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java 91 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProperties.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttProviderConfig.java 101 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttPubController.java 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/mqtt/MqttPublishService.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-dev.yml 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application-devGateway.yml 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/fzzy/api/data/GatewayDeviceProtocol.java
@@ -18,7 +18,7 @@
    DEVICE_IDCARD_HTTP("DEVICE_IDCARD_HTTP", "身份证-HTTP协议"),
    DEVICE_LED_HTTP("DEVICE_LED_HTTP", "LED-HTTP协议"),
    DEVICE_LPR_SDK_HK("DEVICE_LPR_SDK_HK", "车牌识别-SDK海康"),
    DEVICE_TEST("DEVICE_TEST", "空协议");
    DEVICE_TEST("DEVICE_TEST", "测试协议");
    private String code;
src/main/java/com/fzzy/api/utils/NumberUtil.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,49 @@
package com.fzzy.api.utils;
import java.math.BigDecimal;
import java.text.DecimalFormat;
/**
 * æ•°å­—格式化工具类
 */
public class NumberUtil {
    /**
     * å¯¹double类型的数值保留指定位数的小数。<br>
     * è¯¥æ–¹æ³•舍入模式:向“最接近的”数字舍入,如果与两个相邻数字的距离相等,则为向上舍入的舍入模式。<br>
     * <b>注意:</b>如果精度要求比较精确请使用 keepPrecision(String number, int precision)方法
     * @param number  è¦ä¿ç•™å°æ•°çš„æ•°å­—
     * @param precision å°æ•°ä½æ•°
     * @return double å¦‚果数值较大,则使用科学计数法表示
     */
    public static double keepPrecision(Double number, int precision) {
        if(null == number || 0.0 == number ) return 0.0;
        BigDecimal bg = new BigDecimal(number);
        return bg.setScale(precision, BigDecimal.ROUND_HALF_UP).doubleValue();
    }
    /**
     * å¯¹float类型的数值保留指定位数的小数。<br>
     * è¯¥æ–¹æ³•舍入模式:向“最接近的”数字舍入,如果与两个相邻数字的距离相等,则为向上舍入的舍入模式。<br>
     * <b>注意:</b>如果精度要求比较精确请使用 keepPrecision(String number, int precision)方法
     * @param number  è¦ä¿ç•™å°æ•°çš„æ•°å­—
     * @param precision å°æ•°ä½æ•°
     * @return float å¦‚果数值较大,则使用科学计数法表示
     */
    public static float keepPrecision(Float number, int precision) {
        if(null == number) return 0f;
        BigDecimal bg = new BigDecimal(number);
        return bg.setScale(precision, BigDecimal.ROUND_HALF_UP).floatValue();
    }
    /**
     * double转字符串,避免出现科学计数法
     * @param d
     * @return
     */
    public static String doubleToStr(Double d) {
        if(null == d) return "";
        DecimalFormat df = new DecimalFormat("0.0");
        return df.format(d);
    }
}
src/main/java/com/fzzy/gateway/GatewayRunner.java
@@ -22,11 +22,20 @@
    private ApiInitService apiInitService;
    @Autowired
    private MqttPublishService mqttPublishService;
    @Autowired
    private  GatewayTimerScheduled scheduled;
    @Override
    public void run(String... args) throws Exception {
        mqttPublishService.init();
        //执行初始化方案
        apiInitService.init();
        //获取气象信息
        scheduled.doWeatherExe();
    }
}
src/main/java/com/fzzy/gateway/GatewayTimerScheduled.java
@@ -1,14 +1,18 @@
package com.fzzy.gateway;
import com.alibaba.fastjson.JSON;
import com.fzzy.api.data.ApiParam;
import com.fzzy.api.entity.ApiConfs;
import com.fzzy.api.service.*;
import com.fzzy.api.utils.ContextUtil;
import com.fzzy.api.utils.RedisUtil;
import com.fzzy.gateway.api.GatewayRemoteManager;
import com.fzzy.gateway.data.WeatherWebDto;
import com.fzzy.gateway.entity.GatewayConf;
import com.fzzy.gateway.service.GatewayConfService;
import com.fzzy.gateway.util.GatewayHttpUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
@@ -24,6 +28,12 @@
@Slf4j
@Component(GatewayTimerScheduled.BEAN_ID)
public class GatewayTimerScheduled {
    public static String DEFAULT_URL = "https://v0.yiketianqi.com/api?unescape=1&version=v61&appid={appId}&appsecret={appsecret}&cityid={cityid}";
    public static String DEFAULT_APP_ID = "49421971";
    public static String DEFAULT_APP_SECRET = "JmJE48Fv";
    public static String DEFAULT_CITYID = "101270101";//成都
    public static final String BEAN_ID = "gateway.timerScheduled";
@@ -46,6 +56,44 @@
    }
    /**
     * æ¯é—´éš”30分钟执行一次
     */
    @Scheduled(cron = "0 0/30 * * * ?")
    public void scheduled30() {
        //定时获取气象信息
        doWeatherExe();
    }
    public void doWeatherExe() {
        try {
            String url = DEFAULT_URL;
            url = url.replace("{appId}", DEFAULT_APP_ID).replace("{appsecret}", DEFAULT_APP_SECRET).replace("{cityid}", DEFAULT_CITYID);
            String result = GatewayHttpUtil.doGet(url, null);
            if (null == result) {
                log.error("当前外网获取气象信息失败……");
                return;
            }
            WeatherWebDto dto = JSON.parseObject(result, WeatherWebDto.class);
            if (StringUtils.isNotEmpty(dto.getErrcode())) {
                log.error("当前外网获取气象信息异常:{}", dto.getErrmsg());
                return;
            }
            WeatherWebDto.contextMap.put("default", dto);
            log.info("===========================系统定时获获取气象信息===={}==================",dto);
        } catch (Exception e) {
        }
    }
    /**
     * æ‰§è¡Œç½‘关心跳
src/main/java/com/fzzy/gateway/api/GatewaySyncGranService.java
@@ -1,6 +1,7 @@
package com.fzzy.gateway.api;
import com.fzzy.gateway.data.BaseResp;
import com.fzzy.gateway.hx2023.data.KafaGrainData;
import com.fzzy.gateway.hx2023.data.*;
@@ -25,4 +26,11 @@
    public KafaGrainData syncGrain(SyncReqData reqData);
    /**
     * åŒæ­¥ç²®æƒ…信息返回JSON报文
     *
     * @param reqData
     * @return
     */
    public BaseResp syncGrain2(SyncReqData reqData);
}
src/main/java/com/fzzy/gateway/data/BaseResp.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,14 @@
package com.fzzy.gateway.data;
import lombok.Data;
@Data
public class BaseResp {
    private int code = 200;
    private String msg = "成功";
    private String data;
}
src/main/java/com/fzzy/gateway/data/WeatherWebDto.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,51 @@
package com.fzzy.gateway.data;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
 *
 * @author: andy.jia
 * @description: å¤–网气象解析使用DTO
 * @version:
 * @data:2019å¹´12月7日
 *
 */
@Data
public class WeatherWebDto implements Serializable {
    public static Map<String,WeatherWebDto> contextMap = new HashMap<>();
    /**
     *
     */
    private static final long serialVersionUID = 1L;
    private String errcode;//错误编码
    private String errmsg;//错误信息
    private String cityid;// å½“前城市ID
    private String city;//城市名称
    private String update_time;// æ°”象台更新时间
    private String date;// æ—¥æœŸ
    private String week;// æ˜ŸæœŸ
    private String wea;// å¤©æ°”情况
    private String wea_img;// å¤©æ°”对应图标(xue, lei, shachen, wu, bingbao, yun, yu,yin, qing)
    private String tem;// å½“前温度
    private String air;// ç©ºæ°”质量
    private String air_pm25;// PM2.5
    private String air_level;// ç©ºæ°”质量等级
    private String air_tips;// ç©ºæ°”质量描述
    private String humidity;// æ¹¿åº¦
    private String visibility;// èƒ½è§åº¦
    private String pressure;// æ°”压hPa
    private String win;// é£Žå‘
    private String win_speed;// é£Žé€Ÿç­‰çº§
    private String win_meter;// é£Žé€Ÿ å¦‚: 12km/h
    private String alarm;// é¢„警信息
}
src/main/java/com/fzzy/gateway/entity/GatewayDevice.java
@@ -37,6 +37,10 @@
    @PropertyDef(label = "名称")
    private String deviceName;
    @Column(name = "PRODUCT_ID_", length = 50)
    @PropertyDef(label = "设备类型KEY")
    private String productId;
    @Column(name = "TYPE_", length = 10)
    @PropertyDef(label = "设备类型")
    private String type;
@@ -101,4 +105,13 @@
    @PropertyDef(label = "备注", description = "备注信息")
    private String remark;
    @Column(name = "CABLE_RULE_", length = 20)
    @PropertyDef(label = "布线规则", description = "平方仓表示层行列,筒仓表示每圈的列数")
    private String cableRule;
    @Column(name = "CABLE_CIR_", length = 20)
    @PropertyDef(label = "筒仓层规则", description = "针对筒仓")
    private String cableCir;
}
src/main/java/com/fzzy/gateway/hx2023/ScConstant.java
@@ -12,6 +12,13 @@
    public static String MESSAGE_TYPE_REPORT_PROPERTY = "REPORT_PROPERTY";
    public static String MESSAGE_TYPE_INVOKE_FUNCTION = "INVOKE_FUNCTION";
    /**
     * ç²®æƒ…采指令
     */
    public static String FUNCTION_getTAndRHInfo = "getTAndRHInfo";
    public static String getMessageId() {
        return System.currentTimeMillis() + RandomUtils.nextInt(1000) + "";
src/main/java/com/fzzy/gateway/hx2023/controller/GatewayController.java
@@ -4,12 +4,15 @@
import com.fzzy.api.utils.ContextUtil;
import com.fzzy.api.utils.RedisConst;
import com.fzzy.api.utils.RedisUtil;
import com.fzzy.gateway.entity.GatewayConf;
import com.fzzy.gateway.hx2023.data.GatewayAuthData;
import com.fzzy.gateway.service.GatewayConfService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
/**
 *
@@ -21,6 +24,9 @@
    @Resource
    private RedisUtil redisUtil;
    @Resource
    private GatewayConfService confService;
    /**
     * é‰´æƒæŽ¥å£
@@ -39,28 +45,44 @@
    public @ResponseBody
    JSONObject authorize(@RequestBody GatewayAuthData data) {
        log.debug("============鉴权==========={}--{}", data.getUsername(), data.getPassword());
        List<GatewayConf> list = confService.getCacheConfList();
        JSONObject json = new JSONObject();
        json.put("timestamp", System.currentTimeMillis());
        if (null == list || list.isEmpty()) {
            json.put("code", 500);
            json.put("message", "未获取网关信息");
            return json;
        }
        String gatewayId = null;
        for (GatewayConf conf : list) {
            if (data.getUsername().equals(conf.getGatewayUsername()) && data.getPassword().equals(conf.getGatewayPassword())) {
                gatewayId = conf.getGatewayId();
                break;
            }
        }
        if (null == gatewayId) {
            json.put("code", 500);
            json.put("message", "未匹配到用户名和密码");
            return json;
        }
        //TODO éªŒè¯ç”¨æˆ·åå’Œå¯†ç 
        String token = "fzzy-" + gatewayId;
        String token = ContextUtil.getUUID();
        log.debug("============鉴权==========={}--{}--{}", data.getUsername(), data.getPassword(), token);
        this.updateGatewayToken(token, data.getUsername());
        JSONObject json = new JSONObject();
        JSONObject result = new JSONObject();
        result.put("token", token);
        json.put("result", result);
        json.put("message", "成功");
        json.put("status", 0);
        json.put("code", 200);
        json.put("timestamp", System.currentTimeMillis());
        return json;
    }
src/main/java/com/fzzy/gateway/hx2023/data/ClientHeaders.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,12 @@
package com.fzzy.gateway.hx2023.data;
import lombok.Data;
@Data
public class ClientHeaders {
    private String productId;
    private String deviceName;
    private String orgId;
    private String msgId;
}
src/main/java/com/fzzy/gateway/hx2023/data/CloudHeaders.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,11 @@
package com.fzzy.gateway.hx2023.data;
import lombok.Data;
@Data
public class CloudHeaders {
    private String productId;
    private String deviceName;
    private String orgId;
}
src/main/java/com/fzzy/gateway/hx2023/data/CloudSendData.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,28 @@
package com.fzzy.gateway.hx2023.data;
import lombok.Data;
import java.util.List;
/**
 * äº‘端发动的报文
 */
@Data
public class CloudSendData {
    private CloudHeaders headers;
    private String functionId;
    private String messageType;
    private String messageId;
    private String deviceId;
    private String timestamp;
    private boolean success;
    private List<InputData> inputs;
}
src/main/java/com/fzzy/gateway/hx2023/data/GrainData.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,29 @@
package com.fzzy.gateway.hx2023.data;
import lombok.Data;
/**
 * ç²®æƒ…信息
 */
@Data
public class GrainData {
    //设备编码
    private String deviceId;
    private ClientHeaders headers;
    //消息 ID
    private String messageId;
    private String messageType = "INVOKE_FUNCTION_REPLY";
    private boolean success = true;
    private String timestamp;
    private GrainOutPut outPut;
    private GrainWeather weatherStation;
}
src/main/java/com/fzzy/gateway/hx2023/data/GrainOutPut.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,42 @@
package com.fzzy.gateway.hx2023.data;
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
import java.util.List;
/**
 * ç²®æƒ…信息
 */
@Data
public class GrainOutPut {
    //模块识别码-常量,固定传 apiTemperature
    private String apISource = "apiTemperature";
    //整仓平均温度
    private String avgTemperature;
    //整仓最高温度
    private String maxTemperature;
    //整仓最低温度
    private String minTemperature;
    private String minX = "0";
    private String minY = "0";
    private String minZ = "0";
    private String maxX = "0";
    private String maxY = "0";
    private String maxZ = "0";
    private List<GrainTemp> temperature;
}
src/main/java/com/fzzy/gateway/hx2023/data/GrainTemp.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,45 @@
package com.fzzy.gateway.hx2023.data;
import lombok.Data;
/**
 * ç²®æƒ…信息详细
 */
@Data
public class GrainTemp {
    //根号
    private String cableNum;
    //层号
    private String layerNumber;
    //温度值
    private String temperature;
    //索引,从0开始
    private String position;
//    //该温度点所在的列,平方仓、地下仓必填
//    private String linex;
//
//    //该温度点所在的行,平方仓、地下仓必填
//    private String rowy;
    //浅圆仓、筒仓必填,示例:{\"totalCircle\":3,\"smallCircle\":\"4,10,16\"},totalCircle:总圈数,smallCircle:每圈有几根缆
    private String total_circle;
    //具体圈数--浅圆仓、筒仓必填
    private String circle;
    public GrainTemp() {
    }
    public GrainTemp(String cableNum, String layerNumber, String temperature, String position) {
        this.cableNum = cableNum;
        this.layerNumber = layerNumber;
        this.temperature = temperature;
        this.position = position;
    }
}
src/main/java/com/fzzy/gateway/hx2023/data/GrainWeather.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,31 @@
package com.fzzy.gateway.hx2023.data;
import lombok.Data;
@Data
public class GrainWeather {
    private String airPressure;
    private String humidity;
    private String id;
    private String messageId;
    private String pm;
    private String radiation;
    private String rainfallAmount;
    private String temperature;
    private String windAngle;
    private String windDirection;
    private String windPower;
    private String windSpeed;
}
src/main/java/com/fzzy/gateway/hx2023/data/InputData.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,10 @@
package com.fzzy.gateway.hx2023.data;
import lombok.Data;
@Data
public class InputData {
    private String name;
    private String value;
}
src/main/java/com/fzzy/gateway/hx2023/data/SyncReqData.java
@@ -1,5 +1,6 @@
package com.fzzy.gateway.hx2023.data;
import com.fzzy.gateway.entity.GatewayDevice;
import lombok.Data;
@Data
@@ -16,4 +17,8 @@
    private String functionId;
    private String jsonData;
    private GatewayDevice device;
    private boolean autoReplay;
}
src/main/java/com/fzzy/gateway/hx2023/service/HxGatewaySyncGrainImpl.java
@@ -1,11 +1,21 @@
package com.fzzy.gateway.hx2023.service;
import com.alibaba.fastjson2.JSONObject;
import com.fzzy.api.data.GatewayDeviceProtocol;
import com.fzzy.api.utils.NumberUtil;
import com.fzzy.gateway.api.GatewaySyncGranService;
import com.fzzy.gateway.hx2023.data.KafaGrainData;
import com.fzzy.gateway.hx2023.data.SyncReqData;
import com.fzzy.gateway.data.BaseResp;
import com.fzzy.gateway.data.WeatherWebDto;
import com.fzzy.gateway.entity.GatewayDevice;
import com.fzzy.gateway.hx2023.ScConstant;
import com.fzzy.gateway.hx2023.data.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
 * ç½‘关与粮情分机通讯和解析
@@ -18,11 +28,105 @@
    @Override
    public String getGrainProtocol() {
        return null;
        return GatewayDeviceProtocol.DEVICE_TEST.getCode();
    }
    @Override
    public KafaGrainData syncGrain(SyncReqData reqData) {
        return null;
    }
    @Override
    public BaseResp syncGrain2(SyncReqData reqData) {
        GatewayDevice device = reqData.getDevice();
        if (StringUtils.isEmpty(device.getCableCir())) {
            return getGrainTest1(reqData, device);
}
        BaseResp resp = new BaseResp();
        resp.setCode(500);
        resp.setMsg("没有匹配到规则");
        return resp;
    }
    private BaseResp getGrainTest1(SyncReqData reqData, GatewayDevice device) {
        String[] cableRule = device.getCableRule().split("-");
        int cableZ = Integer.valueOf(cableRule[0]);
        int cableY = Integer.valueOf(cableRule[1]);
        int cableX = Integer.valueOf(cableRule[2]);
        int sumNum = cableZ * cableY * cableX;
        WeatherWebDto weather = WeatherWebDto.contextMap.get("default");
        double tMIn = 20, tMax = 25;
        if (null != weather) {
            double tOut = Double.valueOf(weather.getTem());
            tMIn = tOut - 2;
            tMax = tOut + 3;
        }
        //数据封装
        GrainData grain = new GrainData();
        grain.setMessageId(ScConstant.getMessageId());
        grain.setDeviceId(device.getDeviceId());
        grain.setTimestamp(System.currentTimeMillis() + "");
        ClientHeaders headers = new ClientHeaders();
        headers.setDeviceName(device.getDeviceName());
        headers.setProductId(device.getProductId());
        headers.setOrgId(device.getOrgId());
        headers.setMsgId(ScConstant.getMessageId());
        grain.setHeaders(headers);
        GrainOutPut outPut = new GrainOutPut();
        outPut.setAvgTemperature(NumberUtil.keepPrecision((tMax + tMIn) / 2, 1) + "");
        outPut.setMinTemperature(tMax + "");
        outPut.setMaxTemperature(tMIn + "");
        List<GrainTemp> temperature = new ArrayList<>();
        //根号
        int cableNum = 1, position = 0;
        double curTemp = tMIn;
        double randomNumber = tMIn;
        int x = 0, y = 0, z = 0;
        for (int i = 0; i < sumNum; i++) {
            randomNumber = Math.random() * (tMax - tMIn + 1) + tMIn;
            curTemp = NumberUtil.keepPrecision(randomNumber, 1);
            position = i;
            z = i % cableZ + 1;
            x = i / (cableZ * cableY);
            y = x * (cableZ * cableY);
            y = (i - y) / cableZ;
            //根号
            cableNum = (i / cableZ) + 1;
            temperature.add(new GrainTemp(cableNum + "", z + "", curTemp + "", position + ""));
        }
        outPut.setTemperature(temperature);
        grain.setOutPut(outPut);
        BaseResp resp = new BaseResp();
        resp.setData(JSONObject.toJSONString(grain));
        return resp;
    }
}
src/main/java/com/fzzy/gateway/hx2023/service/OnReceiveMqttService.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,82 @@
package com.fzzy.gateway.hx2023.service;
import com.alibaba.fastjson2.JSONObject;
import com.fzzy.gateway.GatewayUtils;
import com.fzzy.gateway.api.GatewayRemoteManager;
import com.fzzy.gateway.hx2023.data.CloudSendData;
import com.fzzy.gateway.entity.GatewayDevice;
import com.fzzy.gateway.hx2023.ScConstant;
import com.fzzy.gateway.data.BaseResp;
import com.fzzy.gateway.hx2023.data.SyncReqData;
import com.fzzy.mqtt.MqttProviderConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * æ ¹æ®æŽ¥å—到的MQTT信息执行
 */
@Slf4j
@Component
public class OnReceiveMqttService {
    @Resource
    private GatewayRemoteManager gatewayRemoteManager;
    @Resource
    private MqttProviderConfig providerClient;
    /**
     * å½“前接收到云端发送信息
     *
     * @param message
     */
    public void onReceiveMessage(String message) {
        try {
            CloudSendData cloudSendData = JSONObject.parseObject(message, CloudSendData.class);
            String functionId = cloudSendData.getFunctionId();
            //粮情采集
            if (ScConstant.FUNCTION_getTAndRHInfo.equals(functionId)) {
                getTAndRHInfo(cloudSendData);
            }
        } catch (Exception e) {
        }
    }
    private void getTAndRHInfo(CloudSendData cloudSendData) {
        String deviceId = cloudSendData.getDeviceId();
        GatewayDevice device = GatewayUtils.getCacheByDeviceId(deviceId);
        SyncReqData syncReqData = new SyncReqData();
        syncReqData.setDeviceId(deviceId);
        syncReqData.setMessageId(cloudSendData.getMessageId());
        syncReqData.setMessageType(cloudSendData.getMessageType());
        syncReqData.setFunctionId(cloudSendData.getFunctionId());
        syncReqData.setAutoReplay(true);
        syncReqData.setDevice(device);
        BaseResp resp = gatewayRemoteManager.getSyncGrainService(device.getSyncProtocol()).syncGrain2(syncReqData);
        //自动推送
        if (200 == resp.getCode() && syncReqData.isAutoReplay()) {
            String topic = "/${productId}/${deviceId}/properties/report";
            topic = topic.replace("${productId}", device.getProductId()).replace("${deviceId}", device.getDeviceId());
            providerClient.publish(topic, resp.getData());
            log.info("=======粮情推送==========={}", resp.getData());
        }
    }
}
src/main/java/com/fzzy/gateway/hx2023/websocket/SocketService.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageReport.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,50 @@
package com.fzzy.gateway.hx2023.websocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
/**
 * ä¸‹å‘调用粮温检测接口指令 topic
 */
@Slf4j
@Component
@ServerEndpoint(value = "/{productId}/{deviceId}/properties/report")
public class WebSockDeviceMessageReport {
    @OnOpen
    public void onOpen(Session session,
                       @PathParam("productId") String productId,
                       @PathParam("deviceId") String deviceId
    ) throws Exception {
        log.info("--------下发调用粮温检测接口指令 topic------");
    }
    @OnClose
    public void onClose() {
        log.info("WebSocket连接关闭={}");
    }
    /**
     * æ”¶åˆ°å‰ç«¯å‘送的信息
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("来自前端的信息:\n" + message);
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
    }
}
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSockDeviceMessageSender.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,50 @@
package com.fzzy.gateway.hx2023.websocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
/**
 * ä¸‹å‘调用粮温检测接口指令 topic
 */
@Slf4j
@Component
@ServerEndpoint(value = "/device-message-sender/{productId}/{deviceId}")
public class WebSockDeviceMessageSender {
    @OnOpen
    public void onOpen(Session session,
                       @PathParam("productId") String productId,
                       @PathParam("deviceId") String deviceId
    ) throws Exception {
        log.info("--------下发调用粮温检测接口指令 topic------");
    }
    @OnClose
    public void onClose() {
        log.info("WebSocket连接关闭={}");
    }
    /**
     * æ”¶åˆ°å‰ç«¯å‘送的信息
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("来自前端的信息:\n" + message);
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
    }
}
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketDeviceLed.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/gateway/hx2023/websocket/WebSocketMqtt.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/gateway/service/GatewayDeviceService.java
@@ -16,7 +16,6 @@
import com.fzzy.gateway.entity.GatewayDevice;
import com.fzzy.gateway.hx2023.ScConstant;
import com.fzzy.gateway.hx2023.data.*;
import com.fzzy.gateway.hx2023.websocket.WebSocketDeviceReport;
import com.fzzy.gateway.service.repository.GatewayDeviceRep;
import com.fzzy.mqtt.MqttPublishService;
@@ -62,28 +61,27 @@
    /**
     * gatewayDeviceService#updateSave
     *
     * @param entity
     * @param data
     */
    @DataResolver
    public void updateSave(GatewayDevice entity) {
        GatewayDevice data = new GatewayDevice();
        BeanUtils.copyProperties(entity, data);
    public void updateSave(GatewayDevice data) {
        GatewayDevice data2 = new GatewayDevice();
        BeanUtils.copyProperties(data, data2);
        if (null == data.getId()) {
            data.setId(ContextUtil.getUUID());
        }
        if (null == data.getDeviceSn()) {
            if (null != entity.getIp()) {
                data.setDeviceSn(entity.getIp());
        if (null == data2.getDeviceSn()) {
            if (null != data2.getIp()) {
                data.setDeviceSn(data2.getIp());
            } else {
                data.setDeviceSn(data.getDeviceId());
                data.setDeviceSn(data2.getDeviceId());
            }
        }
        gatewayDeviceRep.save(data);
        if (null == data2.getId()) {
            data2.setId(ContextUtil.getUUID());
            gatewayDeviceRep.save(data2);
        }else{
            gatewayDeviceRep.save(data2);
        }
        flushCache();
    }
@@ -105,10 +103,6 @@
    }
    /**
     * gatewayDeviceService#flushCache
     */
    @Expose
    public void flushCache() {
        List<GatewayDevice> list = listAll();
        if (null == list || list.isEmpty()) return;
@@ -119,6 +113,9 @@
    /**
     * gatewayDeviceService#ajaxTestWeight
     * åœ°ç£…推送测试
src/main/java/com/fzzy/gateway/service/GatewayDeviceService2.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,62 @@
package com.fzzy.gateway.service;
import com.bstek.dorado.annotation.DataResolver;
import com.fzzy.api.utils.ContextUtil;
import com.fzzy.gateway.GatewayUtils;
import com.fzzy.gateway.entity.GatewayDevice;
import com.fzzy.gateway.service.repository.GatewayDeviceRep;
import org.springframework.beans.BeanUtils;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Component
public class GatewayDeviceService2 {
    @Resource
    private GatewayDeviceRep gatewayDeviceRep;
    public List<GatewayDevice> listAll() {
        Sort sort = new Sort(Sort.Direction.ASC, "deviceId");
        return gatewayDeviceRep.findAll(sort);
    }
    /**
     * gatewayDeviceService2#updateSave
     *
     * @param data
     */
    @DataResolver
    public void updateSave(GatewayDevice data) {
        GatewayDevice data2 = new GatewayDevice();
        BeanUtils.copyProperties(data, data2);
        if (null == data2.getDeviceSn()) {
            if (null != data2.getIp()) {
                data.setDeviceSn(data2.getIp());
            } else {
                data.setDeviceSn(data2.getDeviceId());
            }
        }
        if (null == data2.getId()) {
            data2.setId(ContextUtil.getUUID());
            gatewayDeviceRep.save(data2);
        } else {
            gatewayDeviceRep.save(data2);
        }
        flushCache();
    }
    public void flushCache() {
        List<GatewayDevice> list = listAll();
        if (null == list || list.isEmpty()) return;
        for (GatewayDevice device : list) {
            GatewayUtils.add2Cache(device);
        }
    }
}
src/main/java/com/fzzy/gateway/util/GatewayHttpUtil.java
@@ -9,12 +9,8 @@
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Map;
import java.util.Set;
/**
 * ç½‘关专用HTTP请求工具类
src/main/java/com/fzzy/gateway/util/GatewayRSAUtils.java
@@ -1,8 +1,6 @@
package com.fzzy.gateway.util;
import lombok.extern.slf4j.Slf4j;
import javax.crypto.Cipher;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
@@ -15,7 +13,6 @@
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class GatewayRSAUtils {
    /**
     * RSA最大加密明文大小 2048/8-11
src/main/java/com/fzzy/gateway/view/GatewayConf.view.xml
@@ -171,12 +171,6 @@
        <Property name="exClassName">toolbar-button-push</Property>
      </ToolBarButton>
      <ToolBarButton>
        <ClientEvent name="onClick">view.get(&quot;#dialogLpr&quot;).show();</ClientEvent>
        <Property name="caption">车牌推送测试</Property>
        <Property name="iconClass">fa fa-search</Property>
        <Property name="exClassName">toolbar-button-push</Property>
      </ToolBarButton>
      <ToolBarButton>
        <Property name="caption">网关初始化</Property>
        <Property name="iconClass">fa fa-search</Property>
        <Property name="exClassName">toolbar-button-push</Property>
@@ -365,7 +359,7 @@
      <Buttons>
        <Button>
          <ClientEvent name="onClick">var data = view.get(&quot;#dsQuery.data&quot;);&#xD;
view.get(&quot;#ajaxTestGrain&quot;).set(&quot;parameter&quot;,data).execute(function(result){&#xD;
view.get(&quot;#ajaxTestGrain&quot;).set(&quot;parameter&quot;,data.toJSON()).execute(function(result){&#xD;
    self.get(&quot;parent&quot;).hide();&#xD;
    $alert(result);&#xD;
});</ClientEvent>
@@ -404,7 +398,7 @@
      <Buttons>
        <Button>
          <ClientEvent name="onClick">var data = view.get(&quot;#dsQuery.data&quot;);&#xD;
view.get(&quot;#ajaxTestWeight&quot;).set(&quot;parameter&quot;,data).execute(function(result){&#xD;
view.get(&quot;#ajaxTestWeight&quot;).set(&quot;parameter&quot;,data.get(&quot;weight&quot;)).execute(function(result){&#xD;
    self.get(&quot;parent&quot;).hide();&#xD;
    $alert(result);&#xD;
});</ClientEvent>
src/main/java/com/fzzy/gateway/view/GatewayDevice.view.xml
@@ -107,8 +107,20 @@
        <Property name="label">设备密码</Property>
      </PropertyDef>
      <PropertyDef name="depotIdSys">
        <Property></Property>
        <Property name="label">自定义仓库编码</Property>
      </PropertyDef>
      <PropertyDef name="productId">
        <Property></Property>
        <Property name="label">设备类型KEY</Property>
      </PropertyDef>
      <PropertyDef name="cableRule">
        <Property/>
        <Property name="label">库区系统仓库编码</Property>
        <Property name="label">布线规则</Property>
      </PropertyDef>
      <PropertyDef name="cableCir">
        <Property/>
        <Property name="label">筒仓层规则</Property>
      </PropertyDef>
    </DataType>
  </Model>
@@ -182,6 +194,9 @@
        <Property name="property">type</Property>
        <Property name="align">center</Property>
      </DataColumn>
      <DataColumn name="productId">
        <Property name="property">productId</Property>
      </DataColumn>
      <DataColumn name="deviceId">
        <Property name="property">deviceId</Property>
        <Property name="align">center</Property>
@@ -203,8 +218,7 @@
      <Property name="closeable">false</Property>
      <Buttons>
        <Button>
          <ClientEvent name="onClick">var cur = view.get(&quot;#dgMain&quot;).getCurrentItem();&#xD;
view.get(&quot;#updateSave&quot;).execute(function(){&#xD;
          <ClientEvent name="onClick">view.get(&quot;#updateSave&quot;).execute(function(){&#xD;
    self.get(&quot;parent&quot;).hide();&#xD;
});</ClientEvent>
          <Property name="caption">保存修改</Property>
@@ -250,6 +264,11 @@
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
              <Property name="name">productId</Property>
              <Property name="property">productId</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
              <Property name="name">orgId</Property>
              <Property name="property">orgId</Property>
              <Editor/>
@@ -257,6 +276,11 @@
            <AutoFormElement>
              <Property name="name">depotId</Property>
              <Property name="property">depotId</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
              <Property name="name">depotIdSys</Property>
              <Property name="property">depotIdSys</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
@@ -310,15 +334,20 @@
              <Property name="property">httpUrl</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
              <Property name="name">depotIdSys</Property>
              <Property name="property">depotIdSys</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement layoutConstraint="colSpan:3">
              <Property name="name">remark</Property>
              <Property name="property">remark</Property>
              <Property name="editorType">TextArea</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
              <Property name="name">cableRule</Property>
              <Property name="property">cableRule</Property>
              <Editor/>
            </AutoFormElement>
            <AutoFormElement>
              <Property name="name">cableCir</Property>
              <Property name="property">cableCir</Property>
              <Editor/>
            </AutoFormElement>
          </AutoForm>
@@ -327,10 +356,11 @@
      <Tools/>
    </Dialog>
    <UpdateAction id="updateSave">
      <Property name="dataResolver">gatewayDeviceService#updateSave</Property>
      <Property name="dataResolver">gatewayDeviceService2#updateSave</Property>
      <UpdateItem>
        <Property name="dataPath">[#current]</Property>
        <Property name="dataSet">dsMain</Property>
        <Property name="alias">data</Property>
      </UpdateItem>
    </UpdateAction>
    <AjaxAction id="ajaxDel">
src/main/java/com/fzzy/mqtt/MqttConsumerCallBack.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,53 @@
package com.fzzy.mqtt;
import com.fzzy.gateway.hx2023.service.OnReceiveMqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqttConsumerCallBack implements MqttCallback {
    @Autowired
    private OnReceiveMqttService onReceiveMqttService;
    /**
     * å®¢æˆ·ç«¯æ–­å¼€è¿žæŽ¥çš„回调
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("与服务器断开连接,可重连");
    }
    /**
     * æ¶ˆæ¯åˆ°è¾¾çš„回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String messageStr = new String(message.getPayload());
        log.info(String.format("接收消息主题 : %s", topic));
        log.info(String.format("接收消息Qos : %d", message.getQos()));
        log.info(String.format("接收消息内容 : %s", messageStr));
        log.info(String.format("接收消息retained : %b", message.isRetained()));
        onReceiveMqttService.onReceiveMessage(messageStr);
    }
    /**
     * æ¶ˆæ¯å‘布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info(String.format("接收消息成功"));
    }
}
src/main/java/com/fzzy/mqtt/MqttConsumerConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,93 @@
package com.fzzy.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
@Slf4j
public class MqttConsumerConfig {
    @Autowired
    private MqttProperties mqttProperties;
    @Autowired
    private MqttConsumerCallBack mqttConsumerCallBack;
    /**
     * å®¢æˆ·ç«¯å¯¹è±¡
     */
    private MqttClient client;
    /**
     * åœ¨bean初始化后连接到服务器
     */
    @PostConstruct
    public void init() {
        connect();
    }
    /**
     * å®¢æˆ·ç«¯è¿žæŽ¥æœåŠ¡ç«¯
     */
    public void connect() {
        try {
            //创建MQTT客户端对象
            client = new MqttClient(mqttProperties.getHost(), mqttProperties.getClientInId(), new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            //设置为true表示每次连接到服务端都是以新的身份
            options.setCleanSession(true);
            //设置连接用户名
            options.setUserName(mqttProperties.getUsername());
            //设置连接密码
            options.setPassword(mqttProperties.getPassword().toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(10);
            //设置心跳时间 å•位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic", (mqttProperties.getClientInId() + "与服务器断开连接").getBytes(), 0, false);
            //设置回调
            // client.setCallback(new MqttConsumerCallBack());
            client.setCallback(mqttConsumerCallBack);
            client.connect(options);
            //订阅主题
            //消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
            int[] qos = {1, 1};
            //主题
            String[] topics = mqttProperties.getTopics().split(",");
            //订阅主题
            client.subscribe(topics, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * æ–­å¼€è¿žæŽ¥
     */
    public void disConnect() {
        try {
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    /**
     * è®¢é˜…主题
     */
    public void subscribe(String topic, int qos) {
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/fzzy/mqtt/MqttController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,30 @@
package com.fzzy.mqtt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class MqttController {
    @Autowired
    private MqttConsumerConfig client;
    @Autowired
    private MqttProperties mqttProperties;
    @RequestMapping("/connect")
    public @ResponseBody
    String connect() {
        client.connect();
        return mqttProperties.getClientOutId() + "连接到服务器";
    }
    @RequestMapping("/disConnect")
    @ResponseBody
    public String disConnect() {
        client.disConnect();
        return mqttProperties.getClientOutId() + "与服务器断开连接";
    }
}
src/main/java/com/fzzy/mqtt/MqttGatewayService.java
@@ -1,6 +1,5 @@
package com.fzzy.mqtt;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
src/main/java/com/fzzy/mqtt/MqttInboundConfiguration.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttOutboundConfiguration.java
ÎļþÒÑɾ³ý
src/main/java/com/fzzy/mqtt/MqttProperties.java
@@ -22,39 +22,44 @@
    /**
     * ç”¨æˆ·å
     */
    private String clientUsername;
    private String username;
    /**
     * å¯†ç 
     */
    private String clientPassword;
    private String password;
    /**
     * å®¢æˆ·ç«¯Id,同一台服务器下,不允许出现重复的客户端id
     * å®¢æˆ·ç«¯Id-发布者Id
     */
    private String clientId;
    private String clientOutId;
    /**
     * å®¢æˆ·ç«¯Id-被订阅者Id
     */
    private String clientInId;
    /**
     * è¶…æ—¶æ—¶é—´
     */
    private int clientTimeout = 5000;
    private int timout = 5000;
    /**
     * è®¾ç½®ä¼šè¯å¿ƒè·³æ—¶é—´ å•位为秒 æœåŠ¡å™¨ä¼šæ¯éš”1.5*20秒的时间向客户端
     * å‘送个消息判断客户端是否在线,但这个方法并没有重连的机制
     */
    private int clientAliveTime = 30000;
    private int keepAliveInterval = 20;
    private int clientMaxConnectTime;
    private int maxConnectTimes = 5;
    private String clientTopics;
    private String topics;
    /**
     * è¿žæŽ¥æ–¹å¼
     */
    private Integer clientQos = 0;
    private Integer qos = 0;
    /**
     * é»˜è®¤è¿žæŽ¥ä¸»é¢˜ï¼Œä»¥/#结尾表示订阅所有以test开头的主题
src/main/java/com/fzzy/mqtt/MqttProviderCallBack.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,37 @@
package com.fzzy.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@Slf4j
public class MqttProviderCallBack implements MqttCallback {
    /**
     * å®¢æˆ·ç«¯æ–­å¼€è¿žæŽ¥çš„回调
     */
    @Override
    public void connectionLost(Throwable throwable) {
        System.out.println("与服务器断开连接,可重连");
    }
    /**
     * æ¶ˆæ¯åˆ°è¾¾çš„回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println(String.format("接收消息主题 : %s", topic));
        System.out.println(String.format("接收消息Qos : %d", message.getQos()));
        System.out.println(String.format("接收消息内容 : %s", new String(message.getPayload())));
        System.out.println(String.format("接收消息retained : %b", message.isRetained()));
    }
    /**
     * æ¶ˆæ¯å‘布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println(String.format("接收消息成功"));
    }
}
src/main/java/com/fzzy/mqtt/MqttProviderConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,101 @@
package com.fzzy.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
@Slf4j
public class MqttProviderConfig {
    @Autowired
    private MqttProperties mqttProperties;
    /**
     * å®¢æˆ·ç«¯å¯¹è±¡
     */
    private MqttClient client;
    /**
     *
     * åœ¨bean初始化后连接到服务器
     */
    @PostConstruct
    public void init(){
        connect();
    }
    /**
     * å®¢æˆ·ç«¯è¿žæŽ¥æœåŠ¡ç«¯
     */
    public void connect(){
        try{
            //创建MQTT客户端对象
            client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientOutId(),new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            //设置为true表示每次连接服务器都是以新的身份
            options.setCleanSession(true);
            //设置连接用户名
            options.setUserName(mqttProperties.getUsername());
            //设置连接密码
            options.setPassword(mqttProperties.getPassword().toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(100);
            //设置心跳时间 å•位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(20);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
            options.setWill("willTopic",(mqttProperties.getClientOutId() + "与服务器断开连接").getBytes(),0,false);
            //设置回调
            client.setCallback(new MqttProviderCallBack());
            client.connect(options);
        } catch(MqttException e){
            e.printStackTrace();
        }
    }
    public void publish(String topic,String message){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(mqttProperties.getQos());
        mqttMessage.setRetained(true);
        mqttMessage.setPayload(message.getBytes());
        //主题的目的地,用于发布/订阅信息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度
        //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
        MqttDeliveryToken token;
        try {
            //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
            //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
    public void publish(int qos,boolean retained,String topic,String message){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        mqttMessage.setPayload(message.getBytes());
        //主题的目的地,用于发布/订阅信息
        MqttTopic mqttTopic = client.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度
        //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
        MqttDeliveryToken token;
        try {
            //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
            //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}
src/main/java/com/fzzy/mqtt/MqttPubController.java
@@ -2,34 +2,25 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MqttPubController {
    @Autowired
    private MqttGatewayService gatewayService;
    private MqttProviderConfig providerClient;
    @RequestMapping("/hello")
    public String hello() {
        return "hello!";
    @RequestMapping("/sendMessage")
    public @ResponseBody
    String sendMessage(String topic, String message) {
        try {
            providerClient.publish(topic, message);
            return "发送成功";
        } catch (Exception e) {
            e.printStackTrace();
            return "发送失败";
    }
    @RequestMapping("/sendMqtt")
    public String sendMqtt(String sendData) {
        System.out.println(sendData);
        System.out.println("进入sendMqtt-------" + sendData);
        gatewayService.sendToMqtt("topic01", (String) sendData);
        return "Test is OK";
    }
    @RequestMapping("/sendMqttTopic")
    public String sendMqtt(String sendData, String topic) {
        //System.out.println(sendData+"   "+topic);
        //System.out.println("进入inbound发送:"+sendData);
        gatewayService.sendToMqtt(topic, (String) sendData);
        return "Test is OK";
    }
}
src/main/java/com/fzzy/mqtt/MqttPublishService.java
@@ -9,6 +9,7 @@
@Service
public class MqttPublishService {
    private static MqttClient client ;
src/main/resources/application-dev.yml
@@ -96,15 +96,3 @@
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
mqtt:
  host: tcp://10.13.4.84:11883
  client-id:
  client-username:
  client-password:
  client-timeout: 10
  client-alive-time: 20
  client-max-connect-times: 5
  client-topics:
  client-qos: 0
  isOpen: false
src/main/resources/application-devGateway.yml
@@ -70,13 +70,16 @@
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  mqtt:
    host: tcp://10.13.4.84:11883
    client-id:
    client-username: admin
    client-password: 123456
    client-timeout: 10
    client-alive-time: 20
    client-max-connect-times: 5
    client-topics: "/device/${productId}/${deviceId}/message/property/report,/device-message-sender/${productId}/${deviceId}"
    client-qos: 0
    host: tcp://127.0.0.1:1883
    username: admin
    password: pwdmqtt..
    client-outId: fzzy-customer-id
    client-inId: fzzy-6e3d92ff71b911eea5e50250f2000002
    timeout: 10
    keep-alive-interval: 20
    max-connect-times: 5
    qos: 0
    isOpen: false
    default:
      topic: testTopic
    topics: "/+/+/properties/report,/device-message-sender/+/+"