From e115015a7cc119c7466ed01ce92f78e2d2678b7f Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Thu, 1 Aug 2024 16:07:36 +0300 Subject: [PATCH] fix(core): Make execution and its data creation atomic Executions and their data are inserted in two separate insert statements. Before this PR, this wasn't done in a transaction, which could cause executions to be created without execution data. This caused issues like the one fixed in #9903. --- packages/cli/src/ActiveExecutions.ts | 13 ++++--- .../repositories/execution.repository.ts | 27 +++++++++----- .../repositories/executionData.repository.ts | 20 +++++++++++ .../cli/src/executions/execution.service.ts | 5 ++- .../workflows/workflowExecution.service.ts | 8 ++++- .../repositories/execution.repository.test.ts | 35 +++++++++++++++++++ .../cli/test/unit/ActiveExecutions.test.ts | 5 +++ 7 files changed, 99 insertions(+), 14 deletions(-) diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 97313d5cb2f07..e3fc4023b72d3 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -12,6 +12,7 @@ import { ExecutionCancelledError, sleep, } from 'n8n-workflow'; +import { strict as assert } from 'node:assert'; import type { ExecutionPayload, @@ -23,6 +24,7 @@ import type { import { isWorkflowIdValid } from '@/utils'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { Logger } from '@/Logger'; +import * as Db from '@/Db'; import { ConcurrencyControlService } from './concurrency/concurrency-control.service'; import config from './config'; @@ -73,10 +75,13 @@ export class ActiveExecutions { fullExecutionData.workflowId = workflowId; } - executionId = await this.executionRepository.createNewExecution(fullExecutionData); - if (executionId === undefined) { - throw new ApplicationError('There was an issue assigning an execution id to the execution'); - } + executionId = await Db.transaction(async (transactionManager) => { + return await this.executionRepository.createNewExecution( + fullExecutionData, + transactionManager, + ); + }); + assert(executionId); await this.concurrencyControl.throttle({ mode, executionId }); executionStatus = 'running'; diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 1ebb22d8eb4bc..47c210c4b6d83 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -13,6 +13,7 @@ import { } from '@n8n/typeorm'; import { DateUtils } from '@n8n/typeorm/util/DateUtils'; import type { + EntityManager, FindManyOptions, FindOneOptions, FindOperator, @@ -270,16 +271,26 @@ export class ExecutionRepository extends Repository { return rest; } - async createNewExecution(execution: ExecutionPayload): Promise { + /** + * Insert a new execution and its execution data + */ + async createNewExecution(execution: ExecutionPayload, trx?: EntityManager): Promise { + trx = trx ?? this.manager; + const { data, workflowData, ...rest } = execution; - const { identifiers: inserted } = await this.insert(rest); - const { id: executionId } = inserted[0] as { id: string }; + const insertResult = await trx.insert(ExecutionEntity, rest); + const { id: executionId } = insertResult.identifiers[0] as { id: string }; + const { connections, nodes, name, settings } = workflowData ?? {}; - await this.executionDataRepository.insert({ - executionId, - workflowData: { connections, nodes, name, settings, id: workflowData.id }, - data: stringify(data), - }); + await this.executionDataRepository.createExecutionDataForExecution( + { + executionId, + workflowData: { connections, nodes, name, settings, id: workflowData.id }, + data: stringify(data), + }, + trx, + ); + return String(executionId); } diff --git a/packages/cli/src/databases/repositories/executionData.repository.ts b/packages/cli/src/databases/repositories/executionData.repository.ts index 5872f9888cd66..b1378a2164044 100644 --- a/packages/cli/src/databases/repositories/executionData.repository.ts +++ b/packages/cli/src/databases/repositories/executionData.repository.ts @@ -1,13 +1,33 @@ import { Service } from 'typedi'; +import type { EntityManager } from '@n8n/typeorm'; +import type { IWorkflowBase } from 'n8n-workflow'; import { DataSource, In, Repository } from '@n8n/typeorm'; import { ExecutionData } from '../entities/ExecutionData'; +export interface CreateExecutionDataOpts extends Pick { + workflowData: Pick; +} + @Service() export class ExecutionDataRepository extends Repository { constructor(dataSource: DataSource) { super(ExecutionData, dataSource.manager); } + async createExecutionDataForExecution( + executionData: CreateExecutionDataOpts, + trx?: EntityManager, + ) { + trx = trx ?? this.manager; + const { data, executionId, workflowData } = executionData; + + return await trx.insert(ExecutionData, { + executionId, + data, + workflowData, + }); + } + async findByExecutionIds(executionIds: string[]) { return await this.find({ select: ['workflowData'], diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index bb8650e99f1d6..3694c10a75f7f 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -40,6 +40,7 @@ import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error'; import { License } from '@/License'; +import * as Db from '@/Db'; export const schemaGetExecutionsQueryFilter = { $id: '/IGetExecutionsQueryFilter', @@ -323,7 +324,9 @@ export class ExecutionService { status: 'error', }; - await this.executionRepository.createNewExecution(fullExecutionData); + await Db.transaction(async (transactionManager) => { + await this.executionRepository.createNewExecution(fullExecutionData, transactionManager); + }); } // ---------------------------------- diff --git a/packages/cli/src/workflows/workflowExecution.service.ts b/packages/cli/src/workflows/workflowExecution.service.ts index 8ebe7aed0c142..f76efda93ee3a 100644 --- a/packages/cli/src/workflows/workflowExecution.service.ts +++ b/packages/cli/src/workflows/workflowExecution.service.ts @@ -35,6 +35,7 @@ import { Logger } from '@/Logger'; import type { Project } from '@/databases/entities/Project'; import { GlobalConfig } from '@n8n/config'; import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service'; +import * as Db from '@/Db'; @Service() export class WorkflowExecutionService { @@ -220,7 +221,12 @@ export class WorkflowExecutionService { workflowId: workflowData.id, }; - await this.executionRepository.createNewExecution(fullExecutionData); + await Db.transaction(async (transactionManager) => { + await this.executionRepository.createNewExecution( + fullExecutionData, + transactionManager, + ); + }); } this.logger.info('Error workflow execution blocked due to subworkflow settings', { erroredWorkflowId: workflowErrorData.workflow.id, diff --git a/packages/cli/test/integration/database/repositories/execution.repository.test.ts b/packages/cli/test/integration/database/repositories/execution.repository.test.ts index cfb897d627a56..3408b0aef1466 100644 --- a/packages/cli/test/integration/database/repositories/execution.repository.test.ts +++ b/packages/cli/test/integration/database/repositories/execution.repository.test.ts @@ -1,6 +1,7 @@ import Container from 'typedi'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { ExecutionDataRepository } from '@db/repositories/executionData.repository'; +import * as Db from '@/Db'; import * as testDb from '../../shared/testDb'; import { createWorkflow } from '../../shared/db/workflows'; @@ -52,5 +53,39 @@ describe('ExecutionRepository', () => { }); expect(executionData?.data).toEqual('[{"resultData":"1"},{}]'); }); + + it('should not create execution if execution data insert fails', async () => { + const executionRepo = Container.get(ExecutionRepository); + const executionDataRepo = Container.get(ExecutionDataRepository); + + const workflow = await createWorkflow({ settings: { executionOrder: 'v1' } }); + jest + .spyOn(executionDataRepo, 'createExecutionDataForExecution') + .mockRejectedValueOnce(new Error()); + + await expect( + async () => + await Db.transaction(async (transactionManager) => { + await executionRepo.createNewExecution( + { + workflowId: workflow.id, + data: { + //@ts-expect-error This is not needed for tests + resultData: {}, + }, + workflowData: workflow, + mode: 'manual', + startedAt: new Date(), + status: 'new', + finished: false, + }, + transactionManager, + ); + }), + ).rejects.toThrow(); + + const executionEntities = await executionRepo.find(); + expect(executionEntities).toBeEmptyArray(); + }); }); }); diff --git a/packages/cli/test/unit/ActiveExecutions.test.ts b/packages/cli/test/unit/ActiveExecutions.test.ts index 0cc70a10a4e61..771ca63ab64f3 100644 --- a/packages/cli/test/unit/ActiveExecutions.test.ts +++ b/packages/cli/test/unit/ActiveExecutions.test.ts @@ -8,6 +8,7 @@ import type { ExecutionRepository } from '@db/repositories/execution.repository' import { mock } from 'jest-mock-extended'; import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; import { mockInstance } from '@test/mocking'; +import type { EntityManager } from '@n8n/typeorm'; const FAKE_EXECUTION_ID = '15'; const FAKE_SECOND_EXECUTION_ID = '20'; @@ -25,6 +26,10 @@ const concurrencyControl = mockInstance(ConcurrencyControlService, { isEnabled: false, }); +jest.mock('@/Db', () => ({ + transaction: (fn: (entityManager: EntityManager) => unknown) => fn(mock()), +})); + describe('ActiveExecutions', () => { let activeExecutions: ActiveExecutions;