Skip to content

Commit

Permalink
fix(core): Fix pruning of non-finished executions (#7333)
Browse files Browse the repository at this point in the history
This fixes a bug in the pruning (soft-delete). The pruning was a bit too
aggressive, as it also pruned executions that weren't in an end state
yet. This only becomes an issue if there are long-running executions
(e.g. workflow with Wait node) or the prune parameters are set to keep
only a tiny number of executions.
  • Loading branch information
tomi authored Oct 4, 2023
1 parent 942d0b9 commit 1b4848a
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 92 deletions.
4 changes: 2 additions & 2 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ export const schema = {
env: 'EXECUTIONS_DATA_PRUNE',
},
pruneDataMaxAge: {
doc: 'How old (hours) the execution data has to be to get deleted',
doc: 'How old (hours) the finished execution data has to be to get deleted',
format: Number,
default: 336,
env: 'EXECUTIONS_DATA_MAX_AGE',
Expand All @@ -320,7 +320,7 @@ export const schema = {
// Deletes the oldest entries first
// Set to 0 for No limit
pruneDataMaxCount: {
doc: 'Maximum number of executions to keep in DB. 0 = no limit',
doc: "Maximum number of finished executions to keep in DB. Doesn't necessarily prune exactly to max number. 0 = no limit",
format: Number,
default: 10000,
env: 'EXECUTIONS_DATA_PRUNE_MAX_COUNT',
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,5 @@ export const TIME = {
SECOND: 1000,
MINUTE: 60 * 1000,
HOUR: 60 * 60 * 1000,
DAY: 24 * 60 * 60 * 1000,
};
32 changes: 27 additions & 5 deletions packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import { Service } from 'typedi';
import { Brackets, DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm';
import {
Brackets,
DataSource,
Not,
In,
IsNull,
LessThanOrEqual,
MoreThanOrEqual,
Repository,
} from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
import type {
FindManyOptions,
Expand Down Expand Up @@ -110,13 +119,21 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
}

setSoftDeletionInterval() {
this.logger.debug('Setting soft-deletion interval (pruning) for executions');
this.logger.debug(
`Setting soft-deletion interval (pruning) for executions every ${
this.rates.softDeletion / TIME.MINUTE
} min`,
);

this.intervals.softDeletion = setInterval(async () => this.prune(), this.rates.hardDeletion);
this.intervals.softDeletion = setInterval(async () => this.prune(), this.rates.softDeletion);
}

setHardDeletionInterval() {
this.logger.debug('Setting hard-deletion interval for executions');
this.logger.debug(
`Setting hard-deletion interval for executions every ${
this.rates.hardDeletion / TIME.MINUTE
} min`,
);

this.intervals.hardDeletion = setInterval(
async () => this.hardDelete(),
Expand Down Expand Up @@ -487,7 +504,12 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
await this.createQueryBuilder()
.update(ExecutionEntity)
.set({ deletedAt: new Date() })
.where(
.where({
deletedAt: IsNull(),
// Only mark executions as deleted if they are in an end state
status: Not(In(['new', 'running', 'waiting'])),
})
.andWhere(
new Brackets((qb) =>
countBasedWhere
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import config from '@/config';
import * as Db from '@/Db';

import * as testDb from '../shared/testDb';
import type { ExecutionStatus } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import { getLogger } from '@/Logger';
import type { ExecutionRepository } from '../../../src/databases/repositories';
import type { ExecutionEntity } from '../../../src/databases/entities/ExecutionEntity';
import { TIME } from '../../../src/constants';

describe('ExecutionRepository.prune()', () => {
const now = new Date();
const yesterday = new Date(Date.now() - TIME.DAY);
let executionRepository: ExecutionRepository;
let workflow: Awaited<ReturnType<typeof testDb.createWorkflow>>;

beforeAll(async () => {
LoggerProxy.init(getLogger());
await testDb.init();

const { Execution } = Db.collections;

executionRepository = Execution;
workflow = await testDb.createWorkflow();
});

beforeEach(async () => {
await testDb.truncate(['Execution']);
});

afterAll(async () => {
await testDb.terminate();
});

afterEach(() => {
config.load(config.default);
});

async function findAllExecutions() {
return Db.collections.Execution.find({
order: { id: 'asc' },
withDeleted: true,
});
}

describe('when EXECUTIONS_DATA_PRUNE_MAX_COUNT is set', () => {
beforeEach(() => {
config.set('executions.pruneDataMaxCount', 1);
config.set('executions.pruneDataMaxAge', 336);
});

test('should mark as deleted based on EXECUTIONS_DATA_PRUNE_MAX_COUNT', async () => {
const executions = [
await testDb.createSuccessfulExecution(workflow),
await testDb.createSuccessfulExecution(workflow),
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();

const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
expect.objectContaining({ id: executions[1].id, deletedAt: expect.any(Date) }),
expect.objectContaining({ id: executions[2].id, deletedAt: null }),
]);
});

test('should not re-mark already marked executions', async () => {
const executions = [
await testDb.createExecution(
{ status: 'success', finished: true, startedAt: now, stoppedAt: now, deletedAt: now },
workflow,
),
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();

const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: now }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});

test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
['warning', { startedAt: now, stoppedAt: now }],
['unknown', { startedAt: now, stoppedAt: now }],
['canceled', { startedAt: now, stoppedAt: now }],
['crashed', { startedAt: now, stoppedAt: now }],
['failed', { startedAt: now, stoppedAt: now }],
['success', { finished: true, startedAt: now, stoppedAt: now }],
])('should prune %s executions', async (status, attributes) => {
const executions = [
await testDb.createExecution({ status, ...attributes }, workflow),
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();

const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});

test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
['new', {}],
['running', { startedAt: now }],
['waiting', { startedAt: now, stoppedAt: now, waitTill: now }],
])('should not prune %s executions', async (status, attributes) => {
const executions = [
await testDb.createExecution({ status, ...attributes }, workflow),
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();

const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: null }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});
});

describe('when EXECUTIONS_DATA_MAX_AGE is set', () => {
beforeEach(() => {
config.set('executions.pruneDataMaxAge', 1); // 1h
config.set('executions.pruneDataMaxCount', 0);
});

test('should mark as deleted based on EXECUTIONS_DATA_MAX_AGE', async () => {
const executions = [
await testDb.createExecution(
{ finished: true, startedAt: yesterday, stoppedAt: yesterday, status: 'success' },
workflow,
),
await testDb.createExecution(
{ finished: true, startedAt: now, stoppedAt: now, status: 'success' },
workflow,
),
];

await executionRepository.prune();

const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: expect.any(Date) }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});

test('should not re-mark already marked executions', async () => {
const executions = [
await testDb.createExecution(
{
status: 'success',
finished: true,
startedAt: yesterday,
stoppedAt: yesterday,
deletedAt: yesterday,
},
workflow,
),
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();

const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: yesterday }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});

test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
['warning', { startedAt: yesterday, stoppedAt: yesterday }],
['unknown', { startedAt: yesterday, stoppedAt: yesterday }],
['canceled', { startedAt: yesterday, stoppedAt: yesterday }],
['crashed', { startedAt: yesterday, stoppedAt: yesterday }],
['failed', { startedAt: yesterday, stoppedAt: yesterday }],
['success', { finished: true, startedAt: yesterday, stoppedAt: yesterday }],
])('should prune %s executions', async (status, attributes) => {
const execution = await testDb.createExecution({ status, ...attributes }, workflow);

await executionRepository.prune();

const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: execution.id, deletedAt: expect.any(Date) }),
]);
});

test.each<[ExecutionStatus, Partial<ExecutionEntity>]>([
['new', {}],
['running', { startedAt: yesterday }],
['waiting', { startedAt: yesterday, stoppedAt: yesterday, waitTill: yesterday }],
])('should not prune %s executions', async (status, attributes) => {
const executions = [
await testDb.createExecution({ status, ...attributes }, workflow),
await testDb.createSuccessfulExecution(workflow),
];

await executionRepository.prune();

const result = await findAllExecutions();
expect(result).toEqual([
expect.objectContaining({ id: executions[0].id, deletedAt: null }),
expect.objectContaining({ id: executions[1].id, deletedAt: null }),
]);
});
});
});
5 changes: 3 additions & 2 deletions packages/cli/test/integration/shared/testDb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,11 @@ export async function createManyExecutions(
/**
* Store a execution in the DB and assign it to a workflow.
*/
async function createExecution(
export async function createExecution(
attributes: Partial<ExecutionEntity & ExecutionData>,
workflow: WorkflowEntity,
) {
const { data, finished, mode, startedAt, stoppedAt, waitTill, status } = attributes;
const { data, finished, mode, startedAt, stoppedAt, waitTill, status, deletedAt } = attributes;

const execution = await Db.collections.Execution.save({
finished: finished ?? true,
Expand All @@ -374,6 +374,7 @@ async function createExecution(
stoppedAt: stoppedAt ?? new Date(),
waitTill: waitTill ?? null,
status,
deletedAt,
});

await Db.collections.ExecutionData.save({
Expand Down
Loading

0 comments on commit 1b4848a

Please sign in to comment.