Skip to content

Commit

Permalink
perf: track pool busy and back pressure lifecycle via events
Browse files Browse the repository at this point in the history
Signed-off-by: Jérôme Benoit <[email protected]>
  • Loading branch information
jerome-benoit committed Aug 27, 2024
1 parent 00e2a3f commit a58ab3b
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 40 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ for more details**:
<!-- deno-fmt-ignore -->

```js
<script type="module">import { ThreadWorker } from 'https://cdn.jsdelivr.net/npm/[email protected]/browser/mod.js'</script>
<script type="module">import {ThreadWorker} from 'https://cdn.jsdelivr.net/npm/[email protected]/browser/mod.js'</script>
```

```js
Expand Down
95 changes: 75 additions & 20 deletions src/pools/abstract-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ export abstract class AbstractPool<
* Whether the pool ready event has been emitted or not.
*/
private readyEventEmitted: boolean
/**
* Whether the pool back pressure event has been emitted or not.
*/
private backPressureEventEmitted: boolean
/**
* Whether the pool busy event has been emitted or not.
*/
private busyEventEmitted: boolean
/**
* The start timestamp of the pool.
*/
Expand Down Expand Up @@ -189,6 +197,8 @@ export abstract class AbstractPool<
this.destroying = false
this.startingMinimumNumberOfWorkers = false
this.readyEventEmitted = false
this.busyEventEmitted = false
this.backPressureEventEmitted = false
if (this.opts.startWorkers === true) {
this.start()
}
Expand Down Expand Up @@ -326,8 +336,10 @@ export abstract class AbstractPool<
}),
...(this.opts.enableTasksQueue === true && {
backPressureWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
workerNode.info.backPressure ? accumulator + 1 : accumulator,
(accumulator, _, workerNodeKey) =>
this.isWorkerNodeBackPressured(workerNodeKey)
? accumulator + 1
: accumulator,
0,
),
}),
Expand Down Expand Up @@ -775,6 +787,11 @@ export abstract class AbstractPool<
return workerNode.info.ready && workerNode.usage.tasks.executing === 0
}

private isWorkerNodeBackPressured(workerNodeKey: number): boolean {
const workerNode = this.workerNodes[workerNodeKey]
return workerNode.info.ready && workerNode.info.backPressure
}

private isWorkerNodeBusy(workerNodeKey: number): boolean {
const workerNode = this.workerNodes[workerNodeKey]
if (this.opts.enableTasksQueue === true) {
Expand Down Expand Up @@ -1221,10 +1238,14 @@ export abstract class AbstractPool<
await this.destroyWorkerNode(workerNodeKey)
}),
)
this.eventTarget?.dispatchEvent(
new CustomEvent<PoolInfo>(PoolEvents.destroy, { detail: this.info }),
)
this.readyEventEmitted = false
if (this.eventTarget != null) {
this.eventTarget.dispatchEvent(
new CustomEvent<PoolInfo>(PoolEvents.destroy, { detail: this.info }),
)
this.readyEventEmitted = false
this.busyEventEmitted = false
this.backPressureEventEmitted = false
}
delete this.startTimestamp
this.destroying = false
this.started = false
Expand Down Expand Up @@ -2001,8 +2022,8 @@ export abstract class AbstractPool<
}

private checkAndEmitReadyEvent(): void {
if (!this.readyEventEmitted && this.ready) {
this.eventTarget?.dispatchEvent(
if (this.eventTarget != null && !this.readyEventEmitted && this.ready) {
this.eventTarget.dispatchEvent(
new CustomEvent<PoolInfo>(PoolEvents.ready, { detail: this.info }),
)
this.readyEventEmitted = true
Expand Down Expand Up @@ -2038,6 +2059,7 @@ export abstract class AbstractPool<
resolve(data!)
}
this.afterTaskExecutionHook(workerNodeKey, message)
this.checkAndEmitTaskExecutionFinishedEvents()
this.promiseResponseMap.delete(taskId!)
if (this.opts.enableTasksQueue === true && !this.destroying) {
if (
Expand All @@ -2063,20 +2085,50 @@ export abstract class AbstractPool<
}

private checkAndEmitTaskExecutionEvents(): void {
if (this.busy) {
this.eventTarget?.dispatchEvent(
if (this.eventTarget != null && !this.busyEventEmitted && this.busy) {
this.eventTarget.dispatchEvent(
new CustomEvent<PoolInfo>(PoolEvents.busy, { detail: this.info }),
)
this.busyEventEmitted = true
}
}

private checkAndEmitTaskExecutionFinishedEvents(): void {
if (this.eventTarget != null && this.busyEventEmitted && !this.busy) {
this.eventTarget.dispatchEvent(
new CustomEvent<PoolInfo>(PoolEvents.busyEnd, { detail: this.info }),
)
this.busyEventEmitted = false
}
}

private checkAndEmitTaskQueuingEvents(): void {
if (this.backPressure) {
this.eventTarget?.dispatchEvent(
if (
this.eventTarget != null &&
!this.backPressureEventEmitted &&
this.backPressure
) {
this.eventTarget.dispatchEvent(
new CustomEvent<PoolInfo>(PoolEvents.backPressure, {
detail: this.info,
}),
)
this.backPressureEventEmitted = true
}
}

private checkAndEmitTaskDequeuingEvents(): void {
if (
this.eventTarget != null &&
this.backPressureEventEmitted &&
!this.backPressure
) {
this.eventTarget.dispatchEvent(
new CustomEvent<PoolInfo>(PoolEvents.backPressureEnd, {
detail: this.info,
}),
)
this.backPressureEventEmitted = false
}
}

Expand Down Expand Up @@ -2140,8 +2192,8 @@ export abstract class AbstractPool<
}

private checkAndEmitEmptyEvent(): void {
if (this.empty) {
this.eventTarget?.dispatchEvent(
if (this.eventTarget != null && this.empty) {
this.eventTarget.dispatchEvent(
new CustomEvent<PoolInfo>(PoolEvents.empty, { detail: this.info }),
)
}
Expand Down Expand Up @@ -2175,12 +2227,13 @@ export abstract class AbstractPool<
*/
protected internalBackPressure(): boolean {
return (
this.opts.enableTasksQueue === true &&
this.workerNodes.reduce(
(accumulator, workerNode) =>
workerNode.info.backPressure ? accumulator + 1 : accumulator,
0,
) === this.workerNodes.length
(accumulator, _, workerNodeKey) =>
this.isWorkerNodeBackPressured(workerNodeKey)
? accumulator + 1
: accumulator,
0,
) === this.workerNodes.length
)
}

Expand All @@ -2203,7 +2256,9 @@ export abstract class AbstractPool<
}

private dequeueTask(workerNodeKey: number): Task<Data> | undefined {
return this.workerNodes[workerNodeKey].dequeueTask()
const task = this.workerNodes[workerNodeKey].dequeueTask()
this.checkAndEmitTaskDequeuingEvents()
return task
}

private tasksQueueSize(workerNodeKey: number): number {
Expand Down
6 changes: 6 additions & 0 deletions src/pools/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,28 @@ export type PoolType = keyof typeof PoolTypes
export const PoolEvents: Readonly<{
ready: 'ready'
busy: 'busy'
busyEnd: 'busyEnd'
full: 'full'
empty: 'empty'
destroy: 'destroy'
error: 'error'
messageerror: 'messageerror'
taskError: 'taskError'
backPressure: 'backPressure'
backPressureEnd: 'backPressureEnd'
}> = Object.freeze(
{
ready: 'ready',
busy: 'busy',
busyEnd: 'busyEnd',
full: 'full',
empty: 'empty',
destroy: 'destroy',
error: 'error',
messageerror: 'messageerror',
taskError: 'taskError',
backPressure: 'backPressure',
backPressureEnd: 'backPressureEnd',
} as const,
)

Expand Down Expand Up @@ -253,13 +257,15 @@ export interface IPool<
*
* - `'ready'`: Emitted when the number of workers created in the pool has reached the minimum size expected and are ready. If the pool is dynamic with a minimum number of workers is set to zero, this event is emitted when at least one dynamic worker is ready.
* - `'busy'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are executing concurrently their tasks quota.
* - `'busyEnd'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are no longer executing concurrently their tasks quota.
* - `'full'`: Emitted when the pool is dynamic and the number of workers created has reached the maximum size expected.
* - `'empty'`: Emitted when the pool is dynamic with a minimum number of workers set to zero and the number of workers has reached the minimum size expected.
* - `'destroy'`: Emitted when the pool is destroyed.
* - `'error'`: Emitted when an uncaught error occurs.
* - `'messageerror'`: Emitted when an error occurs while processing a message event.
* - `'taskError'`: Emitted when an error occurs while executing a task.
* - `'backPressure'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are back pressured (i.e. their tasks queue is full: queue size \>= maximum queue size).
* - `'backPressureEnd'`: Emitted when the number of workers created in the pool has reached the maximum size expected and are no longer back pressured (i.e. their tasks queue is no longer full: queue size \< maximum queue size).
*/
readonly eventTarget?: EventTarget
/**
Expand Down
4 changes: 2 additions & 2 deletions src/pools/thread/dynamic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ export class DynamicThreadPool<

/** @inheritDoc */
protected checkAndEmitDynamicWorkerCreationEvents(): void {
if (this.full) {
this.eventTarget?.dispatchEvent(
if (this.eventTarget != null && this.full) {
this.eventTarget.dispatchEvent(
new CustomEvent<PoolInfo>(PoolEvents.full, { detail: this.info }),
)
}
Expand Down
Loading

0 comments on commit a58ab3b

Please sign in to comment.