diff --git a/packages/@n8n/task-runner/src/config/base-runner-config.ts b/packages/@n8n/task-runner/src/config/base-runner-config.ts index e7949d9704214..22051450046a8 100644 --- a/packages/@n8n/task-runner/src/config/base-runner-config.ts +++ b/packages/@n8n/task-runner/src/config/base-runner-config.ts @@ -26,6 +26,14 @@ export class BaseRunnerConfig { @Env('N8N_RUNNERS_MAX_CONCURRENCY') maxConcurrency: number = 5; + /** + * How long (in seconds) a runner may be idle for before exit. Intended + * for use in `external` mode - launcher must pass the env var when launching + * the runner. Disabled with `0` on `internal` mode. + */ + @Env('N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT') + idleTimeout: number = 0; + @Nested healthcheckServer!: HealthcheckServerConfig; } diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts index cd966ef8ac4c6..736bcf16e7367 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts @@ -3,6 +3,7 @@ import type { CodeExecutionMode, IDataObject } from 'n8n-workflow'; import fs from 'node:fs'; import { builtinModules } from 'node:module'; +import type { BaseRunnerConfig } from '@/config/base-runner-config'; import type { JsRunnerConfig } from '@/config/js-runner-config'; import { MainConfig } from '@/config/main-config'; import { ExecutionError } from '@/js-task-runner/errors/execution-error'; @@ -24,17 +25,21 @@ jest.mock('ws'); const defaultConfig = new MainConfig(); describe('JsTaskRunner', () => { - const createRunnerWithOpts = (opts: Partial = {}) => + const createRunnerWithOpts = ( + jsRunnerOpts: Partial = {}, + baseRunnerOpts: Partial = {}, + ) => new JsTaskRunner({ baseRunnerConfig: { ...defaultConfig.baseRunnerConfig, grantToken: 'grantToken', maxConcurrency: 1, n8nUri: 'localhost', + ...baseRunnerOpts, }, jsRunnerConfig: { ...defaultConfig.jsRunnerConfig, - ...opts, + ...jsRunnerOpts, }, sentryConfig: { sentryDsn: '', @@ -825,4 +830,100 @@ describe('JsTaskRunner', () => { }); }); }); + + describe('idle timeout', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it('should set idle timer when instantiated', () => { + const idleTimeout = 5; + const runner = createRunnerWithOpts({}, { idleTimeout }); + const emitSpy = jest.spyOn(runner, 'emit'); + + jest.advanceTimersByTime(idleTimeout * 1000 - 100); + expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); + + jest.advanceTimersByTime(idleTimeout * 1000); + expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout'); + }); + + it('should reset idle timer when accepting a task', () => { + const idleTimeout = 5; + const runner = createRunnerWithOpts({}, { idleTimeout }); + const taskId = '123'; + const offerId = 'offer123'; + const emitSpy = jest.spyOn(runner, 'emit'); + + jest.advanceTimersByTime(idleTimeout * 1000 - 100); + expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); + + runner.openOffers.set(offerId, { + offerId, + validUntil: process.hrtime.bigint() + BigInt(idleTimeout * 1000 * 1_000_000), + }); + runner.offerAccepted(offerId, taskId); + + jest.advanceTimersByTime(200); + expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); // because timer was reset + + runner.runningTasks.clear(); + + jest.advanceTimersByTime(idleTimeout * 1000); + expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout'); + }); + + it('should reset idle timer when finishing a task', async () => { + const idleTimeout = 5; + const runner = createRunnerWithOpts({}, { idleTimeout }); + const taskId = '123'; + const emitSpy = jest.spyOn(runner, 'emit'); + jest.spyOn(runner, 'executeTask').mockResolvedValue({ result: [] }); + + runner.runningTasks.set(taskId, { + taskId, + active: true, + cancelled: false, + }); + + jest.advanceTimersByTime(idleTimeout * 1000 - 100); + expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); + + await runner.receivedSettings(taskId, {}); + + jest.advanceTimersByTime(200); + expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); // because timer was reset + + jest.advanceTimersByTime(idleTimeout * 1000); + expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout'); + }); + + it('should never reach idle timeout if idle timeout is set to 0', () => { + const runner = createRunnerWithOpts({}, { idleTimeout: 0 }); + const emitSpy = jest.spyOn(runner, 'emit'); + + jest.advanceTimersByTime(999999); + expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); + }); + + it('should not reach idle timeout if there are running tasks', () => { + const idleTimeout = 5; + const runner = createRunnerWithOpts({}, { idleTimeout }); + const taskId = '123'; + const emitSpy = jest.spyOn(runner, 'emit'); + + runner.runningTasks.set(taskId, { + taskId, + active: true, + cancelled: false, + }); + + jest.advanceTimersByTime(idleTimeout * 1000); + expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); + }); + }); }); diff --git a/packages/@n8n/task-runner/src/start.ts b/packages/@n8n/task-runner/src/start.ts index e09ddf33321df..005cbfc840e8d 100644 --- a/packages/@n8n/task-runner/src/start.ts +++ b/packages/@n8n/task-runner/src/start.ts @@ -51,6 +51,9 @@ void (async function start() { } runner = new JsTaskRunner(config); + runner.on('runner:reached-idle-timeout', () => { + void createSignalHandler('IDLE_TIMEOUT')(); + }); const { enabled, host, port } = config.baseRunnerConfig.healthcheckServer; diff --git a/packages/@n8n/task-runner/src/task-runner.ts b/packages/@n8n/task-runner/src/task-runner.ts index d1bf8a6d71f4e..8af8aeeb0836e 100644 --- a/packages/@n8n/task-runner/src/task-runner.ts +++ b/packages/@n8n/task-runner/src/task-runner.ts @@ -1,5 +1,6 @@ import { ApplicationError, ensureError } from 'n8n-workflow'; import { nanoid } from 'nanoid'; +import { EventEmitter } from 'node:events'; import { type MessageEvent, WebSocket } from 'ws'; import type { BaseRunnerConfig } from '@/config/base-runner-config'; @@ -49,7 +50,7 @@ export interface TaskRunnerOpts extends BaseRunnerConfig { name?: string; } -export abstract class TaskRunner { +export abstract class TaskRunner extends EventEmitter { id: string = nanoid(); ws: WebSocket; @@ -76,10 +77,17 @@ export abstract class TaskRunner { name: string; + private idleTimer: NodeJS.Timeout | undefined; + + /** How long (in seconds) a runner may be idle for before exit. */ + private readonly idleTimeout: number; + constructor(opts: TaskRunnerOpts) { + super(); this.taskType = opts.taskType; this.name = opts.name ?? 'Node.js Task Runner SDK'; this.maxConcurrency = opts.maxConcurrency; + this.idleTimeout = opts.idleTimeout; const wsUrl = `ws://${opts.n8nUri}/runners/_ws?id=${this.id}`; this.ws = new WebSocket(wsUrl, { @@ -108,6 +116,17 @@ export abstract class TaskRunner { }); this.ws.addEventListener('message', this.receiveMessage); this.ws.addEventListener('close', this.stopTaskOffers); + this.resetIdleTimer(); + } + + private resetIdleTimer() { + if (this.idleTimeout === 0) return; + + this.clearIdleTimer(); + + this.idleTimer = setTimeout(() => { + if (this.runningTasks.size === 0) this.emit('runner:reached-idle-timeout'); + }, this.idleTimeout * 1000); } private receiveMessage = (message: MessageEvent) => { @@ -244,6 +263,7 @@ export abstract class TaskRunner { this.openOffers.delete(offerId); } + this.resetIdleTimer(); this.runningTasks.set(taskId, { taskId, active: false, @@ -306,6 +326,8 @@ export abstract class TaskRunner { this.taskDone(taskId, data); } catch (error) { this.taskErrored(taskId, error); + } finally { + this.resetIdleTimer(); } } @@ -432,6 +454,8 @@ export abstract class TaskRunner { /** Close the connection gracefully and wait until has been closed */ async stop() { + this.clearIdleTimer(); + this.stopTaskOffers(); await this.waitUntilAllTasksAreDone(); @@ -439,6 +463,11 @@ export abstract class TaskRunner { await this.closeConnection(); } + clearIdleTimer() { + if (this.idleTimer) clearTimeout(this.idleTimer); + this.idleTimer = undefined; + } + private async closeConnection() { // 1000 is the standard close code // https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5 diff --git a/packages/cli/src/runners/runner-ws-server.ts b/packages/cli/src/runners/runner-ws-server.ts index 195490589d4d7..8de395d971e66 100644 --- a/packages/cli/src/runners/runner-ws-server.ts +++ b/packages/cli/src/runners/runner-ws-server.ts @@ -126,7 +126,7 @@ export class TaskRunnerWsServer { this.sendMessage.bind(this, id) as MessageCallback, ); - this.logger.info(`Runner "${message.name}" (${id}) has been registered`); + this.logger.info(`Registered runner "${message.name}" (${id}) `); return; } @@ -166,6 +166,7 @@ export class TaskRunnerWsServer { heartbeatInterval: this.taskTunnersConfig.heartbeatInterval, }); this.taskBroker.deregisterRunner(id, disconnectError); + this.logger.debug(`Deregistered runner "${id}"`); connection.close(code); this.runnerConnections.delete(id); } diff --git a/packages/cli/src/runners/task-broker.service.ts b/packages/cli/src/runners/task-broker.service.ts index af76fb6cac3c5..80e918b47a718 100644 --- a/packages/cli/src/runners/task-broker.service.ts +++ b/packages/cli/src/runners/task-broker.service.ts @@ -525,7 +525,7 @@ export class TaskBroker { return; } if (e instanceof TaskDeferredError) { - this.logger.info(`Task (${taskId}) deferred until runner is ready`); + this.logger.debug(`Task (${taskId}) deferred until runner is ready`); this.pendingTaskRequests.push(request); // will settle on receiving task offer from runner return; }