/** * dxEventBus - Multi-threaded Event Bus for DejaOS. * * This module provides a multi-threaded event bus for communication between the main thread * and workers in the QuickJS environment. It uses the main thread as a message relay, * enabling full-duplex event notifications across all threads. * * Features: * - Communication between workers via the main thread. * - Main-to-worker, worker-to-main, and main-to-main communication. * - Dynamic creation and termination of event-aware workers. * * Usage: * // In main thread * import dxEventBus from './dxmodules/dxEventBus.js'; * dxEventBus.newWorker('my_worker', '/app/code/src/worker.js'); * dxEventBus.fire('some_topic', { data: 123 }); * * // In worker.js * import dxEventBus from './dxmodules/dxEventBus.js'; * dxEventBus.on('some_topic', (data) => { console.log(data); }); */ import std from './dxStd.js' import * as os from "os"; const bus = {} const all = {} const subs = {} const isMain = (os.Worker.parent === undefined) bus.id = isMain ? '__main' : null /** * Creates and registers a new worker on the event bus. Must be called from the main thread. * @param {string} id - A unique identifier for the worker. Cannot be empty or duplicated. * @param {string} file - The absolute path to the worker's script file. * @throws {Error} If id is invalid, the file doesn't exist, or not called from the main thread. */ bus.newWorker = function (id, file) { if (!id) { throw new Error("eventbus newWorker:'id' should not be empty") } if (!file) { throw new Error("eventbus newWorker:'file' should not be empty") } if (!isMain) { throw new Error("evnetbus newWorker should be invoke in main thread") } if (!std.exist(file)) { throw new Error("eventbus newWorker: file not found:" + file) } if (all[id]) { throw new Error("eventbus newWorker: worker already exists:" + id) } let content = std.loadFile(file) + '\n' + ` import __bus from '/app/code/dxmodules/dxEventBus.js' import __std from '/app/code/dxmodules/dxStd.js' __bus.id='${id}' Object.keys(__bus.handlers).forEach(key => { __bus.os.Worker.parent.postMessage({ __sub: key, id: __bus.id }) }) __bus.os.Worker.parent.onmessage = function (e) { if(!e.data){ return } e = e.data if (!e || !e.topic) { return } let fun = __bus.handlers[e.topic] if (fun) { try { fun(e.data) } catch (err) { __std.err.puts(\`[dxEventBus] Error in worker '${id}' event handler for topic \${e.topic}: \${err.message || err}\\n\`); __std.err.flush(); } } } ` let newfile = file + '_' + id + '.js' std.saveFile(newfile, content) let worker = new os.Worker(newfile) all[id] = worker worker.onmessage = function (data) { if (data.data) { if (data.data.__sub) { sub(data.data.__sub, data.data.id) return } //worker发送过来的数据再调用一次主线程的fire,要么主线程自己消费,要么转发到其它worker bus.fire(data.data.topic, data.data.data) } } } /** * Terminates a worker and cleans up all its resources from the event bus. * This removes the worker instance and all of its event subscriptions. * @param {string} id - The unique identifier of the worker to terminate. */ bus.delWorker = function (id) { if (!id || !all[id]) { return; } delete all[id]; for (const topic in subs) { if (subs.hasOwnProperty(topic)) { const subscribers = subs[topic]; for (let i = subscribers.length - 1; i >= 0; i--) { if (subscribers[i] === id) { subscribers.splice(i, 1); } } if (subscribers.length === 0) { delete subs[topic]; } } } } /** * Fires an event to notify all subscribers for a given topic. * * This is a fire-and-forget operation. Callbacks for subscribers in the main thread are * executed synchronously and sequentially. For subscribers in workers, the event is sent * asynchronously via `postMessage`. * * @param {string} topic - The event topic to fire. * @param {*} data - The data to pass to the event subscribers. */ bus.fire = function (topic, data) { if (!topic || (typeof topic) != 'string') { throw new Error("eventbus :'topic' should not be null"); } if (isMain) { if (subs[topic] && subs[topic].length > 0) { for (let i = 0; i < subs[topic].length; i++) { const id = subs[topic][i] if (id === '__main' && bus.handlers[topic]) { try { bus.handlers[topic](data) } catch (e) { std.err.puts(`[dxEventBus] Error in main thread event handler for topic '${topic}': ${e.message || e}\n`); std.err.flush(); } } else { const worker = all[id] if (worker) { try { worker.postMessage({ topic: topic, data: data }) } catch (e) { std.err.puts(`[dxEventBus] Error posting message to worker '${id}' for topic '${topic}': ${e.message || e}\n`); std.err.flush(); } } } } } } else { try { os.Worker.parent.postMessage({ topic: topic, data: data }) } catch (e) { std.err.puts(`[dxEventBus] Error in worker '${bus.id}' posting message for topic '${topic}': ${e.message || e}\n`); std.err.flush(); } } } bus.handlers = {} /** * Subscribes to an event topic. * @param {string} topic - The event topic to subscribe to. * @param {function} callback - The function to execute when the event is fired. It receives event data as its only argument. */ bus.on = function (topic, callback) { if (!topic || (typeof topic) != 'string') { throw new Error("The 'topic' should not be null"); } if (!callback || (typeof callback) != 'function') { throw new Error("The 'callback' should be a function"); } sub(topic, bus.id) this.handlers[topic] = callback } /** * Returns the ID of the current thread (either '__main' or the worker's assigned ID). * @returns {string|null} The ID of the current thread. * @note In a worker, if this function is called at the top level of the script before the * event bus has fully initialized, it may return `null`. It is reliable within event handlers. */ bus.getId = function () { return bus.id } function sub(topic, id) { if (isMain) { if (!subs[topic]) { subs[topic] = [] } if (!subs[topic].includes(id)) { subs[topic].push(id) } } else { if (id != null) { os.Worker.parent.postMessage({ __sub: topic, id: id }) } } } bus.os = os export default bus