From e115015a7cc119c7466ed01ce92f78e2d2678b7f Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Thu, 1 Aug 2024 16:07:36 +0300 Subject: [PATCH 1/4] fix(core): Make execution and its data creation atomic Executions and their data are inserted in two separate insert statements. Before this PR, this wasn't done in a transaction, which could cause executions to be created without execution data. This caused issues like the one fixed in #9903. --- packages/cli/src/ActiveExecutions.ts | 13 ++++--- .../repositories/execution.repository.ts | 27 +++++++++----- .../repositories/executionData.repository.ts | 20 +++++++++++ .../cli/src/executions/execution.service.ts | 5 ++- .../workflows/workflowExecution.service.ts | 8 ++++- .../repositories/execution.repository.test.ts | 35 +++++++++++++++++++ .../cli/test/unit/ActiveExecutions.test.ts | 5 +++ 7 files changed, 99 insertions(+), 14 deletions(-) diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 97313d5cb2f07..e3fc4023b72d3 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -12,6 +12,7 @@ import { ExecutionCancelledError, sleep, } from 'n8n-workflow'; +import { strict as assert } from 'node:assert'; import type { ExecutionPayload, @@ -23,6 +24,7 @@ import type { import { isWorkflowIdValid } from '@/utils'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { Logger } from '@/Logger'; +import * as Db from '@/Db'; import { ConcurrencyControlService } from './concurrency/concurrency-control.service'; import config from './config'; @@ -73,10 +75,13 @@ export class ActiveExecutions { fullExecutionData.workflowId = workflowId; } - executionId = await this.executionRepository.createNewExecution(fullExecutionData); - if (executionId === undefined) { - throw new ApplicationError('There was an issue assigning an execution id to the execution'); - } + executionId = await Db.transaction(async (transactionManager) => { + return await this.executionRepository.createNewExecution( + fullExecutionData, + transactionManager, + ); + }); + assert(executionId); await this.concurrencyControl.throttle({ mode, executionId }); executionStatus = 'running'; diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 1ebb22d8eb4bc..47c210c4b6d83 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -13,6 +13,7 @@ import { } from '@n8n/typeorm'; import { DateUtils } from '@n8n/typeorm/util/DateUtils'; import type { + EntityManager, FindManyOptions, FindOneOptions, FindOperator, @@ -270,16 +271,26 @@ export class ExecutionRepository extends Repository { return rest; } - async createNewExecution(execution: ExecutionPayload): Promise { + /** + * Insert a new execution and its execution data + */ + async createNewExecution(execution: ExecutionPayload, trx?: EntityManager): Promise { + trx = trx ?? this.manager; + const { data, workflowData, ...rest } = execution; - const { identifiers: inserted } = await this.insert(rest); - const { id: executionId } = inserted[0] as { id: string }; + const insertResult = await trx.insert(ExecutionEntity, rest); + const { id: executionId } = insertResult.identifiers[0] as { id: string }; + const { connections, nodes, name, settings } = workflowData ?? {}; - await this.executionDataRepository.insert({ - executionId, - workflowData: { connections, nodes, name, settings, id: workflowData.id }, - data: stringify(data), - }); + await this.executionDataRepository.createExecutionDataForExecution( + { + executionId, + workflowData: { connections, nodes, name, settings, id: workflowData.id }, + data: stringify(data), + }, + trx, + ); + return String(executionId); } diff --git a/packages/cli/src/databases/repositories/executionData.repository.ts b/packages/cli/src/databases/repositories/executionData.repository.ts index 5872f9888cd66..b1378a2164044 100644 --- a/packages/cli/src/databases/repositories/executionData.repository.ts +++ b/packages/cli/src/databases/repositories/executionData.repository.ts @@ -1,13 +1,33 @@ import { Service } from 'typedi'; +import type { EntityManager } from '@n8n/typeorm'; +import type { IWorkflowBase } from 'n8n-workflow'; import { DataSource, In, Repository } from '@n8n/typeorm'; import { ExecutionData } from '../entities/ExecutionData'; +export interface CreateExecutionDataOpts extends Pick { + workflowData: Pick; +} + @Service() export class ExecutionDataRepository extends Repository { constructor(dataSource: DataSource) { super(ExecutionData, dataSource.manager); } + async createExecutionDataForExecution( + executionData: CreateExecutionDataOpts, + trx?: EntityManager, + ) { + trx = trx ?? this.manager; + const { data, executionId, workflowData } = executionData; + + return await trx.insert(ExecutionData, { + executionId, + data, + workflowData, + }); + } + async findByExecutionIds(executionIds: string[]) { return await this.find({ select: ['workflowData'], diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index bb8650e99f1d6..3694c10a75f7f 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -40,6 +40,7 @@ import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error'; import { License } from '@/License'; +import * as Db from '@/Db'; export const schemaGetExecutionsQueryFilter = { $id: '/IGetExecutionsQueryFilter', @@ -323,7 +324,9 @@ export class ExecutionService { status: 'error', }; - await this.executionRepository.createNewExecution(fullExecutionData); + await Db.transaction(async (transactionManager) => { + await this.executionRepository.createNewExecution(fullExecutionData, transactionManager); + }); } // ---------------------------------- diff --git a/packages/cli/src/workflows/workflowExecution.service.ts b/packages/cli/src/workflows/workflowExecution.service.ts index 8ebe7aed0c142..f76efda93ee3a 100644 --- a/packages/cli/src/workflows/workflowExecution.service.ts +++ b/packages/cli/src/workflows/workflowExecution.service.ts @@ -35,6 +35,7 @@ import { Logger } from '@/Logger'; import type { Project } from '@/databases/entities/Project'; import { GlobalConfig } from '@n8n/config'; import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service'; +import * as Db from '@/Db'; @Service() export class WorkflowExecutionService { @@ -220,7 +221,12 @@ export class WorkflowExecutionService { workflowId: workflowData.id, }; - await this.executionRepository.createNewExecution(fullExecutionData); + await Db.transaction(async (transactionManager) => { + await this.executionRepository.createNewExecution( + fullExecutionData, + transactionManager, + ); + }); } this.logger.info('Error workflow execution blocked due to subworkflow settings', { erroredWorkflowId: workflowErrorData.workflow.id, diff --git a/packages/cli/test/integration/database/repositories/execution.repository.test.ts b/packages/cli/test/integration/database/repositories/execution.repository.test.ts index cfb897d627a56..3408b0aef1466 100644 --- a/packages/cli/test/integration/database/repositories/execution.repository.test.ts +++ b/packages/cli/test/integration/database/repositories/execution.repository.test.ts @@ -1,6 +1,7 @@ import Container from 'typedi'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExecutionDataRepository } from '@db/repositories/executionData.repository'; +import * as Db from '@/Db'; import * as testDb from '../../shared/testDb'; import { createWorkflow } from '../../shared/db/workflows'; @@ -52,5 +53,39 @@ describe('ExecutionRepository', () => { }); expect(executionData?.data).toEqual('[{"resultData":"1"},{}]'); }); + + it('should not create execution if execution data insert fails', async () => { + const executionRepo = Container.get(ExecutionRepository); + const executionDataRepo = Container.get(ExecutionDataRepository); + + const workflow = await createWorkflow({ settings: { executionOrder: 'v1' } }); + jest + .spyOn(executionDataRepo, 'createExecutionDataForExecution') + .mockRejectedValueOnce(new Error()); + + await expect( + async () => + await Db.transaction(async (transactionManager) => { + await executionRepo.createNewExecution( + { + workflowId: workflow.id, + data: { + //@ts-expect-error This is not needed for tests + resultData: {}, + }, + workflowData: workflow, + mode: 'manual', + startedAt: new Date(), + status: 'new', + finished: false, + }, + transactionManager, + ); + }), + ).rejects.toThrow(); + + const executionEntities = await executionRepo.find(); + expect(executionEntities).toBeEmptyArray(); + }); }); }); diff --git a/packages/cli/test/unit/ActiveExecutions.test.ts b/packages/cli/test/unit/ActiveExecutions.test.ts index 0cc70a10a4e61..771ca63ab64f3 100644 --- a/packages/cli/test/unit/ActiveExecutions.test.ts +++ b/packages/cli/test/unit/ActiveExecutions.test.ts @@ -8,6 +8,7 @@ import type { ExecutionRepository } from '@db/repositories/execution.repository' import { mock } from 'jest-mock-extended'; import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; import { mockInstance } from '@test/mocking'; +import type { EntityManager } from '@n8n/typeorm'; const FAKE_EXECUTION_ID = '15'; const FAKE_SECOND_EXECUTION_ID = '20'; @@ -25,6 +26,10 @@ const concurrencyControl = mockInstance(ConcurrencyControlService, { isEnabled: false, }); +jest.mock('@/Db', () => ({ + transaction: (fn: (entityManager: EntityManager) => unknown) => fn(mock()), +})); + describe('ActiveExecutions', () => { let activeExecutions: ActiveExecutions; From 67d7132a1838a7b0cabd4ac04a147a50771f85ff Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Fri, 2 Aug 2024 12:38:21 +0300 Subject: [PATCH 2/4] refactor: Start transaction within the repository --- packages/cli/src/ActiveExecutions.ts | 8 +--- .../repositories/execution.repository.ts | 38 +++++++++---------- .../cli/src/executions/execution.service.ts | 5 +-- .../workflows/workflowExecution.service.ts | 8 +--- .../repositories/execution.repository.test.ts | 28 ++++++-------- .../cli/test/unit/ActiveExecutions.test.ts | 5 --- 6 files changed, 33 insertions(+), 59 deletions(-) diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index e3fc4023b72d3..c1a6e8ffd65a9 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -24,7 +24,6 @@ import type { import { isWorkflowIdValid } from '@/utils'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { Logger } from '@/Logger'; -import * as Db from '@/Db'; import { ConcurrencyControlService } from './concurrency/concurrency-control.service'; import config from './config'; @@ -75,12 +74,7 @@ export class ActiveExecutions { fullExecutionData.workflowId = workflowId; } - executionId = await Db.transaction(async (transactionManager) => { - return await this.executionRepository.createNewExecution( - fullExecutionData, - transactionManager, - ); - }); + executionId = await this.executionRepository.createNewExecution(fullExecutionData); assert(executionId); await this.concurrencyControl.throttle({ mode, executionId }); diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 47c210c4b6d83..ee1bf5a803d2d 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -13,7 +13,6 @@ import { } from '@n8n/typeorm'; import { DateUtils } from '@n8n/typeorm/util/DateUtils'; import type { - EntityManager, FindManyOptions, FindOneOptions, FindOperator, @@ -30,6 +29,7 @@ import { } from 'n8n-workflow'; import { BinaryDataService } from 'n8n-core'; import { ExecutionCancelledError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow'; +import * as Db from '@/Db'; import type { ExecutionPayload, @@ -272,26 +272,26 @@ export class ExecutionRepository extends Repository { } /** - * Insert a new execution and its execution data + * Insert a new execution and its execution data using a transaction. */ - async createNewExecution(execution: ExecutionPayload, trx?: EntityManager): Promise { - trx = trx ?? this.manager; - - const { data, workflowData, ...rest } = execution; - const insertResult = await trx.insert(ExecutionEntity, rest); - const { id: executionId } = insertResult.identifiers[0] as { id: string }; - - const { connections, nodes, name, settings } = workflowData ?? {}; - await this.executionDataRepository.createExecutionDataForExecution( - { - executionId, - workflowData: { connections, nodes, name, settings, id: workflowData.id }, - data: stringify(data), - }, - trx, - ); + async createNewExecution(execution: ExecutionPayload): Promise { + return await Db.transaction(async (transactionManager) => { + const { data, workflowData, ...rest } = execution; + const insertResult = await transactionManager.insert(ExecutionEntity, rest); + const { id: executionId } = insertResult.identifiers[0] as { id: string }; + + const { connections, nodes, name, settings } = workflowData ?? {}; + await this.executionDataRepository.createExecutionDataForExecution( + { + executionId, + workflowData: { connections, nodes, name, settings, id: workflowData.id }, + data: stringify(data), + }, + transactionManager, + ); - return String(executionId); + return String(executionId); + }); } async markAsCrashed(executionIds: string | string[]) { diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index 3694c10a75f7f..bb8650e99f1d6 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -40,7 +40,6 @@ import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error'; import { License } from '@/License'; -import * as Db from '@/Db'; export const schemaGetExecutionsQueryFilter = { $id: '/IGetExecutionsQueryFilter', @@ -324,9 +323,7 @@ export class ExecutionService { status: 'error', }; - await Db.transaction(async (transactionManager) => { - await this.executionRepository.createNewExecution(fullExecutionData, transactionManager); - }); + await this.executionRepository.createNewExecution(fullExecutionData); } // ---------------------------------- diff --git a/packages/cli/src/workflows/workflowExecution.service.ts b/packages/cli/src/workflows/workflowExecution.service.ts index f76efda93ee3a..8ebe7aed0c142 100644 --- a/packages/cli/src/workflows/workflowExecution.service.ts +++ b/packages/cli/src/workflows/workflowExecution.service.ts @@ -35,7 +35,6 @@ import { Logger } from '@/Logger'; import type { Project } from '@/databases/entities/Project'; import { GlobalConfig } from '@n8n/config'; import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service'; -import * as Db from '@/Db'; @Service() export class WorkflowExecutionService { @@ -221,12 +220,7 @@ export class WorkflowExecutionService { workflowId: workflowData.id, }; - await Db.transaction(async (transactionManager) => { - await this.executionRepository.createNewExecution( - fullExecutionData, - transactionManager, - ); - }); + await this.executionRepository.createNewExecution(fullExecutionData); } this.logger.info('Error workflow execution blocked due to subworkflow settings', { erroredWorkflowId: workflowErrorData.workflow.id, diff --git a/packages/cli/test/integration/database/repositories/execution.repository.test.ts b/packages/cli/test/integration/database/repositories/execution.repository.test.ts index 3408b0aef1466..e777f624299aa 100644 --- a/packages/cli/test/integration/database/repositories/execution.repository.test.ts +++ b/packages/cli/test/integration/database/repositories/execution.repository.test.ts @@ -1,7 +1,6 @@ import Container from 'typedi'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExecutionDataRepository } from '@db/repositories/executionData.repository'; -import * as Db from '@/Db'; import * as testDb from '../../shared/testDb'; import { createWorkflow } from '../../shared/db/workflows'; @@ -65,22 +64,17 @@ describe('ExecutionRepository', () => { await expect( async () => - await Db.transaction(async (transactionManager) => { - await executionRepo.createNewExecution( - { - workflowId: workflow.id, - data: { - //@ts-expect-error This is not needed for tests - resultData: {}, - }, - workflowData: workflow, - mode: 'manual', - startedAt: new Date(), - status: 'new', - finished: false, - }, - transactionManager, - ); + await executionRepo.createNewExecution({ + workflowId: workflow.id, + data: { + //@ts-expect-error This is not needed for tests + resultData: {}, + }, + workflowData: workflow, + mode: 'manual', + startedAt: new Date(), + status: 'new', + finished: false, }), ).rejects.toThrow(); diff --git a/packages/cli/test/unit/ActiveExecutions.test.ts b/packages/cli/test/unit/ActiveExecutions.test.ts index 771ca63ab64f3..0cc70a10a4e61 100644 --- a/packages/cli/test/unit/ActiveExecutions.test.ts +++ b/packages/cli/test/unit/ActiveExecutions.test.ts @@ -8,7 +8,6 @@ import type { ExecutionRepository } from '@db/repositories/execution.repository' import { mock } from 'jest-mock-extended'; import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; import { mockInstance } from '@test/mocking'; -import type { EntityManager } from '@n8n/typeorm'; const FAKE_EXECUTION_ID = '15'; const FAKE_SECOND_EXECUTION_ID = '20'; @@ -26,10 +25,6 @@ const concurrencyControl = mockInstance(ConcurrencyControlService, { isEnabled: false, }); -jest.mock('@/Db', () => ({ - transaction: (fn: (entityManager: EntityManager) => unknown) => fn(mock()), -})); - describe('ActiveExecutions', () => { let activeExecutions: ActiveExecutions; From faa04829f9b19b03c63fc89ac1183e9b8bb3397a Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Fri, 2 Aug 2024 13:19:01 +0300 Subject: [PATCH 3/4] refactor: Use local manager to start transaction --- .../cli/src/databases/repositories/execution.repository.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index ee1bf5a803d2d..a8605147aa751 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -29,7 +29,6 @@ import { } from 'n8n-workflow'; import { BinaryDataService } from 'n8n-core'; import { ExecutionCancelledError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow'; -import * as Db from '@/Db'; import type { ExecutionPayload, @@ -275,7 +274,7 @@ export class ExecutionRepository extends Repository { * Insert a new execution and its execution data using a transaction. */ async createNewExecution(execution: ExecutionPayload): Promise { - return await Db.transaction(async (transactionManager) => { + return await this.manager.transaction(async (transactionManager) => { const { data, workflowData, ...rest } = execution; const insertResult = await transactionManager.insert(ExecutionEntity, rest); const { id: executionId } = insertResult.identifiers[0] as { id: string }; From 182b25190b6d259b24c0e60c99fefd47237bdd1b Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Fri, 2 Aug 2024 13:19:13 +0300 Subject: [PATCH 4/4] refactor: Make transaction required parameter --- .../src/databases/repositories/executionData.repository.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/cli/src/databases/repositories/executionData.repository.ts b/packages/cli/src/databases/repositories/executionData.repository.ts index b1378a2164044..013453d998e42 100644 --- a/packages/cli/src/databases/repositories/executionData.repository.ts +++ b/packages/cli/src/databases/repositories/executionData.repository.ts @@ -16,12 +16,11 @@ export class ExecutionDataRepository extends Repository { async createExecutionDataForExecution( executionData: CreateExecutionDataOpts, - trx?: EntityManager, + transactionManager: EntityManager, ) { - trx = trx ?? this.manager; const { data, executionId, workflowData } = executionData; - return await trx.insert(ExecutionData, { + return await transactionManager.insert(ExecutionData, { executionId, data, workflowData,