Skip to content

Commit

Permalink
refactor(core): Move typeorm operators from PruningService to `Ex…
Browse files Browse the repository at this point in the history
…ecutionRepository` (no-changelog) (#8145)

Follow-up to #8143
  • Loading branch information
ivov authored Dec 22, 2023
1 parent a59d78d commit 7b26a7a
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 77 deletions.
84 changes: 83 additions & 1 deletion packages/cli/src/databases/repositories/execution.repository.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import { Service } from 'typedi';
import { DataSource, In, LessThanOrEqual, MoreThanOrEqual, Repository } from 'typeorm';
import {
Brackets,
DataSource,
In,
IsNull,
LessThanOrEqual,
MoreThanOrEqual,
Not,
Repository,
} from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
import type {
FindManyOptions,
Expand Down Expand Up @@ -434,4 +443,77 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
},
}).then((executions) => executions.map(({ id }) => id));
}

async softDeletePrunableExecutions() {
const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');

// Find ids of all executions that were stopped longer that pruneDataMaxAge ago
const date = new Date();
date.setHours(date.getHours() - maxAge);

const toPrune: Array<FindOptionsWhere<ExecutionEntity>> = [
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
{ stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) },
];

if (maxCount > 0) {
const executions = await this.find({
select: ['id'],
skip: maxCount,
take: 1,
order: { id: 'DESC' },
});

if (executions[0]) {
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
}
}

const [timeBasedWhere, countBasedWhere] = toPrune;

return this.createQueryBuilder()
.update(ExecutionEntity)
.set({ deletedAt: new Date() })
.where({
deletedAt: IsNull(),
// Only mark executions as deleted if they are in an end state
status: Not(In(['new', 'running', 'waiting'])),
})
.andWhere(
new Brackets((qb) =>
countBasedWhere
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
: qb.where(timeBasedWhere),
),
)
.execute();
}

async hardDeleteSoftDeletedExecutions() {
const date = new Date();
date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer'));

const workflowIdsAndExecutionIds = (
await this.find({
select: ['workflowId', 'id'],
where: {
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
},
take: this.hardDeletionBatchSize,

/**
* @important This ensures soft-deleted executions are included,
* else `@DeleteDateColumn()` at `deletedAt` will exclude them.
*/
withDeleted: true,
})
).map(({ id: executionId, workflowId }) => ({ workflowId, executionId }));

return workflowIdsAndExecutionIds;
}

async deleteByIds(executionIds: string[]) {
return this.delete({ id: In(executionIds) });
}
}
86 changes: 10 additions & 76 deletions packages/cli/src/services/pruning.service.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import { Service } from 'typedi';
import { BinaryDataService } from 'n8n-core';
import type { FindOptionsWhere } from 'typeorm';
import { Brackets, In, IsNull, LessThanOrEqual, Not } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';

import { inTest, TIME } from '@/constants';
import config from '@/config';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';
import { ExecutionEntity } from '@db/entities/ExecutionEntity';
import { jsonStringify } from 'n8n-workflow';
import { OnShutdown } from '@/decorators/OnShutdown';

Expand Down Expand Up @@ -113,50 +108,7 @@ export class PruningService {
async softDeleteOnPruningCycle() {
this.logger.debug('[Pruning] Starting soft-deletion of executions');

const maxAge = config.getEnv('executions.pruneDataMaxAge'); // in h
const maxCount = config.getEnv('executions.pruneDataMaxCount');

// Find ids of all executions that were stopped longer that pruneDataMaxAge ago
const date = new Date();
date.setHours(date.getHours() - maxAge);

const toPrune: Array<FindOptionsWhere<ExecutionEntity>> = [
// date reformatting needed - see https://github.com/typeorm/typeorm/issues/2286
{ stoppedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)) },
];

if (maxCount > 0) {
const executions = await this.executionRepository.find({
select: ['id'],
skip: maxCount,
take: 1,
order: { id: 'DESC' },
});

if (executions[0]) {
toPrune.push({ id: LessThanOrEqual(executions[0].id) });
}
}

const [timeBasedWhere, countBasedWhere] = toPrune;

const result = await this.executionRepository
.createQueryBuilder()
.update(ExecutionEntity)
.set({ deletedAt: new Date() })
.where({
deletedAt: IsNull(),
// Only mark executions as deleted if they are in an end state
status: Not(In(['new', 'running', 'waiting'])),
})
.andWhere(
new Brackets((qb) =>
countBasedWhere
? qb.where(timeBasedWhere).orWhere(countBasedWhere)
: qb.where(timeBasedWhere),
),
)
.execute();
const result = await this.executionRepository.softDeletePrunableExecutions();

if (result.affected === 0) {
this.logger.debug('[Pruning] Found no executions to soft-delete');
Expand All @@ -177,40 +129,22 @@ export class PruningService {
* @return Delay in ms after which the next cycle should be started
*/
private async hardDeleteOnPruningCycle() {
const date = new Date();
date.setHours(date.getHours() - config.getEnv('executions.pruneDataHardDeleteBuffer'));

const workflowIdsAndExecutionIds = (
await this.executionRepository.find({
select: ['workflowId', 'id'],
where: {
deletedAt: LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(date)),
},
take: this.hardDeletionBatchSize,

/**
* @important This ensures soft-deleted executions are included,
* else `@DeleteDateColumn()` at `deletedAt` will exclude them.
*/
withDeleted: true,
})
).map(({ id: executionId, workflowId }) => ({ workflowId, executionId }));

const executionIds = workflowIdsAndExecutionIds.map((o) => o.executionId);
const ids = await this.executionRepository.hardDeleteSoftDeletedExecutions();

const executionIds = ids.map((o) => o.executionId);

if (executionIds.length === 0) {
this.logger.debug('[Pruning] Found no executions to hard-delete');

return this.rates.hardDeletion;
}

try {
this.logger.debug('[Pruning] Starting hard-deletion of executions', {
executionIds,
});
this.logger.debug('[Pruning] Starting hard-deletion of executions', { executionIds });

await this.binaryDataService.deleteMany(workflowIdsAndExecutionIds);
await this.binaryDataService.deleteMany(ids);

await this.executionRepository.delete({ id: In(executionIds) });
await this.executionRepository.deleteByIds(executionIds);

this.logger.debug('[Pruning] Hard-deleted executions', { executionIds });
} catch (error) {
Expand All @@ -225,7 +159,7 @@ export class PruningService {
* to prevent high concurrency from causing duplicate deletions.
*/
const isHighVolume = executionIds.length >= this.hardDeletionBatchSize;
const rate = isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion;
return rate;

return isHighVolume ? 1 * TIME.SECOND : this.rates.hardDeletion;
}
}

0 comments on commit 7b26a7a

Please sign in to comment.