From 0ae1e8ee655d056c9795431082ba8a420de42a20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 17 Jun 2024 18:19:40 +0200 Subject: [PATCH] fix(core): Ensure followers do not recover executions from logs https://linear.app/n8n/issue/PAY-1682 --- .../execution-recovery.service.test.ts | 41 +++++++++++++++++-- .../executions/execution-recovery.service.ts | 4 ++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts index e40f4e78acb2e..6508a7812911c 100644 --- a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -20,6 +20,8 @@ import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessag import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses'; import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity'; import { NodeConnectionType } from 'n8n-workflow'; +import { OrchestrationService } from '@/services/orchestration.service'; +import config from '@/config'; /** * Workflow producing an execution whose data will be truncated by an instance crash. @@ -175,6 +177,7 @@ describe('ExecutionRecoveryService', () => { let executionRecoveryService: ExecutionRecoveryService; let push: Push; let executionRepository: ExecutionRepository; + let orchestrationService: OrchestrationService; beforeAll(async () => { await testDb.init(); @@ -182,7 +185,17 @@ describe('ExecutionRecoveryService', () => { mockInstance(InternalHooks); push = mockInstance(Push); executionRepository = Container.get(ExecutionRepository); - executionRecoveryService = new ExecutionRecoveryService(push, executionRepository); + orchestrationService = Container.get(OrchestrationService); + + executionRecoveryService = new ExecutionRecoveryService( + push, + executionRepository, + orchestrationService, + ); + }); + + beforeEach(() => { + config.set('multiMainSetup.instanceType', 'leader'); }); afterEach(async () => { @@ -194,7 +207,29 @@ describe('ExecutionRecoveryService', () => { }); describe('recoverFromLogs', () => { - describe('if no messages', () => { + describe('if follower', () => { + test('should do nothing', async () => { + /** + * Arrange + */ + config.set('multiMainSetup.instanceType', 'follower'); + // @ts-expect-error Private method + const amendSpy = jest.spyOn(executionRecoveryService, 'amend'); + const messages = setupMessages('123', 'Some workflow'); + + /** + * Act + */ + await executionRecoveryService.recoverFromLogs('123', messages); + + /** + * Assert + */ + expect(amendSpy).not.toHaveBeenCalled(); + }); + }); + + describe('if leader, with 0 messages', () => { test('should return `null` if no execution found', async () => { /** * Arrange @@ -244,7 +279,7 @@ describe('ExecutionRecoveryService', () => { }); }); - describe('if messages', () => { + describe('if leader, with 1+ messages', () => { test('should return `null` if no execution found', async () => { /** * Arrange diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index 4f6123403ad9b..935f33e8d871d 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -11,6 +11,7 @@ import type { IExecutionResponse } from '@/Interfaces'; import { NodeCrashedError } from '@/errors/node-crashed.error'; import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; import { ARTIFICIAL_TASK_DATA } from '@/constants'; +import { OrchestrationService } from '@/services/orchestration.service'; /** * Service for recovering key properties in executions. @@ -20,12 +21,15 @@ export class ExecutionRecoveryService { constructor( private readonly push: Push, private readonly executionRepository: ExecutionRepository, + private readonly orchestrationService: OrchestrationService, ) {} /** * Recover key properties of a truncated execution using event logs. */ async recoverFromLogs(executionId: string, messages: EventMessageTypes[]) { + if (this.orchestrationService.isFollower) return; + const amendedExecution = await this.amend(executionId, messages); if (!amendedExecution) return null;