Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Implement soft-deletions for executions #7092

Merged
merged 44 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
29f1d11
refactor(core): Simplify executions and binary data pruning
netroy Aug 31, 2023
f7206fd
Merge branch 'master' into pay-771-implement-soft-deletion
ivov Sep 4, 2023
9c8efbc
Merge branch 'master' into pay-771-implement-soft-deletion
ivov Sep 5, 2023
05e3fef
Remove test code from service
ivov Sep 5, 2023
9dabbe8
Use time constants
ivov Sep 5, 2023
3f7de8e
Improve naming
ivov Sep 5, 2023
5583814
Use native typeorm soft deletion
ivov Sep 5, 2023
f50108a
Improve naming
ivov Sep 5, 2023
85ac53a
Make batch size class field
ivov Sep 5, 2023
1d87e9e
Cleanup
ivov Sep 5, 2023
dd16e45
Remove unused method
ivov Sep 5, 2023
619bff6
Cleanup
ivov Sep 5, 2023
8cace11
Filter out soft-deleted executions
ivov Sep 5, 2023
1fea046
Merge branch 'master' into pay-771-implement-soft-deletion
ivov Sep 5, 2023
c4ffe6d
Add tests
ivov Sep 5, 2023
738eb1b
Improve test
ivov Sep 6, 2023
259fe75
Soft-delete in single pass
ivov Sep 13, 2023
f3f5b27
Restore value
ivov Sep 13, 2023
65d17cb
Restore from debug value
ivov Sep 13, 2023
96ae7b4
Add clarifying comments
ivov Sep 13, 2023
21040b5
Update tests
ivov Sep 13, 2023
9a25301
Add clarifying comment
ivov Sep 13, 2023
0e075c2
Speed up pruning if high volume
ivov Sep 14, 2023
e8ccf05
Merge branch 'master' into pay-771-implement-soft-deletion
ivov Sep 14, 2023
2071c7f
Remove call from hook
ivov Sep 14, 2023
67e163f
Make execution ID non-nullable
ivov Sep 15, 2023
9665da1
Merge master
ivov Sep 15, 2023
12636dd
Readability improvements
ivov Sep 15, 2023
e5c8c72
Adjust types, followup to 67e163f
ivov Sep 15, 2023
b7062e5
Fix lint
ivov Sep 15, 2023
3bf3430
Merge master
ivov Sep 18, 2023
abe6d9d
Clear timers on shutdown
ivov Sep 18, 2023
2682b3d
Set timers only on main instance
ivov Sep 18, 2023
1c788fe
Also for clearing timers
ivov Sep 18, 2023
8823554
Add logging, refactor for readability
ivov Sep 19, 2023
a5def40
Ensure hard-deletion select includes soft-deleted rows
ivov Sep 19, 2023
77955e4
Switch `info` to `debug`
ivov Sep 19, 2023
b930e3e
Fix tests
ivov Sep 19, 2023
e91f3de
Remove redundant checks for `deletedAt` being `NULL`
ivov Sep 19, 2023
2c3704d
Fix lint
ivov Sep 19, 2023
215f58e
Fix last test
ivov Sep 19, 2023
c33d164
More missing loggers in tests
ivov Sep 19, 2023
69764f7
Add logger to even more test
ivov Sep 19, 2023
7c3e58d
Refactor logging for tests
ivov Sep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { createDeferredPromise, LoggerProxy } from 'n8n-workflow';
import type { ChildProcess } from 'child_process';
import type PCancelable from 'p-cancelable';
import type {
ExecutionPayload,
IExecutingWorkflowData,
IExecutionDb,
IExecutionsCurrentSummary,
Expand All @@ -38,7 +39,7 @@ export class ActiveExecutions {
if (executionId === undefined) {
// Is a new execution so save in DB

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: executionData.executionData!,
mode: executionData.executionMode,
finished: false,
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/GenericHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Container } from 'typedi';
import { Like } from 'typeorm';
import config from '@/config';
import * as Db from '@/Db';
import type { ICredentialsDb, IExecutionDb, IWorkflowDb } from '@/Interfaces';
import type { ExecutionPayload, ICredentialsDb, IWorkflowDb } from '@/Interfaces';
import * as ResponseHelper from '@/ResponseHelper';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { CredentialsEntity } from '@db/entities/CredentialsEntity';
Expand Down Expand Up @@ -178,7 +178,7 @@ export async function createErrorExecution(
},
};

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: executionData,
mode,
finished: false,
Expand Down
7 changes: 6 additions & 1 deletion packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export type ICredentialsDecryptedResponse = ICredentialsDecryptedDb;
export type SaveExecutionDataType = 'all' | 'none';

export interface IExecutionBase {
id?: string;
id: string;
mode: WorkflowExecuteMode;
startedAt: Date;
stoppedAt?: Date; // empty value means execution is still running
Expand All @@ -189,6 +189,11 @@ export interface IExecutionDb extends IExecutionBase {
workflowData?: IWorkflowBase;
}

/**
* Payload for creating or updating an execution.
*/
export type ExecutionPayload = Omit<IExecutionDb, 'id'>;

export interface IExecutionPushResponse {
executionId?: string;
waitingForWebhook?: boolean;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
import type express from 'express';

import { BinaryDataManager } from 'n8n-core';

import {
getExecutions,
getExecutionInWorkflows,
deleteExecution,
getExecutionsCount,
} from './executions.service';
import { getExecutions, getExecutionInWorkflows, getExecutionsCount } from './executions.service';
import { ActiveExecutions } from '@/ActiveExecutions';
import { authorize, validCursor } from '../../shared/middlewares/global.middleware';
import type { ExecutionRequest } from '../../../types';
import { getSharedWorkflowIds } from '../workflows/workflows.service';
import { encodeNextCursor } from '../../shared/services/pagination.service';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
import { ExecutionRepository } from '@/databases/repositories';

export = {
deleteExecution: [
Expand All @@ -37,9 +31,7 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}

await BinaryDataManager.getInstance().deleteBinaryDataByExecutionIds([execution.id!]);

await deleteExecution(execution);
await Container.get(ExecutionRepository).softDelete(execution.id);

execution.id = id;

Expand Down Expand Up @@ -111,7 +103,7 @@ export = {

const executions = await getExecutions(filters);

const newLastId = !executions.length ? '0' : (executions.slice(-1)[0].id as string);
const newLastId = !executions.length ? '0' : executions.slice(-1)[0].id;

filters.lastId = newLastId;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { DeleteResult, FindOptionsWhere } from 'typeorm';
import type { FindOptionsWhere } from 'typeorm';
import { In, Not, Raw, LessThan } from 'typeorm';
import { Container } from 'typedi';
import type { ExecutionStatus } from 'n8n-workflow';
Expand Down Expand Up @@ -109,7 +109,3 @@ export async function getExecutionInWorkflows(
unflattenData: true,
});
}

export async function deleteExecution(execution: IExecutionBase): Promise<DeleteResult> {
return Container.get(ExecutionRepository).deleteExecution(execution.id as string);
}
1 change: 0 additions & 1 deletion packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,6 @@ export class Server extends AbstractServer {
),
executions_data_prune: config.getEnv('executions.pruneData'),
executions_data_max_age: config.getEnv('executions.pruneDataMaxAge'),
executions_data_prune_timeout: config.getEnv('executions.pruneDataTimeout'),
},
deploymentType: config.getEnv('deployment.type'),
binaryDataMode: binaryDataConfig.mode,
Expand Down
98 changes: 5 additions & 93 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
/* eslint-disable @typescript-eslint/no-unused-vars */

/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';
import { UserSettings, WorkflowExecute } from 'n8n-core';

import type {
IDataObject,
Expand Down Expand Up @@ -37,21 +37,16 @@ import {
} from 'n8n-workflow';

import { Container } from 'typedi';
import type { FindOptionsWhere } from 'typeorm';
import { LessThanOrEqual, In } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
import config from '@/config';
import * as Db from '@/Db';
import { ActiveExecutions } from '@/ActiveExecutions';
import { CredentialsHelper } from '@/CredentialsHelper';
import { ExternalHooks } from '@/ExternalHooks';
import type {
IExecutionDb,
IExecutionFlattedDb,
IPushDataExecutionFinished,
IWorkflowExecuteProcess,
IWorkflowExecutionDataProcess,
IWorkflowErrorData,
ExecutionPayload,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import { Push } from '@/push';
Expand Down Expand Up @@ -184,77 +179,6 @@ export function executeErrorWorkflow(
}
}

/**
* Prunes Saved Execution which are older than configured.
* Throttled to be executed just once in configured timeframe.
* TODO: Consider moving this whole function to the repository or at least the queries
*/
let throttling = false;
async function pruneExecutionData(this: WorkflowHooks): Promise<void> {
if (!throttling) {
Logger.verbose('Pruning execution data from database');

throttling = true;
const timeout = config.getEnv('executions.pruneDataTimeout'); // in seconds
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');
const date = new Date(); // today
date.setHours(date.getHours() - maxAge);

// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286

const utcDate = DateUtils.mixedDateToUtcDatetimeString(date);

const toPrune: Array<FindOptionsWhere<IExecutionFlattedDb>> = [
{ stoppedAt: LessThanOrEqual(utcDate) },
];

if (maxCount > 0) {
const executions = await Db.collections.Execution.find({
select: ['id'],
skip: maxCount,
take: 1,
order: { id: 'DESC' },
});

if (executions[0]) {
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
}
}

try {
setTimeout(() => {
throttling = false;
}, timeout * 1000);
let executionIds: Array<IExecutionFlattedDb['id']>;
do {
executionIds = (
await Db.collections.Execution.find({
select: ['id'],
where: toPrune,
take: 100,
})
).map(({ id }) => id);
await Db.collections.Execution.delete({ id: In(executionIds) });
// Mark binary data for deletion for all executions
await BinaryDataManager.getInstance().markDataForDeletionByExecutionIds(executionIds);
} while (executionIds.length > 0);
} catch (error) {
ErrorReporter.error(error);
throttling = false;
Logger.error(
`Failed pruning execution data from database for execution ID ${this.executionId} (hookFunctionsSave)`,
{
...error,
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
},
);
}
}
}

/**
* Returns hook functions to push data to Editor-UI
*
Expand Down Expand Up @@ -522,11 +446,6 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
workflowId: this.workflowData.id,
});

// Prune old execution data
if (config.getEnv('executions.pruneData')) {
await pruneExecutionData.call(this);
}

const isManualMode = [this.mode, parentProcessMode].includes('manual');

try {
Expand Down Expand Up @@ -554,8 +473,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
}

if (isManualMode && !saveManualExecutions && !fullRunData.waitTill) {
// Data is always saved, so we remove from database
await Container.get(ExecutionRepository).deleteExecution(this.executionId, true);
await Container.get(ExecutionRepository).softDelete(this.executionId);

return;
}
Expand Down Expand Up @@ -586,8 +504,7 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
this.executionId,
this.retryOf,
);
// Data is always saved, so we remove from database
await Container.get(ExecutionRepository).deleteExecution(this.executionId);
await Container.get(ExecutionRepository).softDelete(this.executionId);

return;
}
Expand Down Expand Up @@ -682,11 +599,6 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
workflowId: this.workflowData.id,
});
try {
// Prune old execution data
if (config.getEnv('executions.pruneData')) {
await pruneExecutionData.call(this);
}

if (isWorkflowIdValid(this.workflowData.id) && newStaticData) {
// Workflow is saved so update in database
try {
Expand Down Expand Up @@ -973,7 +885,7 @@ async function executeWorkflow(
// Therefore, database might not contain finished errors.
// Force an update to db as there should be no harm doing this

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: fullRunData.data,
mode: fullRunData.mode,
finished: fullRunData.finished ? fullRunData.finished : false,
Expand Down
8 changes: 6 additions & 2 deletions packages/cli/src/WorkflowHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ import {
} from 'n8n-workflow';
import { v4 as uuid } from 'uuid';
import * as Db from '@/Db';
import type { IExecutionDb, IWorkflowErrorData, IWorkflowExecutionDataProcess } from '@/Interfaces';
import type {
ExecutionPayload,
IWorkflowErrorData,
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
// eslint-disable-next-line import/no-cycle
import { WorkflowRunner } from '@/WorkflowRunner';
Expand Down Expand Up @@ -186,7 +190,7 @@ export async function executeErrorWorkflow(
initialNode,
);

const fullExecutionData: IExecutionDb = {
const fullExecutionData: ExecutionPayload = {
data: fakeExecution.data,
mode: fakeExecution.mode,
finished: false,
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ export class WorkflowRunner {
(workflowDidSucceed && saveDataSuccessExecution === 'none') ||
(!workflowDidSucceed && saveDataErrorExecution === 'none')
) {
await Container.get(ExecutionRepository).deleteExecution(executionId);
await Container.get(ExecutionRepository).softDelete(executionId);
}
// eslint-disable-next-line id-denylist
} catch (err) {
Expand Down
12 changes: 0 additions & 12 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,6 @@ export const schema = {
default: 336,
env: 'EXECUTIONS_DATA_MAX_AGE',
},
pruneDataTimeout: {
doc: 'Timeout (seconds) after execution data has been pruned',
format: Number,
default: 3600,
env: 'EXECUTIONS_DATA_PRUNE_TIMEOUT',
},

// Additional pruning option to delete executions if total count exceeds the configured max.
// Deletes the oldest entries first
Expand Down Expand Up @@ -919,12 +913,6 @@ export const schema = {
env: 'N8N_BINARY_DATA_STORAGE_PATH',
doc: 'Path for binary data storage in "filesystem" mode',
},
binaryDataTTL: {
format: Number,
default: 60,
env: 'N8N_BINARY_DATA_TTL',
doc: 'TTL for binary data of unsaved executions in minutes',
},
},

deployment: {
Expand Down
9 changes: 9 additions & 0 deletions packages/cli/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,12 @@ export const CREDENTIAL_BLANKING_VALUE = '__n8n_BLANK_VALUE_e5362baf-c777-4d57-a

export const UM_FIX_INSTRUCTION =
'Please fix the database by running ./packages/cli/bin/n8n user-management:reset';

/**
* Units of time in milliseconds
*/
export const TIME = {
ivov marked this conversation as resolved.
Show resolved Hide resolved
SECOND: 1000,
MINUTE: 60 * 1000,
HOUR: 60 * 60 * 1000,
};
3 changes: 2 additions & 1 deletion packages/cli/src/databases/entities/ExecutionEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
OneToOne,
PrimaryColumn,
Relation,
DeleteDateColumn,
} from 'typeorm';
import { datetimeColumnType } from './AbstractEntity';
import { idStringifier } from '../utils/transformers';
Expand Down Expand Up @@ -49,7 +50,7 @@ export class ExecutionEntity {
@Column({ type: datetimeColumnType, nullable: true })
stoppedAt: Date;

@Column(datetimeColumnType)
@DeleteDateColumn({ type: datetimeColumnType, nullable: true })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we really benefiting from this decorator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DeleteDateColumn is what enables softDelete to work, which is used in many places:

Capture 2023-09-18 at 14 48 19@2x

deletedAt: Date;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be

Suggested change
deletedAt: Date;
deletedAt?: Date;

Copy link
Contributor Author

@ivov ivov Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% on this, but I logged on startup webhook entities (which have webhookId and pathLength which are nullable and marked with ?) and they show up as null when missing:

> [email protected] start /Users/ivov/Development/n8n
> run-script-os


> [email protected] start:default
> cd packages/cli/bin && ./n8n

n8n ready on 0.0.0.0, port 5678
Initializing n8n process
Version: 1.7.0
 ================================
   Start Active Workflows:
 ================================
   - My workflow 21 (ID: HDemWLl4i60bkGsm)
allWebhooks [
  WebhookEntity {
    workflowId: 'HDemWLl4i60bkGsm',
    webhookPath: '9347adb3-289c-4508-8705-531575ac3e60',
    method: 'GET',
    node: 'Webhook',
    webhookId: null,
    pathLength: null
  }
]

So if anything no properties that are nullable columns should have ? right? (Always present, but may be null)


@Column({ nullable: true })
Expand Down
Loading