diff --git a/README.md b/README.md index 7b7a6b5..83dc022 100644 --- a/README.md +++ b/README.md @@ -46,8 +46,9 @@ We have a similar API to Piscina, so for more information, you can read Piscina' #### Pool constructor options -- `isolateWorkers`: Default to `false`. Always starts with a fresh worker when running tasks to isolate the environment. -- `terminateTimeout`: Defaults to `null`. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised. +- `isolateWorkers`: Disabled by default. Always starts with a fresh worker when running tasks to isolate the environment. +- `terminateTimeout`: Disabled by default. If terminating a worker takes `terminateTimeout` amount of milliseconds to execute, an error is raised. +- `maxMemoryLimitBeforeRecycle`: Disabled by default. When defined, the worker's heap memory usage is compared against this value after task has been finished. If the current memory usage exceeds this limit, worker is terminated and a new one is started to take its place. This option is useful when your tasks leak memory and you don't want to enable `isolateWorkers` option. #### Pool methods diff --git a/src/common.ts b/src/common.ts index 13cc6d2..b731b4e 100644 --- a/src/common.ts +++ b/src/common.ts @@ -23,6 +23,7 @@ export interface ResponseMessage { taskId: number result: any error: unknown | null + usedMemory: number } export interface TinypoolPrivateData { diff --git a/src/index.ts b/src/index.ts index 03a77f9..dda66cd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -144,6 +144,7 @@ interface Options { concurrentTasksPerWorker?: number useAtomics?: boolean resourceLimits?: ResourceLimits + maxMemoryLimitBeforeRecycle?: number argv?: string[] execArgv?: string[] env?: EnvSpecifier @@ -443,6 +444,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource { port: MessagePort sharedBuffer: Int32Array lastSeenResponseCount: number = 0 + usedMemory?: number onMessage: ResponseCallback constructor( @@ -521,6 +523,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource { } _handleResponse(message: ResponseMessage): void { + this.usedMemory = message.usedMemory this.onMessage(message) if (this.taskInfos.size === 0) { @@ -892,8 +895,18 @@ class ThreadPool { } // When `isolateWorkers` is enabled, remove the worker after task is finished - if (this.options.isolateWorkers && taskInfo.workerInfo) { - this._removeWorker(taskInfo.workerInfo) + const shouldIsolateWorker = + this.options.isolateWorkers && taskInfo.workerInfo + + // When `maxMemoryLimitBeforeRecycle` is enabled, remove workers that have exceeded the memory limit + const shouldRecycleWorker = + !this.options.isolateWorkers && + this.options.maxMemoryLimitBeforeRecycle !== undefined && + (taskInfo.workerInfo?.usedMemory || 0) > + this.options.maxMemoryLimitBeforeRecycle + + if (shouldIsolateWorker || shouldRecycleWorker) { + this._removeWorker(taskInfo.workerInfo!) .then(() => this._ensureMinimumWorkers()) .then(() => this._ensureEnoughWorkersForTaskQueue()) .then(() => resolve(result)) diff --git a/src/worker.ts b/src/worker.ts index ad8d747..1b6f481 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -172,6 +172,7 @@ function onMessage( taskId, result: result, error: null, + usedMemory: process.memoryUsage().heapUsed, } // If the task used e.g. console.log(), wait for the stream to drain @@ -191,6 +192,7 @@ function onMessage( // It may be worth taking a look at the error cloning algorithm we // use in Node.js core here, it's quite a bit more flexible error, + usedMemory: process.memoryUsage().heapUsed, } } currentTasks-- diff --git a/test/fixtures/leak-memory.js b/test/fixtures/leak-memory.js new file mode 100644 index 0000000..7f6b1a7 --- /dev/null +++ b/test/fixtures/leak-memory.js @@ -0,0 +1,17 @@ +export let leaks = [] + +/** + * Leak some memory to test memory limit usage. + * The argument `bytes` is not 100% accurate of the leaked bytes but good enough. + */ +export default function run(bytes) { + const before = process.memoryUsage().heapUsed + + for (const _ of Array(bytes).fill()) { + leaks.push(new SharedArrayBuffer(1024)) + } + const after = process.memoryUsage().heapUsed + const diff = after - before + + console.log(`Leaked: ${diff}. Heap used: ${process.memoryUsage().heapUsed}`) +} diff --git a/test/resource-limits.test.ts b/test/resource-limits.test.ts index 81749ec..84360e1 100644 --- a/test/resource-limits.test.ts +++ b/test/resource-limits.test.ts @@ -8,7 +8,6 @@ test('resourceLimits causes task to reject', async () => { const worker = new Tinypool({ filename: resolve(__dirname, 'fixtures/resource-limits.js'), resourceLimits: { - maxOldGenerationSizeMb: 4, maxYoungGenerationSizeMb: 2, codeRangeSizeMb: 4, @@ -36,3 +35,42 @@ test('resourceLimits causes task to reject', async () => { /Worker terminated due to reaching memory limit: JS heap out of memory/ ) }) + +test('worker is recycled after reaching maxMemoryLimitBeforeRecycle', async () => { + const pool = new Tinypool({ + filename: resolve(__dirname, 'fixtures/leak-memory.js'), + maxMemoryLimitBeforeRecycle: 10_000_000, + isolateWorkers: false, + minThreads: 1, + maxThreads: 1, + }) + + const originalWorkerId = pool.threads[0]?.threadId + expect(originalWorkerId).toBeGreaterThan(0) + + let finalThreadId = originalWorkerId + let rounds = 0 + + // This is just an estimate of how to leak "some" memory - it's not accurate. + // Running 100 loops should be enough to make the worker reach memory limit and be recycled. + // Use the `rounds` to make sure we don't reach the limit on the first round. + for (const _ of Array(100).fill(0)) { + await pool.run(10_000) + + if (pool.threads[0]) { + finalThreadId = pool.threads[0].threadId + } + + if (finalThreadId !== originalWorkerId) { + break + } + + rounds++ + } + + // Test setup should not reach max memory on first round + expect(rounds).toBeGreaterThan(1) + + // Thread should have been recycled + expect(finalThreadId).not.toBe(originalWorkerId) +})