Skip to content

Commit

Permalink
fix(core): Ensure job processor does not reprocess amended executions (
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored Oct 29, 2024
1 parent ad29235 commit c152a3a
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 12 deletions.
21 changes: 21 additions & 0 deletions packages/cli/src/scaling/__tests__/job-processor.service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { mock } from 'jest-mock-extended';

import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { IExecutionResponse } from '@/interfaces';

import { JobProcessor } from '../job-processor';
import type { Job } from '../scaling.types';

describe('JobProcessor', () => {
it('should refrain from processing a crashed execution', async () => {
const executionRepository = mock<ExecutionRepository>();
executionRepository.findSingleExecution.mockResolvedValue(
mock<IExecutionResponse>({ status: 'crashed' }),
);
const jobProcessor = new JobProcessor(mock(), executionRepository, mock(), mock(), mock());

const result = await jobProcessor.processJob(mock<Job>());

expect(result).toEqual({ success: false });
});
});
7 changes: 7 additions & 0 deletions packages/cli/src/scaling/job-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ export class JobProcessor {
);
}

/**
* Bull's implicit retry mechanism and n8n's execution recovery mechanism may
* cause a crashed execution to be enqueued. We refrain from processing it,
* until we have reworked both mechanisms to prevent this scenario.
*/
if (execution.status === 'crashed') return { success: false };

const workflowId = execution.workflowData.id;

this.logger.info(`Worker started execution ${executionId} (job ${job.id})`, {
Expand Down
12 changes: 0 additions & 12 deletions packages/cli/src/workflow-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import type {
IWorkflowExecutionDataProcess,
} from 'n8n-workflow';
import {
ApplicationError,
ErrorReporterProxy as ErrorReporter,
ExecutionCancelledError,
Workflow,
Expand Down Expand Up @@ -381,17 +380,6 @@ export class WorkflowRunner {
let job: Job;
let hooks: WorkflowHooks;
try {
// check to help diagnose PAY-2100
if (
data.executionData?.executionData?.nodeExecutionStack?.length === 0 &&
config.getEnv('deployment.type') === 'internal'
) {
await this.executionRepository.setRunning(executionId); // set `startedAt` so we display it correctly in UI
throw new ApplicationError('Execution to enqueue has empty node execution stack', {
extra: { executionData: data.executionData },
});
}

job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 });

hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain(
Expand Down

0 comments on commit c152a3a

Please sign in to comment.