lgq
3 天以前 081f12a52906abe6c2d139fdc144135978681009
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
/**
 * MQTT Client Module based on Eclipse Paho MQTT C library.
 * This module provides a singleton interface to an MQTT client.
 *
 * Features:
 * - Connect/disconnect to an MQTT broker.
 * - Publish messages and subscribe to topics.
 * - Supports MQTTS (MQTT over SSL/TLS).
 * - Handles connection loss and message delivery callbacks.
 *
 * Usage:
 * - Initialize once with `init()`.
 * - Use the methods to interact with the broker.
 *
 * Doc/Demo: https://github.com/DejaOS/DejaOS
 */
import { mqttclientClass } from './libvbar-m-dxmqttclient.so';
import log from './dxLogger.js'
 
let client = null;
const dxmqttclient = {};
 
function checkClientInitialized() {
    if (!client) {
        throw new Error('MQTT client is not initialized. Call init(serverURI, clientId) first.');
    }
}
 
/**
 * Validates QoS level
 * @param {number} qos QoS level to validate
 * @param {string} context Context for error message
 */
function validateQoS(qos, context = 'QoS') {
    if (typeof qos !== 'number' || !Number.isInteger(qos) || qos < 0 || qos > 2) {
        throw new Error(`${context} must be an integer between 0 and 2, got: ${qos}`);
    }
}
 
/**
 * Validates MQTT topic format
 * @param {string} topic Topic to validate
 * @param {string} context Context for error message
 */
function validateTopic(topic, context = 'Topic') {
    if (typeof topic !== 'string' || topic.length === 0) {
        throw new Error(`${context} must be a non-empty string`);
    }
    if (topic.length > 65535) {
        throw new Error(`${context} must be less than 65535 characters`);
    }
    // Check for invalid characters (null character)
    if (topic.includes('\0')) {
        throw new Error(`${context} cannot contain null characters`);
    }
}
 
/**
 * Validates server URI format
 * @param {string} serverURI Server URI to validate
 */
function validateServerURI(serverURI) {
    if (typeof serverURI !== 'string' || serverURI.length === 0) {
        throw new Error('Server URI must be a non-empty string');
    }
    const validProtocols = ['tcp://', 'ssl://', 'mqtt://', 'mqtts://'];
    if (!validProtocols.some(protocol => serverURI.startsWith(protocol))) {
        throw new Error(`Server URI must start with one of: ${validProtocols.join(', ')}`);
    }
}
 
/**
 * Validates client ID
 * @param {string} clientId Client ID to validate
 */
function validateClientId(clientId) {
    if (typeof clientId !== 'string') {
        throw new Error('Client ID must be a string');
    }
    if (clientId.length > 23) {
        // MQTT 3.1.1 spec limits to 23 chars, but many modern brokers accept longer IDs
        log.info('Client ID is longer than 23 characters, which is not recommended by the MQTT spec.');
    }
}
 
/**
 * Validates connection options
 * @param {object} options Connection options to validate
 */
function validateConnectOptions(options) {
    if (options && typeof options !== 'object') {
        throw new Error('Connection options must be an object');
    }
 
    if (options) {
        // Validate keepAlive
        if (options.keepAlive !== undefined) {
            if (typeof options.keepAlive !== 'number' || options.keepAlive < 0 || options.keepAlive > 65535) {
                throw new Error('keepAlive must be a number between 0 and 65535');
            }
        }
 
        // Validate cleanSession
        if (options.cleanSession !== undefined && typeof options.cleanSession !== 'boolean') {
            throw new Error('cleanSession must be a boolean');
        }
 
        // Validate username and password
        if (options.username !== undefined && typeof options.username !== 'string') {
            throw new Error('username must be a string');
        }
        if (options.password !== undefined && typeof options.password !== 'string') {
            throw new Error('password must be a string');
        }
 
        // Validate will options
        if (options.will !== undefined) {
            if (typeof options.will !== 'object') {
                throw new Error('will options must be an object');
            }
            if (!options.will.topic || typeof options.will.topic !== 'string') {
                throw new Error('will.topic must be a non-empty string');
            }
            validateTopic(options.will.topic, 'will.topic');
 
            if (options.will.payload !== undefined && typeof options.will.payload !== 'string') {
                throw new Error('will.payload must be a string');
            }
            if (options.will.qos !== undefined) {
                validateQoS(options.will.qos, 'will.qos');
            }
            if (options.will.retained !== undefined && typeof options.will.retained !== 'boolean') {
                throw new Error('will.retained must be a boolean');
            }
        }
 
        // Validate SSL options
        if (options.ssl !== undefined) {
            if (typeof options.ssl !== 'object') {
                throw new Error('SSL options must be an object');
            }
            const sslStringFields = ['caFile', 'certFile', 'keyFile', 'keyPassword'];
            sslStringFields.forEach(field => {
                if (options.ssl[field] !== undefined && typeof options.ssl[field] !== 'string') {
                    throw new Error(`ssl.${field} must be a string`);
                }
            });
            if (options.ssl.enableServerCertAuth !== undefined && typeof options.ssl.enableServerCertAuth !== 'boolean') {
                throw new Error('ssl.enableServerCertAuth must be a boolean');
            }
        }
    }
}
 
/**
 * Validates publish options
 * @param {object} options Publish options to validate
 */
function validatePublishOptions(options) {
    if (options && typeof options !== 'object') {
        throw new Error('Publish options must be an object');
    }
 
    if (options) {
        if (options.qos !== undefined) {
            validateQoS(options.qos, 'publish qos');
        }
        if (options.retained !== undefined && typeof options.retained !== 'boolean') {
            throw new Error('retained must be a boolean');
        }
    }
}
 
/**
 * Validates subscribe options
 * @param {object} options Subscribe options to validate
 */
function validateSubscribeOptions(options) {
    if (options && typeof options !== 'object') {
        throw new Error('Subscribe options must be an object');
    }
 
    if (options && options.qos !== undefined) {
        validateQoS(options.qos, 'subscribe qos');
    }
}
 
/**
 * Initializes the MQTT client. This must be called once before any other operations.
 * @param {string} serverURI The URI of the MQTT broker. Examples: "tcp://localhost:1883", "ssl://test.mosquitto.org:8883".
 * @param {string} clientId A unique identifier for this client.
 * @example
 * dxmqttclient.init('tcp://test.mosquitto.org:1883', 'my-device-123');
 */
dxmqttclient.init = function (serverURI, clientId) {
    if (client) {
        log.info('MQTT client already initialized.');
        return;
    }
 
    validateServerURI(serverURI);
    validateClientId(clientId);
 
    client = new mqttclientClass(serverURI, clientId);
};
 
/**
 * Connects the client to the MQTT broker.
 * @param {object} [options] Connection options.
 * @param {string} [options.username] The username for authentication.
 * @param {string} [options.password] The password for authentication.
 * @param {number} [options.keepAlive=60] The keep-alive interval in seconds.
 * @param {boolean} [options.cleanSession=true] Whether to establish a clean session.
 * @param {object} [options.will] The "will" message (last will and testament) to be sent if the client disconnects unexpectedly.
 * @param {string} options.will.topic The topic for the will message.
 * @param {string} options.will.payload The payload of the will message.
 * @param {number} [options.will.qos=0] The Quality of Service level for the will message.
 * @param {boolean} [options.will.retained=false] Whether the will message should be retained.
 * @param {object} [options.ssl] SSL/TLS options, required for 'ssl://' or 'mqtts://' URIs.
 * @param {string} [options.ssl.caFile] Path to the CA certificate file for server verification.
 * @param {string} [options.ssl.certFile] Path to the client's certificate file.
 * @param {string} [options.ssl.keyFile] Path to the client's private key file.
 * @param {string} [options.ssl.keyPassword] Password for the client's private key.
 * @returns {void}
 * @example
 * dxmqttclient.connect({
 *   username: 'user',
 *   password: 'password',
 *   will: {
 *     topic: 'client/status',
 *     payload: 'offline',
 *     qos: 1,
 *     retained: true
 *   }
 * });
 */
dxmqttclient.connect = function (options) {
    checkClientInitialized();
    validateConnectOptions(options);
    client.connect(options || {});
};
 
/**
 * Disconnects the client from the MQTT broker.
 * @param {number} [timeout=1000] Timeout in milliseconds to wait for disconnection to complete.
 * @returns {void}
 */
dxmqttclient.disconnect = function (timeout) {
    checkClientInitialized();
    if (timeout !== undefined) {
        if (typeof timeout !== 'number' || timeout < 0) {
            throw new Error('Disconnect timeout must be a non-negative number');
        }
    }
    client.disconnect(timeout || 1000);
};
 
/**
 * Publishes a message to a topic.
 * @param {string} topic The topic to publish the message to.
 * @param {string|ArrayBuffer} payload The message payload.
 * @param {object} [options] Publishing options.
 * @param {number} [options.qos=0] The Quality of Service (QoS) level (0, 1, or 2).
 * @param {boolean} [options.retained=false] Whether the message should be retained by the broker.
 * @returns {number} The delivery token for tracking message delivery (for QoS > 0).
 * @example
 * dxmqttclient.publish('device/status', 'online', { qos: 1 });
 */
dxmqttclient.publish = function (topic, payload, options) {
    checkClientInitialized();
    validateTopic(topic, 'Publish topic');
 
    if (payload === undefined || payload === null) {
        throw new Error('Payload cannot be undefined or null');
    }
    if (typeof payload !== 'string' && !(payload instanceof ArrayBuffer)) {
        throw new Error('Payload must be a string or ArrayBuffer');
    }
 
    validatePublishOptions(options);
 
    return client.publish(topic, payload, options || {});
};
 
/**
 * Subscribes to a topic.
 * @param {string} topic The topic filter to subscribe to.
 * @param {object} [options] Subscription options.
 * @param {number} [options.qos=0] The maximum QoS level at which to receive messages.
 * @returns {void}
 * @example
 * dxmqttclient.subscribe('commands/light', { qos: 1 });
 */
dxmqttclient.subscribe = function (topic, options) {
    checkClientInitialized();
    validateTopic(topic, 'Subscribe topic');
    validateSubscribeOptions(options);
 
    const qos = (options && options.qos) || 0;
    client.subscribe(topic, qos);
};
 
/**
 * Unsubscribes from a topic.
 * @param {string} topic The topic filter to unsubscribe from.
 * @returns {void}
 */
dxmqttclient.unsubscribe = function (topic) {
    checkClientInitialized();
    validateTopic(topic, 'Unsubscribe topic');
    client.unsubscribe(topic);
};
 
/**
 * Sets the callback handlers for MQTT events.
 * @param {object} callbacks An object containing the callback functions.
 * @param {function()} [callbacks.onConnectSuccess] Fired when the client successfully connects to the broker.
 * @param {function(string, string, number, boolean)} [callbacks.onMessage] Fired when a message is received.
 * @param {function(number)} [callbacks.onDelivery] Fired when a published message has been delivered (for QoS > 0).
 * @param {function(string)} [callbacks.onConnectionLost] Fired when the connection to the broker is lost.
 */
dxmqttclient.setCallbacks = function (callbacks) {
    checkClientInitialized();
 
    if (!callbacks || typeof callbacks !== 'object') {
        throw new Error('Callbacks must be an object');
    }
 
    const callbackNames = ['onConnectSuccess', 'onMessage', 'onDelivery', 'onConnectionLost'];
    callbackNames.forEach(name => {
        if (callbacks[name] !== undefined && typeof callbacks[name] !== 'function') {
            throw new Error(`${name} must be a function`);
        }
    });
 
    client.setCallbacks(callbacks);
};
 
/**
 * Processes events from the MQTT event queue.
 * This should be called periodically to handle message arrivals,
 * delivery confirmations, and connection loss events.
 * It's recommended to use this with `setInterval`.
 * @example
 * // In your application thread loop:
 * setInterval(() => {
 *   dxmqttclient.loop();
 * },50); // Process events every 50ms
 */
dxmqttclient.loop = function () {
    try {
        checkClientInitialized();
        client.loop();
    } catch (e) {
        log.error('Error in MQTT loop:', e);
    }
};
 
/**
 * Checks if the client is currently connected to the broker.
 * @returns {boolean} `true` if connected, `false` otherwise.
 */
dxmqttclient.isConnected = function () {
    checkClientInitialized();
    return client.isConnected();
};
 
/**
 * Deinitializes the client instance, allowing for re-initialization.
 */
dxmqttclient.deinit = function () {
    if (client) {
        // The C++ finalizer handles disconnection and resource cleanup.
        // Setting client to null allows the garbage collector to reclaim it.
        client = null;
    }
};
 
/**
 * Get the native client object.
 * @returns {Object|null} The native client object, or null if not initialized.
 */
dxmqttclient.getNative = function () {
    return client;
};
 
export default dxmqttclient;