/** * Worker Pool Module (dxWorkerPool) * * Features: * - Manages a pool of worker threads to handle concurrent tasks, preventing performance bottlenecks. * - Automatically distributes tasks from a shared queue to available workers. * - Buffers incoming tasks in a queue when all workers are busy. * - Designed as a global singleton; should be initialized only once. * * Usage (Main Thread): * - Initialize the pool once using `pool.init()`, providing the worker script, event bus, and topics. * * Usage (Worker Thread): * - In your worker script, import `dxWorkerPool` and register your task handling function using `pool.callback()`. * * @example * // --- In main.js (Main Thread) --- * import pool from 'dxmodules/dxWorkerPool.js'; * import eventBus from 'dxmodules/dxEventBus.js'; * * // Initialize the pool with 2 workers for the 'heavy_tasks' topic. * pool.init('/app/code/src/my_worker.js', eventBus, ['heavy_tasks'], 2); * * // Later, fire an event to be processed by a worker. * eventBus.fire('heavy_tasks', { data: 'some payload' }); * * // --- In my_worker.js (Worker Thread) --- * import pool from 'dxmodules/dxWorkerPool.js'; * import log from 'dxmodules/dxLogger.js'; * * // Register the callback function to handle incoming tasks. * pool.callback(function(task) { * log.info(`Worker ${pool.getWorkerId()} is processing task on topic '${task.topic}':`, task.data); * // ... perform heavy computation ... * }); * * Doc/Demo: https://github.com/DejaOS/DejaOS */ import std from './dxStd.js' import logger from './dxLogger.js' import * as os from "os"; //-------------------------variable-------------------- const pool = {} const isMain = (os.Worker.parent === undefined) let queueSize = 100 const queue = [] const all = {} pool.os = os /** * Initializes the worker pool. This must be called once from the main thread. * @param {string} file The absolute path to the worker script file (e.g., '/app/code/src/worker.js'). * @param {Object} bus The dxEventBus instance for task distribution. * @param {string[]} topics An array of event bus topics to subscribe to. Tasks from these topics will be processed by the pool. * @param {number} [count=2] The number of worker threads in the pool. Must be a positive number. * @param {number} [maxsize=100] The maximum size of the task queue. If the queue is full, the oldest task is discarded. * @throws {Error} If called from a worker thread, initialized more than once, or if parameters are invalid. */ pool.init = function (file, bus, topics, count = 2, maxsize = 100) { if (Object.keys(all).length > 0) { throw new Error("Worker pool has already been initialized.") } if (!file) { throw new Error("pool init:'file' should not be empty") } if (!bus) { throw new Error("pool init:'bus' should not be empty") } if (!topics || topics.length === 0) { throw new Error("pool init:'topics' should not be empty") } if (!isMain) { throw new Error("pool init should be invoked in main thread") } if (!std.exist(file)) { throw new Error("pool init: file not found:" + file) } queueSize = maxsize if (count < 1) { throw new Error("pool init: 'count' must be a positive number") } for (let i = 0; i < count; i++) { const id = 'pool__id' + i let content = std.loadFile(file) + ` import __pool from '/app/code/dxmodules/dxWorkerPool.js' __pool.id = '${id}' const __parent = __pool.os.Worker.parent __parent.onmessage = function (e) { if (!e.data) { return } let fun = __pool.callbackFunc if (fun) { try { fun(e.data) __parent.postMessage({ id: __pool.id })// Notify main thread that task is done and worker is idle. } catch (err) { __parent.postMessage({ id: __pool.id, error: err.stack })// Notify main thread about the failure. } } } ` let newfile = file + '_' + id + '.js' std.saveFile(newfile, content) let worker = new os.Worker(newfile) all[id] = { isIdle: true, worker: worker } worker.onmessage = function (data) { if (!data.data) { return } const id = data.data.id if (id) {// A worker has finished its task. all[id].isIdle = true if (data.data.error) { logger.error(`worker ${id} callback error:${data.data.error}`) } // A worker has become free, try to dispatch any pending tasks. dispatchTasks() } else { const topic = data.data.topic if (topic) {// A message to be fired on the event bus. bus.fire(topic, data.data.data) } } } } for (let topic of topics) { bus.on(topic, function (d) { push({ topic: topic, data: d }) }) } } function dispatchTasks() { Object.keys(all).forEach(key => { const workerInfo = all[key]; // Find an idle worker and a pending task. if (workerInfo.isIdle) { const task = take(); if (task) { workerInfo.isIdle = false; workerInfo.worker.postMessage(task); } } }); } /** * Returns the unique ID of the current worker thread, or 'main' if called from the main thread. * This is useful for logging and debugging within worker scripts. * @returns {string} The worker's unique ID (e.g., 'pool__id0') or 'main'. */ pool.getWorkerId = function () { if (isMain) { return 'main' } else { return pool.id } } /** * @deprecated This function is obsolete. Topic subscription is now handled directly within `pool.init()`. * Subscribes the pool to additional event bus topics. Must be called from the main thread. * @param {Object} bus The dxEventBus instance. * @param {string[]} topics An array of topics to subscribe to. */ pool.on = function (bus, topics) { if (!bus) { throw new Error("pool onEventBus:'bus' should not be empty") } if (!topics || topics.length === 0) { throw new Error("pool onEventBus:'topics' should not be empty") } if (!isMain) { throw new Error("pool onEventBus should be invoked in main thread") } } pool.callbackFunc = null /** * Registers the task handler function for a worker thread. This must be called from within the worker script. * The provided callback will be invoked with a single argument: the task object. * @param {function(Object): void} cb The callback function to execute for each task. The task object contains `{ topic: string, data: * }`. * @throws {Error} If not a function, or if called from the main thread. */ pool.callback = function (cb) { if (!cb || (typeof cb) != 'function') { throw new Error("pool on :The 'callback' should be a function"); } if (isMain) { throw new Error("pool on should not be invoked in main thread") } pool.callbackFunc = cb } function push(item) { if (queue.length >= queueSize) { const first = JSON.stringify(queue[0]) logger.error(`pool queue is full, removing oldest element: ${first}`) queue.shift(); // Remove the oldest element. } queue.push(item); // A new task has arrived, try to dispatch it immediately. dispatchTasks() } function take() { if (queue.length === 0) { return null; // Return null if the queue is empty. } return queue.shift(); // Remove and return the earliest added element. } export default pool