diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 0ebfbbc1a6dae..cc4555dc5e65b 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -30,7 +30,6 @@ import type { IWorkflowExecutionDataProcess } from '@/Interfaces'; import { ExecutionService } from '@/executions/execution.service'; import { OwnershipService } from '@/services/ownership.service'; import { WorkflowRunner } from '@/WorkflowRunner'; -import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; import { EventService } from '@/events/event.service'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires @@ -305,7 +304,6 @@ export class Start extends BaseCommand { await this.server.start(); Container.get(PruningService).init(); - Container.get(ExecutionRecoveryService).init(); if (config.getEnv('executions.mode') === 'regular') { await this.runEnqueuedExecutions(); diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts index 965e7da6fecda..f72c81a3cad53 100644 --- a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -9,8 +9,6 @@ import { createExecution } from '@test-integration/db/executions'; import * as testDb from '@test-integration/testDb'; import { mock } from 'jest-mock-extended'; -import { OrchestrationService } from '@/services/orchestration.service'; -import config from '@/config'; import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { Push } from '@/push'; @@ -28,20 +26,17 @@ describe('ExecutionRecoveryService', () => { const instanceSettings = new InstanceSettings(); let executionRecoveryService: ExecutionRecoveryService; - let orchestrationService: OrchestrationService; let executionRepository: ExecutionRepository; beforeAll(async () => { await testDb.init(); executionRepository = Container.get(ExecutionRepository); - orchestrationService = Container.get(OrchestrationService); executionRecoveryService = new ExecutionRecoveryService( mock(), instanceSettings, push, executionRepository, - orchestrationService, mock(), ); }); @@ -53,74 +48,12 @@ describe('ExecutionRecoveryService', () => { afterEach(async () => { jest.restoreAllMocks(); await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']); - executionRecoveryService.shutdown(); }); afterAll(async () => { await testDb.terminate(); }); - describe('scheduleQueueRecovery', () => { - describe('queue mode', () => { - it('if leader, should schedule queue recovery', () => { - /** - * Arrange - */ - config.set('executions.mode', 'queue'); - const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery'); - - /** - * Act - */ - executionRecoveryService.init(); - - /** - * Assert - */ - expect(scheduleSpy).toHaveBeenCalled(); - }); - - it('if follower, should do nothing', () => { - /** - * Arrange - */ - config.set('executions.mode', 'queue'); - instanceSettings.markAsFollower(); - const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery'); - - /** - * Act - */ - executionRecoveryService.init(); - - /** - * Assert - */ - expect(scheduleSpy).not.toHaveBeenCalled(); - }); - }); - - describe('regular mode', () => { - it('should do nothing', () => { - /** - * Arrange - */ - config.set('executions.mode', 'regular'); - const scheduleSpy = jest.spyOn(executionRecoveryService, 'scheduleQueueRecovery'); - - /** - * Act - */ - executionRecoveryService.init(); - - /** - * Assert - */ - expect(scheduleSpy).not.toHaveBeenCalled(); - }); - }); - }); - describe('recoverFromLogs', () => { describe('if follower', () => { test('should do nothing', async () => { diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index d691df1b3ce36..8e29127737792 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -1,6 +1,6 @@ -import Container, { Service } from 'typedi'; +import { Service } from 'typedi'; import { Push } from '@/push'; -import { jsonStringify, sleep } from 'n8n-workflow'; +import { sleep } from 'n8n-workflow'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle import type { DateTime } from 'luxon'; @@ -12,10 +12,6 @@ import { NodeCrashedError } from '@/errors/node-crashed.error'; import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; import { ARTIFICIAL_TASK_DATA } from '@/constants'; import { Logger } from '@/Logger'; -import config from '@/config'; -import { OnShutdown } from '@/decorators/OnShutdown'; -import type { QueueRecoverySettings } from './execution.types'; -import { OrchestrationService } from '@/services/orchestration.service'; import { EventService } from '@/events/event.service'; /** @@ -28,34 +24,9 @@ export class ExecutionRecoveryService { private readonly instanceSettings: InstanceSettings, private readonly push: Push, private readonly executionRepository: ExecutionRepository, - private readonly orchestrationService: OrchestrationService, private readonly eventService: EventService, ) {} - /** - * @important Requires `OrchestrationService` to be initialized on queue mode. - */ - init() { - if (config.getEnv('executions.mode') === 'regular') return; - - const { isLeader } = this.instanceSettings; - if (isLeader) this.scheduleQueueRecovery(); - - const { isMultiMainSetupEnabled } = this.orchestrationService; - if (isMultiMainSetupEnabled) { - this.orchestrationService.multiMainSetup - .on('leader-takeover', () => this.scheduleQueueRecovery()) - .on('leader-stepdown', () => this.stopQueueRecovery()); - } - } - - private readonly queueRecoverySettings: QueueRecoverySettings = { - batchSize: config.getEnv('executions.queueRecovery.batchSize'), - waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000, - }; - - private isShuttingDown = false; - /** * Recover key properties of a truncated execution using event logs. */ @@ -82,89 +53,10 @@ export class ExecutionRecoveryService { return amendedExecution; } - /** - * Schedule a cycle to mark dangling executions as crashed in queue mode. - */ - scheduleQueueRecovery(waitMs = this.queueRecoverySettings.waitMs) { - if (!this.shouldScheduleQueueRecovery()) return; - - this.queueRecoverySettings.timeout = setTimeout(async () => { - try { - const nextWaitMs = await this.recoverFromQueue(); - this.scheduleQueueRecovery(nextWaitMs); - } catch (error) { - const msg = this.toErrorMsg(error); - - this.logger.error('[Recovery] Failed to recover dangling executions from queue', { msg }); - this.logger.error('[Recovery] Retrying...'); - - this.scheduleQueueRecovery(); - } - }, waitMs); - - const wait = [this.queueRecoverySettings.waitMs / (60 * 1000), 'min'].join(' '); - - this.logger.debug(`[Recovery] Scheduled queue recovery check for next ${wait}`); - } - - stopQueueRecovery() { - clearTimeout(this.queueRecoverySettings.timeout); - } - - @OnShutdown() - shutdown() { - this.isShuttingDown = true; - this.stopQueueRecovery(); - } - // ---------------------------------- // private // ---------------------------------- - /** - * Mark in-progress executions as `crashed` if stored in DB as `new` or `running` - * but absent from the queue. Return time until next recovery cycle. - */ - private async recoverFromQueue() { - const { waitMs, batchSize } = this.queueRecoverySettings; - - const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize); - - if (storedIds.length === 0) { - this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions'); - return waitMs; - } - - const { ScalingService } = await import('@/scaling/scaling.service'); - - const runningJobs = await Container.get(ScalingService).findJobsByStatus(['active', 'waiting']); - - const queuedIds = new Set(runningJobs.map((job) => job.data.executionId)); - - if (queuedIds.size === 0) { - this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions'); - return waitMs; - } - - const danglingIds = storedIds.filter((id) => !queuedIds.has(id)); - - if (danglingIds.length === 0) { - this.logger.debug('[Recovery] Completed queue recovery check, no dangling executions'); - return waitMs; - } - - await this.executionRepository.markAsCrashed(danglingIds); - - this.logger.info('[Recovery] Completed queue recovery check, recovered dangling executions', { - danglingIds, - }); - - // if this cycle used up the whole batch size, it is possible for there to be - // dangling executions outside this check, so speed up next cycle - - return storedIds.length >= this.queueRecoverySettings.batchSize ? waitMs / 2 : waitMs; - } - /** * Amend `status`, `stoppedAt`, and (if possible) `data` of an execution using event logs. */ @@ -313,18 +205,4 @@ export class ExecutionRecoveryService { await externalHooks.executeHookFunctions('workflowExecuteAfter', [run]); } - - private toErrorMsg(error: unknown) { - return error instanceof Error - ? error.message - : jsonStringify(error, { replaceCircularRefs: true }); - } - - private shouldScheduleQueueRecovery() { - return ( - config.getEnv('executions.mode') === 'queue' && - this.instanceSettings.isLeader && - !this.isShuttingDown - ); - } } diff --git a/packages/cli/src/executions/execution.types.ts b/packages/cli/src/executions/execution.types.ts index fd5024adbd718..15c27261fc1a3 100644 --- a/packages/cli/src/executions/execution.types.ts +++ b/packages/cli/src/executions/execution.types.ts @@ -93,23 +93,6 @@ export namespace ExecutionSummaries { export type ExecutionSummaryWithScopes = ExecutionSummary & { scopes: Scope[] }; } -export type QueueRecoverySettings = { - /** - * ID of timeout for next scheduled recovery cycle. - */ - timeout?: NodeJS.Timeout; - - /** - * Number of in-progress executions to check per cycle. - */ - batchSize: number; - - /** - * Time (in milliseconds) to wait before the next cycle. - */ - waitMs: number; -}; - export type StopResult = { mode: WorkflowExecuteMode; startedAt: Date; diff --git a/packages/cli/src/scaling/__tests__/scaling.service.test.ts b/packages/cli/src/scaling/__tests__/scaling.service.test.ts index 0603309e2039d..adbf5ebde2f0e 100644 --- a/packages/cli/src/scaling/__tests__/scaling.service.test.ts +++ b/packages/cli/src/scaling/__tests__/scaling.service.test.ts @@ -7,6 +7,9 @@ import type { Job, JobData, JobOptions, JobQueue } from '../types'; import { ApplicationError } from 'n8n-workflow'; import { mockInstance } from '@test/mocking'; import { GlobalConfig } from '@n8n/config'; +import { InstanceSettings } from 'n8n-core'; +import type { OrchestrationService } from '@/services/orchestration.service'; +import Container from 'typedi'; import type { JobProcessor } from '../job-processor'; const queue = mock({ @@ -34,9 +37,27 @@ describe('ScalingService', () => { }, }); + const instanceSettings = Container.get(InstanceSettings); + const orchestrationService = mock({ isMultiMainSetupEnabled: false }); + const jobProcessor = mock(); + let scalingService: ScalingService; + beforeEach(() => { jest.clearAllMocks(); config.set('generic.instanceType', 'main'); + scalingService = new ScalingService( + mock(), + mock(), + jobProcessor, + globalConfig, + mock(), + instanceSettings, + orchestrationService, + ); + }); + + afterEach(() => { + scalingService.stopQueueRecovery(); }); describe('setupQueue', () => { @@ -44,7 +65,6 @@ describe('ScalingService', () => { /** * Arrange */ - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); const { prefix, settings } = globalConfig.queue.bull; const Bull = jest.mocked(BullModule.default); @@ -72,7 +92,15 @@ describe('ScalingService', () => { * Arrange */ config.set('generic.instanceType', 'worker'); - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); + const scalingService = new ScalingService( + mock(), + mock(), + mock(), + globalConfig, + mock(), + instanceSettings, + orchestrationService, + ); await scalingService.setupQueue(); const concurrency = 5; @@ -91,7 +119,6 @@ describe('ScalingService', () => { /** * Arrange */ - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); await scalingService.setupQueue(); /** @@ -102,14 +129,13 @@ describe('ScalingService', () => { }); describe('stop', () => { - it('should pause the queue and check for running jobs', async () => { + it('should pause the queue, check for running jobs, and stop queue recovery', async () => { /** * Arrange */ - const jobProcessor = mock(); - const scalingService = new ScalingService(mock(), mock(), jobProcessor, globalConfig); await scalingService.setupQueue(); jobProcessor.getRunningJobIds.mockReturnValue([]); + const stopQueueRecoverySpy = jest.spyOn(scalingService, 'stopQueueRecovery'); const getRunningJobsCountSpy = jest.spyOn(scalingService, 'getRunningJobsCount'); /** @@ -121,6 +147,7 @@ describe('ScalingService', () => { * Assert */ expect(queue.pause).toHaveBeenCalledWith(true, true); + expect(stopQueueRecoverySpy).toHaveBeenCalled(); expect(getRunningJobsCountSpy).toHaveBeenCalled(); }); }); @@ -130,7 +157,6 @@ describe('ScalingService', () => { /** * Arrange */ - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); await scalingService.setupQueue(); /** @@ -150,7 +176,6 @@ describe('ScalingService', () => { /** * Arrange */ - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); await scalingService.setupQueue(); queue.add.mockResolvedValue(mock({ id: '456' })); @@ -173,7 +198,6 @@ describe('ScalingService', () => { /** * Arrange */ - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); await scalingService.setupQueue(); const jobId = '123'; queue.getJob.mockResolvedValue(mock({ id: jobId })); @@ -196,7 +220,6 @@ describe('ScalingService', () => { /** * Arrange */ - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); await scalingService.setupQueue(); queue.getJobs.mockResolvedValue([mock({ id: '123' })]); @@ -217,7 +240,6 @@ describe('ScalingService', () => { /** * Arrange */ - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); await scalingService.setupQueue(); // @ts-expect-error - Untyped but possible Redis response queue.getJobs.mockResolvedValue([mock(), null]); @@ -239,7 +261,6 @@ describe('ScalingService', () => { /** * Arrange */ - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); await scalingService.setupQueue(); const job = mock({ isActive: jest.fn().mockResolvedValue(true) }); @@ -259,7 +280,6 @@ describe('ScalingService', () => { /** * Arrange */ - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); await scalingService.setupQueue(); const job = mock({ isActive: jest.fn().mockResolvedValue(false) }); @@ -279,7 +299,6 @@ describe('ScalingService', () => { /** * Arrange */ - const scalingService = new ScalingService(mock(), mock(), mock(), globalConfig); await scalingService.setupQueue(); const job = mock({ isActive: jest.fn().mockImplementation(() => { @@ -298,4 +317,42 @@ describe('ScalingService', () => { expect(result).toBe(false); }); }); + + describe('scheduleQueueRecovery', () => { + it('if leader, should schedule queue recovery', async () => { + /** + * Arrange + */ + const scheduleSpy = jest.spyOn(scalingService, 'scheduleQueueRecovery'); + instanceSettings.markAsLeader(); + + /** + * Act + */ + await scalingService.setupQueue(); + + /** + * Assert + */ + expect(scheduleSpy).toHaveBeenCalled(); + }); + + it('if follower, should not schedule queue recovery', async () => { + /** + * Arrange + */ + const scheduleSpy = jest.spyOn(scalingService, 'scheduleQueueRecovery'); + instanceSettings.markAsFollower(); + + /** + * Act + */ + await scalingService.setupQueue(); + + /** + * Assert + */ + expect(scheduleSpy).not.toHaveBeenCalled(); + }); + }); }); diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index a30e797ccba48..1a8bc9e173798 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -1,16 +1,28 @@ import Container, { Service } from 'typedi'; -import { ApplicationError, BINARY_ENCODING, sleep } from 'n8n-workflow'; +import { ApplicationError, BINARY_ENCODING, sleep, jsonStringify } from 'n8n-workflow'; import { ActiveExecutions } from '@/ActiveExecutions'; import config from '@/config'; import { Logger } from '@/Logger'; import { MaxStalledCountError } from '@/errors/max-stalled-count.error'; -import { HIGHEST_SHUTDOWN_PRIORITY } from '@/constants'; +import { HIGHEST_SHUTDOWN_PRIORITY, Time } from '@/constants'; import { OnShutdown } from '@/decorators/OnShutdown'; import { JOB_TYPE_NAME, QUEUE_NAME } from './constants'; import { JobProcessor } from './job-processor'; -import type { JobQueue, Job, JobData, JobOptions, JobMessage, JobStatus, JobId } from './types'; +import type { + JobQueue, + Job, + JobData, + JobOptions, + JobMessage, + JobStatus, + JobId, + QueueRecoveryContext, +} from './types'; import type { IExecuteResponsePromiseData } from 'n8n-workflow'; import { GlobalConfig } from '@n8n/config'; +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { InstanceSettings } from 'n8n-core'; +import { OrchestrationService } from '@/services/orchestration.service'; @Service() export class ScalingService { @@ -23,6 +35,9 @@ export class ScalingService { private readonly activeExecutions: ActiveExecutions, private readonly jobProcessor: JobProcessor, private readonly globalConfig: GlobalConfig, + private readonly executionRepository: ExecutionRepository, + private readonly instanceSettings: InstanceSettings, + private readonly orchestrationService: OrchestrationService, ) {} // #region Lifecycle @@ -43,6 +58,14 @@ export class ScalingService { this.registerListeners(); + if (this.instanceSettings.isLeader) this.scheduleQueueRecovery(); + + if (this.orchestrationService.isMultiMainSetupEnabled) { + this.orchestrationService.multiMainSetup + .on('leader-takeover', () => this.scheduleQueueRecovery()) + .on('leader-stepdown', () => this.stopQueueRecovery()); + } + this.logger.debug('[ScalingService] Queue setup completed'); } @@ -64,6 +87,10 @@ export class ScalingService { this.logger.debug('[ScalingService] Queue paused'); + this.stopQueueRecovery(); + + this.logger.debug('[ScalingService] Queue recovery stopped'); + let count = 0; while (this.getRunningJobsCount() !== 0) { @@ -230,4 +257,86 @@ export class ScalingService { throw new ApplicationError('This method must be called on a `worker` instance'); } + + // #region Queue recovery + + private readonly queueRecoveryContext: QueueRecoveryContext = { + batchSize: config.getEnv('executions.queueRecovery.batchSize'), + waitMs: config.getEnv('executions.queueRecovery.interval') * 60 * 1000, + }; + + scheduleQueueRecovery(waitMs = this.queueRecoveryContext.waitMs) { + this.queueRecoveryContext.timeout = setTimeout(async () => { + try { + const nextWaitMs = await this.recoverFromQueue(); + this.scheduleQueueRecovery(nextWaitMs); + } catch (error) { + this.logger.error('[ScalingService] Failed to recover dangling executions from queue', { + msg: this.toErrorMsg(error), + }); + this.logger.error('[ScalingService] Retrying...'); + + this.scheduleQueueRecovery(); + } + }, waitMs); + + const wait = [this.queueRecoveryContext.waitMs / Time.minutes.toMilliseconds, 'min'].join(' '); + + this.logger.debug(`[ScalingService] Scheduled queue recovery check for next ${wait}`); + } + + stopQueueRecovery() { + clearTimeout(this.queueRecoveryContext.timeout); + } + + /** + * Mark in-progress executions as `crashed` if stored in DB as `new` or `running` + * but absent from the queue. Return time until next recovery cycle. + */ + private async recoverFromQueue() { + const { waitMs, batchSize } = this.queueRecoveryContext; + + const storedIds = await this.executionRepository.getInProgressExecutionIds(batchSize); + + if (storedIds.length === 0) { + this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions'); + return waitMs; + } + + const runningJobs = await this.findJobsByStatus(['active', 'waiting']); + + const queuedIds = new Set(runningJobs.map((job) => job.data.executionId)); + + if (queuedIds.size === 0) { + this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions'); + return waitMs; + } + + const danglingIds = storedIds.filter((id) => !queuedIds.has(id)); + + if (danglingIds.length === 0) { + this.logger.debug('[ScalingService] Completed queue recovery check, no dangling executions'); + return waitMs; + } + + await this.executionRepository.markAsCrashed(danglingIds); + + this.logger.info( + '[ScalingService] Completed queue recovery check, recovered dangling executions', + { danglingIds }, + ); + + // if this cycle used up the whole batch size, it is possible for there to be + // dangling executions outside this check, so speed up next cycle + + return storedIds.length >= this.queueRecoveryContext.batchSize ? waitMs / 2 : waitMs; + } + + private toErrorMsg(error: unknown) { + return error instanceof Error + ? error.message + : jsonStringify(error, { replaceCircularRefs: true }); + } + + // #endregion } diff --git a/packages/cli/src/scaling/types.ts b/packages/cli/src/scaling/types.ts index 55d49b8c48487..b35d1d109d7c4 100644 --- a/packages/cli/src/scaling/types.ts +++ b/packages/cli/src/scaling/types.ts @@ -53,3 +53,14 @@ export type RunningJob = { }; export type RunningJobSummary = Omit; + +export type QueueRecoveryContext = { + /** ID of timeout for next scheduled recovery cycle. */ + timeout?: NodeJS.Timeout; + + /** Number of in-progress executions to check per cycle. */ + batchSize: number; + + /** Time (in milliseconds) to wait until the next cycle. */ + waitMs: number; +};