Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add maxMemoryLimitBeforeRecycle options #58

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ We have a similar API to Piscina, so for more information, you can read Piscina'

#### Pool constructor options

- `isolateWorkers`: Default to `false`. Always starts with a fresh worker when running tasks to isolate the environment.
- `terminateTimeout`: Defaults to `null`. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised.
- `isolateWorkers`: Disabled by default. Always starts with a fresh worker when running tasks to isolate the environment.
- `terminateTimeout`: Disabled by default. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised.
- `maxMemoryLimitBeforeRecycle`: Disabled by default. When defined, the worker's heap memory usage is compared against this value after task has been finished. If the current memory usage exceeds this limit, worker is terminated and a new one is started to take its place. This option is useful when your tasks leak memory and you don't want to enable `isolateWorkers` option.

#### Pool methods

Expand Down
1 change: 1 addition & 0 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface ResponseMessage {
taskId: number
result: any
error: unknown | null
usedMemory: number
}

export interface TinypoolPrivateData {
Expand Down
17 changes: 15 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ interface Options {
concurrentTasksPerWorker?: number
useAtomics?: boolean
resourceLimits?: ResourceLimits
maxMemoryLimitBeforeRecycle?: number
argv?: string[]
execArgv?: string[]
env?: EnvSpecifier
Expand Down Expand Up @@ -443,6 +444,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
port: MessagePort
sharedBuffer: Int32Array
lastSeenResponseCount: number = 0
usedMemory?: number
onMessage: ResponseCallback

constructor(
Expand Down Expand Up @@ -521,6 +523,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
}

_handleResponse(message: ResponseMessage): void {
this.usedMemory = message.usedMemory
this.onMessage(message)

if (this.taskInfos.size === 0) {
Expand Down Expand Up @@ -892,8 +895,18 @@ class ThreadPool {
}

// When `isolateWorkers` is enabled, remove the worker after task is finished
if (this.options.isolateWorkers && taskInfo.workerInfo) {
this._removeWorker(taskInfo.workerInfo)
const shouldIsolateWorker =
this.options.isolateWorkers && taskInfo.workerInfo

// When `maxMemoryLimitBeforeRecycle` is enabled, remove workers that have exceeded the memory limit
const shouldRecycleWorker =
!this.options.isolateWorkers &&
this.options.maxMemoryLimitBeforeRecycle !== undefined &&
(taskInfo.workerInfo?.usedMemory || 0) >
this.options.maxMemoryLimitBeforeRecycle

if (shouldIsolateWorker || shouldRecycleWorker) {
this._removeWorker(taskInfo.workerInfo!)
.then(() => this._ensureMinimumWorkers())
.then(() => this._ensureEnoughWorkersForTaskQueue())
.then(() => resolve(result))
Expand Down
2 changes: 2 additions & 0 deletions src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ function onMessage(
taskId,
result: result,
error: null,
usedMemory: process.memoryUsage().heapUsed,
}

// If the task used e.g. console.log(), wait for the stream to drain
Expand All @@ -191,6 +192,7 @@ function onMessage(
// It may be worth taking a look at the error cloning algorithm we
// use in Node.js core here, it's quite a bit more flexible
error,
usedMemory: process.memoryUsage().heapUsed,
}
}
currentTasks--
Expand Down
17 changes: 17 additions & 0 deletions test/fixtures/leak-memory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export let leaks = []

/**
* Leak some memory to test memory limit usage.
* The argument `bytes` is not 100% accurate of the leaked bytes but good enough.
*/
export default function run(bytes) {
const before = process.memoryUsage().heapUsed

for (const _ of Array(bytes).fill()) {
leaks.push(new SharedArrayBuffer(1024))
}
const after = process.memoryUsage().heapUsed
const diff = after - before

console.log(`Leaked: ${diff}. Heap used: ${process.memoryUsage().heapUsed}`)
}
40 changes: 39 additions & 1 deletion test/resource-limits.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ test('resourceLimits causes task to reject', async () => {
const worker = new Tinypool({
filename: resolve(__dirname, 'fixtures/resource-limits.js'),
resourceLimits: {

maxOldGenerationSizeMb: 4,
maxYoungGenerationSizeMb: 2,
codeRangeSizeMb: 4,
Expand Down Expand Up @@ -36,3 +35,42 @@ test('resourceLimits causes task to reject', async () => {
/Worker terminated due to reaching memory limit: JS heap out of memory/
)
})

test('worker is recycled after reaching maxMemoryLimitBeforeRecycle', async () => {
const pool = new Tinypool({
filename: resolve(__dirname, 'fixtures/leak-memory.js'),
maxMemoryLimitBeforeRecycle: 10_000_000,
isolateWorkers: false,
minThreads: 1,
maxThreads: 1,
})

const originalWorkerId = pool.threads[0]?.threadId
expect(originalWorkerId).toBeGreaterThan(0)

let finalThreadId = originalWorkerId
let rounds = 0

// This is just an estimate of how to leak "some" memory - it's not accurate.
// Running 100 loops should be enough to make the worker reach memory limit and be recycled.
// Use the `rounds` to make sure we don't reach the limit on the first round.
for (const _ of Array(100).fill(0)) {
await pool.run(10_000)

if (pool.threads[0]) {
finalThreadId = pool.threads[0].threadId
}

if (finalThreadId !== originalWorkerId) {
break
}

rounds++
}

// Test setup should not reach max memory on first round
expect(rounds).toBeGreaterThan(1)

// Thread should have been recycled
expect(finalThreadId).not.toBe(originalWorkerId)
})