From 687d9418936f3ec6fa1422dd4f25aa2902c01644 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 4 Jun 2024 11:48:04 +0200 Subject: [PATCH 1/3] refactor(core): Revamp crash recovery mechanism for main mode --- jest.config.js | 7 +- packages/cli/src/constants.ts | 11 + packages/cli/src/errors/node-crashed.error.ts | 12 + .../cli/src/errors/workflow-crashed.error.ts | 7 + .../MessageEventBus/MessageEventBus.ts | 7 +- .../eventbus/executionDataRecovery.service.ts | 213 --------- .../execution-recovery.service.test.ts | 421 ++++++++++++++++++ .../executions/execution-recovery.service.ts | 190 ++++++++ .../cli/test/integration/eventbus.ee.test.ts | 4 +- .../cli/test/integration/eventbus.test.ts | 4 +- packages/cli/test/integration/metrics.test.ts | 4 +- packages/cli/tsconfig.build.json | 2 +- packages/cli/tsconfig.json | 4 +- 13 files changed, 660 insertions(+), 226 deletions(-) create mode 100644 packages/cli/src/errors/node-crashed.error.ts create mode 100644 packages/cli/src/errors/workflow-crashed.error.ts delete mode 100644 packages/cli/src/eventbus/executionDataRecovery.service.ts create mode 100644 packages/cli/src/executions/__tests__/execution-recovery.service.test.ts create mode 100644 packages/cli/src/executions/execution-recovery.service.ts diff --git a/jest.config.js b/jest.config.js index f3f7824c14253..1423038c54664 100644 --- a/jest.config.js +++ b/jest.config.js @@ -24,9 +24,14 @@ const config = { // This resolve the path mappings from the tsconfig relative to each jest.config.js moduleNameMapper: Object.entries(paths || {}).reduce((acc, [path, [mapping]]) => { path = `^${path.replace(/\/\*$/, '/(.*)$')}`; - mapping = mapping.replace(/^\.\/(?:(.*)\/)?\*$/, '$1'); + mapping = mapping.replace(/^\.?\.\/(?:(.*)\/)?\*$/, '$1'); mapping = mapping ? `/${mapping}` : ''; acc[path] = '' + (baseUrl ? `/${baseUrl.replace(/^\.\//, '')}` : '') + mapping + '/$1'; + + acc[path] = mapping.startsWith('/test') + ? '' + mapping + '/$1' + : '' + (baseUrl ? `/${baseUrl.replace(/^\.\//, '')}` : '') + mapping + '/$1'; + return acc; }, {}), setupFilesAfterEnv: ['jest-expect-message'], diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index aac5ac9f19560..7c1e7f69c511c 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -156,3 +156,14 @@ export const GENERIC_OAUTH2_CREDENTIALS_WITH_EDITABLE_SCOPE = [ 'microsoftOAuth2Api', 'highLevelOAuth2Api', ]; + +export const ARTIFICIAL_TASK_DATA = { + main: [ + [ + { + json: { isArtificialRecoveredEventItem: true }, + pairedItem: undefined, + }, + ], + ], +}; diff --git a/packages/cli/src/errors/node-crashed.error.ts b/packages/cli/src/errors/node-crashed.error.ts new file mode 100644 index 0000000000000..24433ff979e6d --- /dev/null +++ b/packages/cli/src/errors/node-crashed.error.ts @@ -0,0 +1,12 @@ +import type { INode } from 'n8n-workflow'; +import { NodeOperationError } from 'n8n-workflow'; + +export class NodeCrashedError extends NodeOperationError { + constructor(node: INode) { + super(node, 'Node crashed, possible out-of-memory issue', { + message: 'Execution stopped at this node', + description: + "n8n may have run out of memory while running this execution. More context and tips on how to avoid this in the docs", + }); + } +} diff --git a/packages/cli/src/errors/workflow-crashed.error.ts b/packages/cli/src/errors/workflow-crashed.error.ts new file mode 100644 index 0000000000000..122dd70ef286b --- /dev/null +++ b/packages/cli/src/errors/workflow-crashed.error.ts @@ -0,0 +1,7 @@ +import { WorkflowOperationError } from 'n8n-workflow'; + +export class WorkflowCrashedError extends WorkflowOperationError { + constructor() { + super('Workflow did not finish, possible out-of-memory issue'); + } +} diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 6bc7d7b6a7d9c..141719c68d69e 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -32,7 +32,7 @@ import { import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; -import { ExecutionDataRecoveryService } from '../executionDataRecovery.service'; +import { ExecutionRecoveryService } from '../../executions/execution-recovery.service'; import { EventMessageAiNode, type EventMessageAiNodeOptions, @@ -68,7 +68,7 @@ export class MessageEventBus extends EventEmitter { private readonly eventDestinationsRepository: EventDestinationsRepository, private readonly workflowRepository: WorkflowRepository, private readonly orchestrationService: OrchestrationService, - private readonly recoveryService: ExecutionDataRecoveryService, + private readonly recoveryService: ExecutionRecoveryService, ) { super(); } @@ -185,10 +185,9 @@ export class MessageEventBus extends EventEmitter { ); await this.executionRepository.markAsCrashed([executionId]); } else { - await this.recoveryService.recoverExecutionData( + await this.recoveryService.recover( executionId, unsentAndUnfinished.unfinishedExecutions[executionId], - true, ); } } diff --git a/packages/cli/src/eventbus/executionDataRecovery.service.ts b/packages/cli/src/eventbus/executionDataRecovery.service.ts deleted file mode 100644 index 3ddee5edc71e4..0000000000000 --- a/packages/cli/src/eventbus/executionDataRecovery.service.ts +++ /dev/null @@ -1,213 +0,0 @@ -import { Container, Service } from 'typedi'; -import type { DateTime } from 'luxon'; -import { Push } from '@/push'; -import { InternalHooks } from '@/InternalHooks'; -import type { IRun, IRunExecutionData, ITaskData } from 'n8n-workflow'; -import { NodeOperationError, WorkflowOperationError, sleep } from 'n8n-workflow'; - -import { ExecutionRepository } from '@db/repositories/execution.repository'; -import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; -import type { EventMessageTypes, EventNamesTypes } from './EventMessageClasses'; - -@Service() -export class ExecutionDataRecoveryService { - constructor( - private readonly push: Push, - private readonly executionRepository: ExecutionRepository, - ) {} - - // eslint-disable-next-line complexity - async recoverExecutionData( - executionId: string, - messages: EventMessageTypes[], - applyToDb: boolean, - ): Promise { - const executionEntry = await this.executionRepository.findSingleExecution(executionId, { - includeData: true, - unflattenData: true, - }); - - if (executionEntry && messages) { - let executionData = executionEntry.data; - let workflowError: WorkflowOperationError | undefined; - if (!executionData) { - executionData = { resultData: { runData: {} } }; - } - let nodeNames: string[] = []; - if ( - executionData?.resultData?.runData && - Object.keys(executionData.resultData.runData).length > 0 - ) { - } else { - if (!executionData.resultData) { - executionData.resultData = { - runData: {}, - }; - } else { - if (!executionData.resultData.runData) { - executionData.resultData.runData = {}; - } - } - } - nodeNames = executionEntry.workflowData.nodes.map((n) => n.name); - - let lastNodeRunTimestamp: DateTime | undefined = undefined; - - for (const nodeName of nodeNames) { - const nodeByName = executionEntry?.workflowData.nodes.find((n) => n.name === nodeName); - - if (!nodeByName) continue; - - const nodeStartedMessage = messages.find( - (message) => - message.eventName === 'n8n.node.started' && message.payload.nodeName === nodeName, - ); - const nodeFinishedMessage = messages.find( - (message) => - message.eventName === 'n8n.node.finished' && message.payload.nodeName === nodeName, - ); - - const executionTime = - nodeStartedMessage && nodeFinishedMessage - ? nodeFinishedMessage.ts.diff(nodeStartedMessage.ts).toMillis() - : 0; - - let taskData: ITaskData; - if (executionData.resultData.runData[nodeName]?.length > 0) { - taskData = executionData.resultData.runData[nodeName][0]; - } else { - taskData = { - startTime: nodeStartedMessage ? nodeStartedMessage.ts.toUnixInteger() : 0, - executionTime, - source: [null], - executionStatus: 'unknown', - }; - } - - if (nodeStartedMessage && !nodeFinishedMessage) { - const nodeError = new NodeOperationError( - nodeByName, - 'Node crashed, possible out-of-memory issue', - { - message: 'Execution stopped at this node', - description: - "n8n may have run out of memory while executing it. More context and tips on how to avoid this in the docs", - }, - ); - workflowError = new WorkflowOperationError( - 'Workflow did not finish, possible out-of-memory issue', - ); - taskData.error = nodeError; - taskData.executionStatus = 'crashed'; - executionData.resultData.lastNodeExecuted = nodeName; - if (nodeStartedMessage) lastNodeRunTimestamp = nodeStartedMessage.ts; - } else if (nodeStartedMessage && nodeFinishedMessage) { - taskData.executionStatus = 'success'; - if (taskData.data === undefined) { - taskData.data = { - main: [ - [ - { - json: { - isArtificialRecoveredEventItem: true, - }, - pairedItem: undefined, - }, - ], - ], - }; - } - } - - if (!executionData.resultData.runData[nodeName]) { - executionData.resultData.runData[nodeName] = [taskData]; - } - } - - if (!lastNodeRunTimestamp) { - const workflowEndedMessage = messages.find((message) => - ( - [ - 'n8n.workflow.success', - 'n8n.workflow.crashed', - 'n8n.workflow.failed', - ] as EventNamesTypes[] - ).includes(message.eventName), - ); - if (workflowEndedMessage) { - lastNodeRunTimestamp = workflowEndedMessage.ts; - } else { - if (!workflowError) { - workflowError = new WorkflowOperationError( - 'Workflow did not finish, possible out-of-memory issue', - ); - } - const workflowStartedMessage = messages.find( - (message) => message.eventName === 'n8n.workflow.started', - ); - if (workflowStartedMessage) { - lastNodeRunTimestamp = workflowStartedMessage.ts; - } - } - } - - if (!executionData.resultData.error && workflowError) { - executionData.resultData.error = workflowError; - } - - if (applyToDb) { - const newStatus = executionEntry.status === 'error' ? 'error' : 'crashed'; - await this.executionRepository.updateExistingExecution(executionId, { - data: executionData, - status: newStatus, - stoppedAt: lastNodeRunTimestamp?.toJSDate(), - }); - await Container.get(InternalHooks).onWorkflowPostExecute( - executionId, - executionEntry.workflowData, - { - data: executionData, - finished: false, - mode: executionEntry.mode, - waitTill: executionEntry.waitTill ?? undefined, - startedAt: executionEntry.startedAt, - stoppedAt: lastNodeRunTimestamp?.toJSDate(), - status: newStatus, - }, - ); - const iRunData: IRun = { - data: executionData, - finished: false, - mode: executionEntry.mode, - waitTill: executionEntry.waitTill ?? undefined, - startedAt: executionEntry.startedAt, - stoppedAt: lastNodeRunTimestamp?.toJSDate(), - status: newStatus, - }; - const workflowHooks = getWorkflowHooksMain( - { - userId: '', - workflowData: executionEntry.workflowData, - executionMode: executionEntry.mode, - executionData, - runData: executionData.resultData.runData, - retryOf: executionEntry.retryOf, - }, - executionId, - ); - - // execute workflowExecuteAfter hook to trigger error workflow - await workflowHooks.executeHookFunctions('workflowExecuteAfter', [iRunData]); - - // wait for UI to be back up and send the execution data - this.push.once('editorUiConnected', async () => { - // add a small timeout to make sure the UI is back up - await sleep(1000); - this.push.broadcast('executionRecovered', { executionId }); - }); - } - return executionData; - } - return; - } -} diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts new file mode 100644 index 0000000000000..bd17851969eaf --- /dev/null +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -0,0 +1,421 @@ +import Container from 'typedi'; +import { stringify } from 'flatted'; + +import { mockInstance } from '@test/mocking'; +import { randomInteger } from '@test-integration/random'; +import { createWorkflow } from '@test-integration/db/workflows'; +import { createExecution } from '@test-integration/db/executions'; +import * as testDb from '@test-integration/testDb'; + +import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; +import { ExecutionRepository } from '@/databases/repositories/execution.repository'; + +import { InternalHooks } from '@/InternalHooks'; +import { Push } from '@/push'; +import { ARTIFICIAL_TASK_DATA } from '@/constants'; +import { NodeCrashedError } from '@/errors/node-crashed.error'; +import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; +import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNode'; +import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; +import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses'; +import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity'; + +/** + * Workflow producing an execution whose data will be truncated by an instance crash. + */ +export const OOM_WORKFLOW: Partial = { + nodes: [ + { + parameters: {}, + id: '48ce17fe-9651-42ae-910c-48602a00f0bb', + name: 'When clicking "Test workflow"', + type: 'n8n-nodes-base.manualTrigger', + typeVersion: 1, + position: [640, 260], + }, + { + parameters: { + category: 'oom', + memorySizeValue: 1000, + }, + id: '07a48151-96d3-45eb-961c-1daf85fbe052', + name: 'DebugHelper', + type: 'n8n-nodes-base.debugHelper', + typeVersion: 1, + position: [840, 260], + }, + ], + connections: { + 'When clicking "Test workflow"': { + main: [ + [ + { + node: 'DebugHelper', + type: 'main', + index: 0, + }, + ], + ], + }, + }, + pinData: {}, +}; + +/** + * Snapshot of an execution that will be truncated by an instance crash. + */ +export const IN_PROGRESS_EXECUTION_DATA = { + startData: {}, + resultData: { + runData: { + 'When clicking "Test workflow"': [ + { + hints: [], + startTime: 1716138610153, + executionTime: 1, + source: [], + executionStatus: 'success', + data: { + main: [ + [ + { + json: {}, + pairedItem: { + item: 0, + }, + }, + ], + ], + }, + }, + ], + }, + lastNodeExecuted: 'When clicking "Test workflow"', + }, + executionData: { + contextData: {}, + nodeExecutionStack: [ + { + node: { + parameters: { + category: 'oom', + memorySizeValue: 1000, + }, + id: '07a48151-96d3-45eb-961c-1daf85fbe052', + name: 'DebugHelper', + type: 'n8n-nodes-base.debugHelper', + typeVersion: 1, + position: [840, 260], + }, + data: { + main: [ + [ + { + json: {}, + pairedItem: { + item: 0, + }, + }, + ], + ], + }, + source: { + main: [ + { + previousNode: 'When clicking "Test workflow"', + }, + ], + }, + }, + ], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: {}, + }, +}; + +export const setupMessages = (executionId: string, workflowName: string): EventMessage[] => { + return [ + new EventMessageWorkflow({ + eventName: 'n8n.workflow.started', + payload: { executionId }, + }), + new EventMessageNode({ + eventName: 'n8n.node.started', + payload: { + executionId, + workflowName, + nodeName: 'When clicking "Test workflow"', + nodeType: 'n8n-nodes-base.manualTrigger', + }, + }), + new EventMessageNode({ + eventName: 'n8n.node.finished', + payload: { + executionId, + workflowName, + nodeName: 'When clicking "Test workflow"', + nodeType: 'n8n-nodes-base.manualTrigger', + }, + }), + new EventMessageNode({ + eventName: 'n8n.node.started', + payload: { + executionId, + workflowName, + nodeName: 'DebugHelper', + nodeType: 'n8n-nodes-base.debugHelper', + }, + }), + ]; +}; + +describe('ExecutionRecoveryService', () => { + let executionRecoveryService: ExecutionRecoveryService; + let push: Push; + + beforeAll(async () => { + await testDb.init(); + + mockInstance(InternalHooks); + push = mockInstance(Push); + + executionRecoveryService = new ExecutionRecoveryService( + push, + Container.get(ExecutionRepository), + ); + }); + + afterEach(async () => { + await testDb.truncate(['Execution', 'ExecutionData', 'Workflow']); + }); + + afterAll(async () => { + await testDb.terminate(); + }); + + describe('recover', () => { + it('should amend, persist, run hooks, broadcast', async () => { + /** + * Arrange + */ + const amendSpy = jest.spyOn(executionRecoveryService, 'amend'); + const executionRepository = Container.get(ExecutionRepository); + const dbUpdateSpy = jest.spyOn(executionRepository, 'update'); + // @ts-expect-error Private method + const runHooksSpy = jest.spyOn(executionRecoveryService, 'runHooks'); + + const workflow = await createWorkflow(OOM_WORKFLOW); + + const execution = await createExecution( + { + status: 'running', + data: stringify(IN_PROGRESS_EXECUTION_DATA), + }, + workflow, + ); + + const messages = setupMessages(execution.id, workflow.name); + + /** + * Act + */ + + await executionRecoveryService.recover(execution.id, messages); + + /** + * Assert + */ + + expect(amendSpy).toHaveBeenCalledTimes(1); + expect(amendSpy).toHaveBeenCalledWith(execution.id, messages); + expect(dbUpdateSpy).toHaveBeenCalledTimes(1); + expect(runHooksSpy).toHaveBeenCalledTimes(1); + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(push.once).toHaveBeenCalledTimes(1); + }); + }); + + describe('amend', () => { + test('should amend a truncated execution where last node did not finish', async () => { + /** + * Arrange + */ + + const workflow = await createWorkflow(OOM_WORKFLOW); + + const execution = await createExecution( + { + status: 'running', + data: stringify(IN_PROGRESS_EXECUTION_DATA), + }, + workflow, + ); + + const messages = setupMessages(execution.id, workflow.name); + + /** + * Act + */ + + const amendedExecution = await executionRecoveryService.amend(execution.id, messages); + + /** + * Assert + */ + + const startOfLastNodeRun = messages + .find((m) => m.eventName === 'n8n.node.started' && m.payload.nodeName === 'DebugHelper') + ?.ts.toJSDate(); + + expect(amendedExecution).toEqual( + expect.objectContaining({ + status: 'crashed', + stoppedAt: startOfLastNodeRun, + }), + ); + + const resultData = amendedExecution?.data.resultData; + + if (!resultData) fail('Expected `resultData` to be defined'); + + expect(resultData.error).toBeInstanceOf(WorkflowCrashedError); + expect(resultData.lastNodeExecuted).toBe('DebugHelper'); + + const runData = resultData.runData; + + if (!runData) fail('Expected `runData` to be defined'); + + const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0); + const debugHelperTaskData = runData.DebugHelper.at(0); + + expect(manualTriggerTaskData?.executionStatus).toBe('success'); + expect(manualTriggerTaskData?.error).toBeUndefined(); + expect(manualTriggerTaskData?.startTime).not.toBe(ARTIFICIAL_TASK_DATA); + + expect(debugHelperTaskData?.executionStatus).toBe('crashed'); + expect(debugHelperTaskData?.error).toBeInstanceOf(NodeCrashedError); + }); + + test('should amend a truncated execution where last node finished', async () => { + /** + * Arrange + */ + + const workflow = await createWorkflow(OOM_WORKFLOW); + + const execution = await createExecution( + { + status: 'running', + data: stringify(IN_PROGRESS_EXECUTION_DATA), + }, + workflow, + ); + + const messages = setupMessages(execution.id, workflow.name); + messages.push( + new EventMessageNode({ + eventName: 'n8n.node.finished', + payload: { + executionId: execution.id, + workflowName: workflow.name, + nodeName: 'DebugHelper', + nodeType: 'n8n-nodes-base.debugHelper', + }, + }), + ); + + /** + * Act + */ + + const amendedExecution = await executionRecoveryService.amend(execution.id, messages); + + /** + * Assert + */ + + const endOfLastNoderun = messages + .find((m) => m.eventName === 'n8n.node.finished' && m.payload.nodeName === 'DebugHelper') + ?.ts.toJSDate(); + + expect(amendedExecution).toEqual( + expect.objectContaining({ + status: 'crashed', + stoppedAt: endOfLastNoderun, + }), + ); + + const resultData = amendedExecution?.data.resultData; + + if (!resultData) fail('Expected `resultData` to be defined'); + + expect(resultData.error).toBeUndefined(); + expect(resultData.lastNodeExecuted).toBe('DebugHelper'); + + const runData = resultData.runData; + + if (!runData) fail('Expected `runData` to be defined'); + + const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0); + const debugHelperTaskData = runData.DebugHelper.at(0); + + expect(manualTriggerTaskData?.executionStatus).toBe('success'); + expect(manualTriggerTaskData?.error).toBeUndefined(); + + expect(debugHelperTaskData?.executionStatus).toBe('success'); + expect(debugHelperTaskData?.error).toBeUndefined(); + expect(debugHelperTaskData?.data).toEqual(ARTIFICIAL_TASK_DATA); + }); + + test('should return `null` if no messages', async () => { + /** + * Arrange + */ + const workflow = await createWorkflow(OOM_WORKFLOW); + const execution = await createExecution( + { + status: 'running', + data: stringify(IN_PROGRESS_EXECUTION_DATA), + }, + workflow, + ); + const noMessages: EventMessage[] = []; + + /** + * Act + */ + + const amendedExecution = await executionRecoveryService.amend(execution.id, noMessages); + + /** + * Assert + */ + + expect(amendedExecution).toBeNull(); + }); + + test('should return `null` if no execution', async () => { + /** + * Arrange + */ + const inexistentExecutionId = randomInteger(100).toString(); + const messages = setupMessages(inexistentExecutionId, 'Some workflow'); + + /** + * Act + */ + + const amendedExecution = await executionRecoveryService.amend( + inexistentExecutionId, + messages, + ); + + /** + * Assert + */ + + expect(amendedExecution).toBeNull(); + }); + }); +}); diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts new file mode 100644 index 0000000000000..bd60bae2f05bb --- /dev/null +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -0,0 +1,190 @@ +import Container, { Service } from 'typedi'; +import { Push } from '@/push'; +import { sleep } from 'n8n-workflow'; +import { ExecutionRepository } from '@db/repositories/execution.repository'; +import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle +import { InternalHooks } from '@/InternalHooks'; // @TODO: Dependency cycle if injected +import type { DateTime } from 'luxon'; +import type { IRun, ITaskData } from 'n8n-workflow'; +import type { EventMessageTypes } from '../eventbus/EventMessageClasses'; +import type { IExecutionResponse } from '@/Interfaces'; +import { NodeCrashedError } from '@/errors/node-crashed.error'; +import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; +import { ARTIFICIAL_TASK_DATA } from '@/constants'; + +/** + * Service for recovering truncated executions using event log messages. + * + * "Recovery" means (1) amending and persisting a few key details of an + * execution truncated by an instance crash, (2) running post-execution hooks, + * and (3) surfacing this amended execution to the client. "Recovery" does _not_ + * mean injecting actual execution data from the logs, or resuming from the + * point of truncation, or re-running the execution. + * + * Recovery is only possible if execution logs are available in the container. + * In main mode, if the container was not recycled, logs should be available. + * In queue mode, workers handle production executions and write logs to each of + * the workers' filesystems, whereas the main process handles manual executions + * and writes logs to its own filesystem, so in queue mode execution recovery + * is only ever possible for manual executions. + */ +@Service() +export class ExecutionRecoveryService { + constructor( + private readonly push: Push, + private readonly executionRepository: ExecutionRepository, + ) {} + + async recover(executionId: string, messages: EventMessageTypes[]) { + if (messages.length === 0) return; + + const amendedExecution = await this.amend(executionId, messages); + + if (!amendedExecution) return null; + + await this.executionRepository.updateExistingExecution(executionId, amendedExecution); + + await this.runHooks(amendedExecution); + + this.push.once('editorUiConnected', async () => { + await sleep(1000); + this.push.broadcast('executionRecovered', { executionId }); + }); + + return amendedExecution; + } + + /** + * Amend `status`, `stoppedAt`, and `data` of an execution using event log messages. + */ + async amend(executionId: string, messages: EventMessageTypes[]) { + const { nodeMessages, workflowMessages } = this.toRelevantMessages(messages); + + if (nodeMessages.length === 0) return null; + + const execution = await this.executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); + + if (!execution) return null; + + const runExecutionData = execution.data ?? { resultData: { runData: {} } }; + + let lastNodeRunTimestamp: DateTime | undefined; + + for (const node of execution.workflowData.nodes) { + const nodeStartedMessage = nodeMessages.find( + (m) => m.payload.nodeName === node.name && m.eventName === 'n8n.node.started', + ); + + if (!nodeStartedMessage) continue; + + const nodeFinishedMessage = nodeMessages.find( + (m) => m.payload.nodeName === node.name && m.eventName === 'n8n.node.finished', + ); + + const taskData: ITaskData = { + startTime: nodeStartedMessage.ts.toUnixInteger(), + executionTime: -1, + source: [null], + }; + + if (nodeFinishedMessage) { + taskData.executionStatus = 'success'; + taskData.data ??= ARTIFICIAL_TASK_DATA; + taskData.executionTime = nodeFinishedMessage.ts.diff(nodeStartedMessage.ts).toMillis(); + lastNodeRunTimestamp = nodeFinishedMessage.ts; + } else { + taskData.executionStatus = 'crashed'; + taskData.error = new NodeCrashedError(node); + taskData.executionTime = 0; + runExecutionData.resultData.error = new WorkflowCrashedError(); + lastNodeRunTimestamp = nodeStartedMessage.ts; + } + + runExecutionData.resultData.lastNodeExecuted = node.name; + runExecutionData.resultData.runData[node.name] = [taskData]; + } + + return { + ...execution, + status: execution.status === 'error' ? 'error' : 'crashed', + stoppedAt: this.toStoppedAt(lastNodeRunTimestamp, workflowMessages), + data: runExecutionData, + } as IExecutionResponse; + } + + /** + * Private + */ + + private toRelevantMessages(messages: EventMessageTypes[]) { + return messages.reduce<{ + nodeMessages: EventMessageTypes[]; + workflowMessages: EventMessageTypes[]; + }>( + (acc, cur) => { + if (cur.eventName.startsWith('n8n.node.')) { + acc.nodeMessages.push(cur); + } else if (cur.eventName.startsWith('n8n.workflow.')) { + acc.workflowMessages.push(cur); + } + + return acc; + }, + { nodeMessages: [], workflowMessages: [] }, + ); + } + + private toStoppedAt(timestamp: DateTime | undefined, messages: EventMessageTypes[]) { + if (timestamp) return timestamp.toJSDate(); + + const WORKFLOW_END_EVENTS = new Set([ + 'n8n.workflow.success', + 'n8n.workflow.crashed', + 'n8n.workflow.failed', + ]); + + return ( + messages.find((m) => WORKFLOW_END_EVENTS.has(m.eventName)) ?? + messages.find((m) => m.eventName === 'n8n.workflow.started') + )?.ts.toJSDate(); + } + + private async runHooks(execution: IExecutionResponse) { + await Container.get(InternalHooks).onWorkflowPostExecute(execution.id, execution.workflowData, { + data: execution.data, + finished: false, + mode: execution.mode, + waitTill: execution.waitTill, + startedAt: execution.startedAt, + stoppedAt: execution.stoppedAt, + status: execution.status, + }); + + const externalHooks = getWorkflowHooksMain( + { + userId: '', + workflowData: execution.workflowData, + executionMode: execution.mode, + executionData: execution.data, + runData: execution.data.resultData.runData, + retryOf: execution.retryOf, + }, + execution.id, + ); + + const run: IRun = { + data: execution.data, + finished: false, + mode: execution.mode, + waitTill: execution.waitTill ?? undefined, + startedAt: execution.startedAt, + stoppedAt: execution.stoppedAt, + status: execution.status, + }; + + await externalHooks.executeHookFunctions('workflowExecuteAfter', [run]); + } +} diff --git a/packages/cli/test/integration/eventbus.ee.test.ts b/packages/cli/test/integration/eventbus.ee.test.ts index 0c8627ac48a5c..b19f024f52080 100644 --- a/packages/cli/test/integration/eventbus.ee.test.ts +++ b/packages/cli/test/integration/eventbus.ee.test.ts @@ -22,7 +22,7 @@ import type { MessageEventBusDestinationWebhook } from '@/eventbus/MessageEventB import type { MessageEventBusDestinationSentry } from '@/eventbus/MessageEventBusDestination/MessageEventBusDestinationSentry.ee'; import { EventMessageAudit } from '@/eventbus/EventMessageClasses/EventMessageAudit'; import type { EventNamesTypes } from '@/eventbus/EventMessageClasses'; -import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; +import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; import * as utils from './shared/utils'; import { createUser } from './shared/db/users'; @@ -80,7 +80,7 @@ async function confirmIdSent(id: string) { expect(sent.find((msg) => msg.id === id)).toBeTruthy(); } -mockInstance(ExecutionDataRecoveryService); +mockInstance(ExecutionRecoveryService); const testServer = utils.setupTestServer({ endpointGroups: ['eventBus'], enabledFeatures: ['feat:logStreaming'], diff --git a/packages/cli/test/integration/eventbus.test.ts b/packages/cli/test/integration/eventbus.test.ts index 1acb8ffa36667..e4e58f96fa330 100644 --- a/packages/cli/test/integration/eventbus.test.ts +++ b/packages/cli/test/integration/eventbus.test.ts @@ -1,6 +1,6 @@ import type { User } from '@db/entities/User'; import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; -import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; +import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; import * as utils from './shared/utils/'; import { createUser } from './shared/db/users'; @@ -17,7 +17,7 @@ let owner: User; let authOwnerAgent: SuperAgentTest; mockInstance(MessageEventBus); -mockInstance(ExecutionDataRecoveryService); +mockInstance(ExecutionRecoveryService); const testServer = utils.setupTestServer({ endpointGroups: ['eventBus'], enabledFeatures: [], // do not enable logstreaming diff --git a/packages/cli/test/integration/metrics.test.ts b/packages/cli/test/integration/metrics.test.ts index 394ec026c4dd5..0e62ddbae0832 100644 --- a/packages/cli/test/integration/metrics.test.ts +++ b/packages/cli/test/integration/metrics.test.ts @@ -5,12 +5,12 @@ import request from 'supertest'; import config from '@/config'; import { N8N_VERSION } from '@/constants'; import { MetricsService } from '@/services/metrics.service'; -import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service'; +import { ExecutionRecoveryService } from '@/executions/execution-recovery.service'; import { setupTestServer } from './shared/utils'; import { mockInstance } from '../shared/mocking'; -mockInstance(ExecutionDataRecoveryService); +mockInstance(ExecutionRecoveryService); jest.unmock('@/eventbus/MessageEventBus/MessageEventBus'); config.set('endpoints.metrics.enable', true); config.set('endpoints.metrics.includeDefaultMetrics', false); diff --git a/packages/cli/tsconfig.build.json b/packages/cli/tsconfig.build.json index 1e8a2ff7fa476..55afe8b40986a 100644 --- a/packages/cli/tsconfig.build.json +++ b/packages/cli/tsconfig.build.json @@ -6,5 +6,5 @@ "tsBuildInfoFile": "dist/build.tsbuildinfo" }, "include": ["src/**/*.ts"], - "exclude": ["test/**"] + "exclude": ["test/**", "src/**/__tests__/**"] } diff --git a/packages/cli/tsconfig.json b/packages/cli/tsconfig.json index 86b8d550e813a..8c4325a55bb2a 100644 --- a/packages/cli/tsconfig.json +++ b/packages/cli/tsconfig.json @@ -8,7 +8,9 @@ "baseUrl": "src", "paths": { "@/*": ["./*"], - "@db/*": ["./databases/*"] + "@db/*": ["./databases/*"], + "@test/*": ["../test/shared/*"], + "@test-integration/*": ["../test/integration/shared/*"] }, "tsBuildInfoFile": "dist/typecheck.tsbuildinfo", // TODO: remove all options below this line From 9816048cb80a29ceb404fa6ee55eb1eb23e4caba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 4 Jun 2024 12:03:53 +0200 Subject: [PATCH 2/3] Minor cleanup --- .../execution-recovery.service.test.ts | 4 +-- .../executions/execution-recovery.service.ts | 32 +++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts index bd17851969eaf..c062c52787500 100644 --- a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -19,6 +19,7 @@ import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNod import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses'; import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity'; +import { NodeConnectionType } from 'n8n-workflow'; /** * Workflow producing an execution whose data will be truncated by an instance crash. @@ -51,7 +52,7 @@ export const OOM_WORKFLOW: Partial = { [ { node: 'DebugHelper', - type: 'main', + type: NodeConnectionType.Main, index: 0, }, ], @@ -231,7 +232,6 @@ describe('ExecutionRecoveryService', () => { expect(amendSpy).toHaveBeenCalledWith(execution.id, messages); expect(dbUpdateSpy).toHaveBeenCalledTimes(1); expect(runHooksSpy).toHaveBeenCalledTimes(1); - // eslint-disable-next-line @typescript-eslint/unbound-method expect(push.once).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index bd60bae2f05bb..28398a956cbeb 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -13,20 +13,8 @@ import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; import { ARTIFICIAL_TASK_DATA } from '@/constants'; /** - * Service for recovering truncated executions using event log messages. + * Service for recovering executions truncated by an instance crash. * - * "Recovery" means (1) amending and persisting a few key details of an - * execution truncated by an instance crash, (2) running post-execution hooks, - * and (3) surfacing this amended execution to the client. "Recovery" does _not_ - * mean injecting actual execution data from the logs, or resuming from the - * point of truncation, or re-running the execution. - * - * Recovery is only possible if execution logs are available in the container. - * In main mode, if the container was not recycled, logs should be available. - * In queue mode, workers handle production executions and write logs to each of - * the workers' filesystems, whereas the main process handles manual executions - * and writes logs to its own filesystem, so in queue mode execution recovery - * is only ever possible for manual executions. */ @Service() export class ExecutionRecoveryService { @@ -35,6 +23,18 @@ export class ExecutionRecoveryService { private readonly executionRepository: ExecutionRepository, ) {} + /** + * "Recovery" means (1) amending key properties of a truncated execution, + * (2) running post-execution hooks, and (3) returning the amended execution + * so the UI can reflect the error. "Recovery" does **not** mean injecting + * execution data from the logs (they hold none), or resuming the execution + * from the point of truncation, or re-running the whole execution. + * + * Recovery is only possible if event logs are available in the container. + * In regular mode, logs should but might not be available, e.g. due to container + * being recycled, max log size causing rotation, etc. In queue mode, as workers + * log to their own filesystems, only manual exections can be recovered. + */ async recover(executionId: string, messages: EventMessageTypes[]) { if (messages.length === 0) return; @@ -115,9 +115,9 @@ export class ExecutionRecoveryService { } as IExecutionResponse; } - /** - * Private - */ + // ---------------------------------- + // private + // ---------------------------------- private toRelevantMessages(messages: EventMessageTypes[]) { return messages.reduce<{ From 6ec9919039158cc655853c6819d1e8bc3381e6fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 4 Jun 2024 12:09:50 +0200 Subject: [PATCH 3/3] Make `amend` private --- .../__tests__/execution-recovery.service.test.ts | 11 +++++------ .../cli/src/executions/execution-recovery.service.ts | 5 ++--- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts index c062c52787500..c9d75ee6a3243 100644 --- a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -200,6 +200,7 @@ describe('ExecutionRecoveryService', () => { /** * Arrange */ + // @ts-expect-error Private method const amendSpy = jest.spyOn(executionRecoveryService, 'amend'); const executionRepository = Container.get(ExecutionRepository); const dbUpdateSpy = jest.spyOn(executionRepository, 'update'); @@ -234,9 +235,7 @@ describe('ExecutionRecoveryService', () => { expect(runHooksSpy).toHaveBeenCalledTimes(1); expect(push.once).toHaveBeenCalledTimes(1); }); - }); - describe('amend', () => { test('should amend a truncated execution where last node did not finish', async () => { /** * Arrange @@ -258,7 +257,7 @@ describe('ExecutionRecoveryService', () => { * Act */ - const amendedExecution = await executionRecoveryService.amend(execution.id, messages); + const amendedExecution = await executionRecoveryService.recover(execution.id, messages); /** * Assert @@ -329,7 +328,7 @@ describe('ExecutionRecoveryService', () => { * Act */ - const amendedExecution = await executionRecoveryService.amend(execution.id, messages); + const amendedExecution = await executionRecoveryService.recover(execution.id, messages); /** * Assert @@ -386,7 +385,7 @@ describe('ExecutionRecoveryService', () => { * Act */ - const amendedExecution = await executionRecoveryService.amend(execution.id, noMessages); + const amendedExecution = await executionRecoveryService.recover(execution.id, noMessages); /** * Assert @@ -406,7 +405,7 @@ describe('ExecutionRecoveryService', () => { * Act */ - const amendedExecution = await executionRecoveryService.amend( + const amendedExecution = await executionRecoveryService.recover( inexistentExecutionId, messages, ); diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index 28398a956cbeb..1a57030561bb3 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -14,7 +14,6 @@ import { ARTIFICIAL_TASK_DATA } from '@/constants'; /** * Service for recovering executions truncated by an instance crash. - * */ @Service() export class ExecutionRecoveryService { @@ -36,7 +35,7 @@ export class ExecutionRecoveryService { * log to their own filesystems, only manual exections can be recovered. */ async recover(executionId: string, messages: EventMessageTypes[]) { - if (messages.length === 0) return; + if (messages.length === 0) return null; const amendedExecution = await this.amend(executionId, messages); @@ -57,7 +56,7 @@ export class ExecutionRecoveryService { /** * Amend `status`, `stoppedAt`, and `data` of an execution using event log messages. */ - async amend(executionId: string, messages: EventMessageTypes[]) { + private async amend(executionId: string, messages: EventMessageTypes[]) { const { nodeMessages, workflowMessages } = this.toRelevantMessages(messages); if (nodeMessages.length === 0) return null;