Skip to content

Commit

Permalink
refactor(core): Implement soft-deletions for executions (#7092)
Browse files Browse the repository at this point in the history
Based on #7065 | Story: https://linear.app/n8n/issue/PAY-771

n8n on filesystem mode marks binary data to delete on manual execution
deletion, on unsaved execution completion, and on every execution
pruning cycle. We later prune binary data in a separate cycle via these
marker files, based on the configured TTL. In the context of introducing
an S3 client to manage binary data, the filesystem mode's mark-and-prune
setup is too tightly coupled to the general binary data management
client interface.

This PR...
- Ensures the deletion of an execution causes the deletion of any binary
data associated to it. This does away with the need for binary data TTL
and simplifies the filesystem mode's mark-and-prune setup.
- Refactors all execution deletions (including pruning) to cause soft
deletions, hard-deletes soft-deleted executions based on the existing
pruning config, and adjusts execution endpoints to filter out
soft-deleted executions. This reduces DB load, and keeps binary data
around long enough for users to access it when building workflows with
unsaved executions.
- Moves all execution pruning work from an execution lifecycle hook to
`execution.repository.ts`. This keeps related logic in a single place.
- Removes all marking logic from the binary data manager. This
simplifies the interface that the S3 client will meet.
- Adds basic sanity-check tests to pruning logic and execution deletion.

Out of scope:

- Improving existing pruning logic.
- Improving existing execution repository logic.
- Adjusting dir structure for filesystem mode.

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <[email protected]>
  • Loading branch information
ivov and netroy authored Sep 20, 2023
1 parent 09a7cf0 commit cd08c8e
Show file tree
Hide file tree
Showing 36 changed files with 411 additions and 253 deletions.
3 changes: 2 additions & 1 deletion packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { createDeferredPromise, LoggerProxy } from 'n8n-workflow';
import type { ChildProcess } from 'child_process';
import type PCancelable from 'p-cancelable';
import type {
ExecutionPayload,
IExecutingWorkflowData,
IExecutionDb,
IExecutionsCurrentSummary,
Expand All @@ -38,7 +39,7 @@ export class ActiveExecutions {
if (executionId === undefined) {
// Is a new execution so save in DB

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: executionData.executionData!,
mode: executionData.executionMode,
finished: false,
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/GenericHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Container } from 'typedi';
import { Like } from 'typeorm';
import config from '@/config';
import * as Db from '@/Db';
import type { ICredentialsDb, IExecutionDb, IWorkflowDb } from '@/Interfaces';
import type { ExecutionPayload, ICredentialsDb, IWorkflowDb } from '@/Interfaces';
import * as ResponseHelper from '@/ResponseHelper';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { CredentialsEntity } from '@db/entities/CredentialsEntity';
Expand Down Expand Up @@ -178,7 +178,7 @@ export async function createErrorExecution(
},
};

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: executionData,
mode,
finished: false,
Expand Down
7 changes: 6 additions & 1 deletion packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export type ICredentialsDecryptedResponse = ICredentialsDecryptedDb;
export type SaveExecutionDataType = 'all' | 'none';

export interface IExecutionBase {
id?: string;
id: string;
mode: WorkflowExecuteMode;
startedAt: Date;
stoppedAt?: Date; // empty value means execution is still running
Expand All @@ -189,6 +189,11 @@ export interface IExecutionDb extends IExecutionBase {
workflowData?: IWorkflowBase;
}

/**
* Payload for creating or updating an execution.
*/
export type ExecutionPayload = Omit<IExecutionDb, 'id'>;

export interface IExecutionPushResponse {
executionId?: string;
waitingForWebhook?: boolean;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
import type express from 'express';

import { BinaryDataManager } from 'n8n-core';

import {
getExecutions,
getExecutionInWorkflows,
deleteExecution,
getExecutionsCount,
} from './executions.service';
import { getExecutions, getExecutionInWorkflows, getExecutionsCount } from './executions.service';
import { ActiveExecutions } from '@/ActiveExecutions';
import { authorize, validCursor } from '../../shared/middlewares/global.middleware';
import type { ExecutionRequest } from '../../../types';
import { getSharedWorkflowIds } from '../workflows/workflows.service';
import { encodeNextCursor } from '../../shared/services/pagination.service';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
import { ExecutionRepository } from '@/databases/repositories';

export = {
deleteExecution: [
Expand All @@ -37,9 +31,7 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}

await BinaryDataManager.getInstance().deleteBinaryDataByExecutionIds([execution.id!]);

await deleteExecution(execution);
await Container.get(ExecutionRepository).softDelete(execution.id);

execution.id = id;

Expand Down Expand Up @@ -111,7 +103,7 @@ export = {

const executions = await getExecutions(filters);

const newLastId = !executions.length ? '0' : (executions.slice(-1)[0].id as string);
const newLastId = !executions.length ? '0' : executions.slice(-1)[0].id;

filters.lastId = newLastId;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { DeleteResult, FindOptionsWhere } from 'typeorm';
import type { FindOptionsWhere } from 'typeorm';
import { In, Not, Raw, LessThan } from 'typeorm';
import { Container } from 'typedi';
import type { ExecutionStatus } from 'n8n-workflow';
Expand Down Expand Up @@ -109,7 +109,3 @@ export async function getExecutionInWorkflows(
unflattenData: true,
});
}

export async function deleteExecution(execution: IExecutionBase): Promise<DeleteResult> {
return Container.get(ExecutionRepository).deleteExecution(execution.id as string);
}
1 change: 0 additions & 1 deletion packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,6 @@ export class Server extends AbstractServer {
),
executions_data_prune: config.getEnv('executions.pruneData'),
executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'),
executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'),
},
deploymentType: config.getEnv('deployment.type'),
binaryDataMode: binaryDataConfig.mode,
Expand Down
98 changes: 5 additions & 93 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
/* eslint-disable @typescript-eslint/no-unused-vars */

/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';
import { UserSettings, WorkflowExecute } from 'n8n-core';

import type {
IDataObject,
Expand Down Expand Up @@ -37,21 +37,16 @@ import {
} from 'n8n-workflow';

import { Container } from 'typedi';
import type { FindOptionsWhere } from 'typeorm';
import { LessThanOrEqual, In } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
import config from '@/config';
import * as Db from '@/Db';
import { ActiveExecutions } from '@/ActiveExecutions';
import { CredentialsHelper } from '@/CredentialsHelper';
import { ExternalHooks } from '@/ExternalHooks';
import type {
IExecutionDb,
IExecutionFlattedDb,
IPushDataExecutionFinished,
IWorkflowExecuteProcess,
IWorkflowExecutionDataProcess,
IWorkflowErrorData,
ExecutionPayload,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import { Push } from '@/push';
Expand Down Expand Up @@ -184,77 +179,6 @@ export function executeErrorWorkflow(
}
}

/**
* Prunes Saved Execution which are older than configured.
* Throttled to be executed just once in configured timeframe.
* TODO: Consider moving this whole function to the repository or at least the queries
*/
let throttling = false;
async function pruneExecutionData(this: WorkflowHooks): Promise<void> {
if (!throttling) {
Logger.verbose('Pruning execution data from database');

throttling = true;
const timeout = config.getEnv('executions.pruneDataTimeout'); // in seconds
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');
const date = new Date(); // today
date.setHours(date.getHours() - maxAge);

// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286

const utcDate = DateUtils.mixedDateToUtcDatetimeString(date);

const toPrune: Array<FindOptionsWhere<IExecutionFlattedDb>> = [
{ stoppedAt: LessThanOrEqual(utcDate) },
];

if (maxCount > 0) {
const executions = await Db.collections.Execution.find({
select: ['id'],
skip: maxCount,
take: 1,
order: { id: 'DESC' },
});

if (executions[0]) {
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
}
}

try {
setTimeout(() => {
throttling = false;
}, timeout * 1000);
let executionIds: Array<IExecutionFlattedDb['id']>;
do {
executionIds = (
await Db.collections.Execution.find({
select: ['id'],
where: toPrune,
take: 100,
})
).map(({ id }) => id);
await Db.collections.Execution.delete({ id: In(executionIds) });
// Mark binary data for deletion for all executions
await BinaryDataManager.getInstance().markDataForDeletionByExecutionIds(executionIds);
} while (executionIds.length > 0);
} catch (error) {
ErrorReporter.error(error);
throttling = false;
Logger.error(
`Failed pruning execution data from database for execution ID ${this.executionId} (hookFunctionsSave)`,
{
...error,
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
},
);
}
}
}

/**
* Returns hook functions to push data to Editor-UI
*
Expand Down Expand Up @@ -522,11 +446,6 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
workflowId: this.workflowData.id,
});

// Prune old execution data
if (config.getEnv('executions.pruneData')) {
await pruneExecutionData.call(this);
}

const isManualMode = [this.mode, parentProcessMode].includes('manual');

try {
Expand Down Expand Up @@ -554,8 +473,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
}

if (isManualMode && !saveManualExecutions && !fullRunData.waitTill) {
// Data is always saved, so we remove from database
await Container.get(ExecutionRepository).deleteExecution(this.executionId, true);
await Container.get(ExecutionRepository).softDelete(this.executionId);

return;
}
Expand Down Expand Up @@ -586,8 +504,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
this.executionId,
this.retryOf,
);
// Data is always saved, so we remove from database
await Container.get(ExecutionRepository).deleteExecution(this.executionId);
await Container.get(ExecutionRepository).softDelete(this.executionId);

return;
}
Expand Down Expand Up @@ -682,11 +599,6 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
workflowId: this.workflowData.id,
});
try {
// Prune old execution data
if (config.getEnv('executions.pruneData')) {
await pruneExecutionData.call(this);
}

if (isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
Expand Down Expand Up @@ -973,7 +885,7 @@ async function executeWorkflow(
// Therefore, database might not contain finished errors.
// Force an update to db as there should be no harm doing this

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
Expand Down
8 changes: 6 additions & 2 deletions packages/cli/src/WorkflowHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ import {
} from 'n8n-workflow';
import { v4 as uuid } from 'uuid';
import * as Db from '@/Db';
import type { IExecutionDb, IWorkflowErrorData, IWorkflowExecutionDataProcess } from '@/Interfaces';
import type {
ExecutionPayload,
IWorkflowErrorData,
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
// eslint-disable-next-line import/no-cycle
import { WorkflowRunner } from '@/WorkflowRunner';
Expand Down Expand Up @@ -186,7 +190,7 @@ export async function executeErrorWorkflow(
initialNode,
);

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: fakeExecution.data,
mode: fakeExecution.mode,
finished: false,
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ export class WorkflowRunner {
(workflowDidSucceed && saveDataSuccessExecution === 'none') ||
(!workflowDidSucceed && saveDataErrorExecution === 'none')
) {
await Container.get(ExecutionRepository).deleteExecution(executionId);
await Container.get(ExecutionRepository).softDelete(executionId);
}
// eslint-disable-next-line id-denylist
} catch (err) {
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ export abstract class BaseCommand extends Command {
}

async initLicense(instanceType: N8nInstanceType = 'main'): Promise<void> {
config.set('generic.instanceType', instanceType);

const license = Container.get(License);
await license.init(this.instanceId, instanceType);

Expand Down
3 changes: 3 additions & 0 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { eventBus } from '@/eventbus';
import { BaseCommand } from './BaseCommand';
import { InternalHooks } from '@/InternalHooks';
import { License } from '@/License';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
Expand Down Expand Up @@ -103,6 +104,8 @@ export class Start extends BaseCommand {
// Note: While this saves a new license cert to DB, the previous entitlements are still kept in memory so that the shutdown process can complete
await Container.get(License).shutdown();

Container.get(ExecutionRepository).clearTimers();

await Container.get(InternalHooks).onN8nStop();

const skipWebhookDeregistration = config.getEnv(
Expand Down
18 changes: 6 additions & 12 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,6 @@ export const schema = {
default: 336,
env: 'EXECUTIONS_DATA_MAX_AGE',
},
pruneDataTimeout: {
doc: 'Timeout (seconds) after execution data has been pruned',
format: Number,
default: 3600,
env: 'EXECUTIONS_DATA_PRUNE_TIMEOUT',
},

// Additional pruning option to delete executions if total count exceeds the configured max.
// Deletes the oldest entries first
Expand Down Expand Up @@ -438,6 +432,12 @@ export const schema = {
default: 'America/New_York',
env: 'GENERIC_TIMEZONE',
},

instanceType: {
doc: 'Type of n8n instance',
format: ['main', 'webhook', 'worker'] as const,
default: 'main',
},
},

// How n8n can be reached (Editor & REST-API)
Expand Down Expand Up @@ -919,12 +919,6 @@ export const schema = {
env: 'N8N_BINARY_DATA_STORAGE_PATH',
doc: 'Path for binary data storage in "filesystem" mode',
},
binaryDataTTL: {
format: Number,
default: 60,
env: 'N8N_BINARY_DATA_TTL',
doc: 'TTL for binary data of unsaved executions in minutes',
},
},

deployment: {
Expand Down
9 changes: 9 additions & 0 deletions packages/cli/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,12 @@ export const CREDENTIAL_BLANKING_VALUE = '__n8n_BLANK_VALUE_e5362baf-c777-4d57-a

export const UM_FIX_INSTRUCTION =
'Please fix the database by running ./packages/cli/bin/n8n user-management:reset';

/**
* Units of time in milliseconds
*/
export const TIME = {
SECOND: 1000,
MINUTE: 60 * 1000,
HOUR: 60 * 60 * 1000,
};
Loading

0 comments on commit cd08c8e

Please sign in to comment.