/**
|
* dxEventBus - Multi-threaded Event Bus for DejaOS.
|
*
|
* This module provides a multi-threaded event bus for communication between the main thread
|
* and workers in the QuickJS environment. It uses the main thread as a message relay,
|
* enabling full-duplex event notifications across all threads.
|
*
|
* Features:
|
* - Communication between workers via the main thread.
|
* - Main-to-worker, worker-to-main, and main-to-main communication.
|
* - Dynamic creation and termination of event-aware workers.
|
*
|
* Usage:
|
* // In main thread
|
* import dxEventBus from './dxmodules/dxEventBus.js';
|
* dxEventBus.newWorker('my_worker', '/app/code/src/worker.js');
|
* dxEventBus.fire('some_topic', { data: 123 });
|
*
|
* // In worker.js
|
* import dxEventBus from './dxmodules/dxEventBus.js';
|
* dxEventBus.on('some_topic', (data) => { console.log(data); });
|
*/
|
import std from './dxStd.js'
|
import * as os from "os";
|
|
const bus = {}
|
const all = {}
|
const subs = {}
|
const isMain = (os.Worker.parent === undefined)
|
bus.id = isMain ? '__main' : null
|
/**
|
* Creates and registers a new worker on the event bus. Must be called from the main thread.
|
* @param {string} id - A unique identifier for the worker. Cannot be empty or duplicated.
|
* @param {string} file - The absolute path to the worker's script file.
|
* @throws {Error} If id is invalid, the file doesn't exist, or not called from the main thread.
|
*/
|
bus.newWorker = function (id, file) {
|
if (!id) {
|
throw new Error("eventbus newWorker:'id' should not be empty")
|
} if (!file) {
|
throw new Error("eventbus newWorker:'file' should not be empty")
|
}
|
if (!isMain) {
|
throw new Error("evnetbus newWorker should be invoke in main thread")
|
}
|
if (!std.exist(file)) {
|
throw new Error("eventbus newWorker: file not found:" + file)
|
}
|
if (all[id]) {
|
throw new Error("eventbus newWorker: worker already exists:" + id)
|
}
|
let content = std.loadFile(file) + '\n' + `
|
import __bus from '/app/code/dxmodules/dxEventBus.js'
|
import __std from '/app/code/dxmodules/dxStd.js'
|
__bus.id='${id}'
|
Object.keys(__bus.handlers).forEach(key => {
|
__bus.os.Worker.parent.postMessage({ __sub: key, id: __bus.id })
|
})
|
__bus.os.Worker.parent.onmessage = function (e) {
|
if(!e.data){
|
return
|
}
|
e = e.data
|
if (!e || !e.topic) {
|
return
|
}
|
let fun = __bus.handlers[e.topic]
|
if (fun) {
|
try {
|
fun(e.data)
|
} catch (err) {
|
__std.err.puts(\`[dxEventBus] Error in worker '${id}' event handler for topic \${e.topic}: \${err.message || err}\\n\`);
|
__std.err.flush();
|
}
|
}
|
}
|
`
|
let newfile = file + '_' + id + '.js'
|
std.saveFile(newfile, content)
|
let worker = new os.Worker(newfile)
|
all[id] = worker
|
worker.onmessage = function (data) {
|
if (data.data) {
|
if (data.data.__sub) {
|
sub(data.data.__sub, data.data.id)
|
return
|
}
|
//worker发送过来的数据再调用一次主线程的fire,要么主线程自己消费,要么转发到其它worker
|
bus.fire(data.data.topic, data.data.data)
|
}
|
}
|
}
|
/**
|
* Terminates a worker and cleans up all its resources from the event bus.
|
* This removes the worker instance and all of its event subscriptions.
|
* @param {string} id - The unique identifier of the worker to terminate.
|
*/
|
bus.delWorker = function (id) {
|
if (!id || !all[id]) {
|
return;
|
}
|
|
delete all[id];
|
|
for (const topic in subs) {
|
if (subs.hasOwnProperty(topic)) {
|
const subscribers = subs[topic];
|
for (let i = subscribers.length - 1; i >= 0; i--) {
|
if (subscribers[i] === id) {
|
subscribers.splice(i, 1);
|
}
|
}
|
if (subscribers.length === 0) {
|
delete subs[topic];
|
}
|
}
|
}
|
}
|
/**
|
* Fires an event to notify all subscribers for a given topic.
|
*
|
* This is a fire-and-forget operation. Callbacks for subscribers in the main thread are
|
* executed synchronously and sequentially. For subscribers in workers, the event is sent
|
* asynchronously via `postMessage`.
|
*
|
* @param {string} topic - The event topic to fire.
|
* @param {*} data - The data to pass to the event subscribers.
|
*/
|
bus.fire = function (topic, data) {
|
if (!topic || (typeof topic) != 'string') {
|
throw new Error("eventbus :'topic' should not be null");
|
}
|
if (isMain) {
|
if (subs[topic] && subs[topic].length > 0) {
|
for (let i = 0; i < subs[topic].length; i++) {
|
const id = subs[topic][i]
|
if (id === '__main' && bus.handlers[topic]) {
|
try {
|
bus.handlers[topic](data)
|
} catch (e) {
|
std.err.puts(`[dxEventBus] Error in main thread event handler for topic '${topic}': ${e.message || e}\n`);
|
std.err.flush();
|
}
|
} else {
|
const worker = all[id]
|
if (worker) {
|
try {
|
worker.postMessage({ topic: topic, data: data })
|
} catch (e) {
|
std.err.puts(`[dxEventBus] Error posting message to worker '${id}' for topic '${topic}': ${e.message || e}\n`);
|
std.err.flush();
|
}
|
}
|
}
|
}
|
}
|
} else {
|
try {
|
os.Worker.parent.postMessage({ topic: topic, data: data })
|
} catch (e) {
|
std.err.puts(`[dxEventBus] Error in worker '${bus.id}' posting message for topic '${topic}': ${e.message || e}\n`);
|
std.err.flush();
|
}
|
}
|
}
|
|
|
bus.handlers = {}
|
/**
|
* Subscribes to an event topic.
|
* @param {string} topic - The event topic to subscribe to.
|
* @param {function} callback - The function to execute when the event is fired. It receives event data as its only argument.
|
*/
|
bus.on = function (topic, callback) {
|
if (!topic || (typeof topic) != 'string') {
|
throw new Error("The 'topic' should not be null");
|
}
|
if (!callback || (typeof callback) != 'function') {
|
throw new Error("The 'callback' should be a function");
|
}
|
sub(topic, bus.id)
|
this.handlers[topic] = callback
|
}
|
/**
|
* Returns the ID of the current thread (either '__main' or the worker's assigned ID).
|
* @returns {string|null} The ID of the current thread.
|
* @note In a worker, if this function is called at the top level of the script before the
|
* event bus has fully initialized, it may return `null`. It is reliable within event handlers.
|
*/
|
bus.getId = function () {
|
return bus.id
|
}
|
function sub(topic, id) {
|
if (isMain) {
|
if (!subs[topic]) {
|
subs[topic] = []
|
}
|
if (!subs[topic].includes(id)) {
|
subs[topic].push(id)
|
}
|
} else {
|
if (id != null) {
|
os.Worker.parent.postMessage({ __sub: topic, id: id })
|
}
|
}
|
}
|
bus.os = os
|
export default bus
|