//build:20240717
|
//线程池,里面加载多个worker,线程池接收任务或事务后然后派发给线程池里面空闲的worker来执行任务,用于解决多事务处理的瓶颈
|
//设备资源有限,线程数量不宜太多,另外也不考虑多个线程池的情况,全局只一个
|
//组件依赖 dxLogger,dxCommon,dxStd
|
import std from './dxStd.js'
|
import logger from './dxLogger.js'
|
import * as os from "os";
|
//-------------------------variable--------------------
|
const pool = {}
|
const isMain = (os.Worker.parent === undefined)
|
let queueSize = 100
|
const queue = []
|
const all = {}
|
pool.os = os
|
/**
|
* 初始化线程池,设置worker个数和缓存队列大小,有可能多个worker都没有空闲,缓存队列可以缓存来不及处理的事务
|
* 因为worker只能通过主线程创建,所以init函数也只能在主线程里执行
|
* 注意: worker对应的文件里不能包含while(true)这种死循环,可以用setInteval来实现循环
|
* @param {string} file worker对应的文件名,必填,绝对路径,通常以'/app/code/src'开始
|
* @param {Object} bus EventBus对象 必填
|
* @param {Array} topics 要订阅的主题组 必填
|
* @param {number} count 线程的个数,非必填,不能小于1,缺省2,
|
* @param {number} maxsize 事务缓存的大小,非必填,缺省100,如果超过100,最老的事务被抛弃
|
*/
|
pool.init = function (file, bus, topics, count = 2, maxsize = 100) {
|
if (!file) {
|
throw new Error("pool init:'file' should not be empty")
|
}
|
if (!bus) {
|
throw new Error("pool init:'bus' should not be empty")
|
}
|
if (!topics) {
|
throw new Error("pool init:'topics' should not be empty")
|
}
|
if (!isMain) {
|
throw new Error("pool init should be invoked in main thread")
|
}
|
if (!std.exist(file)) {
|
throw new Error("pool init: file not found:" + file)
|
}
|
queueSize = maxsize
|
if (count <= 1) {
|
count = 1
|
}
|
for (let i = 0; i < count; i++) {
|
const id = 'pool__id' + i
|
let content = std.loadFile(file) + `
|
import __pool from '/app/code/dxmodules/dxWorkerPool.js'
|
__pool.id = '${id}'
|
const __parent = __pool.os.Worker.parent
|
__parent.onmessage = function (e) {
|
if (!e.data) {
|
return
|
}
|
let fun = __pool.callbackFunc
|
if (fun) {
|
try {
|
fun(e.data)
|
__parent.postMessage({ id: __pool.id })//通知处理完了idle
|
} catch (err) {
|
__parent.postMessage({ id: __pool.id, error: err.stack })//通知处理完了idle,但是失败了
|
}
|
}
|
}
|
`
|
let newfile = file + '_' + id + '.js'
|
std.saveFile(newfile, content)
|
let worker = new os.Worker(newfile)
|
all[id] = { isIdle: true, worker: worker }
|
worker.onmessage = function (data) {
|
if (!data.data) {
|
return
|
}
|
const id = data.data.id
|
if (id) {//通知处理完成的消息
|
all[id].isIdle = true
|
if (data.data.error) {
|
logger.error(`worker ${id} callback error:${data.data.error}`)
|
}
|
} else {
|
const topic = data.data.topic
|
if (topic) {//bus.fire出来的消息
|
bus.fire(topic, data.data.data)
|
}
|
}
|
}
|
}
|
for (let topic of topics) {
|
bus.on(topic, function (d) {
|
push({ topic: topic, data: d })
|
})
|
}
|
|
std.setInterval(function () {
|
Object.keys(all).forEach(key => {
|
const obj = all[key]
|
if (obj.isIdle) {
|
let event = take()
|
if (event) {
|
obj.isIdle = false
|
obj.worker.postMessage(event)
|
}
|
}
|
});
|
}, 5)
|
}
|
/**
|
* 返回线程的唯一标识id
|
* @returns worker唯一标识
|
*/
|
pool.getWorkerId = function () {
|
if (isMain) {
|
return 'main'
|
} else {
|
return pool.id
|
}
|
}
|
/**
|
* 订阅EventBus 上的事务主题,可以订阅多个主题,这个函数也只能在主线程里执行
|
* @param {Object} bus EventBus对象
|
* @param {Array} topics 要订阅的主题组
|
*/
|
pool.on = function (bus, topics) {
|
if (!bus) {
|
throw new Error("pool onEventBus:'bus' should not be empty")
|
}
|
if (!topics) {
|
throw new Error("pool onEventBus:'topics' should not be empty")
|
}
|
if (!isMain) {
|
throw new Error("pool onEventBus should be invoked in main thread")
|
}
|
|
}
|
|
pool.callbackFunc = null
|
/**
|
* worker线程订阅线程池的事件,不用选择特定的主题,线程池关注的所有事件都会处理,
|
* 这个函数必须在worker线程里执行,不能在主线程执行
|
* @param {function} cb 事件处理的回调函数,必填
|
*/
|
pool.callback = function (cb) {
|
if (!cb || (typeof cb) != 'function') {
|
throw new Error("pool on :The 'callback' should be a function");
|
}
|
if (isMain) {
|
throw new Error("pool on should not be invoked in main thread")
|
}
|
pool.callbackFunc = cb
|
}
|
|
function push(item) {
|
if (queue.length >= queueSize) {
|
const first = JSON.stringify(queue[0])
|
logger.error(`pool queue is full,removing oldest element: ${first}`)
|
queue.shift(); // 移除最老的元素
|
}
|
queue.push(item);
|
}
|
|
function take() {
|
if (queue.length === 0) {
|
return null; // 队列为空时返回 null
|
}
|
return queue.shift(); // 移除并返回最早添加的元素
|
}
|
export default pool
|