Skip to content

Commit

Permalink
fix(core): Stop re-enqueing executions on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov committed Jul 1, 2024
1 parent 41c47a2 commit 18e453b
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 53 deletions.
42 changes: 0 additions & 42 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { FeatureNotLicensedError } from '@/errors/feature-not-licensed.error';
import { WaitTracker } from '@/WaitTracker';
import { BaseCommand } from './BaseCommand';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { ExecutionService } from '@/executions/execution.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
Expand Down Expand Up @@ -300,10 +296,6 @@ export class Start extends BaseCommand {
Container.get(PruningService).init();
Container.get(ExecutionRecoveryService).init();

if (config.getEnv('executions.mode') === 'regular') {
await this.runEnqueuedExecutions();
}

// Start to get active workflows and run their triggers
await this.activeWorkflowManager.init();

Expand Down Expand Up @@ -345,38 +337,4 @@ export class Start extends BaseCommand {
if (error.stack) this.logger.error(error.stack);
await this.exitWithCrash('Exiting due to an error.', error);
}

/**
* During startup, we may find executions that had been enqueued at the time of shutdown.
*
* If so, start running any such executions concurrently up to the concurrency limit, and
* enqueue any remaining ones until we have spare concurrency capacity again.
*/
private async runEnqueuedExecutions() {
const executions = await Container.get(ExecutionService).findAllEnqueuedExecutions();

if (executions.length === 0) return;

this.logger.debug(
'[Startup] Found enqueued executions to run',
executions.map((e) => e.id),
);

const ownershipService = Container.get(OwnershipService);
const workflowRunner = Container.get(WorkflowRunner);

for (const execution of executions) {
const project = await ownershipService.getWorkflowProjectCached(execution.workflowId);

const data: IWorkflowExecutionDataProcess = {
executionMode: execution.mode,
executionData: execution.data,
workflowData: execution.workflowData,
projectId: project.id,
};

// do not block - each execution either runs concurrently or is queued
void workflowRunner.run(data, undefined, false, execution.id);
}
}
}
11 changes: 0 additions & 11 deletions packages/cli/src/executions/execution.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -390,17 +390,6 @@ export class ExecutionService {
};
}

async findAllEnqueuedExecutions() {
return await this.executionRepository.findMultipleExecutions(
{
select: ['id', 'mode'],
where: { status: 'new' },
order: { id: 'ASC' },
},
{ includeData: true, unflattenData: true },
);
}

async stop(executionId: string): Promise<StopResult> {
const execution = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
Expand Down

0 comments on commit 18e453b

Please sign in to comment.