-
Notifications
You must be signed in to change notification settings - Fork 0
/
WorkerQueue.js
116 lines (94 loc) · 2.48 KB
/
WorkerQueue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
export class WorkerQueue {
#concurrency = 1
#id = crypto.randomUUID()
#workerPool = []
#resolveQueueEmpty = null
#queueEmptyPromise = null
#messagesQueue = []
#concurrentMessageCount = 0
#uri
constructor( concurrency, uri ) {
this.#concurrency = concurrency || this.#concurrency
this.#uri = uri
for ( let i = 0; i < this.#concurrency; i++ ) {
this.#workerPool.push( createWorker( this.#uri, this.#finishTask ))
}
}
#finishTask( worker ) {
worker.currentTask = null
this.#concurrentMessageCount--
this.#next()
}
#next() {
if (
this.#concurrentMessageCount >= this.concurrency ||
!this.messagesQueue.length
) {
if (
this.#concurrentMessageCount === 0
&& this.#resolveQueueEmpty
) {
this.#resolveQueueEmpty( false )
}
return
}
const availableWorker = this.#workerPool.find( w => !w.currentTask )
if ( !availableWorker ) return
this.#concurrentMessageCount++
const task = this.messagesQueue.shift()
availableWorker.currentTask = task
availableWorker.postMessage( task.message )
}
get concurrency() {
return this.#concurrency
}
get concurrentMessageCount() {
return this.#concurrentMessageCount
}
get finished() {
if ( !this.messagesQueue.length && this.#concurrentMessageCount === 0 ) {
return Promise.resolve( true )
}
return (
this.#queueEmptyPromise ||
( this.#queueEmptyPromise = new Promise(( resolve ) => {
this.#resolveQueueEmpty = resolve
}))
)
}
get messagesQueue() {
return this.#messagesQueue
}
add( message ) {
this.#queueEmptyPromise = null
this.#resolveQueueEmpty = null
return new Promise(( resolve, reject ) => {
this.messagesQueue.push({ message, resolve, reject })
this.#next()
})
}
terminate() { this.#workerPool.forEach( worker => worker.terminate())}
}
/**
* Create a worker.
* @param {string} uri The URI of the worker.
* @param {Function} callback The callback to call when the worker finishes a
* task.
* @returns {Worker} The worker.
*/
function createWorker( uri, callback ) {
const worker = new Worker( uri )
worker.onmessage = ( event ) => {
if ( worker.currentTask ) {
worker.currentTask.resolve( event.data )
callback( worker )
}
}
worker.onerror = ( error ) => {
if ( worker.currentTask ) {
worker.currentTask.reject( error )
callback( worker )
}
}
return worker
}