diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index aac5ac9f195607..7c1e7f69c511c1 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -156,3 +156,14 @@ export const GENERIC_OAUTH2_CREDENTIALS_WITH_EDITABLE_SCOPE = [ 'microsoftOAuth2Api', 'highLevelOAuth2Api', ]; + +export const ARTIFICIAL_TASK_DATA = { + main: [ + [ + { + json: { isArtificialRecoveredEventItem: true }, + pairedItem: undefined, + }, + ], + ], +}; diff --git a/packages/cli/src/errors/node-crashed.error.ts b/packages/cli/src/errors/node-crashed.error.ts new file mode 100644 index 00000000000000..24433ff979e6d8 --- /dev/null +++ b/packages/cli/src/errors/node-crashed.error.ts @@ -0,0 +1,12 @@ +import type { INode } from 'n8n-workflow'; +import { NodeOperationError } from 'n8n-workflow'; + +export class NodeCrashedError extends NodeOperationError { + constructor(node: INode) { + super(node, 'Node crashed, possible out-of-memory issue', { + message: 'Execution stopped at this node', + description: + "n8n may have run out of memory while running this execution. More context and tips on how to avoid this in the docs", + }); + } +} diff --git a/packages/cli/src/errors/workflow-crashed.error.ts b/packages/cli/src/errors/workflow-crashed.error.ts new file mode 100644 index 00000000000000..122dd70ef286b3 --- /dev/null +++ b/packages/cli/src/errors/workflow-crashed.error.ts @@ -0,0 +1,7 @@ +import { WorkflowOperationError } from 'n8n-workflow'; + +export class WorkflowCrashedError extends WorkflowOperationError { + constructor() { + super('Workflow did not finish, possible out-of-memory issue'); + } +} diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 6bc7d7b6a7d9c7..141719c68d69ed 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -32,7 +32,7 @@ import { import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; -import { ExecutionDataRecoveryService } from '../executionDataRecovery.service'; +import { ExecutionRecoveryService } from '../../executions/execution-recovery.service'; import { EventMessageAiNode, type EventMessageAiNodeOptions, @@ -68,7 +68,7 @@ export class MessageEventBus extends EventEmitter { private readonly eventDestinationsRepository: EventDestinationsRepository, private readonly workflowRepository: WorkflowRepository, private readonly orchestrationService: OrchestrationService, - private readonly recoveryService: ExecutionDataRecoveryService, + private readonly recoveryService: ExecutionRecoveryService, ) { super(); } @@ -185,10 +185,9 @@ export class MessageEventBus extends EventEmitter { ); await this.executionRepository.markAsCrashed([executionId]); } else { - await this.recoveryService.recoverExecutionData( + await this.recoveryService.recover( executionId, unsentAndUnfinished.unfinishedExecutions[executionId], - true, ); } } diff --git a/packages/cli/src/eventbus/executionDataRecovery.service.ts b/packages/cli/src/eventbus/executionDataRecovery.service.ts deleted file mode 100644 index 3ddee5edc71e47..00000000000000 --- a/packages/cli/src/eventbus/executionDataRecovery.service.ts +++ /dev/null @@ -1,213 +0,0 @@ -import { Container, Service } from 'typedi'; -import type { DateTime } from 'luxon'; -import { Push } from '@/push'; -import { InternalHooks } from '@/InternalHooks'; -import type { IRun, IRunExecutionData, ITaskData } from 'n8n-workflow'; -import { NodeOperationError, WorkflowOperationError, sleep } from 'n8n-workflow'; - -import { ExecutionRepository } from '@db/repositories/execution.repository'; -import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; -import type { EventMessageTypes, EventNamesTypes } from './EventMessageClasses'; - -@Service() -export class ExecutionDataRecoveryService { - constructor( - private readonly push: Push, - private readonly executionRepository: ExecutionRepository, - ) {} - - // eslint-disable-next-line complexity - async recoverExecutionData( - executionId: string, - messages: EventMessageTypes[], - applyToDb: boolean, - ): Promise { - const executionEntry = await this.executionRepository.findSingleExecution(executionId, { - includeData: true, - unflattenData: true, - }); - - if (executionEntry && messages) { - let executionData = executionEntry.data; - let workflowError: WorkflowOperationError | undefined; - if (!executionData) { - executionData = { resultData: { runData: {} } }; - } - let nodeNames: string[] = []; - if ( - executionData?.resultData?.runData && - Object.keys(executionData.resultData.runData).length > 0 - ) { - } else { - if (!executionData.resultData) { - executionData.resultData = { - runData: {}, - }; - } else { - if (!executionData.resultData.runData) { - executionData.resultData.runData = {}; - } - } - } - nodeNames = executionEntry.workflowData.nodes.map((n) => n.name); - - let lastNodeRunTimestamp: DateTime | undefined = undefined; - - for (const nodeName of nodeNames) { - const nodeByName = executionEntry?.workflowData.nodes.find((n) => n.name === nodeName); - - if (!nodeByName) continue; - - const nodeStartedMessage = messages.find( - (message) => - message.eventName === 'n8n.node.started' && message.payload.nodeName === nodeName, - ); - const nodeFinishedMessage = messages.find( - (message) => - message.eventName === 'n8n.node.finished' && message.payload.nodeName === nodeName, - ); - - const executionTime = - nodeStartedMessage && nodeFinishedMessage - ? nodeFinishedMessage.ts.diff(nodeStartedMessage.ts).toMillis() - : 0; - - let taskData: ITaskData; - if (executionData.resultData.runData[nodeName]?.length > 0) { - taskData = executionData.resultData.runData[nodeName][0]; - } else { - taskData = { - startTime: nodeStartedMessage ? nodeStartedMessage.ts.toUnixInteger() : 0, - executionTime, - source: [null], - executionStatus: 'unknown', - }; - } - - if (nodeStartedMessage && !nodeFinishedMessage) { - const nodeError = new NodeOperationError( - nodeByName, - 'Node crashed, possible out-of-memory issue', - { - message: 'Execution stopped at this node', - description: - "n8n may have run out of memory while executing it. More context and tips on how to avoid this in the docs", - }, - ); - workflowError = new WorkflowOperationError( - 'Workflow did not finish, possible out-of-memory issue', - ); - taskData.error = nodeError; - taskData.executionStatus = 'crashed'; - executionData.resultData.lastNodeExecuted = nodeName; - if (nodeStartedMessage) lastNodeRunTimestamp = nodeStartedMessage.ts; - } else if (nodeStartedMessage && nodeFinishedMessage) { - taskData.executionStatus = 'success'; - if (taskData.data === undefined) { - taskData.data = { - main: [ - [ - { - json: { - isArtificialRecoveredEventItem: true, - }, - pairedItem: undefined, - }, - ], - ], - }; - } - } - - if (!executionData.resultData.runData[nodeName]) { - executionData.resultData.runData[nodeName] = [taskData]; - } - } - - if (!lastNodeRunTimestamp) { - const workflowEndedMessage = messages.find((message) => - ( - [ - 'n8n.workflow.success', - 'n8n.workflow.crashed', - 'n8n.workflow.failed', - ] as EventNamesTypes[] - ).includes(message.eventName), - ); - if (workflowEndedMessage) { - lastNodeRunTimestamp = workflowEndedMessage.ts; - } else { - if (!workflowError) { - workflowError = new WorkflowOperationError( - 'Workflow did not finish, possible out-of-memory issue', - ); - } - const workflowStartedMessage = messages.find( - (message) => message.eventName === 'n8n.workflow.started', - ); - if (workflowStartedMessage) { - lastNodeRunTimestamp = workflowStartedMessage.ts; - } - } - } - - if (!executionData.resultData.error && workflowError) { - executionData.resultData.error = workflowError; - } - - if (applyToDb) { - const newStatus = executionEntry.status === 'error' ? 'error' : 'crashed'; - await this.executionRepository.updateExistingExecution(executionId, { - data: executionData, - status: newStatus, - stoppedAt: lastNodeRunTimestamp?.toJSDate(), - }); - await Container.get(InternalHooks).onWorkflowPostExecute( - executionId, - executionEntry.workflowData, - { - data: executionData, - finished: false, - mode: executionEntry.mode, - waitTill: executionEntry.waitTill ?? undefined, - startedAt: executionEntry.startedAt, - stoppedAt: lastNodeRunTimestamp?.toJSDate(), - status: newStatus, - }, - ); - const iRunData: IRun = { - data: executionData, - finished: false, - mode: executionEntry.mode, - waitTill: executionEntry.waitTill ?? undefined, - startedAt: executionEntry.startedAt, - stoppedAt: lastNodeRunTimestamp?.toJSDate(), - status: newStatus, - }; - const workflowHooks = getWorkflowHooksMain( - { - userId: '', - workflowData: executionEntry.workflowData, - executionMode: executionEntry.mode, - executionData, - runData: executionData.resultData.runData, - retryOf: executionEntry.retryOf, - }, - executionId, - ); - - // execute workflowExecuteAfter hook to trigger error workflow - await workflowHooks.executeHookFunctions('workflowExecuteAfter', [iRunData]); - - // wait for UI to be back up and send the execution data - this.push.once('editorUiConnected', async () => { - // add a small timeout to make sure the UI is back up - await sleep(1000); - this.push.broadcast('executionRecovered', { executionId }); - }); - } - return executionData; - } - return; - } -} diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts new file mode 100644 index 00000000000000..c9d75ee6a3243c --- /dev/null +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -0,0 +1,420 @@ +import Container from 'typedi'; +import { stringify } from 'flatted'; + +import { mockInstance } from '@test/mocking'; +import { randomInteger } from '@test-integration/random'; +import { createWorkflow } from '@test-integration/db/workflows'; +import { createExecution } from '@test-integration/db/executions'; +import * as testDb from '@test-integration/testDb'; + +import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; + +import { InternalHooks } from '@/InternalHooks'; +import { Push } from '@/push'; +import { ARTIFICIAL_TASK_DATA } from '@/constants'; +import { NodeCrashedError } from '@/errors/node-crashed.error'; +import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; +import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNode'; +import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; +import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses'; +import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity'; +import { NodeConnectionType } from 'n8n-workflow'; + +/** + * Workflow producing an execution whose data will be truncated by an instance crash. + */ +export const OOM_WORKFLOW: Partial = { + nodes: [ + { + parameters: {}, + id: '48ce17fe-9651-42ae-910c-48602a00f0bb', + name: 'When clicking "Test workflow"', + type: 'n8n-nodes-base.manualTrigger', + typeVersion: 1, + position: [640, 260], + }, + { + parameters: { + category: 'oom', + memorySizeValue: 1000, + }, + id: '07a48151-96d3-45eb-961c-1daf85fbe052', + name: 'DebugHelper', + type: 'n8n-nodes-base.debugHelper', + typeVersion: 1, + position: [840, 260], + }, + ], + connections: { + 'When clicking "Test workflow"': { + main: [ + [ + { + node: 'DebugHelper', + type: NodeConnectionType.Main, + index: 0, + }, + ], + ], + }, + }, + pinData: {}, +}; + +/** + * Snapshot of an execution that will be truncated by an instance crash. + */ +export const IN_PROGRESS_EXECUTION_DATA = { + startData: {}, + resultData: { + runData: { + 'When clicking "Test workflow"': [ + { + hints: [], + startTime: 1716138610153, + executionTime: 1, + source: [], + executionStatus: 'success', + data: { + main: [ + [ + { + json: {}, + pairedItem: { + item: 0, + }, + }, + ], + ], + }, + }, + ], + }, + lastNodeExecuted: 'When clicking "Test workflow"', + }, + executionData: { + contextData: {}, + nodeExecutionStack: [ + { + node: { + parameters: { + category: 'oom', + memorySizeValue: 1000, + }, + id: '07a48151-96d3-45eb-961c-1daf85fbe052', + name: 'DebugHelper', + type: 'n8n-nodes-base.debugHelper', + typeVersion: 1, + position: [840, 260], + }, + data: { + main: [ + [ + { + json: {}, + pairedItem: { + item: 0, + }, + }, + ], + ], + }, + source: { + main: [ + { + previousNode: 'When clicking "Test workflow"', + }, + ], + }, + }, + ], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: {}, + }, +}; + +export const setupMessages = (executionId: string, workflowName: string): EventMessage[] => { + return [ + new EventMessageWorkflow({ + eventName: 'n8n.workflow.started', + payload: { executionId }, + }), + new EventMessageNode({ + eventName: 'n8n.node.started', + payload: { + executionId, + workflowName, + nodeName: 'When clicking "Test workflow"', + nodeType: 'n8n-nodes-base.manualTrigger', + }, + }), + new EventMessageNode({ + eventName: 'n8n.node.finished', + payload: { + executionId, + workflowName, + nodeName: 'When clicking "Test workflow"', + nodeType: 'n8n-nodes-base.manualTrigger', + }, + }), + new EventMessageNode({ + eventName: 'n8n.node.started', + payload: { + executionId, + workflowName, + nodeName: 'DebugHelper', + nodeType: 'n8n-nodes-base.debugHelper', + }, + }), + ]; +}; + +describe('ExecutionRecoveryService', () => { + let executionRecoveryService: ExecutionRecoveryService; + let push: Push; + + beforeAll(async () => { + await testDb.init(); + + mockInstance(InternalHooks); + push = mockInstance(Push); + + executionRecoveryService = new ExecutionRecoveryService( + push, + Container.get(ExecutionRepository), + ); + }); + + afterEach(async () => { + await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']); + }); + + afterAll(async () => { + await testDb.terminate(); + }); + + describe('recover', () => { + it('should amend, persist, run hooks, broadcast', async () => { + /** + * Arrange + */ + // @ts-expect-error Private method + const amendSpy = jest.spyOn(executionRecoveryService, 'amend'); + const executionRepository = Container.get(ExecutionRepository); + const dbUpdateSpy = jest.spyOn(executionRepository, 'update'); + // @ts-expect-error Private method + const runHooksSpy = jest.spyOn(executionRecoveryService, 'runHooks'); + + const workflow = await createWorkflow(OOM_WORKFLOW); + + const execution = await createExecution( + { + status: 'running', + data: stringify(IN_PROGRESS_EXECUTION_DATA), + }, + workflow, + ); + + const messages = setupMessages(execution.id, workflow.name); + + /** + * Act + */ + + await executionRecoveryService.recover(execution.id, messages); + + /** + * Assert + */ + + expect(amendSpy).toHaveBeenCalledTimes(1); + expect(amendSpy).toHaveBeenCalledWith(execution.id, messages); + expect(dbUpdateSpy).toHaveBeenCalledTimes(1); + expect(runHooksSpy).toHaveBeenCalledTimes(1); + expect(push.once).toHaveBeenCalledTimes(1); + }); + + test('should amend a truncated execution where last node did not finish', async () => { + /** + * Arrange + */ + + const workflow = await createWorkflow(OOM_WORKFLOW); + + const execution = await createExecution( + { + status: 'running', + data: stringify(IN_PROGRESS_EXECUTION_DATA), + }, + workflow, + ); + + const messages = setupMessages(execution.id, workflow.name); + + /** + * Act + */ + + const amendedExecution = await executionRecoveryService.recover(execution.id, messages); + + /** + * Assert + */ + + const startOfLastNodeRun = messages + .find((m) => m.eventName === 'n8n.node.started' && m.payload.nodeName === 'DebugHelper') + ?.ts.toJSDate(); + + expect(amendedExecution).toEqual( + expect.objectContaining({ + status: 'crashed', + stoppedAt: startOfLastNodeRun, + }), + ); + + const resultData = amendedExecution?.data.resultData; + + if (!resultData) fail('Expected `resultData` to be defined'); + + expect(resultData.error).toBeInstanceOf(WorkflowCrashedError); + expect(resultData.lastNodeExecuted).toBe('DebugHelper'); + + const runData = resultData.runData; + + if (!runData) fail('Expected `runData` to be defined'); + + const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0); + const debugHelperTaskData = runData.DebugHelper.at(0); + + expect(manualTriggerTaskData?.executionStatus).toBe('success'); + expect(manualTriggerTaskData?.error).toBeUndefined(); + expect(manualTriggerTaskData?.startTime).not.toBe(ARTIFICIAL_TASK_DATA); + + expect(debugHelperTaskData?.executionStatus).toBe('crashed'); + expect(debugHelperTaskData?.error).toBeInstanceOf(NodeCrashedError); + }); + + test('should amend a truncated execution where last node finished', async () => { + /** + * Arrange + */ + + const workflow = await createWorkflow(OOM_WORKFLOW); + + const execution = await createExecution( + { + status: 'running', + data: stringify(IN_PROGRESS_EXECUTION_DATA), + }, + workflow, + ); + + const messages = setupMessages(execution.id, workflow.name); + messages.push( + new EventMessageNode({ + eventName: 'n8n.node.finished', + payload: { + executionId: execution.id, + workflowName: workflow.name, + nodeName: 'DebugHelper', + nodeType: 'n8n-nodes-base.debugHelper', + }, + }), + ); + + /** + * Act + */ + + const amendedExecution = await executionRecoveryService.recover(execution.id, messages); + + /** + * Assert + */ + + const endOfLastNoderun = messages + .find((m) => m.eventName === 'n8n.node.finished' && m.payload.nodeName === 'DebugHelper') + ?.ts.toJSDate(); + + expect(amendedExecution).toEqual( + expect.objectContaining({ + status: 'crashed', + stoppedAt: endOfLastNoderun, + }), + ); + + const resultData = amendedExecution?.data.resultData; + + if (!resultData) fail('Expected `resultData` to be defined'); + + expect(resultData.error).toBeUndefined(); + expect(resultData.lastNodeExecuted).toBe('DebugHelper'); + + const runData = resultData.runData; + + if (!runData) fail('Expected `runData` to be defined'); + + const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0); + const debugHelperTaskData = runData.DebugHelper.at(0); + + expect(manualTriggerTaskData?.executionStatus).toBe('success'); + expect(manualTriggerTaskData?.error).toBeUndefined(); + + expect(debugHelperTaskData?.executionStatus).toBe('success'); + expect(debugHelperTaskData?.error).toBeUndefined(); + expect(debugHelperTaskData?.data).toEqual(ARTIFICIAL_TASK_DATA); + }); + + test('should return `null` if no messages', async () => { + /** + * Arrange + */ + const workflow = await createWorkflow(OOM_WORKFLOW); + const execution = await createExecution( + { + status: 'running', + data: stringify(IN_PROGRESS_EXECUTION_DATA), + }, + workflow, + ); + const noMessages: EventMessage[] = []; + + /** + * Act + */ + + const amendedExecution = await executionRecoveryService.recover(execution.id, noMessages); + + /** + * Assert + */ + + expect(amendedExecution).toBeNull(); + }); + + test('should return `null` if no execution', async () => { + /** + * Arrange + */ + const inexistentExecutionId = randomInteger(100).toString(); + const messages = setupMessages(inexistentExecutionId, 'Some workflow'); + + /** + * Act + */ + + const amendedExecution = await executionRecoveryService.recover( + inexistentExecutionId, + messages, + ); + + /** + * Assert + */ + + expect(amendedExecution).toBeNull(); + }); + }); +}); diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts new file mode 100644 index 00000000000000..1a57030561bb37 --- /dev/null +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -0,0 +1,189 @@ +import Container, { Service } from 'typedi'; +import { Push } from '@/push'; +import { sleep } from 'n8n-workflow'; +import { ExecutionRepository } from '@db/repositories/execution.repository'; +import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle +import { InternalHooks } from '@/InternalHooks'; // @TODO: Dependency cycle if injected +import type { DateTime } from 'luxon'; +import type { IRun, ITaskData } from 'n8n-workflow'; +import type { EventMessageTypes } from '../eventbus/EventMessageClasses'; +import type { IExecutionResponse } from '@/Interfaces'; +import { NodeCrashedError } from '@/errors/node-crashed.error'; +import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; +import { ARTIFICIAL_TASK_DATA } from '@/constants'; + +/** + * Service for recovering executions truncated by an instance crash. + */ +@Service() +export class ExecutionRecoveryService { + constructor( + private readonly push: Push, + private readonly executionRepository: ExecutionRepository, + ) {} + + /** + * "Recovery" means (1) amending key properties of a truncated execution, + * (2) running post-execution hooks, and (3) returning the amended execution + * so the UI can reflect the error. "Recovery" does **not** mean injecting + * execution data from the logs (they hold none), or resuming the execution + * from the point of truncation, or re-running the whole execution. + * + * Recovery is only possible if event logs are available in the container. + * In regular mode, logs should but might not be available, e.g. due to container + * being recycled, max log size causing rotation, etc. In queue mode, as workers + * log to their own filesystems, only manual exections can be recovered. + */ + async recover(executionId: string, messages: EventMessageTypes[]) { + if (messages.length === 0) return null; + + const amendedExecution = await this.amend(executionId, messages); + + if (!amendedExecution) return null; + + await this.executionRepository.updateExistingExecution(executionId, amendedExecution); + + await this.runHooks(amendedExecution); + + this.push.once('editorUiConnected', async () => { + await sleep(1000); + this.push.broadcast('executionRecovered', { executionId }); + }); + + return amendedExecution; + } + + /** + * Amend `status`, `stoppedAt`, and `data` of an execution using event log messages. + */ + private async amend(executionId: string, messages: EventMessageTypes[]) { + const { nodeMessages, workflowMessages } = this.toRelevantMessages(messages); + + if (nodeMessages.length === 0) return null; + + const execution = await this.executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); + + if (!execution) return null; + + const runExecutionData = execution.data ?? { resultData: { runData: {} } }; + + let lastNodeRunTimestamp: DateTime | undefined; + + for (const node of execution.workflowData.nodes) { + const nodeStartedMessage = nodeMessages.find( + (m) => m.payload.nodeName === node.name && m.eventName === 'n8n.node.started', + ); + + if (!nodeStartedMessage) continue; + + const nodeFinishedMessage = nodeMessages.find( + (m) => m.payload.nodeName === node.name && m.eventName === 'n8n.node.finished', + ); + + const taskData: ITaskData = { + startTime: nodeStartedMessage.ts.toUnixInteger(), + executionTime: -1, + source: [null], + }; + + if (nodeFinishedMessage) { + taskData.executionStatus = 'success'; + taskData.data ??= ARTIFICIAL_TASK_DATA; + taskData.executionTime = nodeFinishedMessage.ts.diff(nodeStartedMessage.ts).toMillis(); + lastNodeRunTimestamp = nodeFinishedMessage.ts; + } else { + taskData.executionStatus = 'crashed'; + taskData.error = new NodeCrashedError(node); + taskData.executionTime = 0; + runExecutionData.resultData.error = new WorkflowCrashedError(); + lastNodeRunTimestamp = nodeStartedMessage.ts; + } + + runExecutionData.resultData.lastNodeExecuted = node.name; + runExecutionData.resultData.runData[node.name] = [taskData]; + } + + return { + ...execution, + status: execution.status === 'error' ? 'error' : 'crashed', + stoppedAt: this.toStoppedAt(lastNodeRunTimestamp, workflowMessages), + data: runExecutionData, + } as IExecutionResponse; + } + + // ---------------------------------- + // private + // ---------------------------------- + + private toRelevantMessages(messages: EventMessageTypes[]) { + return messages.reduce<{ + nodeMessages: EventMessageTypes[]; + workflowMessages: EventMessageTypes[]; + }>( + (acc, cur) => { + if (cur.eventName.startsWith('n8n.node.')) { + acc.nodeMessages.push(cur); + } else if (cur.eventName.startsWith('n8n.workflow.')) { + acc.workflowMessages.push(cur); + } + + return acc; + }, + { nodeMessages: [], workflowMessages: [] }, + ); + } + + private toStoppedAt(timestamp: DateTime | undefined, messages: EventMessageTypes[]) { + if (timestamp) return timestamp.toJSDate(); + + const WORKFLOW_END_EVENTS = new Set([ + 'n8n.workflow.success', + 'n8n.workflow.crashed', + 'n8n.workflow.failed', + ]); + + return ( + messages.find((m) => WORKFLOW_END_EVENTS.has(m.eventName)) ?? + messages.find((m) => m.eventName === 'n8n.workflow.started') + )?.ts.toJSDate(); + } + + private async runHooks(execution: IExecutionResponse) { + await Container.get(InternalHooks).onWorkflowPostExecute(execution.id, execution.workflowData, { + data: execution.data, + finished: false, + mode: execution.mode, + waitTill: execution.waitTill, + startedAt: execution.startedAt, + stoppedAt: execution.stoppedAt, + status: execution.status, + }); + + const externalHooks = getWorkflowHooksMain( + { + userId: '', + workflowData: execution.workflowData, + executionMode: execution.mode, + executionData: execution.data, + runData: execution.data.resultData.runData, + retryOf: execution.retryOf, + }, + execution.id, + ); + + const run: IRun = { + data: execution.data, + finished: false, + mode: execution.mode, + waitTill: execution.waitTill ?? undefined, + startedAt: execution.startedAt, + stoppedAt: execution.stoppedAt, + status: execution.status, + }; + + await externalHooks.executeHookFunctions('workflowExecuteAfter', [run]); + } +} diff --git a/packages/cli/test/integration/eventbus.ee.test.ts b/packages/cli/test/integration/eventbus.ee.test.ts index 0c8627ac48a5c8..b19f024f520808 100644 --- a/packages/cli/test/integration/eventbus.ee.test.ts +++ b/packages/cli/test/integration/eventbus.ee.test.ts @@ -22,7 +22,7 @@ import type { MessageEventBusDestinationWebhook } from '@/eventbus/MessageEventB import type { MessageEventBusDestinationSentry } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee'; import { EventMessageAudit } from '@/eventbus/EventMessageClasses/EventMessageAudit'; import type { EventNamesTypes } from '@/eventbus/EventMessageClasses'; -import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; +import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; import * as utils from './shared/utils'; import { createUser } from './shared/db/users'; @@ -80,7 +80,7 @@ async function confirmIdSent(id: string) { expect(sent.find((msg) => msg.id === id)).toBeTruthy(); } -mockInstance(ExecutionDataRecoveryService); +mockInstance(ExecutionRecoveryService); const testServer = utils.setupTestServer({ endpointGroups: ['eventBus'], enabledFeatures: ['feat:logStreaming'], diff --git a/packages/cli/test/integration/eventbus.test.ts b/packages/cli/test/integration/eventbus.test.ts index 1acb8ffa366677..e4e58f96fa3305 100644 --- a/packages/cli/test/integration/eventbus.test.ts +++ b/packages/cli/test/integration/eventbus.test.ts @@ -1,6 +1,6 @@ import type { User } from '@db/entities/User'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; -import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; +import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; import * as utils from './shared/utils/'; import { createUser } from './shared/db/users'; @@ -17,7 +17,7 @@ let owner: User; let authOwnerAgent: SuperAgentTest; mockInstance(MessageEventBus); -mockInstance(ExecutionDataRecoveryService); +mockInstance(ExecutionRecoveryService); const testServer = utils.setupTestServer({ endpointGroups: ['eventBus'], enabledFeatures: [], // do not enable logstreaming diff --git a/packages/cli/test/integration/metrics.test.ts b/packages/cli/test/integration/metrics.test.ts index 394ec026c4dd5d..0e62ddbae08326 100644 --- a/packages/cli/test/integration/metrics.test.ts +++ b/packages/cli/test/integration/metrics.test.ts @@ -5,12 +5,12 @@ import request from 'supertest'; import config from '@/config'; import { N8N_VERSION } from '@/constants'; import { MetricsService } from '@/services/metrics.service'; -import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; +import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; import { setupTestServer } from './shared/utils'; import { mockInstance } from '../shared/mocking'; -mockInstance(ExecutionDataRecoveryService); +mockInstance(ExecutionRecoveryService); jest.unmock('@/eventbus/MessageEventBus/MessageEventBus'); config.set('endpoints.metrics.enable', true); config.set('endpoints.metrics.includeDefaultMetrics', false);