Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Move typeorm operators from PruningService to ExecutionRepository (no-changelog) #8145

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import { Service } from 'typedi';
import { DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm';
import {
Brackets,
DataSource,
In,
IsNull,
LessThanOrEqual,
MoreThanOrEqual,
Not,
Repository,
} from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
import type {
FindManyOptions,
Expand Down Expand Up @@ -434,4 +443,77 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
},
}).then((executions) => executions.map(({ id }) => id));
}

async softDeletePrunableExecutions() {
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');

// Find ids of all executions that were stopped longer that pruneDataMaxAge ago
const date = new Date();
date.setHours(date.getHours() - maxAge);

const toPrune: Array<FindOptionsWhere<ExecutionEntity>> = [
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
{ stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) },
];

if (maxCount > 0) {
const executions = await this.find({
select: ['id'],
skip: maxCount,
take: 1,
order: { id: 'DESC' },
});

if (executions[0]) {
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
}
}

const [timeBasedWhere, countBasedWhere] = toPrune;

return this.createQueryBuilder()
.update(ExecutionEntity)
.set({ deletedAt: new Date() })
.where({
deletedAt: IsNull(),
// Only mark executions as deleted if they are in an end state
status: Not(In(['new', 'running', 'waiting'])),
})
.andWhere(
new Brackets((qb) =>
countBasedWhere
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
: qb.where(timeBasedWhere),
),
)
.execute();
}

async hardDeleteSoftDeletedExecutions() {
const date = new Date();
date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer'));

const workflowIdsAndExecutionIds = (
await this.find({
select: ['workflowId', 'id'],
where: {
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
},
take: this.hardDeletionBatchSize,

/**
* @important This ensures soft-deleted executions are included,
* else `@DeleteDateColumn()` at `deletedAt` will exclude them.
*/
withDeleted: true,
})
).map(({ id: executionId, workflowId }) => ({ workflowId, executionId }));

return workflowIdsAndExecutionIds;
}

async deleteByIds(executionIds: string[]) {
return this.delete({ id: In(executionIds) });
}
}
86 changes: 10 additions & 76 deletions packages/cli/src/services/pruning.service.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import { Service } from 'typedi';
import { BinaryDataService } from 'n8n-core';
import type { FindOptionsWhere } from 'typeorm';
import { Brackets, In, IsNull, LessThanOrEqual, Not } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';

import { inTest, TIME } from '@/constants';
import config from '@/config';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';
import { ExecutionEntity } from '@db/entities/ExecutionEntity';
import { jsonStringify } from 'n8n-workflow';
import { OnShutdown } from '@/decorators/OnShutdown';

Expand Down Expand Up @@ -113,50 +108,7 @@ export class PruningService {
async softDeleteOnPruningCycle() {
this.logger.debug('[Pruning] Starting soft-deletion of executions');

const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');

// Find ids of all executions that were stopped longer that pruneDataMaxAge ago
const date = new Date();
date.setHours(date.getHours() - maxAge);

const toPrune: Array<FindOptionsWhere<ExecutionEntity>> = [
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
{ stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) },
];

if (maxCount > 0) {
const executions = await this.executionRepository.find({
select: ['id'],
skip: maxCount,
take: 1,
order: { id: 'DESC' },
});

if (executions[0]) {
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
}
}

const [timeBasedWhere, countBasedWhere] = toPrune;

const result = await this.executionRepository
.createQueryBuilder()
.update(ExecutionEntity)
.set({ deletedAt: new Date() })
.where({
deletedAt: IsNull(),
// Only mark executions as deleted if they are in an end state
status: Not(In(['new', 'running', 'waiting'])),
})
.andWhere(
new Brackets((qb) =>
countBasedWhere
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
: qb.where(timeBasedWhere),
),
)
.execute();
const result = await this.executionRepository.softDeletePrunableExecutions();

if (result.affected === 0) {
this.logger.debug('[Pruning] Found no executions to soft-delete');
Expand All @@ -177,40 +129,22 @@ export class PruningService {
* @return Delay in ms after which the next cycle should be started
*/
private async hardDeleteOnPruningCycle() {
const date = new Date();
date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer'));

const workflowIdsAndExecutionIds = (
await this.executionRepository.find({
select: ['workflowId', 'id'],
where: {
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
},
take: this.hardDeletionBatchSize,

/**
* @important This ensures soft-deleted executions are included,
* else `@DeleteDateColumn()` at `deletedAt` will exclude them.
*/
withDeleted: true,
})
).map(({ id: executionId, workflowId }) => ({ workflowId, executionId }));

const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
const ids = await this.executionRepository.hardDeleteSoftDeletedExecutions();

const executionIds = ids.map((o) => o.executionId);

if (executionIds.length === 0) {
this.logger.debug('[Pruning] Found no executions to hard-delete');

return this.rates.hardDeletion;
}

try {
this.logger.debug('[Pruning] Starting hard-deletion of executions', {
executionIds,
});
this.logger.debug('[Pruning] Starting hard-deletion of executions', { executionIds });

await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
await this.binaryDataService.deleteMany(ids);

await this.executionRepository.delete({ id: In(executionIds) });
await this.executionRepository.deleteByIds(executionIds);

this.logger.debug('[Pruning] Hard-deleted executions', { executionIds });
} catch (error) {
Expand All @@ -225,7 +159,7 @@ export class PruningService {
* to prevent high concurrency from causing duplicate deletions.
*/
const isHighVolume = executionIds.length >= this.hardDeletionBatchSize;
const rate = isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion;
return rate;

return isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion;
}
}
Loading