Skip to content

Commit

Permalink
feat(core): Shut down runner on idle timeout (no-changelog) (#11820)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Nov 22, 2024
1 parent cd3598a commit 0cbb46c
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 5 deletions.
8 changes: 8 additions & 0 deletions packages/@n8n/task-runner/src/config/base-runner-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ export class BaseRunnerConfig {
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5;

/**
* How long (in seconds) a runner may be idle for before exit. Intended
* for use in `external` mode - launcher must pass the env var when launching
* the runner. Disabled with `0` on `internal` mode.
*/
@Env('N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT')
idleTimeout: number = 0;

@Nested
healthcheckServer!: HealthcheckServerConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { CodeExecutionMode, IDataObject } from 'n8n-workflow';
import fs from 'node:fs';
import { builtinModules } from 'node:module';

import type { BaseRunnerConfig } from '@/config/base-runner-config';
import type { JsRunnerConfig } from '@/config/js-runner-config';
import { MainConfig } from '@/config/main-config';
import { ExecutionError } from '@/js-task-runner/errors/execution-error';
Expand All @@ -24,17 +25,21 @@ jest.mock('ws');
const defaultConfig = new MainConfig();

describe('JsTaskRunner', () => {
const createRunnerWithOpts = (opts: Partial<JsRunnerConfig> = {}) =>
const createRunnerWithOpts = (
jsRunnerOpts: Partial<JsRunnerConfig> = {},
baseRunnerOpts: Partial<BaseRunnerConfig> = {},
) =>
new JsTaskRunner({
baseRunnerConfig: {
...defaultConfig.baseRunnerConfig,
grantToken: 'grantToken',
maxConcurrency: 1,
n8nUri: 'localhost',
...baseRunnerOpts,
},
jsRunnerConfig: {
...defaultConfig.jsRunnerConfig,
...opts,
...jsRunnerOpts,
},
sentryConfig: {
sentryDsn: '',
Expand Down Expand Up @@ -825,4 +830,100 @@ describe('JsTaskRunner', () => {
});
});
});

describe('idle timeout', () => {
beforeEach(() => {
jest.useFakeTimers();
});

afterEach(() => {
jest.useRealTimers();
});

it('should set idle timer when instantiated', () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const emitSpy = jest.spyOn(runner, 'emit');

jest.advanceTimersByTime(idleTimeout * 1000 - 100);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');

jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout');
});

it('should reset idle timer when accepting a task', () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const taskId = '123';
const offerId = 'offer123';
const emitSpy = jest.spyOn(runner, 'emit');

jest.advanceTimersByTime(idleTimeout * 1000 - 100);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');

runner.openOffers.set(offerId, {
offerId,
validUntil: process.hrtime.bigint() + BigInt(idleTimeout * 1000 * 1_000_000),
});
runner.offerAccepted(offerId, taskId);

jest.advanceTimersByTime(200);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); // because timer was reset

runner.runningTasks.clear();

jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout');
});

it('should reset idle timer when finishing a task', async () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const taskId = '123';
const emitSpy = jest.spyOn(runner, 'emit');
jest.spyOn(runner, 'executeTask').mockResolvedValue({ result: [] });

runner.runningTasks.set(taskId, {
taskId,
active: true,
cancelled: false,
});

jest.advanceTimersByTime(idleTimeout * 1000 - 100);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');

await runner.receivedSettings(taskId, {});

jest.advanceTimersByTime(200);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout'); // because timer was reset

jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).toHaveBeenCalledWith('runner:reached-idle-timeout');
});

it('should never reach idle timeout if idle timeout is set to 0', () => {
const runner = createRunnerWithOpts({}, { idleTimeout: 0 });
const emitSpy = jest.spyOn(runner, 'emit');

jest.advanceTimersByTime(999999);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
});

it('should not reach idle timeout if there are running tasks', () => {
const idleTimeout = 5;
const runner = createRunnerWithOpts({}, { idleTimeout });
const taskId = '123';
const emitSpy = jest.spyOn(runner, 'emit');

runner.runningTasks.set(taskId, {
taskId,
active: true,
cancelled: false,
});

jest.advanceTimersByTime(idleTimeout * 1000);
expect(emitSpy).not.toHaveBeenCalledWith('runner:reached-idle-timeout');
});
});
});
3 changes: 3 additions & 0 deletions packages/@n8n/task-runner/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ void (async function start() {
}

runner = new JsTaskRunner(config);
runner.on('runner:reached-idle-timeout', () => {
void createSignalHandler('IDLE_TIMEOUT')();
});

const { enabled, host, port } = config.baseRunnerConfig.healthcheckServer;

Expand Down
31 changes: 30 additions & 1 deletion packages/@n8n/task-runner/src/task-runner.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ApplicationError, ensureError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { EventEmitter } from 'node:events';
import { type MessageEvent, WebSocket } from 'ws';

import type { BaseRunnerConfig } from '@/config/base-runner-config';
Expand Down Expand Up @@ -49,7 +50,7 @@ export interface TaskRunnerOpts extends BaseRunnerConfig {
name?: string;
}

export abstract class TaskRunner {
export abstract class TaskRunner extends EventEmitter {
id: string = nanoid();

ws: WebSocket;
Expand All @@ -76,10 +77,17 @@ export abstract class TaskRunner {

name: string;

private idleTimer: NodeJS.Timeout | undefined;

/** How long (in seconds) a runner may be idle for before exit. */
private readonly idleTimeout: number;

constructor(opts: TaskRunnerOpts) {
super();
this.taskType = opts.taskType;
this.name = opts.name ?? 'Node.js Task Runner SDK';
this.maxConcurrency = opts.maxConcurrency;
this.idleTimeout = opts.idleTimeout;

const wsUrl = `ws://${opts.n8nUri}/runners/_ws?id=${this.id}`;
this.ws = new WebSocket(wsUrl, {
Expand Down Expand Up @@ -108,6 +116,17 @@ export abstract class TaskRunner {
});
this.ws.addEventListener('message', this.receiveMessage);
this.ws.addEventListener('close', this.stopTaskOffers);
this.resetIdleTimer();
}

private resetIdleTimer() {
if (this.idleTimeout === 0) return;

this.clearIdleTimer();

this.idleTimer = setTimeout(() => {
if (this.runningTasks.size === 0) this.emit('runner:reached-idle-timeout');
}, this.idleTimeout * 1000);
}

private receiveMessage = (message: MessageEvent) => {
Expand Down Expand Up @@ -244,6 +263,7 @@ export abstract class TaskRunner {
this.openOffers.delete(offerId);
}

this.resetIdleTimer();
this.runningTasks.set(taskId, {
taskId,
active: false,
Expand Down Expand Up @@ -306,6 +326,8 @@ export abstract class TaskRunner {
this.taskDone(taskId, data);
} catch (error) {
this.taskErrored(taskId, error);
} finally {
this.resetIdleTimer();
}
}

Expand Down Expand Up @@ -432,13 +454,20 @@ export abstract class TaskRunner {

/** Close the connection gracefully and wait until has been closed */
async stop() {
this.clearIdleTimer();

this.stopTaskOffers();

await this.waitUntilAllTasksAreDone();

await this.closeConnection();
}

clearIdleTimer() {
if (this.idleTimer) clearTimeout(this.idleTimer);
this.idleTimer = undefined;
}

private async closeConnection() {
// 1000 is the standard close code
// https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/runners/runner-ws-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export class TaskRunnerWsServer {
this.sendMessage.bind(this, id) as MessageCallback,
);

this.logger.info(`Runner "${message.name}" (${id}) has been registered`);
this.logger.info(`Registered runner "${message.name}" (${id}) `);
return;
}

Expand Down Expand Up @@ -166,6 +166,7 @@ export class TaskRunnerWsServer {
heartbeatInterval: this.taskTunnersConfig.heartbeatInterval,
});
this.taskBroker.deregisterRunner(id, disconnectError);
this.logger.debug(`Deregistered runner "${id}"`);
connection.close(code);
this.runnerConnections.delete(id);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/runners/task-broker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ export class TaskBroker {
return;
}
if (e instanceof TaskDeferredError) {
this.logger.info(`Task (${taskId}) deferred until runner is ready`);
this.logger.debug(`Task (${taskId}) deferred until runner is ready`);
this.pendingTaskRequests.push(request); // will settle on receiving task offer from runner
return;
}
Expand Down

0 comments on commit 0cbb46c

Please sign in to comment.