/** * MQTT Client Module based on Eclipse Paho MQTT C library. * This module provides a singleton interface to an MQTT client. * * Features: * - Connect/disconnect to an MQTT broker. * - Publish messages and subscribe to topics. * - Supports MQTTS (MQTT over SSL/TLS). * - Handles connection loss and message delivery callbacks. * * Usage: * - Initialize once with `init()`. * - Use the methods to interact with the broker. * * Doc/Demo: https://github.com/DejaOS/DejaOS */ import { mqttclientClass } from './libvbar-m-dxmqttclient.so'; import log from './dxLogger.js' let client = null; const dxmqttclient = {}; function checkClientInitialized() { if (!client) { throw new Error('MQTT client is not initialized. Call init(serverURI, clientId) first.'); } } /** * Validates QoS level * @param {number} qos QoS level to validate * @param {string} context Context for error message */ function validateQoS(qos, context = 'QoS') { if (typeof qos !== 'number' || !Number.isInteger(qos) || qos < 0 || qos > 2) { throw new Error(`${context} must be an integer between 0 and 2, got: ${qos}`); } } /** * Validates MQTT topic format * @param {string} topic Topic to validate * @param {string} context Context for error message */ function validateTopic(topic, context = 'Topic') { if (typeof topic !== 'string' || topic.length === 0) { throw new Error(`${context} must be a non-empty string`); } if (topic.length > 65535) { throw new Error(`${context} must be less than 65535 characters`); } // Check for invalid characters (null character) if (topic.includes('\0')) { throw new Error(`${context} cannot contain null characters`); } } /** * Validates server URI format * @param {string} serverURI Server URI to validate */ function validateServerURI(serverURI) { if (typeof serverURI !== 'string' || serverURI.length === 0) { throw new Error('Server URI must be a non-empty string'); } const validProtocols = ['tcp://', 'ssl://', 'mqtt://', 'mqtts://']; if (!validProtocols.some(protocol => serverURI.startsWith(protocol))) { throw new Error(`Server URI must start with one of: ${validProtocols.join(', ')}`); } } /** * Validates client ID * @param {string} clientId Client ID to validate */ function validateClientId(clientId) { if (typeof clientId !== 'string') { throw new Error('Client ID must be a string'); } if (clientId.length > 23) { // MQTT 3.1.1 spec limits to 23 chars, but many modern brokers accept longer IDs log.info('Client ID is longer than 23 characters, which is not recommended by the MQTT spec.'); } } /** * Validates connection options * @param {object} options Connection options to validate */ function validateConnectOptions(options) { if (options && typeof options !== 'object') { throw new Error('Connection options must be an object'); } if (options) { // Validate keepAlive if (options.keepAlive !== undefined) { if (typeof options.keepAlive !== 'number' || options.keepAlive < 0 || options.keepAlive > 65535) { throw new Error('keepAlive must be a number between 0 and 65535'); } } // Validate cleanSession if (options.cleanSession !== undefined && typeof options.cleanSession !== 'boolean') { throw new Error('cleanSession must be a boolean'); } // Validate username and password if (options.username !== undefined && typeof options.username !== 'string') { throw new Error('username must be a string'); } if (options.password !== undefined && typeof options.password !== 'string') { throw new Error('password must be a string'); } // Validate will options if (options.will !== undefined) { if (typeof options.will !== 'object') { throw new Error('will options must be an object'); } if (!options.will.topic || typeof options.will.topic !== 'string') { throw new Error('will.topic must be a non-empty string'); } validateTopic(options.will.topic, 'will.topic'); if (options.will.payload !== undefined && typeof options.will.payload !== 'string') { throw new Error('will.payload must be a string'); } if (options.will.qos !== undefined) { validateQoS(options.will.qos, 'will.qos'); } if (options.will.retained !== undefined && typeof options.will.retained !== 'boolean') { throw new Error('will.retained must be a boolean'); } } // Validate SSL options if (options.ssl !== undefined) { if (typeof options.ssl !== 'object') { throw new Error('SSL options must be an object'); } const sslStringFields = ['caFile', 'certFile', 'keyFile', 'keyPassword']; sslStringFields.forEach(field => { if (options.ssl[field] !== undefined && typeof options.ssl[field] !== 'string') { throw new Error(`ssl.${field} must be a string`); } }); if (options.ssl.enableServerCertAuth !== undefined && typeof options.ssl.enableServerCertAuth !== 'boolean') { throw new Error('ssl.enableServerCertAuth must be a boolean'); } } } } /** * Validates publish options * @param {object} options Publish options to validate */ function validatePublishOptions(options) { if (options && typeof options !== 'object') { throw new Error('Publish options must be an object'); } if (options) { if (options.qos !== undefined) { validateQoS(options.qos, 'publish qos'); } if (options.retained !== undefined && typeof options.retained !== 'boolean') { throw new Error('retained must be a boolean'); } } } /** * Validates subscribe options * @param {object} options Subscribe options to validate */ function validateSubscribeOptions(options) { if (options && typeof options !== 'object') { throw new Error('Subscribe options must be an object'); } if (options && options.qos !== undefined) { validateQoS(options.qos, 'subscribe qos'); } } /** * Initializes the MQTT client. This must be called once before any other operations. * @param {string} serverURI The URI of the MQTT broker. Examples: "tcp://localhost:1883", "ssl://test.mosquitto.org:8883". * @param {string} clientId A unique identifier for this client. * @example * dxmqttclient.init('tcp://test.mosquitto.org:1883', 'my-device-123'); */ dxmqttclient.init = function (serverURI, clientId) { if (client) { log.info('MQTT client already initialized.'); return; } validateServerURI(serverURI); validateClientId(clientId); client = new mqttclientClass(serverURI, clientId); }; /** * Connects the client to the MQTT broker. * @param {object} [options] Connection options. * @param {string} [options.username] The username for authentication. * @param {string} [options.password] The password for authentication. * @param {number} [options.keepAlive=60] The keep-alive interval in seconds. * @param {boolean} [options.cleanSession=true] Whether to establish a clean session. * @param {object} [options.will] The "will" message (last will and testament) to be sent if the client disconnects unexpectedly. * @param {string} options.will.topic The topic for the will message. * @param {string} options.will.payload The payload of the will message. * @param {number} [options.will.qos=0] The Quality of Service level for the will message. * @param {boolean} [options.will.retained=false] Whether the will message should be retained. * @param {object} [options.ssl] SSL/TLS options, required for 'ssl://' or 'mqtts://' URIs. * @param {string} [options.ssl.caFile] Path to the CA certificate file for server verification. * @param {string} [options.ssl.certFile] Path to the client's certificate file. * @param {string} [options.ssl.keyFile] Path to the client's private key file. * @param {string} [options.ssl.keyPassword] Password for the client's private key. * @returns {void} * @example * dxmqttclient.connect({ * username: 'user', * password: 'password', * will: { * topic: 'client/status', * payload: 'offline', * qos: 1, * retained: true * } * }); */ dxmqttclient.connect = function (options) { checkClientInitialized(); validateConnectOptions(options); client.connect(options || {}); }; /** * Disconnects the client from the MQTT broker. * @param {number} [timeout=1000] Timeout in milliseconds to wait for disconnection to complete. * @returns {void} */ dxmqttclient.disconnect = function (timeout) { checkClientInitialized(); if (timeout !== undefined) { if (typeof timeout !== 'number' || timeout < 0) { throw new Error('Disconnect timeout must be a non-negative number'); } } client.disconnect(timeout || 1000); }; /** * Publishes a message to a topic. * @param {string} topic The topic to publish the message to. * @param {string|ArrayBuffer} payload The message payload. * @param {object} [options] Publishing options. * @param {number} [options.qos=0] The Quality of Service (QoS) level (0, 1, or 2). * @param {boolean} [options.retained=false] Whether the message should be retained by the broker. * @returns {number} The delivery token for tracking message delivery (for QoS > 0). * @example * dxmqttclient.publish('device/status', 'online', { qos: 1 }); */ dxmqttclient.publish = function (topic, payload, options) { checkClientInitialized(); validateTopic(topic, 'Publish topic'); if (payload === undefined || payload === null) { throw new Error('Payload cannot be undefined or null'); } if (typeof payload !== 'string' && !(payload instanceof ArrayBuffer)) { throw new Error('Payload must be a string or ArrayBuffer'); } validatePublishOptions(options); return client.publish(topic, payload, options || {}); }; /** * Subscribes to a topic. * @param {string} topic The topic filter to subscribe to. * @param {object} [options] Subscription options. * @param {number} [options.qos=0] The maximum QoS level at which to receive messages. * @returns {void} * @example * dxmqttclient.subscribe('commands/light', { qos: 1 }); */ dxmqttclient.subscribe = function (topic, options) { checkClientInitialized(); validateTopic(topic, 'Subscribe topic'); validateSubscribeOptions(options); const qos = (options && options.qos) || 0; client.subscribe(topic, qos); }; /** * Unsubscribes from a topic. * @param {string} topic The topic filter to unsubscribe from. * @returns {void} */ dxmqttclient.unsubscribe = function (topic) { checkClientInitialized(); validateTopic(topic, 'Unsubscribe topic'); client.unsubscribe(topic); }; /** * Sets the callback handlers for MQTT events. * @param {object} callbacks An object containing the callback functions. * @param {function()} [callbacks.onConnectSuccess] Fired when the client successfully connects to the broker. * @param {function(string, string, number, boolean)} [callbacks.onMessage] Fired when a message is received. * @param {function(number)} [callbacks.onDelivery] Fired when a published message has been delivered (for QoS > 0). * @param {function(string)} [callbacks.onConnectionLost] Fired when the connection to the broker is lost. */ dxmqttclient.setCallbacks = function (callbacks) { checkClientInitialized(); if (!callbacks || typeof callbacks !== 'object') { throw new Error('Callbacks must be an object'); } const callbackNames = ['onConnectSuccess', 'onMessage', 'onDelivery', 'onConnectionLost']; callbackNames.forEach(name => { if (callbacks[name] !== undefined && typeof callbacks[name] !== 'function') { throw new Error(`${name} must be a function`); } }); client.setCallbacks(callbacks); }; /** * Processes events from the MQTT event queue. * This should be called periodically to handle message arrivals, * delivery confirmations, and connection loss events. * It's recommended to use this with `setInterval`. * @example * // In your application thread loop: * setInterval(() => { * dxmqttclient.loop(); * },50); // Process events every 50ms */ dxmqttclient.loop = function () { try { checkClientInitialized(); client.loop(); } catch (e) { log.error('Error in MQTT loop:', e); } }; /** * Checks if the client is currently connected to the broker. * @returns {boolean} `true` if connected, `false` otherwise. */ dxmqttclient.isConnected = function () { checkClientInitialized(); return client.isConnected(); }; /** * Deinitializes the client instance, allowing for re-initialization. */ dxmqttclient.deinit = function () { if (client) { // The C++ finalizer handles disconnection and resource cleanup. // Setting client to null allows the garbage collector to reclaim it. client = null; } }; /** * Get the native client object. * @returns {Object|null} The native client object, or null if not initialized. */ dxmqttclient.getNative = function () { return client; }; export default dxmqttclient;