Skip to content

Commit

Permalink
fix(core): Prevent false stalled jobs in queue mode from displaying a…
Browse files Browse the repository at this point in the history
…s errored (#7435)

This is related to an issue with how Bull handles stalled jobs, see
OptimalBits/bull#1415 for reference.

CPU intensive workflows can in certain cases take a long while to finish
up, thereby blocking the thread and causing Bull queue to think the job
has stalled, even though it finished successfully. In these cases the
error handling could then overwrite the successful execution data with
the error message.
  • Loading branch information
flipswitchingmonkey authored and elsmr committed Oct 19, 2023
1 parent f7fc76f commit 0854d3f
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 0 deletions.
18 changes: 18 additions & 0 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@ export class WorkflowRunner {
) {
ErrorReporter.error(error);

const isQueueMode = config.getEnv('executions.mode') === 'queue';

// in queue mode, first do a sanity run for the edge case that the execution was not marked as stalled
// by Bull even though it executed successfully, see https://github.com/OptimalBits/bull/issues/1415

if (isQueueMode && executionMode !== 'manual') {
const executionWithoutData = await Container.get(ExecutionRepository).findSingleExecution(
executionId,
{
includeData: false,
},
);
if (executionWithoutData?.finished === true && executionWithoutData?.status === 'success') {
// false positive, execution was successful
return;
}
}

const fullRunData: IRun = {
data: {
resultData: {
Expand Down
83 changes: 83 additions & 0 deletions packages/cli/test/unit/WorkflowRunner.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import type { User } from '@db/entities/User';
import * as testDb from '../integration/shared/testDb';
import * as utils from '../integration/shared/utils/';
import { createWorkflow, createExecution } from '../integration/shared/testDb';
import { WorkflowRunner } from '@/WorkflowRunner';
import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow';
import { Push } from '../../src/push';
import { mockInstance } from '../integration/shared/utils';
import Container from 'typedi';
import config from '../../src/config';

let owner: User;
let runner: WorkflowRunner;
let hookFunctions: IWorkflowExecuteHooks;
utils.setupTestServer({ endpointGroups: [] });

class Watchers {
workflowExecuteAfter = jest.fn();
}
const watchers = new Watchers();
const watchedWorkflowExecuteAfter = jest.spyOn(watchers, 'workflowExecuteAfter');

beforeAll(async () => {
const globalOwnerRole = await testDb.getGlobalOwnerRole();
owner = await testDb.createUser({ globalRole: globalOwnerRole });

mockInstance(Push);
Container.set(Push, new Push());

runner = new WorkflowRunner();

hookFunctions = {
workflowExecuteAfter: [watchers.workflowExecuteAfter],
};
});

afterAll(() => {
jest.restoreAllMocks();
});

beforeEach(async () => {
await testDb.truncate(['Workflow', 'SharedWorkflow']);
});

test('processError should return early in Bull stalled edge case', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
status: 'success',
finished: true,
},
workflow,
);
config.set('executions.mode', 'queue');
await runner.processError(
new Error('test') as ExecutionError,
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0);
});

test('processError should process error', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
status: 'success',
finished: true,
},
workflow,
);
config.set('executions.mode', 'regular');
await runner.processError(
new Error('test') as ExecutionError,
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(1);
});

0 comments on commit 0854d3f

Please sign in to comment.