Skip to content

Commit

Permalink
perf(core): Introduce concurrency control for main mode (n8n-io#9453)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored and adrian-martinez-onestic committed Jun 20, 2024
1 parent 70f6e31 commit 65fc1ad
Show file tree
Hide file tree
Showing 31 changed files with 920 additions and 58 deletions.
16 changes: 15 additions & 1 deletion packages/cli/src/ActiveExecutions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import type {
import { isWorkflowIdValid } from '@/utils';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { Logger } from '@/Logger';
import { ConcurrencyControlService } from './concurrency/concurrency-control.service';
import config from './config';

@Service()
export class ActiveExecutions {
Expand All @@ -31,19 +33,21 @@ export class ActiveExecutions {
constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
private readonly concurrencyControl: ConcurrencyControlService,
) {}

/**
* Add a new active execution
*/
async add(executionData: IWorkflowExecutionDataProcess, executionId?: string): Promise<string> {
let executionStatus: ExecutionStatus = executionId ? 'running' : 'new';
const mode = executionData.executionMode;
if (executionId === undefined) {
// Is a new execution so save in DB

const fullExecutionData: ExecutionPayload = {
data: executionData.executionData!,
mode: executionData.executionMode,
mode,
finished: false,
startedAt: new Date(),
workflowData: executionData.workflowData,
Expand All @@ -64,10 +68,14 @@ export class ActiveExecutions {
if (executionId === undefined) {
throw new ApplicationError('There was an issue assigning an execution id to the execution');
}

await this.concurrencyControl.throttle({ mode, executionId });
executionStatus = 'running';
} else {
// Is an existing execution we want to finish so update in DB

await this.concurrencyControl.throttle({ mode, executionId });

const execution: Pick<IExecutionDb, 'id' | 'data' | 'waitTill' | 'status'> = {
id: executionId,
data: executionData.executionData!,
Expand Down Expand Up @@ -128,6 +136,8 @@ export class ActiveExecutions {

// Remove from the list of active executions
delete this.activeExecutions[executionId];

this.concurrencyControl.release({ mode: execution.executionData.executionMode });
}

/**
Expand Down Expand Up @@ -191,6 +201,10 @@ export class ActiveExecutions {
let executionIds = Object.keys(this.activeExecutions);

if (cancelAll) {
if (config.getEnv('executions.mode') === 'regular') {
await this.concurrencyControl.removeAll(this.activeExecutions);
}

const stopPromises = executionIds.map(
async (executionId) => await this.stopExecution(executionId),
);
Expand Down
4 changes: 4 additions & 0 deletions packages/cli/src/InternalHooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1261,4 +1261,8 @@ export class InternalHooks {
}) {
return await this.telemetry.track('Project settings updated', data);
}

async onConcurrencyLimitHit({ threshold }: { threshold: number }) {
await this.telemetry.track('User hit concurrency limit', { threshold });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { getSharedWorkflowIds } from '../workflows/workflows.service';
import { encodeNextCursor } from '../../shared/services/pagination.service';
import { InternalHooks } from '@/InternalHooks';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service';

export = {
deleteExecution: [
Expand All @@ -32,6 +33,19 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}

if (execution.status === 'running') {
return res.status(400).json({
message: 'Cannot delete a running execution',
});
}

if (execution.status === 'new') {
Container.get(ConcurrencyControlService).remove({
executionId: execution.id,
mode: execution.mode,
});
}

await Container.get(ExecutionRepository).hardDelete({
workflowId: execution.workflowId,
executionId: execution.id,
Expand Down
42 changes: 42 additions & 0 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error';
import { WaitTracker } from '@/WaitTracker';
import { BaseCommand } from './BaseCommand';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/WorkflowRunner';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
Expand Down Expand Up @@ -288,6 +292,10 @@ export class Start extends BaseCommand {

await this.initPruning();

if (config.getEnv('executions.mode') === 'regular') {
await this.runEnqueuedExecutions();
}

// Start to get active workflows and run their triggers
await this.activeWorkflowManager.init();

Expand Down Expand Up @@ -347,4 +355,38 @@ export class Start extends BaseCommand {
if (error.stack) this.logger.error(error.stack);
await this.exitWithCrash('Exiting due to an error.', error);
}

/**
* During startup, we may find executions that had been enqueued at the time of shutdown.
*
* If so, start running any such executions concurrently up to the concurrency limit, and
* enqueue any remaining ones until we have spare concurrency capacity again.
*/
private async runEnqueuedExecutions() {
const executions = await Container.get(ExecutionService).findAllEnqueuedExecutions();

if (executions.length === 0) return;

this.logger.debug(
'[Startup] Found enqueued executions to run',
executions.map((e) => e.id),
);

const ownershipService = Container.get(OwnershipService);
const workflowRunner = Container.get(WorkflowRunner);

for (const execution of executions) {
const project = await ownershipService.getWorkflowProjectCached(execution.workflowId);

const data: IWorkflowExecutionDataProcess = {
executionMode: execution.mode,
executionData: execution.data,
workflowData: execution.workflowData,
projectId: project.id,
};

// do not block - each execution either runs concurrently or is queued
void workflowRunner.run(data, undefined, false, execution.id);
}
}
}
6 changes: 5 additions & 1 deletion packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,12 @@ export class Worker extends BaseCommand {
Worker.jobQueue = Container.get(Queue);
await Worker.jobQueue.init();
this.logger.debug('Queue singleton ready');

const envConcurrency = config.getEnv('executions.concurrency.productionLimit');
const concurrency = envConcurrency !== -1 ? envConcurrency : flags.concurrency;

void Worker.jobQueue.process(
flags.concurrency,
concurrency,
async (job) => await this.runJob(job, this.nodeTypes),
);

Expand Down
Loading

0 comments on commit 65fc1ad

Please sign in to comment.