lgq
3 天以前 081f12a52906abe6c2d139fdc144135978681009
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
/**
 * Worker Pool Module (dxWorkerPool)
 *
 * Features:
 * - Manages a pool of worker threads to handle concurrent tasks, preventing performance bottlenecks.
 * - Automatically distributes tasks from a shared queue to available workers.
 * - Buffers incoming tasks in a queue when all workers are busy.
 * - Designed as a global singleton; should be initialized only once.
 *
 * Usage (Main Thread):
 * - Initialize the pool once using `pool.init()`, providing the worker script, event bus, and topics.
 *
 * Usage (Worker Thread):
 * - In your worker script, import `dxWorkerPool` and register your task handling function using `pool.callback()`.
 *
 * @example
 * // --- In main.js (Main Thread) ---
 * import pool from 'dxmodules/dxWorkerPool.js';
 * import eventBus from 'dxmodules/dxEventBus.js';
 *
 * // Initialize the pool with 2 workers for the 'heavy_tasks' topic.
 * pool.init('/app/code/src/my_worker.js', eventBus, ['heavy_tasks'], 2);
 *
 * // Later, fire an event to be processed by a worker.
 * eventBus.fire('heavy_tasks', { data: 'some payload' });
 *
 * // --- In my_worker.js (Worker Thread) ---
 * import pool from 'dxmodules/dxWorkerPool.js';
 * import log from 'dxmodules/dxLogger.js';
 *
 * // Register the callback function to handle incoming tasks.
 * pool.callback(function(task) {
 *     log.info(`Worker ${pool.getWorkerId()} is processing task on topic '${task.topic}':`, task.data);
 *     // ... perform heavy computation ...
 * });
 *
 * Doc/Demo: https://github.com/DejaOS/DejaOS
 */
import std from './dxStd.js'
import logger from './dxLogger.js'
import * as os from "os";
//-------------------------variable--------------------
const pool = {}
const isMain = (os.Worker.parent === undefined)
let queueSize = 100
const queue = []
const all = {}
pool.os = os
/**
 * Initializes the worker pool. This must be called once from the main thread.
 * @param {string} file The absolute path to the worker script file (e.g., '/app/code/src/worker.js').
 * @param {Object} bus The dxEventBus instance for task distribution.
 * @param {string[]} topics An array of event bus topics to subscribe to. Tasks from these topics will be processed by the pool.
 * @param {number} [count=2] The number of worker threads in the pool. Must be a positive number.
 * @param {number} [maxsize=100] The maximum size of the task queue. If the queue is full, the oldest task is discarded.
 * @throws {Error} If called from a worker thread, initialized more than once, or if parameters are invalid.
 */
pool.init = function (file, bus, topics, count = 2, maxsize = 100) {
    if (Object.keys(all).length > 0) {
        throw new Error("Worker pool has already been initialized.")
    }
    if (!file) {
        throw new Error("pool init:'file' should not be empty")
    }
    if (!bus) {
        throw new Error("pool init:'bus' should not be empty")
    }
    if (!topics || topics.length === 0) {
        throw new Error("pool init:'topics' should not be empty")
    }
    if (!isMain) {
        throw new Error("pool init should be invoked in main thread")
    }
    if (!std.exist(file)) {
        throw new Error("pool init: file not found:" + file)
    }
    queueSize = maxsize
    if (count < 1) {
        throw new Error("pool init: 'count' must be a positive number")
    }
    for (let i = 0; i < count; i++) {
        const id = 'pool__id' + i
        let content = std.loadFile(file) + `
import __pool from '/app/code/dxmodules/dxWorkerPool.js'
__pool.id = '${id}'
const __parent = __pool.os.Worker.parent
__parent.onmessage = function (e) {
    if (!e.data) {
        return
    }
    let fun = __pool.callbackFunc
    if (fun) {
        try {
            fun(e.data)
            __parent.postMessage({ id: __pool.id })// Notify main thread that task is done and worker is idle.
        } catch (err) {
            __parent.postMessage({ id: __pool.id, error: err.stack })// Notify main thread about the failure.
        }
    }
}
            `
        let newfile = file + '_' + id + '.js'
        std.saveFile(newfile, content)
        let worker = new os.Worker(newfile)
        all[id] = { isIdle: true, worker: worker }
        worker.onmessage = function (data) {
            if (!data.data) {
                return
            }
            const id = data.data.id
            if (id) {// A worker has finished its task.
                all[id].isIdle = true
                if (data.data.error) {
                    logger.error(`worker ${id} callback error:${data.data.error}`)
                }
                // A worker has become free, try to dispatch any pending tasks.
                dispatchTasks()
            } else {
                const topic = data.data.topic
                if (topic) {// A message to be fired on the event bus.
                    bus.fire(topic, data.data.data)
                }
            }
        }
    }
    for (let topic of topics) {
        bus.on(topic, function (d) {
            push({ topic: topic, data: d })
        })
    }
}
 
function dispatchTasks() {
    Object.keys(all).forEach(key => {
        const workerInfo = all[key];
        // Find an idle worker and a pending task.
        if (workerInfo.isIdle) {
            const task = take();
            if (task) {
                workerInfo.isIdle = false;
                workerInfo.worker.postMessage(task);
            }
        }
    });
}
/**
 * Returns the unique ID of the current worker thread, or 'main' if called from the main thread.
 * This is useful for logging and debugging within worker scripts.
 * @returns {string} The worker's unique ID (e.g., 'pool__id0') or 'main'.
 */
pool.getWorkerId = function () {
    if (isMain) {
        return 'main'
    } else {
        return pool.id
    }
}
/**
 * @deprecated This function is obsolete. Topic subscription is now handled directly within `pool.init()`.
 * Subscribes the pool to additional event bus topics. Must be called from the main thread.
 * @param {Object} bus The dxEventBus instance.
 * @param {string[]} topics An array of topics to subscribe to.
 */
pool.on = function (bus, topics) {
    if (!bus) {
        throw new Error("pool onEventBus:'bus' should not be empty")
    }
    if (!topics || topics.length === 0) {
        throw new Error("pool onEventBus:'topics' should not be empty")
    }
    if (!isMain) {
        throw new Error("pool onEventBus should be invoked in main thread")
    }
 
}
 
pool.callbackFunc = null
/**
 * Registers the task handler function for a worker thread. This must be called from within the worker script.
 * The provided callback will be invoked with a single argument: the task object.
 * @param {function(Object): void} cb The callback function to execute for each task. The task object contains `{ topic: string, data: * }`.
 * @throws {Error} If not a function, or if called from the main thread.
 */
pool.callback = function (cb) {
    if (!cb || (typeof cb) != 'function') {
        throw new Error("pool on :The 'callback' should be a function");
    }
    if (isMain) {
        throw new Error("pool on should not be invoked in main thread")
    }
    pool.callbackFunc = cb
}
 
function push(item) {
    if (queue.length >= queueSize) {
        const first = JSON.stringify(queue[0])
        logger.error(`pool queue is full, removing oldest element: ${first}`)
        queue.shift(); // Remove the oldest element.
    }
    queue.push(item);
    // A new task has arrived, try to dispatch it immediately.
    dispatchTasks()
}
 
function take() {
    if (queue.length === 0) {
        return null; // Return null if the queue is empty.
    }
    return queue.shift(); // Remove and return the earliest added element.
}
export default pool