From d21395a0871706c7cd74d2bf1921eec4687eb8e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 7 Jun 2024 10:50:47 +0200 Subject: [PATCH 1/3] refactor(core): Remove event bus channel (no-changelog) --- packages/cli/src/commands/worker.ts | 2 +- .../eventbus/MessageEventBus/MessageEventBus.ts | 11 ----------- .../main/orchestration.handler.main.service.ts | 7 +------ .../worker/orchestration.worker.service.ts | 6 ------ .../cli/src/services/redis/RedisServiceHelper.ts | 1 - .../services/redis/RedisServicePubSubPublisher.ts | 11 +---------- .../services/redis/RedisServicePubSubSubscriber.ts | 14 +------------- .../test/integration/commands/worker.cmd.test.ts | 11 +++++------ 8 files changed, 9 insertions(+), 54 deletions(-) diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 7182e328502bc..46d7d32315b42 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -273,7 +273,7 @@ export class Worker extends BaseCommand { await this.initOrchestration(); this.logger.debug('Orchestration init complete'); - await Container.get(OrchestrationWorkerService).publishToEventLog( + await Container.get(MessageEventBus).send( new EventMessageGeneric({ eventName: 'n8n.worker.started', payload: { diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 6bc7d7b6a7d9c..cb28dd50fc5a6 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -247,17 +247,6 @@ export class MessageEventBus extends EventEmitter { return result; } - async handleRedisEventBusMessage(messageString: string) { - const eventData = jsonParse(messageString); - if (eventData) { - const eventMessage = getEventMessageObjectByType(eventData); - if (eventMessage) { - await this.send(eventMessage); - } - } - return eventData; - } - private async trySendingUnsent(msgs?: EventMessageTypes[]) { const unsentMessages = msgs ?? (await this.getEventsUnsent()); if (unsentMessages.length > 0) { diff --git a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts index 4a57d140d72b6..9206f9db15ab3 100644 --- a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts +++ b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts @@ -1,12 +1,10 @@ -import Container, { Service } from 'typedi'; +import { Service } from 'typedi'; import { COMMAND_REDIS_CHANNEL, - EVENT_BUS_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL, } from '../../redis/RedisServiceHelper'; import { handleWorkerResponseMessageMain } from './handleWorkerResponseMessageMain'; import { handleCommandMessageMain } from './handleCommandMessageMain'; -import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; @Service() @@ -16,7 +14,6 @@ export class OrchestrationHandlerMainService extends OrchestrationHandlerService await this.redisSubscriber.subscribeToCommandChannel(); await this.redisSubscriber.subscribeToWorkerResponseChannel(); - await this.redisSubscriber.subscribeToEventLog(); this.redisSubscriber.addMessageHandler( 'OrchestrationMessageReceiver', @@ -25,8 +22,6 @@ export class OrchestrationHandlerMainService extends OrchestrationHandlerService await handleWorkerResponseMessageMain(messageString); } else if (channel === COMMAND_REDIS_CHANNEL) { await handleCommandMessageMain(messageString); - } else if (channel === EVENT_BUS_REDIS_CHANNEL) { - await Container.get(MessageEventBus).handleRedisEventBusMessage(messageString); } }, ); diff --git a/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts b/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts index 9a00d312b21f2..fc5bb931ea09b 100644 --- a/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts +++ b/packages/cli/src/services/orchestration/worker/orchestration.worker.service.ts @@ -1,5 +1,4 @@ import { Service } from 'typedi'; -import type { AbstractEventMessage } from '@/eventbus/EventMessageClasses/AbstractEventMessage'; import { OrchestrationService } from '../../orchestration.service'; import config from '@/config'; @@ -12,9 +11,4 @@ export class OrchestrationWorkerService extends OrchestrationService { config.get('generic.instanceType') === 'worker' ); } - - async publishToEventLog(message: AbstractEventMessage) { - if (!this.sanityCheck()) return; - await this.redisPublisher.publishToEventLog(message); - } } diff --git a/packages/cli/src/services/redis/RedisServiceHelper.ts b/packages/cli/src/services/redis/RedisServiceHelper.ts index 257a826b76729..32a72fb22f977 100644 --- a/packages/cli/src/services/redis/RedisServiceHelper.ts +++ b/packages/cli/src/services/redis/RedisServiceHelper.ts @@ -8,7 +8,6 @@ import { Logger } from '@/Logger'; export const EVENT_BUS_REDIS_STREAM = 'n8n:eventstream'; export const COMMAND_REDIS_STREAM = 'n8n:commandstream'; export const WORKER_RESPONSE_REDIS_STREAM = 'n8n:workerstream'; -export const EVENT_BUS_REDIS_CHANNEL = 'n8n.events'; export const COMMAND_REDIS_CHANNEL = 'n8n.commands'; export const WORKER_RESPONSE_REDIS_CHANNEL = 'n8n.worker-response'; export const WORKER_RESPONSE_REDIS_LIST = 'n8n:list:worker-response'; diff --git a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts index fe080d8e0f674..b029b6546acc2 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubPublisher.ts @@ -1,10 +1,5 @@ import { Service } from 'typedi'; -import type { AbstractEventMessage } from '@/eventbus/EventMessageClasses/AbstractEventMessage'; -import { - COMMAND_REDIS_CHANNEL, - EVENT_BUS_REDIS_CHANNEL, - WORKER_RESPONSE_REDIS_CHANNEL, -} from './RedisServiceHelper'; +import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './RedisServiceHelper'; import type { RedisServiceCommandObject, RedisServiceWorkerResponseObject, @@ -24,10 +19,6 @@ export class RedisServicePubSubPublisher extends RedisServiceBaseSender { await this.redisClient?.publish(channel, message); } - async publishToEventLog(message: AbstractEventMessage): Promise { - await this.publish(EVENT_BUS_REDIS_CHANNEL, message.toString()); - } - async publishToCommandChannel( message: Omit, ): Promise { diff --git a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts index 79371b6c12be3..a3474c314956a 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts @@ -1,9 +1,5 @@ import { Service } from 'typedi'; -import { - COMMAND_REDIS_CHANNEL, - EVENT_BUS_REDIS_CHANNEL, - WORKER_RESPONSE_REDIS_CHANNEL, -} from './RedisServiceHelper'; +import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './RedisServiceHelper'; import { RedisServiceBaseReceiver } from './RedisServiceBaseClasses'; @Service() @@ -44,10 +40,6 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { }); } - async subscribeToEventLog(): Promise { - await this.subscribe(EVENT_BUS_REDIS_CHANNEL); - } - async subscribeToCommandChannel(): Promise { await this.subscribe(COMMAND_REDIS_CHANNEL); } @@ -56,10 +48,6 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { await this.subscribe(WORKER_RESPONSE_REDIS_CHANNEL); } - async unSubscribeFromEventLog(): Promise { - await this.unsubscribe(EVENT_BUS_REDIS_CHANNEL); - } - async unSubscribeFromCommandChannel(): Promise { await this.unsubscribe(COMMAND_REDIS_CHANNEL); } diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts index 8f26b6a683308..da2a1fd5b255e 100644 --- a/packages/cli/test/integration/commands/worker.cmd.test.ts +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -15,7 +15,6 @@ import { InternalHooks } from '@/InternalHooks'; import { PostHogClient } from '@/posthog'; import { RedisService } from '@/services/redis.service'; import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service'; -import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service'; import { OrchestrationService } from '@/services/orchestration.service'; import * as testDb from '../shared/testDb'; @@ -23,6 +22,8 @@ import { mockInstance } from '../../shared/mocking'; const oclifConfig = new Config({ root: __dirname }); +let eventBus: MessageEventBus; + beforeAll(async () => { config.set('executions.mode', 'queue'); config.set('binaryDataManager.availableModes', 'filesystem'); @@ -32,7 +33,7 @@ beforeAll(async () => { mockInstance(CacheService); mockInstance(ExternalSecretsManager); mockInstance(BinaryDataService); - mockInstance(MessageEventBus); + eventBus = mockInstance(MessageEventBus); mockInstance(LoadNodesAndCredentials); mockInstance(CredentialTypes); mockInstance(NodeTypes); @@ -58,9 +59,7 @@ test('worker initializes all its components', async () => { jest.spyOn(worker, 'initExternalSecrets').mockImplementation(async () => {}); jest.spyOn(worker, 'initEventBus').mockImplementation(async () => {}); jest.spyOn(worker, 'initOrchestration'); - jest - .spyOn(OrchestrationWorkerService.prototype, 'publishToEventLog') - .mockImplementation(async () => {}); + // jest.spyOn(MessageEventBus.prototype, 'send').mockImplementation(async () => {}); jest .spyOn(OrchestrationHandlerWorkerService.prototype, 'initSubscriber') .mockImplementation(async () => {}); @@ -79,7 +78,7 @@ test('worker initializes all its components', async () => { expect(worker.initEventBus).toHaveBeenCalledTimes(1); expect(worker.initOrchestration).toHaveBeenCalledTimes(1); expect(OrchestrationHandlerWorkerService.prototype.initSubscriber).toHaveBeenCalledTimes(1); - expect(OrchestrationWorkerService.prototype.publishToEventLog).toHaveBeenCalledTimes(1); + expect(eventBus.send).toHaveBeenCalledTimes(1); expect(worker.initQueue).toHaveBeenCalledTimes(1); jest.restoreAllMocks(); From 5583e162d8933c3f16b64b8cf257d9638372ccf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 7 Jun 2024 11:03:35 +0200 Subject: [PATCH 2/3] Remove unused imports --- packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index cb28dd50fc5a6..2e4b62f25cb7a 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -3,7 +3,6 @@ import type { DeleteResult } from '@n8n/typeorm'; import { In } from '@n8n/typeorm'; import EventEmitter from 'events'; import uniqby from 'lodash/uniqBy'; -import { jsonParse } from 'n8n-workflow'; import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; import config from '@/config'; @@ -30,9 +29,6 @@ import { eventMessageGenericDestinationTestEvent, } from '../EventMessageClasses/EventMessageGeneric'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; -import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; -import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; -import { ExecutionDataRecoveryService } from '../executionDataRecovery.service'; import { EventMessageAiNode, type EventMessageAiNodeOptions, From b6ed7d24a04da806b6a6914e5658109a8f8aac59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 7 Jun 2024 11:08:09 +0200 Subject: [PATCH 3/3] Fix over-deletion --- packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 2e4b62f25cb7a..7d14e72fec10a 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -33,6 +33,7 @@ import { EventMessageAiNode, type EventMessageAiNodeOptions, } from '../EventMessageClasses/EventMessageAiNode'; +import { ExecutionDataRecoveryService } from '../executionDataRecovery.service'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';