Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pool.recycleWorkers() method #63

Merged
merged 1 commit into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ We have a similar API to Piscina, so for more information, you can read Piscina'
#### Pool methods
- `cancelPendingTasks()`: Gracefully cancels all pending tasks without stopping or interfering with on-going tasks. This method is useful when your tasks may have side effects and should not be terminated forcefully during task execution. If your tasks don't have any side effects you may want to use [`{ signal }`](https://github.com/piscinajs/piscina#cancelable-tasks) option for forcefully terminating all tasks, including the on-going ones, instead.
- `recycleWorkers()`: Waits for all current tasks to finish and re-creates all workers. Can be used to force isolation imperatively even when `isolateWorkers` is disabled.
#### Exports
Expand Down
48 changes: 45 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
lastSeenResponseCount: number = 0
usedMemory?: number
onMessage: ResponseCallback
shouldRecycle?: boolean

constructor(
worker: Worker,
Expand Down Expand Up @@ -995,17 +996,27 @@ class ThreadPool {
}

shouldRecycleWorker(taskInfo?: TaskInfo): boolean {
// Worker could be set to recycle by pool's imperative methods
if (taskInfo?.workerInfo?.shouldRecycle) {
return true
}

// When `isolateWorkers` is enabled, remove the worker after task is finished
const isWorkerIsolated = this.options.isolateWorkers && taskInfo?.workerInfo
if (this.options.isolateWorkers && taskInfo?.workerInfo) {
return true
}

// When `maxMemoryLimitBeforeRecycle` is enabled, remove workers that have exceeded the memory limit
const isWorkersMemoryLimitReached =
if (
!this.options.isolateWorkers &&
this.options.maxMemoryLimitBeforeRecycle !== undefined &&
(taskInfo?.workerInfo?.usedMemory || 0) >
this.options.maxMemoryLimitBeforeRecycle
) {
return true
}

return Boolean(isWorkerIsolated || isWorkersMemoryLimitReached)
return false
}

pendingCapacity(): number {
Expand Down Expand Up @@ -1041,6 +1052,33 @@ class ThreadPool {

await Promise.all(exitEvents)
}

async recycleWorkers() {
// Worker's are automatically recycled when isolateWorkers is enabled
if (this.options.isolateWorkers) {
return
}

const exitEvents: Promise<any[]>[] = []

Array.from(this.workers).filter((workerInfo) => {
// Remove idle workers
if (workerInfo.currentUsage() === 0) {
exitEvents.push(once(workerInfo!.worker, 'exit'))
this._removeWorker(workerInfo!)
}
// Mark on-going workers for recycling.
// Note that we don't need to wait for these ones to finish
// as pool.shouldRecycleWorker will do it once task has finished
else {
workerInfo.shouldRecycle = true
}
})

await Promise.all(exitEvents)

this._ensureMinimumWorkers()
}
}

class Tinypool extends EventEmitterAsyncResource {
Expand Down Expand Up @@ -1116,6 +1154,10 @@ class Tinypool extends EventEmitterAsyncResource {
pool.taskQueue.cancel()
}

async recycleWorkers() {
await this.#pool.recycleWorkers()
}

get completed(): number {
return this.#pool.completed
}
Expand Down
81 changes: 81 additions & 0 deletions test/isolation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { dirname, resolve } from 'path'
import { Tinypool } from 'tinypool'
import { fileURLToPath } from 'url'

const __dirname = dirname(fileURLToPath(import.meta.url))

test('idle workers can be recycled', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/sleep.js'),
minThreads: 4,
maxThreads: 4,
isolateWorkers: false,
})

function getThreadIds() {
return pool.threads.map((thread) => thread!.threadId).sort((a, b) => a - b)
}

expect(pool.threads).toHaveLength(4)
const initialThreadIds = getThreadIds()

await Promise.all(times(4)(() => pool.run({})))
expect(getThreadIds()).toStrictEqual(initialThreadIds)

await pool.recycleWorkers()
expect(pool.threads).toHaveLength(4)

const newThreadIds = getThreadIds()
initialThreadIds.forEach((id) => expect(newThreadIds).not.toContain(id))

await Promise.all(times(4)(() => pool.run({})))
initialThreadIds.forEach((id) => expect(newThreadIds).not.toContain(id))
expect(getThreadIds()).toStrictEqual(newThreadIds)
})

test('running workers can recycle after task execution finishes', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/sleep.js'),
minThreads: 4,
maxThreads: 4,
isolateWorkers: false,
})

function getThreadIds() {
return pool.threads.map((thread) => thread!.threadId).sort((a, b) => a - b)
}

expect(pool.threads).toHaveLength(4)
const initialThreadIds = getThreadIds()

const tasks = [
...times(2)(() => pool.run({ time: 1 })),
...times(2)(() => pool.run({ time: 2000 })),
]

// Wait for first two tasks to finish
await Promise.all(tasks.slice(0, 2))

await pool.recycleWorkers()
AriPerkkio marked this conversation as resolved.
Show resolved Hide resolved
const threadIds = getThreadIds()

// Idle workers should have been recycled immediately
expect(threadIds).not.toContain(initialThreadIds[0])
expect(threadIds).not.toContain(initialThreadIds[1])

// Running workers should not have recycled yet
expect(threadIds).toContain(initialThreadIds[2])
expect(threadIds).toContain(initialThreadIds[3])

await Promise.all(tasks)

// All workers should have recycled now
const newThreadIds = getThreadIds()
initialThreadIds.forEach((id) => expect(newThreadIds).not.toContain(id))
})

function times(count: number) {
return function run<T>(fn: () => T): T[] {
return Array(count).fill(0).map(fn)
}
}