Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Prevent false stalled jobs in queue mode from displaying as errored #7435

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@ export class WorkflowRunner {
) {
ErrorReporter.error(error);

const isQueueMode = config.getEnv('executions.mode') === 'queue';

// in queue mode, first do a sanity run for the edge case that the execution was not marked as stalled
// by Bull even though it executed successfully, see https://github.com/OptimalBits/bull/issues/1415

if (isQueueMode && executionMode !== 'manual') {
const executionWithoutData = await Container.get(ExecutionRepository).findSingleExecution(
executionId,
{
includeData: false,
},
);
if (executionWithoutData?.finished === true && executionWithoutData?.status === 'success') {
// false positive, execution was successful
return;
}
}

const fullRunData: IRun = {
data: {
resultData: {
Expand Down
83 changes: 83 additions & 0 deletions packages/cli/test/unit/WorkflowRunner.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import type { User } from '@db/entities/User';
import * as testDb from '../integration/shared/testDb';
import * as utils from '../integration/shared/utils/';
import { createWorkflow, createExecution } from '../integration/shared/testDb';
import { WorkflowRunner } from '@/WorkflowRunner';
import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow';
import { Push } from '../../src/push';
import { mockInstance } from '../integration/shared/utils';
import Container from 'typedi';
import config from '../../src/config';

let owner: User;
let runner: WorkflowRunner;
let hookFunctions: IWorkflowExecuteHooks;
utils.setupTestServer({ endpointGroups: [] });

class Watchers {
workflowExecuteAfter = jest.fn();
}
const watchers = new Watchers();
const watchedWorkflowExecuteAfter = jest.spyOn(watchers, 'workflowExecuteAfter');

beforeAll(async () => {
const globalOwnerRole = await testDb.getGlobalOwnerRole();
owner = await testDb.createUser({ globalRole: globalOwnerRole });

mockInstance(Push);
Container.set(Push, new Push());

runner = new WorkflowRunner();

hookFunctions = {
workflowExecuteAfter: [watchers.workflowExecuteAfter],
};
});

afterAll(() => {
jest.restoreAllMocks();
});

beforeEach(async () => {
await testDb.truncate(['Workflow', 'SharedWorkflow']);
});

test('processError should return early in Bull stalled edge case', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
status: 'success',
finished: true,
},
workflow,
);
config.set('executions.mode', 'queue');
await runner.processError(
new Error('test') as ExecutionError,
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0);
});

test('processError should process error', async () => {
const workflow = await createWorkflow({}, owner);
const execution = await createExecution(
{
status: 'success',
finished: true,
},
workflow,
);
config.set('executions.mode', 'regular');
await runner.processError(
new Error('test') as ExecutionError,
new Date(),
'webhook',
execution.id,
new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow),
);
expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(1);
});
Loading