Skip to content
/ qp-n8n Public
forked from n8n-io/n8n

Commit

Permalink
fix(core): Fix calling error workflows in main mode recovery (n8n-io#…
Browse files Browse the repository at this point in the history
…5698)

* fix calling error workflows in main mode recovery

* cleanup

* remove WorkflowExecuteAdditionalData export from index

* revert refactor to fix test

* Update index.ts

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <[email protected]>
  • Loading branch information
2 people authored and sunilrr committed Apr 24, 2023
1 parent 91bfb19 commit fce5967
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions packages/cli/src/eventbus/MessageEventBus/recoverEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { workflowExecutionCompleted } from '../../events/WorkflowStatistics';
import { eventBus } from './MessageEventBus';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData';

export async function recoverExecutionDataFromEventLogMessages(
executionId: string,
Expand Down Expand Up @@ -122,9 +123,6 @@ export async function recoverExecutionDataFromEventLogMessages(
}
}

if (!executionData.resultData.error && workflowError) {
executionData.resultData.error = workflowError;
}
if (!lastNodeRunTimestamp) {
const workflowEndedMessage = messages.find((message) =>
(
Expand All @@ -138,6 +136,11 @@ export async function recoverExecutionDataFromEventLogMessages(
if (workflowEndedMessage) {
lastNodeRunTimestamp = workflowEndedMessage.ts;
} else {
if (!workflowError) {
workflowError = new WorkflowOperationError(
'Workflow did not finish, possible out-of-memory issue',
);
}
const workflowStartedMessage = messages.find(
(message) => message.eventName === 'n8n.workflow.started',
);
Expand All @@ -146,6 +149,11 @@ export async function recoverExecutionDataFromEventLogMessages(
}
}
}

if (!executionData.resultData.error && workflowError) {
executionData.resultData.error = workflowError;
}

if (applyToDb) {
await Db.collections.Execution.update(executionId, {
data: stringify(executionData),
Expand Down Expand Up @@ -174,6 +182,20 @@ export async function recoverExecutionDataFromEventLogMessages(
stoppedAt: lastNodeRunTimestamp?.toJSDate(),
status: 'crashed',
};
const workflowHooks = getWorkflowHooksMain(
{
userId: '',
workflowData: executionEntry.workflowData,
executionMode: executionEntry.mode,
executionData,
runData: executionData.resultData.runData,
retryOf: executionEntry.retryOf,
},
executionId,
);

// execute workflowExecuteAfter hook to trigger error workflow
await workflowHooks.executeHookFunctions('workflowExecuteAfter', [iRunData]);

// calling workflowExecutionCompleted directly because the eventEmitter is not up yet at this point
await workflowExecutionCompleted(executionEntry.workflowData, iRunData);
Expand Down

0 comments on commit fce5967

Please sign in to comment.