/** * MQTT工作模块 (mqttWorker.js) * * 功能说明: * - 管理MQTT客户端连接 * - 自动重连机制 * - 心跳保活 * - 消息订阅和发布 * - 事件通知 */ import std from '../../dxmodules/dxStd.js' import bus from '../../dxmodules/dxEventBus.js' import config from '../../dxmodules/dxConfig.js' import mqtt from '../../dxmodules/dxMqttClient.js' import logger from '../../dxmodules/dxLogger.js' import map from '../../dxmodules/dxMap.js' import weComService from '../service/weComService.js' import driver from '../driver.js' // 网络状态跟踪变量 let shouldBeReinit = false // 是否需要重新初始化 const mqtt_map = map.get("MQTT") const net_map = map.get("NET") const client_map = map.get("CLIENT") /** * 获取所有需要订阅的MQTT主题 * * @returns {Array} 订阅主题列表 */ function getTopics() { // 获取所有订阅的topic let sn = config.get("sys.sn") const topics = [ "control", "getConfig", "setConfig", "upgradeFirmware", "test", "getPermission", "insertPermission", "delPermission", "clearPermission", "modifyPermission", "getKey", "insertKey", "delKey", "clearKey", "modifyKey", "getUser", "insertUser", "delUser", "clearUser", "modifyUser", "getSecurity", "insertSecurity", "delSecurity", "clearSecurity", "getRecords", "delRecords" ] const eventReplies = ["connect_reply", "alarm_reply", "access_reply", "access_online_reply", "wecom_reply"] let prefix = config.get("mqttInfo.prefix") || '' let flag = prefix + 'access_device/v2/cmd/' + sn + "/" let eventFlag = prefix + 'access_device/v2/event/' + sn + "/" return topics.map(item => flag + item).concat(eventReplies.map(item => eventFlag + item)); } /** * 建立MQTT连接 * * 使用阻塞式连接,直到连接成功为止 */ function connect() { try { if (net_map.get("NET_STATUS") != "connected") { return } // 获取连接配置 const username = config.get("mqtt.username") const password = config.get("mqtt.password") const willTopic = config.get("mqtt.willTopic") const qos = config.get("mqtt.qos") mqtt.connect({ username: username, password: password, will: { topic: willTopic, payload: JSON.stringify({ serialNo: std.genRandomStr(10), uuid: config.get("sys.uuid"), sign: "", code: "000000", time: Math.floor(new Date().getTime() / 1000) }), qos, retained: true, }, cleanSession: config.get("mqtt.cleanSession") ? true : false }); // 每次重新连接成功后需要重新订阅 getTopics().forEach(v => { mqtt.subscribe(v, { qos }); }) } catch (error) { logger.error("MQTT connection error,retry in 5s:"); } } let heartTimer = null /** * 启动心跳定时器 */ function restartHeartTimer() { if (heartTimer) { std.clearInterval(heartTimer) heartTimer = null } if (config.get("sys.heart_en")) { heartTimer = std.setInterval(() => { mqtt.publish("access_device/v2/event/heartbeat", JSON.stringify({ serialNo: std.genRandomStr(10), uuid: config.get("sys.uuid"), sign: '', code: "000000", time: Math.floor(new Date().getTime() / 1000) })) }, config.get("sys.heart_time") * 1000) } } /** * MQTT初始化主函数 */ function run() { let clientId = config.get('mqtt.clientId') + (config.get("mqtt.clientIdSuffix") == 1 ? std.genRandomStr(3) : "") let addr = weComService.isWeCom() ? weComService.getAddr() : config.get("mqtt.addr") logger.info("MQTT CONNECTION INFO\naddr:", addr + "\nclientId:" + clientId); // 初始化MQTT客户端 mqtt.init(addr, clientId); // 设置MQTT回调函数 mqtt.setCallbacks({ onMessage: (topic, payload, qos, retained) => { //logger.info(`Message received: topic=${topic}, payload=${payload}, qos=${qos}, retained=${retained}`); bus.fire(driver.mqtt.RECEIVE_MSG, { topic, payload }) }, onDelivery: (token) => { logger.info(`Message delivery confirmed, token: ${token}`); }, onConnectionLost: (cause) => {//disconnect不会触发onConnectionLost事件 logger.error(`Connection lost: ${cause}`); bus.fire(driver.mqtt.CONNECTED_CHANGED, "disconnected") mqtt_map.put("MQTT_STATUS", "disconnected") }, onConnectSuccess: () => { logger.info("Connected to broker"); bus.fire(driver.mqtt.CONNECTED_CHANGED, "connected") mqtt_map.put("MQTT_STATUS", "connected") client_map.put("CLIENT_ID", clientId) restartHeartTimer() } }); // 建立连接 connect() } /** * MQTT重连函数 */ function reconnect() { logger.info("MQTT reconnect"); mqtt.disconnect() bus.fire(driver.mqtt.CONNECTED_CHANGED, "disconnected") mqtt_map.put("MQTT_STATUS", "disconnected") mqtt.deinit() // 重新初始化MQTT run() } /** * 注册事件监听器 */ function events() { // 监听重连事件 bus.on(driver.mqtt.REINIT, () => { shouldBeReinit = true }) // 监听发送消息事件 bus.on(driver.mqtt.SEND_MSG, (data) => { const { topic, payload } = data if (mqtt.getNative() && mqtt.isConnected()) { mqtt.publish(topic, payload) } }) // 监听重启心跳事件 bus.on(driver.mqtt.RESTART_HEARTBEAT, restartHeartTimer) bus.on(driver.net.CONNECTED_CHANGED, (status) => { if (status == "disconnected") { bus.fire(driver.mqtt.CONNECTED_CHANGED, "disconnected") mqtt_map.put("MQTT_STATUS", "disconnected") mqtt.disconnect() } else if (status == "connected") { reconnect() } }) } // mqtt状态监听器 function listener() { // 创建MQTT循环定时器 std.setInterval(() => { try { if (mqtt.getNative()) { mqtt.loop(); } } catch (error) { logger.error("MQTT loop error:", error) } }, 50) std.setInterval(() => { try { if (shouldBeReinit) { shouldBeReinit = false reconnect() } else if (mqtt.getNative() && !mqtt.isConnected()) { connect() } } catch (error) { logger.error("MQTT connect loop error:", error) } }, 5000) } // 模块初始化 try { events() // 注册事件监听器 run() // 初始化MQTT listener() } catch (error) { logger.error("MQTT worker initialization error:", error); }