Skip to content

Commit

Permalink
fix(core): Make execution and its data creation atomic
Browse files Browse the repository at this point in the history
Executions and their data are inserted in two separate insert statements. Before this
PR, this wasn't done in a transaction, which could cause executions to be created
without execution data. This caused issues like the one fixed in #9903.
  • Loading branch information
tomi committed Aug 1, 2024
1 parent d8688bd commit e115015
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 14 deletions.
13 changes: 9 additions & 4 deletions packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
ExecutionCancelledError,
sleep,
} from 'n8n-workflow';
import { strict as assert } from 'node:assert';

import type {
ExecutionPayload,
Expand All @@ -23,6 +24,7 @@ import type {
import { isWorkflowIdValid } from '@/utils';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';
import * as Db from '@/Db';
import { ConcurrencyControlService } from './concurrency/concurrency-control.service';
import config from './config';

Expand Down Expand Up @@ -73,10 +75,13 @@ export class ActiveExecutions {
fullExecutionData.workflowId = workflowId;
}

executionId = await this.executionRepository.createNewExecution(fullExecutionData);
if (executionId === undefined) {
throw new ApplicationError('There was an issue assigning an execution id to the execution');
}
executionId = await Db.transaction(async (transactionManager) => {
return await this.executionRepository.createNewExecution(
fullExecutionData,
transactionManager,
);
});
assert(executionId);

await this.concurrencyControl.throttle({ mode, executionId });
executionStatus = 'running';
Expand Down
27 changes: 19 additions & 8 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from '@n8n/typeorm';
import { DateUtils } from '@n8n/typeorm/util/DateUtils';
import type {
EntityManager,
FindManyOptions,
FindOneOptions,
FindOperator,
Expand Down Expand Up @@ -270,16 +271,26 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return rest;
}

async createNewExecution(execution: ExecutionPayload): Promise<string> {
/**
* Insert a new execution and its execution data
*/
async createNewExecution(execution: ExecutionPayload, trx?: EntityManager): Promise<string> {
trx = trx ?? this.manager;

const { data, workflowData, ...rest } = execution;
const { identifiers: inserted } = await this.insert(rest);
const { id: executionId } = inserted[0] as { id: string };
const insertResult = await trx.insert(ExecutionEntity, rest);
const { id: executionId } = insertResult.identifiers[0] as { id: string };

const { connections, nodes, name, settings } = workflowData ?? {};
await this.executionDataRepository.insert({
executionId,
workflowData: { connections, nodes, name, settings, id: workflowData.id },
data: stringify(data),
});
await this.executionDataRepository.createExecutionDataForExecution(
{
executionId,
workflowData: { connections, nodes, name, settings, id: workflowData.id },
data: stringify(data),
},
trx,
);

return String(executionId);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,33 @@
import { Service } from 'typedi';
import type { EntityManager } from '@n8n/typeorm';
import type { IWorkflowBase } from 'n8n-workflow';
import { DataSource, In, Repository } from '@n8n/typeorm';
import { ExecutionData } from '../entities/ExecutionData';

export interface CreateExecutionDataOpts extends Pick<ExecutionData, 'data' | 'executionId'> {
workflowData: Pick<IWorkflowBase, 'connections' | 'nodes' | 'name' | 'settings' | 'id'>;
}

@Service()
export class ExecutionDataRepository extends Repository<ExecutionData> {
constructor(dataSource: DataSource) {
super(ExecutionData, dataSource.manager);
}

async createExecutionDataForExecution(
executionData: CreateExecutionDataOpts,
trx?: EntityManager,
) {
trx = trx ?? this.manager;
const { data, executionId, workflowData } = executionData;

return await trx.insert(ExecutionData, {
executionId,
data,
workflowData,
});
}

async findByExecutionIds(executionIds: string[]) {
return await this.find({
select: ['workflowData'],
Expand Down
5 changes: 4 additions & 1 deletion packages/cli/src/executions/execution.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error';
import { License } from '@/License';
import * as Db from '@/Db';

export const schemaGetExecutionsQueryFilter = {
$id: '/IGetExecutionsQueryFilter',
Expand Down Expand Up @@ -323,7 +324,9 @@ export class ExecutionService {
status: 'error',
};

await this.executionRepository.createNewExecution(fullExecutionData);
await Db.transaction(async (transactionManager) => {
await this.executionRepository.createNewExecution(fullExecutionData, transactionManager);
});
}

// ----------------------------------
Expand Down
8 changes: 7 additions & 1 deletion packages/cli/src/workflows/workflowExecution.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { Logger } from '@/Logger';
import type { Project } from '@/databases/entities/Project';
import { GlobalConfig } from '@n8n/config';
import { SubworkflowPolicyChecker } from '@/subworkflows/subworkflow-policy-checker.service';
import * as Db from '@/Db';

@Service()
export class WorkflowExecutionService {
Expand Down Expand Up @@ -220,7 +221,12 @@ export class WorkflowExecutionService {
workflowId: workflowData.id,
};

await this.executionRepository.createNewExecution(fullExecutionData);
await Db.transaction(async (transactionManager) => {
await this.executionRepository.createNewExecution(
fullExecutionData,
transactionManager,
);
});
}
this.logger.info('Error workflow execution blocked due to subworkflow settings', {
erroredWorkflowId: workflowErrorData.workflow.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Container from 'typedi';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { ExecutionDataRepository } from '@db/repositories/executionData.repository';
import * as Db from '@/Db';
import * as testDb from '../../shared/testDb';
import { createWorkflow } from '../../shared/db/workflows';

Expand Down Expand Up @@ -52,5 +53,39 @@ describe('ExecutionRepository', () => {
});
expect(executionData?.data).toEqual('[{"resultData":"1"},{}]');
});

it('should not create execution if execution data insert fails', async () => {
const executionRepo = Container.get(ExecutionRepository);
const executionDataRepo = Container.get(ExecutionDataRepository);

const workflow = await createWorkflow({ settings: { executionOrder: 'v1' } });
jest
.spyOn(executionDataRepo, 'createExecutionDataForExecution')
.mockRejectedValueOnce(new Error());

await expect(
async () =>
await Db.transaction(async (transactionManager) => {
await executionRepo.createNewExecution(
{
workflowId: workflow.id,
data: {
//@ts-expect-error This is not needed for tests
resultData: {},
},
workflowData: workflow,
mode: 'manual',
startedAt: new Date(),
status: 'new',
finished: false,
},
transactionManager,
);
}),
).rejects.toThrow();

const executionEntities = await executionRepo.find();
expect(executionEntities).toBeEmptyArray();
});
});
});
5 changes: 5 additions & 0 deletions packages/cli/test/unit/ActiveExecutions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { ExecutionRepository } from '@db/repositories/execution.repository'
import { mock } from 'jest-mock-extended';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';
import { mockInstance } from '@test/mocking';
import type { EntityManager } from '@n8n/typeorm';

const FAKE_EXECUTION_ID = '15';
const FAKE_SECOND_EXECUTION_ID = '20';
Expand All @@ -25,6 +26,10 @@ const concurrencyControl = mockInstance(ConcurrencyControlService, {
isEnabled: false,
});

jest.mock('@/Db', () => ({
transaction: (fn: (entityManager: EntityManager) => unknown) => fn(mock<EntityManager>()),
}));

describe('ActiveExecutions', () => {
let activeExecutions: ActiveExecutions;

Expand Down

0 comments on commit e115015

Please sign in to comment.