Skip to content

Commit

Permalink
fix(core): Make execution and its data creation atomic (#10276)
Browse files Browse the repository at this point in the history
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <[email protected]>
  • Loading branch information
tomi and netroy authored Aug 2, 2024
1 parent c3e2e84 commit ae50bb9
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 12 deletions.
5 changes: 2 additions & 3 deletions packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
ExecutionCancelledError,
sleep,
} from 'n8n-workflow';
import { strict as assert } from 'node:assert';

import type {
ExecutionPayload,
Expand Down Expand Up @@ -74,9 +75,7 @@ export class ActiveExecutions {
}

executionId = await this.executionRepository.createNewExecution(fullExecutionData);
if (executionId === undefined) {
throw new ApplicationError('There was an issue assigning an execution id to the execution');
}
assert(executionId);

await this.concurrencyControl.throttle({ mode, executionId });
executionStatus = 'running';
Expand Down
28 changes: 19 additions & 9 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,27 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return rest;
}

/**
* Insert a new execution and its execution data using a transaction.
*/
async createNewExecution(execution: ExecutionPayload): Promise<string> {
const { data, workflowData, ...rest } = execution;
const { identifiers: inserted } = await this.insert(rest);
const { id: executionId } = inserted[0] as { id: string };
const { connections, nodes, name, settings } = workflowData ?? {};
await this.executionDataRepository.insert({
executionId,
workflowData: { connections, nodes, name, settings, id: workflowData.id },
data: stringify(data),
return await this.manager.transaction(async (transactionManager) => {
const { data, workflowData, ...rest } = execution;
const insertResult = await transactionManager.insert(ExecutionEntity, rest);
const { id: executionId } = insertResult.identifiers[0] as { id: string };

const { connections, nodes, name, settings } = workflowData ?? {};
await this.executionDataRepository.createExecutionDataForExecution(
{
executionId,
workflowData: { connections, nodes, name, settings, id: workflowData.id },
data: stringify(data),
},
transactionManager,
);

return String(executionId);
});
return String(executionId);
}

async markAsCrashed(executionIds: string | string[]) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
import { Service } from 'typedi';
import type { EntityManager } from '@n8n/typeorm';
import type { IWorkflowBase } from 'n8n-workflow';
import { DataSource, In, Repository } from '@n8n/typeorm';
import { ExecutionData } from '../entities/ExecutionData';

export interface CreateExecutionDataOpts extends Pick<ExecutionData, 'data' | 'executionId'> {
workflowData: Pick<IWorkflowBase, 'connections' | 'nodes' | 'name' | 'settings' | 'id'>;
}

@Service()
export class ExecutionDataRepository extends Repository<ExecutionData> {
constructor(dataSource: DataSource) {
super(ExecutionData, dataSource.manager);
}

async createExecutionDataForExecution(
executionData: CreateExecutionDataOpts,
transactionManager: EntityManager,
) {
const { data, executionId, workflowData } = executionData;

return await transactionManager.insert(ExecutionData, {
executionId,
data,
workflowData,
});
}

async findByExecutionIds(executionIds: string[]) {
return await this.find({
select: ['workflowData'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,34 @@ describe('ExecutionRepository', () => {
});
expect(executionData?.data).toEqual('[{"resultData":"1"},{}]');
});

it('should not create execution if execution data insert fails', async () => {
const executionRepo = Container.get(ExecutionRepository);
const executionDataRepo = Container.get(ExecutionDataRepository);

const workflow = await createWorkflow({ settings: { executionOrder: 'v1' } });
jest
.spyOn(executionDataRepo, 'createExecutionDataForExecution')
.mockRejectedValueOnce(new Error());

await expect(
async () =>
await executionRepo.createNewExecution({
workflowId: workflow.id,
data: {
//@ts-expect-error This is not needed for tests
resultData: {},
},
workflowData: workflow,
mode: 'manual',
startedAt: new Date(),
status: 'new',
finished: false,
}),
).rejects.toThrow();

const executionEntities = await executionRepo.find();
expect(executionEntities).toBeEmptyArray();
});
});
});

0 comments on commit ae50bb9

Please sign in to comment.