From 18e453b4f542d51185f6be3e41ec25fdd164b987 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 1 Jul 2024 16:47:09 +0200 Subject: [PATCH 1/6] fix(core): Stop re-enqueing executions on startup --- packages/cli/src/commands/start.ts | 42 ------------------- .../cli/src/executions/execution.service.ts | 11 ----- 2 files changed, 53 deletions(-) diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 62762adbb85db..42431556400c8 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -27,10 +27,6 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error'; import { WaitTracker } from '@/WaitTracker'; import { BaseCommand } from './BaseCommand'; -import type { IWorkflowExecutionDataProcess } from '@/Interfaces'; -import { ExecutionService } from '@/executions/execution.service'; -import { OwnershipService } from '@/services/ownership.service'; -import { WorkflowRunner } from '@/WorkflowRunner'; import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires @@ -300,10 +296,6 @@ export class Start extends BaseCommand { Container.get(PruningService).init(); Container.get(ExecutionRecoveryService).init(); - if (config.getEnv('executions.mode') === 'regular') { - await this.runEnqueuedExecutions(); - } - // Start to get active workflows and run their triggers await this.activeWorkflowManager.init(); @@ -345,38 +337,4 @@ export class Start extends BaseCommand { if (error.stack) this.logger.error(error.stack); await this.exitWithCrash('Exiting due to an error.', error); } - - /** - * During startup, we may find executions that had been enqueued at the time of shutdown. - * - * If so, start running any such executions concurrently up to the concurrency limit, and - * enqueue any remaining ones until we have spare concurrency capacity again. - */ - private async runEnqueuedExecutions() { - const executions = await Container.get(ExecutionService).findAllEnqueuedExecutions(); - - if (executions.length === 0) return; - - this.logger.debug( - '[Startup] Found enqueued executions to run', - executions.map((e) => e.id), - ); - - const ownershipService = Container.get(OwnershipService); - const workflowRunner = Container.get(WorkflowRunner); - - for (const execution of executions) { - const project = await ownershipService.getWorkflowProjectCached(execution.workflowId); - - const data: IWorkflowExecutionDataProcess = { - executionMode: execution.mode, - executionData: execution.data, - workflowData: execution.workflowData, - projectId: project.id, - }; - - // do not block - each execution either runs concurrently or is queued - void workflowRunner.run(data, undefined, false, execution.id); - } - } } diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index 546443ec4afca..21109dc125b85 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -390,17 +390,6 @@ export class ExecutionService { }; } - async findAllEnqueuedExecutions() { - return await this.executionRepository.findMultipleExecutions( - { - select: ['id', 'mode'], - where: { status: 'new' }, - order: { id: 'ASC' }, - }, - { includeData: true, unflattenData: true }, - ); - } - async stop(executionId: string): Promise { const execution = await this.executionRepository.findSingleExecution(executionId, { includeData: true, From a02a72148b7022ddf16b61fa925964c633b2cc13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 1 Jul 2024 17:10:08 +0200 Subject: [PATCH 2/6] Revert "fix(core): Stop re-enqueing executions on startup" This reverts commit 18e453b4f542d51185f6be3e41ec25fdd164b987. --- packages/cli/src/commands/start.ts | 42 +++++++++++++++++++ .../cli/src/executions/execution.service.ts | 11 +++++ 2 files changed, 53 insertions(+) diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 42431556400c8..62762adbb85db 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -27,6 +27,10 @@ import { ExecutionRepository } from '@db/repositories/execution.repository'; import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error'; import { WaitTracker } from '@/WaitTracker'; import { BaseCommand } from './BaseCommand'; +import type { IWorkflowExecutionDataProcess } from '@/Interfaces'; +import { ExecutionService } from '@/executions/execution.service'; +import { OwnershipService } from '@/services/ownership.service'; +import { WorkflowRunner } from '@/WorkflowRunner'; import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires @@ -296,6 +300,10 @@ export class Start extends BaseCommand { Container.get(PruningService).init(); Container.get(ExecutionRecoveryService).init(); + if (config.getEnv('executions.mode') === 'regular') { + await this.runEnqueuedExecutions(); + } + // Start to get active workflows and run their triggers await this.activeWorkflowManager.init(); @@ -337,4 +345,38 @@ export class Start extends BaseCommand { if (error.stack) this.logger.error(error.stack); await this.exitWithCrash('Exiting due to an error.', error); } + + /** + * During startup, we may find executions that had been enqueued at the time of shutdown. + * + * If so, start running any such executions concurrently up to the concurrency limit, and + * enqueue any remaining ones until we have spare concurrency capacity again. + */ + private async runEnqueuedExecutions() { + const executions = await Container.get(ExecutionService).findAllEnqueuedExecutions(); + + if (executions.length === 0) return; + + this.logger.debug( + '[Startup] Found enqueued executions to run', + executions.map((e) => e.id), + ); + + const ownershipService = Container.get(OwnershipService); + const workflowRunner = Container.get(WorkflowRunner); + + for (const execution of executions) { + const project = await ownershipService.getWorkflowProjectCached(execution.workflowId); + + const data: IWorkflowExecutionDataProcess = { + executionMode: execution.mode, + executionData: execution.data, + workflowData: execution.workflowData, + projectId: project.id, + }; + + // do not block - each execution either runs concurrently or is queued + void workflowRunner.run(data, undefined, false, execution.id); + } + } } diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index 21109dc125b85..546443ec4afca 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -390,6 +390,17 @@ export class ExecutionService { }; } + async findAllEnqueuedExecutions() { + return await this.executionRepository.findMultipleExecutions( + { + select: ['id', 'mode'], + where: { status: 'new' }, + order: { id: 'ASC' }, + }, + { includeData: true, unflattenData: true }, + ); + } + async stop(executionId: string): Promise { const execution = await this.executionRepository.findSingleExecution(executionId, { includeData: true, From 7ed7c36a6f2a77ca857cf0c6b5db12e3488964ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 1 Jul 2024 17:24:56 +0200 Subject: [PATCH 3/6] Throw on missing execution data --- packages/cli/src/ActiveExecutions.ts | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 67616ee4b3f63..ce2c1339b9c53 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -46,11 +46,20 @@ export class ActiveExecutions { async add(executionData: IWorkflowExecutionDataProcess, executionId?: string): Promise { let executionStatus: ExecutionStatus = executionId ? 'running' : 'new'; const mode = executionData.executionMode; + + const runExecutionData = executionData.executionData; + + if (!runExecutionData) { + throw new ApplicationError('Cannot add active execution without execution data', { + extra: { workflowId: executionData.workflowData.id }, + }); + } + if (executionId === undefined) { // Is a new execution so save in DB const fullExecutionData: ExecutionPayload = { - data: executionData.executionData!, + data: runExecutionData, mode, finished: false, startedAt: new Date(), @@ -82,7 +91,7 @@ export class ActiveExecutions { const execution: Pick = { id: executionId, - data: executionData.executionData!, + data: runExecutionData, waitTill: null, status: executionStatus, }; From 274cff9d31aaa3aab30db0f481d529a8df60c2a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 1 Jul 2024 17:38:28 +0200 Subject: [PATCH 4/6] Fix tests --- packages/cli/test/unit/ActiveExecutions.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/cli/test/unit/ActiveExecutions.test.ts b/packages/cli/test/unit/ActiveExecutions.test.ts index b2454de87c5e1..81c525531ed39 100644 --- a/packages/cli/test/unit/ActiveExecutions.test.ts +++ b/packages/cli/test/unit/ActiveExecutions.test.ts @@ -137,6 +137,7 @@ function mockExecutionData(): IWorkflowExecutionDataProcess { nodes: [], connections: {}, }, + executionData: { resultData: { runData: {} } }, userId: uuid(), }; } From e6ac65c3ab56ecaf4f447a4c2f787c82df7efd3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 2 Jul 2024 09:16:34 +0200 Subject: [PATCH 5/6] refactor: Report from execution repository --- packages/cli/src/ActiveExecutions.ts | 12 ++--------- .../repositories/execution.repository.ts | 20 +++++++++++++++++-- .../cli/test/unit/ActiveExecutions.test.ts | 1 - 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index ce2c1339b9c53..7ee4f3570290a 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -47,19 +47,11 @@ export class ActiveExecutions { let executionStatus: ExecutionStatus = executionId ? 'running' : 'new'; const mode = executionData.executionMode; - const runExecutionData = executionData.executionData; - - if (!runExecutionData) { - throw new ApplicationError('Cannot add active execution without execution data', { - extra: { workflowId: executionData.workflowData.id }, - }); - } - if (executionId === undefined) { // Is a new execution so save in DB const fullExecutionData: ExecutionPayload = { - data: runExecutionData, + data: executionData.executionData!, mode, finished: false, startedAt: new Date(), @@ -91,7 +83,7 @@ export class ActiveExecutions { const execution: Pick = { id: executionId, - data: runExecutionData, + data: executionData.executionData!, waitTill: null, status: executionStatus, }; diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 4e9977c4042f5..94a1a520e1888 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -43,6 +43,8 @@ import { ExecutionDataRepository } from './executionData.repository'; import { Logger } from '@/Logger'; import type { ExecutionSummaries } from '@/executions/execution.types'; import { PostgresLiveRowsRetrievalError } from '@/errors/postgres-live-rows-retrieval.error'; +import { separate } from '@/utils'; +import { ErrorReporterProxy as ErrorReporter } from 'n8n-workflow'; export interface IGetExecutionsQueryFilter { id?: FindOperator | string; @@ -156,7 +158,9 @@ export class ExecutionRepository extends Repository { const executions = await this.find(queryParams); if (options?.includeData && options?.unflattenData) { - return executions.map((execution) => { + const [valid, invalid] = separate(executions, (e) => e.executionData !== null); + this.reportInvalidExecutions(invalid); + return valid.map((execution) => { const { executionData, metadata, ...rest } = execution; return { ...rest, @@ -166,7 +170,9 @@ export class ExecutionRepository extends Repository { } as IExecutionResponse; }); } else if (options?.includeData) { - return executions.map((execution) => { + const [valid, invalid] = separate(executions, (e) => e.executionData !== null); + this.reportInvalidExecutions(invalid); + return valid.map((execution) => { const { executionData, metadata, ...rest } = execution; return { ...rest, @@ -183,6 +189,16 @@ export class ExecutionRepository extends Repository { }); } + reportInvalidExecutions(executions: ExecutionEntity[]) { + if (executions.length === 0) return; + + ErrorReporter.error( + new ApplicationError('Found executions without executionData', { + extra: { executionIds: executions.map(({ id }) => id) }, + }), + ); + } + async findSingleExecution( id: string, options?: { diff --git a/packages/cli/test/unit/ActiveExecutions.test.ts b/packages/cli/test/unit/ActiveExecutions.test.ts index 81c525531ed39..b2454de87c5e1 100644 --- a/packages/cli/test/unit/ActiveExecutions.test.ts +++ b/packages/cli/test/unit/ActiveExecutions.test.ts @@ -137,7 +137,6 @@ function mockExecutionData(): IWorkflowExecutionDataProcess { nodes: [], connections: {}, }, - executionData: { resultData: { runData: {} } }, userId: uuid(), }; } From 165342237e72cbe6d8e3be9fca349da6ca88f470 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 2 Jul 2024 09:17:21 +0200 Subject: [PATCH 6/6] Reduce diff --- packages/cli/src/ActiveExecutions.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 7ee4f3570290a..67616ee4b3f63 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -46,7 +46,6 @@ export class ActiveExecutions { async add(executionData: IWorkflowExecutionDataProcess, executionId?: string): Promise { let executionStatus: ExecutionStatus = executionId ? 'running' : 'new'; const mode = executionData.executionMode; - if (executionId === undefined) { // Is a new execution so save in DB