diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index b96143dafab37..1618485b68650 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -101,6 +101,15 @@ export class Queue { return await this.jobQueue.getJobs(jobTypes); } + /** + * Get IDs of executions that are currently in progress in the queue. + */ + async getInProgressExecutionIds() { + const inProgressJobs = await this.getJobs(['active', 'waiting']); + + return new Set(inProgressJobs.map((job) => job.data.executionId)); + } + async process(concurrency: number, fn: Bull.ProcessCallbackFunction): Promise { return await this.jobQueue.process(concurrency, fn); } diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 1fc3690d470d4..8b58549c0dfb9 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -31,6 +31,7 @@ import type { IWorkflowExecutionDataProcess } from '@/Interfaces'; import { ExecutionService } from '@/executions/execution.service'; import { OwnershipService } from '@/services/ownership.service'; import { WorkflowRunner } from '@/WorkflowRunner'; +import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires const open = require('open'); @@ -65,8 +66,6 @@ export class Start extends BaseCommand { protected server = Container.get(Server); - private pruningService: PruningService; - constructor(argv: string[], cmdConfig: Config) { super(argv, cmdConfig); this.setInstanceType('main'); @@ -294,6 +293,7 @@ export class Start extends BaseCommand { await this.server.start(); Container.get(PruningService).init(); + Container.get(ExecutionRecoveryService).init(); if (config.getEnv('executions.mode') === 'regular') { await this.runEnqueuedExecutions(); diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 69b29372da338..730d2792125ab 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -371,6 +371,21 @@ export const schema = { default: 10000, env: 'EXECUTIONS_DATA_PRUNE_MAX_COUNT', }, + + queueRecovery: { + interval: { + doc: 'How often (minutes) to check for queue recovery', + format: Number, + default: 180, + env: 'N8N_EXECUTIONS_QUEUE_RECOVERY_INTERVAL', + }, + batchSize: { + doc: 'Size of batch of executions to check for queue recovery', + format: Number, + default: 100, + env: 'N8N_EXECUTIONS_QUEUE_RECOVERY_BATCH', + }, + }, }, queue: { diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 57bdcbc802580..03a2a8ae82e73 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -275,9 +275,7 @@ export class ExecutionRepository extends Repository { }, ); - this.logger.info('[Execution Recovery] Marked executions as `crashed`', { - executionIds, - }); + this.logger.info('Marked executions as `crashed`', { executionIds }); } /** @@ -773,4 +771,18 @@ export class ExecutionRepository extends Repository { return executions.map(({ id }) => id); } + + /** + * Retrieve a batch of execution IDs with `new` or `running` status, in most recent order. + */ + async getInProgressExecutionIds(batchSize: number) { + const executions = await this.find({ + select: ['id'], + where: { status: In(['new', 'running']) }, + order: { startedAt: 'DESC' }, + take: batchSize, + }); + + return executions.map(({ id }) => id); + } } diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts index 6508a7812911c..36597bfe9854e 100644 --- a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -7,9 +7,12 @@ import { createWorkflow } from '@test-integration/db/workflows'; import { createExecution } from '@test-integration/db/executions'; import * as testDb from '@test-integration/testDb'; +import { NodeConnectionType } from 'n8n-workflow'; +import { mock } from 'jest-mock-extended'; +import { OrchestrationService } from '@/services/orchestration.service'; +import config from '@/config'; import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; - import { InternalHooks } from '@/InternalHooks'; import { Push } from '@/push'; import { ARTIFICIAL_TASK_DATA } from '@/constants'; @@ -17,11 +20,10 @@ import { NodeCrashedError } from '@/errors/node-crashed.error'; import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNode'; import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; + import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses'; import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity'; -import { NodeConnectionType } from 'n8n-workflow'; -import { OrchestrationService } from '@/services/orchestration.service'; -import config from '@/config'; +import type { Logger } from '@/Logger'; /** * Workflow producing an execution whose data will be truncated by an instance crash. @@ -174,20 +176,20 @@ export const setupMessages = (executionId: string, workflowName: string): EventM }; describe('ExecutionRecoveryService', () => { - let executionRecoveryService: ExecutionRecoveryService; let push: Push; - let executionRepository: ExecutionRepository; + let executionRecoveryService: ExecutionRecoveryService; let orchestrationService: OrchestrationService; + let executionRepository: ExecutionRepository; beforeAll(async () => { await testDb.init(); - - mockInstance(InternalHooks); push = mockInstance(Push); executionRepository = Container.get(ExecutionRepository); orchestrationService = Container.get(OrchestrationService); + mockInstance(InternalHooks); executionRecoveryService = new ExecutionRecoveryService( + mock(), push, executionRepository, orchestrationService, @@ -199,13 +201,78 @@ describe('ExecutionRecoveryService', () => { }); afterEach(async () => { + config.load(config.default); + jest.restoreAllMocks(); await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']); + executionRecoveryService.shutdown(); }); afterAll(async () => { await testDb.terminate(); }); + describe('scheduleQueueRecovery', () => { + describe('queue mode', () => { + it('if leader, should schedule queue recovery', () => { + /** + * Arrange + */ + config.set('executions.mode', 'queue'); + jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(true); + const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery'); + + /** + * Act + */ + executionRecoveryService.init(); + + /** + * Assert + */ + expect(scheduleSpy).toHaveBeenCalled(); + }); + + it('if follower, should do nothing', () => { + /** + * Arrange + */ + config.set('executions.mode', 'queue'); + jest.spyOn(orchestrationService, 'isLeader', 'get').mockReturnValue(false); + const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery'); + + /** + * Act + */ + executionRecoveryService.init(); + + /** + * Assert + */ + expect(scheduleSpy).not.toHaveBeenCalled(); + }); + }); + + describe('regular mode', () => { + it('should do nothing', () => { + /** + * Arrange + */ + config.set('executions.mode', 'regular'); + const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery'); + + /** + * Act + */ + executionRecoveryService.init(); + + /** + * Assert + */ + expect(scheduleSpy).not.toHaveBeenCalled(); + }); + }); + }); + describe('recoverFromLogs', () => { describe('if follower', () => { test('should do nothing', async () => { diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index 935f33e8d871d..a6ea90e55e274 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -1,6 +1,6 @@ import Container, { Service } from 'typedi'; import { Push } from '@/push'; -import { sleep } from 'n8n-workflow'; +import { jsonStringify, sleep } from 'n8n-workflow'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle import { InternalHooks } from '@/InternalHooks'; // @TODO: Dependency cycle if injected @@ -11,6 +11,10 @@ import type { IExecutionResponse } from '@/Interfaces'; import { NodeCrashedError } from '@/errors/node-crashed.error'; import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; import { ARTIFICIAL_TASK_DATA } from '@/constants'; +import { Logger } from '@/Logger'; +import config from '@/config'; +import { OnShutdown } from '@/decorators/OnShutdown'; +import type { QueueRecoverySettings } from './execution.types'; import { OrchestrationService } from '@/services/orchestration.service'; /** @@ -19,11 +23,36 @@ import { OrchestrationService } from '@/services/orchestration.service'; @Service() export class ExecutionRecoveryService { constructor( + private readonly logger: Logger, private readonly push: Push, private readonly executionRepository: ExecutionRepository, private readonly orchestrationService: OrchestrationService, ) {} + /** + * @important Requires `OrchestrationService` to be initialized on queue mode. + */ + init() { + if (config.getEnv('executions.mode') === 'regular') return; + + const { isLeader, isMultiMainSetupEnabled } = this.orchestrationService; + + if (isLeader) this.scheduleQueueRecovery(); + + if (isMultiMainSetupEnabled) { + this.orchestrationService.multiMainSetup + .on('leader-takeover', () => this.scheduleQueueRecovery()) + .on('leader-stepdown', () => this.stopQueueRecovery()); + } + } + + private readonly queueRecoverySettings: QueueRecoverySettings = { + batchSize: config.getEnv('executions.queueRecovery.batchSize'), + waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000, + }; + + private isShuttingDown = false; + /** * Recover key properties of a truncated execution using event logs. */ @@ -34,6 +63,10 @@ export class ExecutionRecoveryService { if (!amendedExecution) return null; + this.logger.info('[Recovery] Logs available, amended execution', { + executionId: amendedExecution.id, + }); + await this.executionRepository.updateExistingExecution(executionId, amendedExecution); await this.runHooks(amendedExecution); @@ -46,12 +79,89 @@ export class ExecutionRecoveryService { return amendedExecution; } + /** + * Schedule a cycle to mark dangling executions as crashed in queue mode. + */ + scheduleQueueRecovery(waitMs = this.queueRecoverySettings.waitMs) { + if (!this.shouldScheduleQueueRecovery()) return; + + this.queueRecoverySettings.timeout = setTimeout(async () => { + try { + const nextWaitMs = await this.recoverFromQueue(); + this.scheduleQueueRecovery(nextWaitMs); + } catch (error) { + const msg = this.toErrorMsg(error); + + this.logger.error('[Recovery] Failed to recover dangling executions from queue', { msg }); + this.logger.error('[Recovery] Retrying...'); + + this.scheduleQueueRecovery(); + } + }, waitMs); + + const wait = [this.queueRecoverySettings.waitMs / (60 * 1000), 'min'].join(' '); + + this.logger.debug(`[Recovery] Scheduled queue recovery check for next ${wait}`); + } + + stopQueueRecovery() { + clearTimeout(this.queueRecoverySettings.timeout); + } + + @OnShutdown() + shutdown() { + this.isShuttingDown = true; + this.stopQueueRecovery(); + } + // ---------------------------------- // private // ---------------------------------- /** - * Amend `status`, `stoppedAt`, and (if possible) `data` properties of an execution. + * Mark in-progress executions as `crashed` if stored in DB as `new` or `running` + * but absent from the queue. Return time until next recovery cycle. + */ + private async recoverFromQueue() { + const { waitMs, batchSize } = this.queueRecoverySettings; + + const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize); + + if (storedIds.length === 0) { + this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions'); + return waitMs; + } + + const { Queue } = await import('@/Queue'); + + const queuedIds = await Container.get(Queue).getInProgressExecutionIds(); + + if (queuedIds.size === 0) { + this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions'); + return waitMs; + } + + const danglingIds = storedIds.filter((id) => !queuedIds.has(id)); + + if (danglingIds.length === 0) { + this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions'); + return waitMs; + } + + await this.executionRepository.markAsCrashed(danglingIds); + + this.logger.info('[Recovery] Completed queue recovery check, recovered dangling executions', { + danglingIds, + }); + + // if this cycle used up the whole batch size, it is possible for there to be + // dangling executions outside this check, so speed up next cycle + + return storedIds.length >= this.queueRecoverySettings.batchSize ? waitMs / 2 : waitMs; + } + + /** + * Amend `status`, `stoppedAt`, and (if possible) `data` of an execution using event logs. */ private async amend(executionId: string, messages: EventMessageTypes[]) { if (messages.length === 0) return await this.amendWithoutLogs(executionId); @@ -198,4 +308,18 @@ export class ExecutionRecoveryService { await externalHooks.executeHookFunctions('workflowExecuteAfter', [run]); } + + private toErrorMsg(error: unknown) { + return error instanceof Error + ? error.message + : jsonStringify(error, { replaceCircularRefs: true }); + } + + private shouldScheduleQueueRecovery() { + return ( + config.getEnv('executions.mode') === 'queue' && + config.getEnv('multiMainSetup.instanceType') === 'leader' && + !this.isShuttingDown + ); + } } diff --git a/packages/cli/src/executions/execution.types.ts b/packages/cli/src/executions/execution.types.ts index 9a1bdc88c8302..95b9a3cdec4f1 100644 --- a/packages/cli/src/executions/execution.types.ts +++ b/packages/cli/src/executions/execution.types.ts @@ -84,3 +84,20 @@ export namespace ExecutionSummaries { }; }; } + +export type QueueRecoverySettings = { + /** + * ID of timeout for next scheduled recovery cycle. + */ + timeout?: NodeJS.Timeout; + + /** + * Number of in-progress executions to check per cycle. + */ + batchSize: number; + + /** + * Time (in milliseconds) to wait before the next cycle. + */ + waitMs: number; +}; diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 5f74f1931bd73..7498a0c03b5a2 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -62,7 +62,7 @@ export class MultiMainSetup extends EventEmitter { if (config.getEnv('multiMainSetup.instanceType') === 'leader') { config.set('multiMainSetup.instanceType', 'follower'); - this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking + this.emit('leader-stepdown'); // lost leadership - stop triggers, pollers, pruning, wait-tracking, queue recovery EventReporter.info('[Multi-main setup] Leader failed to renew leader key'); } @@ -78,7 +78,7 @@ export class MultiMainSetup extends EventEmitter { config.set('multiMainSetup.instanceType', 'follower'); /** - * Lost leadership - stop triggers, pollers, pruning, wait tracking, license renewal + * Lost leadership - stop triggers, pollers, pruning, wait tracking, license renewal, queue recovery */ this.emit('leader-stepdown'); @@ -101,7 +101,7 @@ export class MultiMainSetup extends EventEmitter { await this.redisPublisher.setExpiration(this.leaderKey, this.leaderKeyTtl); /** - * Gained leadership - start triggers, pollers, pruning, wait-tracking, license renewal + * Gained leadership - start triggers, pollers, pruning, wait-tracking, license renewal, queue recovery */ this.emit('leader-takeover'); } else {