/**
|
* Worker Pool Module (dxWorkerPool)
|
*
|
* Features:
|
* - Manages a pool of worker threads to handle concurrent tasks, preventing performance bottlenecks.
|
* - Automatically distributes tasks from a shared queue to available workers.
|
* - Buffers incoming tasks in a queue when all workers are busy.
|
* - Designed as a global singleton; should be initialized only once.
|
*
|
* Usage (Main Thread):
|
* - Initialize the pool once using `pool.init()`, providing the worker script, event bus, and topics.
|
*
|
* Usage (Worker Thread):
|
* - In your worker script, import `dxWorkerPool` and register your task handling function using `pool.callback()`.
|
*
|
* @example
|
* // --- In main.js (Main Thread) ---
|
* import pool from 'dxmodules/dxWorkerPool.js';
|
* import eventBus from 'dxmodules/dxEventBus.js';
|
*
|
* // Initialize the pool with 2 workers for the 'heavy_tasks' topic.
|
* pool.init('/app/code/src/my_worker.js', eventBus, ['heavy_tasks'], 2);
|
*
|
* // Later, fire an event to be processed by a worker.
|
* eventBus.fire('heavy_tasks', { data: 'some payload' });
|
*
|
* // --- In my_worker.js (Worker Thread) ---
|
* import pool from 'dxmodules/dxWorkerPool.js';
|
* import log from 'dxmodules/dxLogger.js';
|
*
|
* // Register the callback function to handle incoming tasks.
|
* pool.callback(function(task) {
|
* log.info(`Worker ${pool.getWorkerId()} is processing task on topic '${task.topic}':`, task.data);
|
* // ... perform heavy computation ...
|
* });
|
*
|
* Doc/Demo: https://github.com/DejaOS/DejaOS
|
*/
|
import std from './dxStd.js'
|
import logger from './dxLogger.js'
|
import * as os from "os";
|
//-------------------------variable--------------------
|
const pool = {}
|
const isMain = (os.Worker.parent === undefined)
|
let queueSize = 100
|
const queue = []
|
const all = {}
|
pool.os = os
|
/**
|
* Initializes the worker pool. This must be called once from the main thread.
|
* @param {string} file The absolute path to the worker script file (e.g., '/app/code/src/worker.js').
|
* @param {Object} bus The dxEventBus instance for task distribution.
|
* @param {string[]} topics An array of event bus topics to subscribe to. Tasks from these topics will be processed by the pool.
|
* @param {number} [count=2] The number of worker threads in the pool. Must be a positive number.
|
* @param {number} [maxsize=100] The maximum size of the task queue. If the queue is full, the oldest task is discarded.
|
* @throws {Error} If called from a worker thread, initialized more than once, or if parameters are invalid.
|
*/
|
pool.init = function (file, bus, topics, count = 2, maxsize = 100) {
|
if (Object.keys(all).length > 0) {
|
throw new Error("Worker pool has already been initialized.")
|
}
|
if (!file) {
|
throw new Error("pool init:'file' should not be empty")
|
}
|
if (!bus) {
|
throw new Error("pool init:'bus' should not be empty")
|
}
|
if (!topics || topics.length === 0) {
|
throw new Error("pool init:'topics' should not be empty")
|
}
|
if (!isMain) {
|
throw new Error("pool init should be invoked in main thread")
|
}
|
if (!std.exist(file)) {
|
throw new Error("pool init: file not found:" + file)
|
}
|
queueSize = maxsize
|
if (count < 1) {
|
throw new Error("pool init: 'count' must be a positive number")
|
}
|
for (let i = 0; i < count; i++) {
|
const id = 'pool__id' + i
|
let content = std.loadFile(file) + `
|
import __pool from '/app/code/dxmodules/dxWorkerPool.js'
|
__pool.id = '${id}'
|
const __parent = __pool.os.Worker.parent
|
__parent.onmessage = function (e) {
|
if (!e.data) {
|
return
|
}
|
let fun = __pool.callbackFunc
|
if (fun) {
|
try {
|
fun(e.data)
|
__parent.postMessage({ id: __pool.id })// Notify main thread that task is done and worker is idle.
|
} catch (err) {
|
__parent.postMessage({ id: __pool.id, error: err.stack })// Notify main thread about the failure.
|
}
|
}
|
}
|
`
|
let newfile = file + '_' + id + '.js'
|
std.saveFile(newfile, content)
|
let worker = new os.Worker(newfile)
|
all[id] = { isIdle: true, worker: worker }
|
worker.onmessage = function (data) {
|
if (!data.data) {
|
return
|
}
|
const id = data.data.id
|
if (id) {// A worker has finished its task.
|
all[id].isIdle = true
|
if (data.data.error) {
|
logger.error(`worker ${id} callback error:${data.data.error}`)
|
}
|
// A worker has become free, try to dispatch any pending tasks.
|
dispatchTasks()
|
} else {
|
const topic = data.data.topic
|
if (topic) {// A message to be fired on the event bus.
|
bus.fire(topic, data.data.data)
|
}
|
}
|
}
|
}
|
for (let topic of topics) {
|
bus.on(topic, function (d) {
|
push({ topic: topic, data: d })
|
})
|
}
|
}
|
|
function dispatchTasks() {
|
Object.keys(all).forEach(key => {
|
const workerInfo = all[key];
|
// Find an idle worker and a pending task.
|
if (workerInfo.isIdle) {
|
const task = take();
|
if (task) {
|
workerInfo.isIdle = false;
|
workerInfo.worker.postMessage(task);
|
}
|
}
|
});
|
}
|
/**
|
* Returns the unique ID of the current worker thread, or 'main' if called from the main thread.
|
* This is useful for logging and debugging within worker scripts.
|
* @returns {string} The worker's unique ID (e.g., 'pool__id0') or 'main'.
|
*/
|
pool.getWorkerId = function () {
|
if (isMain) {
|
return 'main'
|
} else {
|
return pool.id
|
}
|
}
|
/**
|
* @deprecated This function is obsolete. Topic subscription is now handled directly within `pool.init()`.
|
* Subscribes the pool to additional event bus topics. Must be called from the main thread.
|
* @param {Object} bus The dxEventBus instance.
|
* @param {string[]} topics An array of topics to subscribe to.
|
*/
|
pool.on = function (bus, topics) {
|
if (!bus) {
|
throw new Error("pool onEventBus:'bus' should not be empty")
|
}
|
if (!topics || topics.length === 0) {
|
throw new Error("pool onEventBus:'topics' should not be empty")
|
}
|
if (!isMain) {
|
throw new Error("pool onEventBus should be invoked in main thread")
|
}
|
|
}
|
|
pool.callbackFunc = null
|
/**
|
* Registers the task handler function for a worker thread. This must be called from within the worker script.
|
* The provided callback will be invoked with a single argument: the task object.
|
* @param {function(Object): void} cb The callback function to execute for each task. The task object contains `{ topic: string, data: * }`.
|
* @throws {Error} If not a function, or if called from the main thread.
|
*/
|
pool.callback = function (cb) {
|
if (!cb || (typeof cb) != 'function') {
|
throw new Error("pool on :The 'callback' should be a function");
|
}
|
if (isMain) {
|
throw new Error("pool on should not be invoked in main thread")
|
}
|
pool.callbackFunc = cb
|
}
|
|
function push(item) {
|
if (queue.length >= queueSize) {
|
const first = JSON.stringify(queue[0])
|
logger.error(`pool queue is full, removing oldest element: ${first}`)
|
queue.shift(); // Remove the oldest element.
|
}
|
queue.push(item);
|
// A new task has arrived, try to dispatch it immediately.
|
dispatchTasks()
|
}
|
|
function take() {
|
if (queue.length === 0) {
|
return null; // Return null if the queue is empty.
|
}
|
return queue.shift(); // Remove and return the earliest added element.
|
}
|
export default pool
|