//build:20240411
|
//利用mqtt协议实现和mqtt服务端的通信或通过mqtt broker实现和其它mqtt客户端的通信
|
//依赖组件 dxMap,dxLogger,dxDriver,dxCommon,dxEventBus,dxNet
|
import { mqttClass } from './libvbar-m-dxmqtt.so'
|
import * as os from "os"
|
import std from './dxStd.js'
|
import dxMap from './dxMap.js'
|
import dxCommon from './dxCommon.js'
|
import bus from './dxEventBus.js'
|
const map = dxMap.get("default")
|
const mqttObj = new mqttClass();
|
const mqtt = {}
|
/**
|
* 初始化mqtt相关属性并创建连接,请在worker里使用dxMqtt组件或使用简化函数dxMqtt.run
|
* @param {string} mqttAddr mqtt服务地址,必填,以tcp://开头,格式是tcp://ip:port
|
* @param {string} clientId 客户端id,必填,不同的设备请使用不同的客户端id
|
* @param {string} username 非必填,mqtt用户名
|
* @param {string} password 非必填,mqtt密码
|
* @param {string} prefix 非必填,缺省为空字符串,这个表示自动在主题前加上一个前缀
|
* @param {number} qos 0,1,2 非必填,缺省是1. 其中0表示消息最多发送一次,发送后消息就被丢弃;1表示消息至少发送一次,可以保证消息被接收方收到,但是会存在接收方收到重复消息的情况;2表示消息发送成功且只发送一次,资源开销大
|
* @param {string} willTopic 非必填,遗嘱主题,通过broker通信的时候设备断开会自动触发一个mqtt遗嘱消息,这个是遗嘱消息的主题
|
* @param {string} willMessage 非必填,遗嘱内容,通过broker通信的时候设备断开会自动触发一个mqtt遗嘱消息,这个是遗嘱消息的内容
|
* @param {string} id 句柄id,非必填(若初始化多个实例需要传入唯一id)
|
*/
|
mqtt.init = function (mqttAddr, clientId, username, password, prefix = "", qos = 1, willTopic, willMessage, id) {
|
|
if (mqttAddr === undefined || mqttAddr.length === 0) {
|
throw new Error("dxMqtt.init: 'mqttAddr' parameter should not be null or empty")
|
}
|
if (clientId === undefined || clientId.length === 0) {
|
throw new Error("dxMqtt.init: 'clientId' parameter should not be null or empty")
|
}
|
// 如果mqttAddr不以tcp://开头,自动添加前缀
|
if (!mqttAddr.startsWith('tcp://')) {
|
mqttAddr = 'tcp://' + mqttAddr
|
}
|
let pointer = mqttObj.init(mqttAddr, clientId, username, password, prefix, qos, willTopic, willMessage);
|
if (pointer === undefined || pointer === null) {
|
throw new Error("dxMqtt.init: mqtt init failed")
|
}
|
|
dxCommon.handleId("mqtt", id, pointer)
|
}
|
|
/**
|
* 重新连接,比如连接成功后突然网络断开,无需重新init,直接重连即可
|
* @param {string} willTopic 非必填,遗嘱主题,通过broker通信的时候设备断开会自动触发一个mqtt遗嘱消息,这个是遗嘱消息的主题
|
* @param {string} willMessage 非必填,遗嘱内容,通过broker通信的时候设备断开会自动触发一个mqtt遗嘱消息,这个是遗嘱消息的内容
|
* @param {string} id 句柄id,非必填(需保持和init中的id一致)
|
*/
|
mqtt.reconnect = function (willTopic, willMessage, id) {
|
let pointer = dxCommon.handleId("mqtt", id)
|
return mqttObj.recreate(pointer, willTopic, willMessage);
|
}
|
|
/**
|
* 订阅多主题
|
* @param {array} topics 必填, 要订阅的主题数组,可以同时订阅多个
|
* @param {number} qos 非必填,缺省是1. 其中0表示消息最多发送一次,发送后消息就被丢弃;1表示消息至少发送一次,可以保证消息被接收方收到,但是会存在接收方收到重复消息的情况;2表示消息发送成功且只发送一次,资源开销大
|
* @param {string} id 句柄id,非必填(需保持和init中的id一致)
|
* @returns
|
*/
|
mqtt.subscribes = function (topics, qos, id) {
|
if (topics === undefined || topics.length === 0) {
|
throw new Error("dxMqtt.subscribes: 'topics' parameter should not be null or empty")
|
}
|
|
if (qos === undefined) {
|
qos = 1
|
}
|
let pointer = dxCommon.handleId("mqtt", id)
|
return mqttObj.subscribes(pointer, topics, qos);
|
}
|
|
/**
|
* 判断mqtt是否连接,连接成功后如果网络断开,连接也会断开
|
* @param {string} id 句柄id,非必填(需保持和init中的id一致)
|
* @returns false失败; true成功
|
*/
|
mqtt.isConnected = function (id) {
|
let pointer = dxCommon.handleId("mqtt", id)
|
return mqttObj.isConnected(pointer);
|
}
|
|
/**
|
* 查询mqtt配置
|
* @param {string} id 句柄id,非必填(需保持和init中的id一致)
|
* @returns mqtt配置
|
*/
|
mqtt.getConfig = function (id) {
|
let pointer = dxCommon.handleId("mqtt", id)
|
return mqttObj.getConfig(pointer);
|
}
|
|
/**
|
* mqtt配置更新
|
* @param {object} options 配置参数,大部分可以用默认值
|
* @param {string} options.mqttAddr mqtt服务地址,必填,以tcp://开头,格式是tcp://ip:port
|
* @param {string} options.clientId 客户端id,必填,不同的设备请使用不同的客户端id
|
* @param {string} options.userName 非必填,mqtt用户名
|
* @param {string} options.password 非必填,mqtt密码
|
* @param {string} options.prefix 非必填,缺省为空字符串,这个表示自动在主题前加上一个前缀
|
* @param {number} options.qos 0,1,2 非必填,缺省是1. 其中0表示消息最多发送一次,发送后消息就被丢弃;1表示消息至少发送一次,可以保证消息被接收方收到,但是会存在接收方收到重复消息的情况;2表示消息发送成功且只发送一次,资源开销大
|
* @param {string} options.ssl 非必填,ssl配置类
|
*/
|
mqtt.updateConfig = function (options, id) {
|
if (!options) {
|
throw new Error("dxMqtt.updateConfig: 'options' parameter should not be null or empty")
|
}
|
if (options.mqttAddr === undefined || options.mqttAddr.length === 0) {
|
throw new Error("dxMqtt.updateConfig: 'options.mqttAddr' parameter should not be null or empty")
|
}
|
if (options.clientId === undefined || options.clientId.length === 0) {
|
throw new Error("dxMqtt.updateConfig: 'options.clientId' parameter should not be null or empty")
|
}
|
if (options.qos === undefined || options.qos == null) {
|
throw new Error("dxMqtt.updateConfig: 'options.qos' parameter should not be null or empty")
|
}
|
// 如果mqttAddr不以tcp://开头,自动添加前缀
|
if (!options.mqttAddr.startsWith('tcp://')) {
|
options.mqttAddr = 'tcp://' + options.mqttAddr
|
}
|
let pointer = dxCommon.handleId("mqtt", id)
|
let res = mqttObj.setConfig(pointer, options);
|
return res;
|
}
|
|
/**
|
* 发送mqtt请求
|
* @param {string} topic 主题,必填
|
* @param {string} payload 消息体内容,必填
|
* @param {string} id 句柄id,非必填(需保持和init中的id一致)
|
*/
|
mqtt.send = function (topic, payload, id) {
|
if (topic === undefined || topic.length === 0) {
|
throw new Error("dxMqtt.send:'topic' parameter should not be null or empty")
|
}
|
if (payload === undefined || payload.length === 0) {
|
throw new Error("dxMqtt.send:'payload' parameter should not be null or empty")
|
}
|
let pointer = dxCommon.handleId("mqtt", id)
|
return mqttObj.sendMsg(pointer, topic, payload);
|
}
|
|
/**
|
* 接收mqtt数据,需要轮询去获取
|
* @param {string} id 句柄id,非必填(需保持和init中的id一致)
|
* @return mqtt请求数据,结构是: {topic:'主题',payload:'内容'}
|
*/
|
mqtt.receive = function (id) {
|
let msg = mqttObj.msgReceive(id);
|
return JSON.parse(msg);
|
}
|
|
/**
|
* 判断是否有新的数据,一般先判断有数据后再调用receive去获取数据
|
* @param {string} id 句柄id,非必填(需保持和init中的id一致)
|
* @returns false 有数据;true 没有数据
|
*/
|
mqtt.msgIsEmpty = function (id) {
|
return mqttObj.msgIsEmpty(id);
|
}
|
|
/**
|
* 销毁mqtt实例
|
* @param {string} id 句柄id,非必填(需保持和init中的id一致)
|
*/
|
mqtt.destroy = function (id) {
|
let pointer = dxCommon.handleId("mqtt", id)
|
mqttObj.deinit(pointer);
|
}
|
|
mqtt.RECEIVE_MSG = '__mqtt__MsgReceive'
|
mqtt.CONNECTED_CHANGED = '__mqtt__Connect_changed'
|
mqtt.RECONNECT = '__mqtt__Reconnect'
|
|
/**
|
* 用简单的方式实现mqtt客户端,只需要调用这一个函数就可以实现mqtt客户端,
|
* 收到消息会触发给 dxEventBus发送一个事件,事件的主题是mqtt.RECEIVE_MQTT_MSG,内容是{topic:'',payload:''}格式
|
* 如果需要发送消息,直接使用 mqtt.send方法 mqtt发送的数据格式类似: { topic: "sendtopic1", payload: JSON.stringify({ a: i, b: "ssss" }) }
|
* mqtt的连接状态发生变化会触发给 dxEventBus发送一个事件,事件的主题是mqtt.CONNECTED_CHANGED,内容是'connected'或者'disconnect'
|
* mqtt需要有网络,所以必须在使用之前确保dxNet组件完成初始化
|
* @param {object} options mqtt相关参数,必填
|
* @param {string} options.mqttAddr mqtt服务地址,必填,以tcp://开头,格式是tcp://ip:port
|
* @param {string} options.clientId 客户端id,必填,不同的设备请使用不同的客户端id
|
* @param {string} options.username 非必填,mqtt用户名
|
* @param {string} options.password 非必填,mqtt密码
|
* @param {string} options.prefix 非必填,缺省为空字符串,这个表示自动在主题前加上一个前缀
|
* @param {number} options.qos 0,1,2 非必填,缺省是1. 其中0表示消息最多发送一次,发送后消息就被丢弃;1表示消息至少发送一次,可以保证消息被接收方收到,但是会存在接收方收到重复消息的情况;2表示消息发送成功且只发送一次,资源开销大
|
* @param {string} options.willTopic 非必填,遗嘱主题,通过broker通信的时候设备断开会自动触发一个mqtt遗嘱消息,这个是遗嘱消息的主题
|
* @param {string} options.willMessage 非必填,遗嘱内容,通过broker通信的时候设备断开会自动触发一个mqtt遗嘱消息,这个是遗嘱消息的内容
|
* @param {array} options.subs 非必填,要订阅的主题组
|
* @param {string} options.id 句柄id,非必填(若初始化多个实例需要传入唯一id)
|
*/
|
mqtt.run = function (options) {
|
if (options === undefined || options.length === 0) {
|
throw new Error("dxmqtt.run:'options' parameter should not be null or empty")
|
}
|
if (options.id === undefined || options.id === null || typeof options.id !== 'string') {
|
// 句柄id
|
options.id = ""
|
}
|
let oldfilepre = '/app/code/dxmodules/mqttWorker'
|
let content = std.loadFile(oldfilepre + '.js').replace("{{id}}", options.id)
|
let newfile = oldfilepre + options.id + '.js'
|
std.saveFile(newfile, content)
|
let init = map.get("__mqtt__run_init" + options.id)
|
if (!init) {//确保只初始化一次
|
map.put("__mqtt__run_init" + options.id, options)
|
bus.newWorker(options.id || "__mqtt", newfile)
|
}
|
}
|
/**
|
* 获取当前mqtt连接的状态
|
* @param {string} id 句柄id,非必填(需保持和init中的id一致)
|
* @returns 'connected' 或者 'disconnected'
|
*/
|
mqtt.getConnected = function (id) {
|
if (id == undefined || id == null) {
|
id = ""
|
}
|
return mqtt.isConnected(id) ? "connected" : "disconnected"
|
}
|
|
export default mqtt;
|