1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
//build:20240717
//线程池,里面加载多个worker,线程池接收任务或事务后然后派发给线程池里面空闲的worker来执行任务,用于解决多事务处理的瓶颈
//设备资源有限,线程数量不宜太多,另外也不考虑多个线程池的情况,全局只一个
//组件依赖 dxLogger,dxCommon,dxStd
import std from './dxStd.js'
import logger from './dxLogger.js'
import * as os from "os";
//-------------------------variable--------------------
const pool = {}
const isMain = (os.Worker.parent === undefined)
let queueSize = 100
const queue = []
const all = {}
pool.os = os
/**
 * 初始化线程池,设置worker个数和缓存队列大小,有可能多个worker都没有空闲,缓存队列可以缓存来不及处理的事务
 * 因为worker只能通过主线程创建,所以init函数也只能在主线程里执行
 * 注意: worker对应的文件里不能包含while(true)这种死循环,可以用setInteval来实现循环
 * @param {string} file worker对应的文件名,必填,绝对路径,通常以'/app/code/src'开始
 * @param {Object} bus EventBus对象 必填
 * @param {Array} topics 要订阅的主题组 必填
 * @param {number} count 线程的个数,非必填,不能小于1,缺省2,
 * @param {number} maxsize 事务缓存的大小,非必填,缺省100,如果超过100,最老的事务被抛弃
 */
pool.init = function (file, bus, topics, count = 2, maxsize = 100) {
    if (!file) {
        throw new Error("pool init:'file' should not be empty")
    }
    if (!bus) {
        throw new Error("pool init:'bus' should not be empty")
    }
    if (!topics) {
        throw new Error("pool init:'topics' should not be empty")
    }
    if (!isMain) {
        throw new Error("pool init should be invoked in main thread")
    }
    if (!std.exist(file)) {
        throw new Error("pool init: file not found:" + file)
    }
    queueSize = maxsize
    if (count <= 1) {
        count = 1
    }
    for (let i = 0; i < count; i++) {
        const id = 'pool__id' + i
        let content = std.loadFile(file) + `
import __pool from '/app/code/dxmodules/dxWorkerPool.js'
__pool.id = '${id}'
const __parent = __pool.os.Worker.parent
__parent.onmessage = function (e) {
    if (!e.data) {
        return
    }
    let fun = __pool.callbackFunc
    if (fun) {
        try {
            fun(e.data)
            __parent.postMessage({ id: __pool.id })//通知处理完了idle
        } catch (err) {
            __parent.postMessage({ id: __pool.id, error: err.stack })//通知处理完了idle,但是失败了    
        }
    }
}
            `
        let newfile = file + '_' + id + '.js'
        std.saveFile(newfile, content)
        let worker = new os.Worker(newfile)
        all[id] = { isIdle: true, worker: worker }
        worker.onmessage = function (data) {
            if (!data.data) {
                return
            }
            const id = data.data.id
            if (id) {//通知处理完成的消息
                all[id].isIdle = true
                if (data.data.error) {
                    logger.error(`worker ${id} callback error:${data.data.error}`)
                }
            } else {
                const topic = data.data.topic
                if (topic) {//bus.fire出来的消息
                    bus.fire(topic, data.data.data)
                }
            }
        }
    }
    for (let topic of topics) {
        bus.on(topic, function (d) {
            push({ topic: topic, data: d })
        })
    }
 
    std.setInterval(function () {
        Object.keys(all).forEach(key => {
            const obj = all[key]
            if (obj.isIdle) {
                let event = take()
                if (event) {
                    obj.isIdle = false
                    obj.worker.postMessage(event)
                }
            }
        });
    }, 5)
}
/**
 * 返回线程的唯一标识id
 * @returns worker唯一标识
 */
pool.getWorkerId = function () {
    if (isMain) {
        return 'main'
    } else {
        return pool.id
    }
}
/**
 * 订阅EventBus 上的事务主题,可以订阅多个主题,这个函数也只能在主线程里执行
 * @param {Object} bus EventBus对象
 * @param {Array} topics 要订阅的主题组
 */
pool.on = function (bus, topics) {
    if (!bus) {
        throw new Error("pool onEventBus:'bus' should not be empty")
    }
    if (!topics) {
        throw new Error("pool onEventBus:'topics' should not be empty")
    }
    if (!isMain) {
        throw new Error("pool onEventBus should be invoked in main thread")
    }
 
}
 
pool.callbackFunc = null
/**
 * worker线程订阅线程池的事件,不用选择特定的主题,线程池关注的所有事件都会处理,
 * 这个函数必须在worker线程里执行,不能在主线程执行
 * @param {function} cb 事件处理的回调函数,必填
 */
pool.callback = function (cb) {
    if (!cb || (typeof cb) != 'function') {
        throw new Error("pool on :The 'callback' should be a function");
    }
    if (isMain) {
        throw new Error("pool on should not be invoked in main thread")
    }
    pool.callbackFunc = cb
}
 
function push(item) {
    if (queue.length >= queueSize) {
        const first = JSON.stringify(queue[0])
        logger.error(`pool queue is full,removing oldest element: ${first}`)
        queue.shift(); // 移除最老的元素
    }
    queue.push(item);
}
 
function take() {
    if (queue.length === 0) {
        return null; // 队列为空时返回 null
    }
    return queue.shift(); // 移除并返回最早添加的元素
}
export default pool