Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Make execution and its data creation atomic #11392

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { DataSource, In, Repository } from '@n8n/typeorm';
import type { EntityManager } from '@n8n/typeorm';
import type { QueryDeepPartialEntity } from '@n8n/typeorm/query-builder/QueryPartialEntity';
import { Service } from 'typedi';

import { ExecutionData } from '../entities/execution-data';
Expand All @@ -9,6 +11,13 @@ export class ExecutionDataRepository extends Repository<ExecutionData> {
super(ExecutionData, dataSource.manager);
}

async createExecutionDataForExecution(
data: QueryDeepPartialEntity<ExecutionData>,
transactionManager: EntityManager,
) {
return await transactionManager.insert(ExecutionData, data);
}

async findByExecutionIds(executionIds: string[]) {
return await this.find({
select: ['workflowData'],
Expand Down
38 changes: 28 additions & 10 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,34 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
* Insert a new execution and its execution data using a transaction.
*/
async createNewExecution(execution: CreateExecutionPayload): Promise<string> {
const { data, workflowData, ...rest } = execution;
const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() });
const { id: executionId } = inserted[0] as { id: string };
const { connections, nodes, name, settings } = workflowData ?? {};
await this.executionDataRepository.insert({
executionId,
workflowData: { connections, nodes, name, settings, id: workflowData.id },
data: stringify(data),
});
return String(executionId);
const { data: dataObj, workflowData: currentWorkflow, ...rest } = execution;
const { connections, nodes, name, settings } = currentWorkflow ?? {};
const workflowData = { connections, nodes, name, settings, id: currentWorkflow.id };
const data = stringify(dataObj);

const { type: dbType, sqlite: sqliteConfig } = this.globalConfig.database;
if (dbType === 'sqlite' && sqliteConfig.poolSize === 0) {
// TODO: Delete this block of code once the sqlite legacy (non-pooling) driver is dropped.
// In the non-pooling sqlite driver we can't use transactions, because that creates nested transactions under highly concurrent loads, leading to errors in the database
const { identifiers: inserted } = await this.insert({ ...rest, createdAt: new Date() });
const { id: executionId } = inserted[0] as { id: string };
await this.executionDataRepository.insert({ executionId, workflowData, data });
return String(executionId);
} else {
// All other database drivers should create executions and execution-data atomically
return await this.manager.transaction(async (transactionManager) => {
const { identifiers: inserted } = await transactionManager.insert(ExecutionEntity, {
...rest,
createdAt: new Date(),
});
const { id: executionId } = inserted[0] as { id: string };
await this.executionDataRepository.createExecutionDataForExecution(
{ executionId, workflowData, data },
transactionManager,
);
return String(executionId);
});
}
}

async markAsCrashed(executionIds: string | string[]) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { GlobalConfig } from '@n8n/config';
import Container from 'typedi';

import { ExecutionDataRepository } from '@/databases/repositories/execution-data.repository';
Expand Down Expand Up @@ -54,5 +55,38 @@ describe('ExecutionRepository', () => {
});
expect(executionData?.data).toEqual('[{"resultData":"1"},{}]');
});

it('should not create execution if execution data insert fails', async () => {
const { type: dbType, sqlite: sqliteConfig } = Container.get(GlobalConfig).database;
// Do not run this test for the legacy sqlite driver
if (dbType === 'sqlite' && sqliteConfig.poolSize === 0) return;

const executionRepo = Container.get(ExecutionRepository);
const executionDataRepo = Container.get(ExecutionDataRepository);

const workflow = await createWorkflow({ settings: { executionOrder: 'v1' } });
jest
.spyOn(executionDataRepo, 'createExecutionDataForExecution')
.mockRejectedValueOnce(new Error());

await expect(
async () =>
await executionRepo.createNewExecution({
workflowId: workflow.id,
data: {
//@ts-expect-error This is not needed for tests
resultData: {},
},
workflowData: workflow,
mode: 'manual',
startedAt: new Date(),
status: 'new',
finished: false,
}),
).rejects.toThrow();

const executionEntities = await executionRepo.find();
expect(executionEntities).toBeEmptyArray();
});
});
});
Loading