From 4ce85c135eca49aba9abf9b96e1a8233178ec0b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Thu, 21 Sep 2023 17:24:17 +0200 Subject: [PATCH] revert(core): Add command to trigger license refresh on workers (#7184) This reverts commit 9f797b96d818a5ae74ad82917347c99f3c249688. --- packages/cli/package.json | 2 +- packages/cli/src/License.ts | 40 +------ packages/cli/src/Server.ts | 4 +- packages/cli/src/commands/BaseCommand.ts | 8 +- packages/cli/src/commands/start.ts | 2 +- packages/cli/src/commands/webhook.ts | 2 +- packages/cli/src/commands/worker.ts | 4 +- .../cli/src/controllers/e2e.controller.ts | 2 +- .../controllers/orchestration.controller.ts | 5 +- .../MessageEventBus/MessageEventBus.ts | 104 ++---------------- .../cli/src/services/orchestration.service.ts | 87 +++++++++++++-- .../orchestration/handleCommandMessage.ts | 29 ----- .../handleWorkerResponseMessage.ts | 11 -- .../cli/src/services/orchestration/helpers.ts | 17 --- .../services/redis/RedisServiceBaseClasses.ts | 1 - .../services/redis/RedisServiceCommands.ts | 9 +- .../redis/RedisServicePubSubSubscriber.ts | 25 ----- .../cli/src/worker/workerCommandHandler.ts | 10 +- .../cli/test/integration/eventbus.ee.test.ts | 4 +- packages/cli/test/unit/License.test.ts | 19 ---- .../services/orchestration.service.test.ts | 42 +++---- pnpm-lock.yaml | 8 +- 22 files changed, 139 insertions(+), 296 deletions(-) delete mode 100644 packages/cli/src/services/orchestration/handleCommandMessage.ts delete mode 100644 packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts delete mode 100644 packages/cli/src/services/orchestration/helpers.ts diff --git a/packages/cli/package.json b/packages/cli/package.json index d87035ec7b899..b9dba97afd660 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -100,7 +100,7 @@ }, "dependencies": { "@n8n/client-oauth2": "workspace:*", - "@n8n_io/license-sdk": "~2.6.0", + "@n8n_io/license-sdk": "~2.5.1", "@oclif/command": "^1.8.16", "@oclif/core": "^1.16.4", "@oclif/errors": "^1.3.6", diff --git a/packages/cli/src/License.ts b/packages/cli/src/License.ts index bf384d0247329..18f4c16fd07b9 100644 --- a/packages/cli/src/License.ts +++ b/packages/cli/src/License.ts @@ -11,10 +11,8 @@ import { SETTINGS_LICENSE_CERT_KEY, UNLIMITED_LICENSE_QUOTA, } from './constants'; -import Container, { Service } from 'typedi'; -import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './Interfaces'; -import type { RedisServicePubSubPublisher } from './services/redis/RedisServicePubSubPublisher'; -import { RedisService } from './services/redis.service'; +import { Service } from 'typedi'; +import type { BooleanLicenseFeature, NumericLicenseFeature } from './Interfaces'; type FeatureReturnType = Partial< { @@ -28,28 +26,18 @@ export class License { private manager: LicenseManager | undefined; - instanceId: string | undefined; - - private redisPublisher: RedisServicePubSubPublisher; - constructor() { this.logger = getLogger(); } - async init(instanceId: string, instanceType: N8nInstanceType = 'main') { + async init(instanceId: string) { if (this.manager) { return; } - this.instanceId = instanceId; - const isMainInstance = instanceType === 'main'; const server = config.getEnv('license.serverUrl'); - const autoRenewEnabled = isMainInstance && config.getEnv('license.autoRenewEnabled'); - const offlineMode = !isMainInstance; + const autoRenewEnabled = config.getEnv('license.autoRenewEnabled'); const autoRenewOffset = config.getEnv('license.autoRenewOffset'); - const saveCertStr = isMainInstance - ? async (value: TLicenseBlock) => this.saveCertStr(value) - : async () => {}; try { this.manager = new LicenseManager({ @@ -59,10 +47,9 @@ export class License { autoRenewEnabled, renewOnInit: autoRenewEnabled, autoRenewOffset, - offlineMode, logger: this.logger, loadCertStr: async () => this.loadCertStr(), - saveCertStr, + saveCertStr: async (value: TLicenseBlock) => this.saveCertStr(value), deviceFingerprint: () => instanceId, }); @@ -100,15 +87,6 @@ export class License { }, ['key'], ); - if (config.getEnv('executions.mode') === 'queue') { - if (!this.redisPublisher) { - this.logger.debug('Initializing Redis publisher for License Service'); - this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); - } - await this.redisPublisher.publishToCommandChannel({ - command: 'reloadLicense', - }); - } } async activate(activationKey: string): Promise { @@ -119,14 +97,6 @@ export class License { await this.manager.activate(activationKey); } - async reload(): Promise { - if (!this.manager) { - return; - } - this.logger.debug('Reloading license'); - await this.manager.reload(); - } - async renew() { if (!this.manager) { return; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index b17b2d8bb9b9f..a4446b2f7bd4a 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -1472,9 +1472,7 @@ export class Server extends AbstractServer { // ---------------------------------------- if (!eventBus.isInitialized) { - await eventBus.initialize({ - uniqueInstanceId: this.uniqueInstanceId, - }); + await eventBus.initialize(); } if (this.endpointPresetCredentials !== '') { diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 039660843a674..fbb75de3c936d 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -16,7 +16,7 @@ import { initErrorHandling } from '@/ErrorReporting'; import { ExternalHooks } from '@/ExternalHooks'; import { NodeTypes } from '@/NodeTypes'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; -import type { IExternalHooksClass, N8nInstanceType } from '@/Interfaces'; +import type { IExternalHooksClass } from '@/Interfaces'; import { InternalHooks } from '@/InternalHooks'; import { PostHogClient } from '@/posthog'; import { License } from '@/License'; @@ -115,11 +115,9 @@ export abstract class BaseCommand extends Command { await this.externalHooks.init(); } - async initLicense(instanceType: N8nInstanceType = 'main'): Promise { - config.set('generic.instanceType', instanceType); - + async initLicense(): Promise { const license = Container.get(License); - await license.init(this.instanceId, instanceType); + await license.init(this.instanceId); const activationKey = config.getEnv('license.activationKey'); diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index 0cebdb633ec8e..01c45987b3805 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -200,7 +200,7 @@ export class Start extends BaseCommand { this.logger.info('Initializing n8n process'); this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner); - await this.initLicense('main'); + await this.initLicense(); await this.initBinaryManager(); await this.initExternalHooks(); await this.initExternalSecrets(); diff --git a/packages/cli/src/commands/webhook.ts b/packages/cli/src/commands/webhook.ts index 912f8a22762ce..7d5bf4630232e 100644 --- a/packages/cli/src/commands/webhook.ts +++ b/packages/cli/src/commands/webhook.ts @@ -77,7 +77,7 @@ export class Webhook extends BaseCommand { await this.initCrashJournal(); await super.init(); - await this.initLicense('webhook'); + await this.initLicense(); await this.initBinaryManager(); await this.initExternalHooks(); await this.initExternalSecrets(); diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index bffaea3e4a0a7..30cf4be73e5c1 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -256,7 +256,7 @@ export class Worker extends BaseCommand { this.logger.debug(`Worker ID: ${this.uniqueInstanceId}`); this.logger.debug('Starting n8n worker...'); - await this.initLicense('worker'); + await this.initLicense(); await this.initBinaryManager(); await this.initExternalHooks(); await this.initExternalSecrets(); @@ -268,7 +268,6 @@ export class Worker extends BaseCommand { async initEventBus() { await eventBus.initialize({ workerId: this.uniqueInstanceId, - uniqueInstanceId: this.uniqueInstanceId, }); } @@ -296,7 +295,6 @@ export class Worker extends BaseCommand { // eslint-disable-next-line @typescript-eslint/no-unsafe-argument getWorkerCommandReceivedHandler({ uniqueInstanceId: this.uniqueInstanceId, - instanceId: this.instanceId, redisPublisher: this.redisPublisher, getRunningJobIds: () => Object.keys(Worker.runningJobs), }), diff --git a/packages/cli/src/controllers/e2e.controller.ts b/packages/cli/src/controllers/e2e.controller.ts index f4e7a744994cb..0957ea95ee773 100644 --- a/packages/cli/src/controllers/e2e.controller.ts +++ b/packages/cli/src/controllers/e2e.controller.ts @@ -109,7 +109,7 @@ export class E2EController { private async resetLogStreaming() { for (const id in eventBus.destinations) { - await eventBus.removeDestination(id, false); + await eventBus.removeDestination(id); } } diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts index 5386b75698635..fd6b68ad4e167 100644 --- a/packages/cli/src/controllers/orchestration.controller.ts +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -1,12 +1,15 @@ +import config from '@/config'; import { Authorized, Get, RestController } from '@/decorators'; import { OrchestrationRequest } from '@/requests'; import { Service } from 'typedi'; -import { OrchestrationService } from '@/services/orchestration.service'; +import { OrchestrationService } from '../services/orchestration.service'; @Authorized(['global', 'owner']) @RestController('/orchestration') @Service() export class OrchestrationController { + private config = config; + constructor(private readonly orchestrationService: OrchestrationService) {} /** diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 5a6738c5f84fb..43059bf15954b 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -1,4 +1,4 @@ -import { LoggerProxy, jsonParse } from 'n8n-workflow'; +import { LoggerProxy } from 'n8n-workflow'; import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; import type { DeleteResult } from 'typeorm'; import type { @@ -27,18 +27,9 @@ import { } from '../EventMessageClasses/EventMessageGeneric'; import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; -import Container, { Service } from 'typedi'; +import Container from 'typedi'; import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; -import { RedisService } from '@/services/redis.service'; -import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; -import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; -import { - COMMAND_REDIS_CHANNEL, - EVENT_BUS_REDIS_CHANNEL, -} from '@/services/redis/RedisServiceHelper'; -import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; -import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; -import { messageToRedisServiceCommandObject } from '@/services/orchestration/helpers'; +import { OrchestrationService } from '../../services/orchestration.service'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -50,21 +41,13 @@ export interface MessageWithCallback { export interface MessageEventBusInitializeOptions { skipRecoveryPass?: boolean; workerId?: string; - uniqueInstanceId?: string; } -@Service() export class MessageEventBus extends EventEmitter { private static instance: MessageEventBus; isInitialized: boolean; - uniqueInstanceId: string; - - redisPublisher: RedisServicePubSubPublisher; - - redisSubscriber: RedisServicePubSubSubscriber; - logWriter: MessageEventBusLogWriter; destinations: { @@ -93,30 +76,11 @@ export class MessageEventBus extends EventEmitter { * * Sets `isInitialized` to `true` once finished. */ - async initialize(options: MessageEventBusInitializeOptions): Promise { + async initialize(options?: MessageEventBusInitializeOptions): Promise { if (this.isInitialized) { return; } - this.uniqueInstanceId = options?.uniqueInstanceId ?? ''; - - if (config.getEnv('executions.mode') === 'queue') { - this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); - this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber(); - await this.redisSubscriber.subscribeToEventLog(); - await this.redisSubscriber.subscribeToCommandChannel(); - this.redisSubscriber.addMessageHandler( - 'MessageEventBusMessageReceiver', - async (channel: string, messageString: string) => { - if (channel === EVENT_BUS_REDIS_CHANNEL) { - await this.handleRedisEventBusMessage(messageString); - } else if (channel === COMMAND_REDIS_CHANNEL) { - await this.handleRedisCommandMessage(messageString); - } - }, - ); - } - LoggerProxy.debug('Initializing event bus...'); const savedEventDestinations = await Db.collections.EventDestinations.find({}); @@ -125,7 +89,7 @@ export class MessageEventBus extends EventEmitter { try { const destination = messageEventBusDestinationFromDb(this, destinationData); if (destination) { - await this.addDestination(destination, false); + await this.addDestination(destination); } } catch (error) { // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access @@ -218,13 +182,10 @@ export class MessageEventBus extends EventEmitter { this.isInitialized = true; } - async addDestination(destination: MessageEventBusDestination, notifyWorkers: boolean = true) { - await this.removeDestination(destination.getId(), false); + async addDestination(destination: MessageEventBusDestination) { + await this.removeDestination(destination.getId()); this.destinations[destination.getId()] = destination; this.destinations[destination.getId()].startListening(); - if (notifyWorkers) { - await this.broadcastRestartEventbusAfterDestinationUpdate(); - } return destination; } @@ -238,62 +199,19 @@ export class MessageEventBus extends EventEmitter { return result.sort((a, b) => (a.__type ?? '').localeCompare(b.__type ?? '')); } - async removeDestination( - id: string, - notifyWorkers: boolean = true, - ): Promise { + async removeDestination(id: string): Promise { let result; if (Object.keys(this.destinations).includes(id)) { await this.destinations[id].close(); result = await this.destinations[id].deleteFromDb(); delete this.destinations[id]; } - if (notifyWorkers) { - await this.broadcastRestartEventbusAfterDestinationUpdate(); - } return result; } - async handleRedisEventBusMessage(messageString: string) { - const eventData = jsonParse(messageString); - if (eventData) { - const eventMessage = getEventMessageObjectByType(eventData); - if (eventMessage) { - await Container.get(MessageEventBus).send(eventMessage); - } - } - return eventData; - } - - async handleRedisCommandMessage(messageString: string) { - const message = messageToRedisServiceCommandObject(messageString); - if (message) { - if ( - message.senderId === this.uniqueInstanceId || - (message.targets && !message.targets.includes(this.uniqueInstanceId)) - ) { - LoggerProxy.debug( - `Skipping command message ${message.command} because it's not for this instance.`, - ); - return message; - } - switch (message.command) { - case 'restartEventBus': - await this.restart(); - default: - break; - } - return message; - } - return; - } - async broadcastRestartEventbusAfterDestinationUpdate() { if (config.getEnv('executions.mode') === 'queue') { - await this.redisPublisher.publishToCommandChannel({ - senderId: this.uniqueInstanceId, - command: 'restartEventBus', - }); + await Container.get(OrchestrationService).restartEventBus(); } } @@ -317,8 +235,6 @@ export class MessageEventBus extends EventEmitter { ); await this.destinations[destinationName].close(); } - await this.redisSubscriber?.unSubscribeFromCommandChannel(); - await this.redisSubscriber?.unSubscribeFromEventLog(); this.isInitialized = false; LoggerProxy.debug('EventBus shut down.'); } @@ -501,4 +417,4 @@ export class MessageEventBus extends EventEmitter { } } -export const eventBus = Container.get(MessageEventBus); +export const eventBus = MessageEventBus.getInstance(); diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index 17fcab79dd4fe..d3f8c7e27eb9f 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -2,9 +2,19 @@ import { Service } from 'typedi'; import { RedisService } from './redis.service'; import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; -import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis/RedisServiceHelper'; -import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage'; -import { handleCommandMessage } from './orchestration/handleCommandMessage'; +import { LoggerProxy, jsonParse } from 'n8n-workflow'; +import { eventBus } from '../eventbus'; +import type { AbstractEventMessageOptions } from '../eventbus/EventMessageClasses/AbstractEventMessageOptions'; +import { getEventMessageObjectByType } from '../eventbus/EventMessageClasses/Helpers'; +import type { + RedisServiceCommandObject, + RedisServiceWorkerResponseObject, +} from './redis/RedisServiceCommands'; +import { + COMMAND_REDIS_CHANNEL, + EVENT_BUS_REDIS_CHANNEL, + WORKER_RESPONSE_REDIS_CHANNEL, +} from './redis/RedisServiceHelper'; @Service() export class OrchestrationService { @@ -41,21 +51,81 @@ export class OrchestrationService { private async initSubscriber() { this.redisSubscriber = await this.redisService.getPubSubSubscriber(); + // TODO: these are all proof of concept implementations for the moment + // until worker communication is implemented + // #region proof of concept + await this.redisSubscriber.subscribeToEventLog(); await this.redisSubscriber.subscribeToWorkerResponseChannel(); await this.redisSubscriber.subscribeToCommandChannel(); this.redisSubscriber.addMessageHandler( 'OrchestrationMessageReceiver', async (channel: string, messageString: string) => { - if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { - await handleWorkerResponseMessage(messageString); + // TODO: this is a proof of concept implementation to forward events to the main instance's event bus + // Events are arriving through a pub/sub channel and are forwarded to the eventBus + // In the future, a stream should probably replace this implementation entirely + if (channel === EVENT_BUS_REDIS_CHANNEL) { + await this.handleEventBusMessage(messageString); + } else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { + await this.handleWorkerResponseMessage(messageString); } else if (channel === COMMAND_REDIS_CHANNEL) { - await handleCommandMessage(messageString, this.uniqueInstanceId); + await this.handleCommandMessage(messageString); } }, ); } + async handleWorkerResponseMessage(messageString: string) { + const workerResponse = jsonParse(messageString); + if (workerResponse) { + // TODO: Handle worker response + LoggerProxy.debug('Received worker response', workerResponse); + } + return workerResponse; + } + + async handleEventBusMessage(messageString: string) { + const eventData = jsonParse(messageString); + if (eventData) { + const eventMessage = getEventMessageObjectByType(eventData); + if (eventMessage) { + await eventBus.send(eventMessage); + } + } + return eventData; + } + + async handleCommandMessage(messageString: string) { + if (!messageString) return; + let message: RedisServiceCommandObject; + try { + message = jsonParse(messageString); + } catch { + LoggerProxy.debug( + `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, + ); + return; + } + if (message) { + if ( + message.senderId === this.uniqueInstanceId || + (message.targets && !message.targets.includes(this.uniqueInstanceId)) + ) { + LoggerProxy.debug( + `Skipping command message ${message.command} because it's not for this instance.`, + ); + return message; + } + switch (message.command) { + case 'restartEventBus': + await eventBus.restart(); + break; + } + return message; + } + return; + } + async getWorkerStatus(id?: string) { if (!this.initialized) { throw new Error('OrchestrationService not initialized'); @@ -89,14 +159,13 @@ export class OrchestrationService { }); } - // reload the license on workers after it was changed on the main instance - async reloadLicense(id?: string) { + async restartEventBus(id?: string) { if (!this.initialized) { throw new Error('OrchestrationService not initialized'); } await this.redisPublisher.publishToCommandChannel({ senderId: this.uniqueInstanceId, - command: 'reloadLicense', + command: 'restartEventBus', targets: id ? [id] : undefined, }); } diff --git a/packages/cli/src/services/orchestration/handleCommandMessage.ts b/packages/cli/src/services/orchestration/handleCommandMessage.ts deleted file mode 100644 index 8a04cb3ba7035..0000000000000 --- a/packages/cli/src/services/orchestration/handleCommandMessage.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { LoggerProxy } from 'n8n-workflow'; -import { messageToRedisServiceCommandObject } from './helpers'; -import Container from 'typedi'; -import { License } from '@/License'; - -// this function handles commands sent to the MAIN instance. the workers handle their own commands -export async function handleCommandMessage(messageString: string, uniqueInstanceId: string) { - const message = messageToRedisServiceCommandObject(messageString); - if (message) { - if ( - message.senderId === uniqueInstanceId || - (message.targets && !message.targets.includes(uniqueInstanceId)) - ) { - LoggerProxy.debug( - `Skipping command message ${message.command} because it's not for this instance.`, - ); - return message; - } - switch (message.command) { - case 'reloadLicense': - await Container.get(License).reload(); - break; - default: - break; - } - return message; - } - return; -} diff --git a/packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts b/packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts deleted file mode 100644 index ee22318638e89..0000000000000 --- a/packages/cli/src/services/orchestration/handleWorkerResponseMessage.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { jsonParse, LoggerProxy } from 'n8n-workflow'; -import type { RedisServiceWorkerResponseObject } from '../redis/RedisServiceCommands'; - -export async function handleWorkerResponseMessage(messageString: string) { - const workerResponse = jsonParse(messageString); - if (workerResponse) { - // TODO: Handle worker response - LoggerProxy.debug('Received worker response', workerResponse); - } - return workerResponse; -} diff --git a/packages/cli/src/services/orchestration/helpers.ts b/packages/cli/src/services/orchestration/helpers.ts deleted file mode 100644 index 6996391f40db0..0000000000000 --- a/packages/cli/src/services/orchestration/helpers.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { LoggerProxy, jsonParse } from 'n8n-workflow'; -import type { RedisServiceCommandObject } from '../redis/RedisServiceCommands'; -import { COMMAND_REDIS_CHANNEL } from '../redis/RedisServiceHelper'; - -export function messageToRedisServiceCommandObject(messageString: string) { - if (!messageString) return; - let message: RedisServiceCommandObject; - try { - message = jsonParse(messageString); - } catch { - LoggerProxy.debug( - `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, - ); - return; - } - return message; -} diff --git a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts index da16aa25e741c..2ed9d94eee913 100644 --- a/packages/cli/src/services/redis/RedisServiceBaseClasses.ts +++ b/packages/cli/src/services/redis/RedisServiceBaseClasses.ts @@ -49,7 +49,6 @@ class RedisServiceBase { return; } await this.redisClient.quit(); - this.isInitialized = false; this.redisClient = undefined; } } diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index a57e190047e0c..5796560d4b1e2 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -1,9 +1,4 @@ -export type RedisServiceCommand = - | 'getStatus' - | 'getId' - | 'restartEventBus' - | 'stopWorker' - | 'reloadLicense'; +export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands /** * An object to be sent via Redis pub/sub from the main process to the workers. @@ -12,7 +7,7 @@ export type RedisServiceCommand = * @field payload: Optional arguments to be sent with the command. */ type RedisServiceBaseCommand = { - senderId?: string; + senderId: string; command: RedisServiceCommand; payload?: { [key: string]: string | number | boolean | string[] | number[] | boolean[]; diff --git a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts index 240fe7e1afd39..404544d6f9e93 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts @@ -32,19 +32,6 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { }); } - async unsubscribe(channel: string): Promise { - if (!this.redisClient) { - return; - } - await this.redisClient?.unsubscribe(channel, (error, _count: number) => { - if (error) { - Logger.error(`Error unsubscribing from channel ${channel}`); - } else { - Logger.debug(`Unsubscribed Redis PubSub client from channel: ${channel}`); - } - }); - } - async subscribeToEventLog(): Promise { await this.subscribe(EVENT_BUS_REDIS_CHANNEL); } @@ -56,16 +43,4 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { async subscribeToWorkerResponseChannel(): Promise { await this.subscribe(WORKER_RESPONSE_REDIS_CHANNEL); } - - async unSubscribeFromEventLog(): Promise { - await this.unsubscribe(EVENT_BUS_REDIS_CHANNEL); - } - - async unSubscribeFromCommandChannel(): Promise { - await this.unsubscribe(COMMAND_REDIS_CHANNEL); - } - - async unSubscribeFromWorkerResponseChannel(): Promise { - await this.unsubscribe(WORKER_RESPONSE_REDIS_CHANNEL); - } } diff --git a/packages/cli/src/worker/workerCommandHandler.ts b/packages/cli/src/worker/workerCommandHandler.ts index acd488624a04c..874ead410c542 100644 --- a/packages/cli/src/worker/workerCommandHandler.ts +++ b/packages/cli/src/worker/workerCommandHandler.ts @@ -1,14 +1,12 @@ import { jsonParse, LoggerProxy } from 'n8n-workflow'; +import { eventBus } from '../eventbus'; import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; -import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; +import type { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; import * as os from 'os'; -import Container from 'typedi'; -import { License } from '@/License'; export function getWorkerCommandReceivedHandler(options: { uniqueInstanceId: string; - instanceId: string; redisPublisher: RedisServicePubSubPublisher; getRunningJobIds: () => string[]; }) { @@ -58,6 +56,7 @@ export function getWorkerCommandReceivedHandler(options: { }); break; case 'restartEventBus': + await eventBus.restart(); await options.redisPublisher.publishToWorkerChannel({ workerId: options.uniqueInstanceId, command: message.command, @@ -66,9 +65,6 @@ export function getWorkerCommandReceivedHandler(options: { }, }); break; - case 'reloadLicense': - await Container.get(License).reload(); - break; case 'stopWorker': // TODO: implement proper shutdown // await this.stopProcess(); diff --git a/packages/cli/test/integration/eventbus.ee.test.ts b/packages/cli/test/integration/eventbus.ee.test.ts index 264c2a5835975..e06e10f6c0a5d 100644 --- a/packages/cli/test/integration/eventbus.ee.test.ts +++ b/packages/cli/test/integration/eventbus.ee.test.ts @@ -91,9 +91,7 @@ beforeAll(async () => { config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); config.set('eventBus.logWriter.keepLogCount', 1); - await eventBus.initialize({ - uniqueInstanceId: 'test', - }); + await eventBus.initialize(); }); afterAll(async () => { diff --git a/packages/cli/test/unit/License.test.ts b/packages/cli/test/unit/License.test.ts index b9f942e365080..624d07438d7d5 100644 --- a/packages/cli/test/unit/License.test.ts +++ b/packages/cli/test/unit/License.test.ts @@ -31,7 +31,6 @@ describe('License', () => { expect(LicenseManager).toHaveBeenCalledWith({ autoRenewEnabled: true, autoRenewOffset: MOCK_RENEW_OFFSET, - offlineMode: false, renewOnInit: true, deviceFingerprint: expect.any(Function), productIdentifier: `n8n-${N8N_VERSION}`, @@ -43,24 +42,6 @@ describe('License', () => { }); }); - test('initializes license manager for worker', async () => { - license = new License(); - await license.init(MOCK_INSTANCE_ID, 'worker'); - expect(LicenseManager).toHaveBeenCalledWith({ - autoRenewEnabled: false, - autoRenewOffset: MOCK_RENEW_OFFSET, - offlineMode: true, - renewOnInit: false, - deviceFingerprint: expect.any(Function), - productIdentifier: `n8n-${N8N_VERSION}`, - logger: expect.anything(), - loadCertStr: expect.any(Function), - saveCertStr: expect.any(Function), - server: MOCK_SERVER_URL, - tenantId: 1, - }); - }); - test('attempts to activate license with provided key', async () => { await license.activate(MOCK_ACTIVATION_KEY); diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index 127aac690d0f2..18204cea2b81c 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -6,11 +6,9 @@ import { OrchestrationService } from '@/services/orchestration.service'; import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; import { eventBus } from '@/eventbus'; +import * as EventHelpers from '@/eventbus/EventMessageClasses/Helpers'; import { RedisService } from '@/services/redis.service'; import { mockInstance } from '../../integration/shared/utils'; -import { handleWorkerResponseMessage } from '../../../src/services/orchestration/handleWorkerResponseMessage'; -import { handleCommandMessage } from '../../../src/services/orchestration/handleCommandMessage'; -import { License } from '../../../src/License'; const os = Container.get(OrchestrationService); @@ -79,7 +77,6 @@ describe('Orchestration Service', () => { afterAll(async () => { jest.mock('../../../src/services/redis/RedisServicePubSubPublisher').restoreAllMocks(); jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber').restoreAllMocks(); - await os.shutdown(); }); test('should initialize', async () => { @@ -90,35 +87,38 @@ describe('Orchestration Service', () => { }); test('should handle worker responses', async () => { - const response = await handleWorkerResponseMessage( + const response = await os.handleWorkerResponseMessage( JSON.stringify(workerRestartEventbusResponse), ); expect(response.command).toEqual('restartEventBus'); }); + test('should handle event messages', async () => { + const response = await os.handleEventBusMessage(JSON.stringify(eventBusMessage)); + jest.spyOn(eventBus, 'send'); + jest.spyOn(EventHelpers, 'getEventMessageObjectByType'); + expect(eventBus.send).toHaveBeenCalled(); + expect(response.eventName).toEqual('n8n.workflow.success'); + jest.spyOn(eventBus, 'send').mockRestore(); + jest.spyOn(EventHelpers, 'getEventMessageObjectByType').mockRestore(); + }); + test('should handle command messages from others', async () => { - const license = Container.get(License); - license.instanceId = 'test'; - jest.spyOn(license, 'reload'); - const responseFalseId = await handleCommandMessage( - JSON.stringify({ - senderId: 'test', - command: 'reloadLicense', - }), - os.uniqueInstanceId, + jest.spyOn(eventBus, 'restart'); + const responseFalseId = await os.handleCommandMessage( + JSON.stringify(workerRestartEventbusResponse), ); expect(responseFalseId).toBeDefined(); - expect(responseFalseId!.command).toEqual('reloadLicense'); + expect(responseFalseId!.command).toEqual('restartEventBus'); expect(responseFalseId!.senderId).toEqual('test'); - expect(license.reload).toHaveBeenCalled(); - jest.spyOn(license, 'reload').mockRestore(); + expect(eventBus.restart).toHaveBeenCalled(); + jest.spyOn(eventBus, 'restart').mockRestore(); }); test('should reject command messages from iteslf', async () => { jest.spyOn(eventBus, 'restart'); - const response = await handleCommandMessage( + const response = await os.handleCommandMessage( JSON.stringify({ ...workerRestartEventbusResponse, senderId: os.uniqueInstanceId }), - os.uniqueInstanceId, ); expect(response).toBeDefined(); expect(response!.command).toEqual('restartEventBus'); @@ -133,4 +133,8 @@ describe('Orchestration Service', () => { expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled(); jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore(); }); + + afterAll(async () => { + await os.shutdown(); + }); }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7ae16a82e49a9..ae98c27544043 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -195,8 +195,8 @@ importers: specifier: workspace:* version: link:../@n8n/client-oauth2 '@n8n_io/license-sdk': - specifier: ~2.6.0 - version: 2.6.0 + specifier: ~2.5.1 + version: 2.5.1 '@oclif/command': specifier: ^1.8.16 version: 1.8.18(@oclif/config@1.18.5)(supports-color@8.1.1) @@ -4657,8 +4657,8 @@ packages: acorn-walk: 8.2.0 dev: false - /@n8n_io/license-sdk@2.6.0: - resolution: {integrity: sha512-jPUn8xKAZMWgFw8w6BwqbdlZ1Et4tZcPUdOfEzxpWxEmgtCEAdbl3V0ygP3pTXyWY0hblvv8QzbHOUrK25hQSA==} + /@n8n_io/license-sdk@2.5.1: + resolution: {integrity: sha512-CL4JVJS8nvI8qPFQ1jSG7CiPnNkeKJSgbDxWOLVX4MRjTKrwL8Cpd1LeYMx5g5StmHzkoxz2TDqL8WT6qyMlrQ==} engines: {node: '>=14.0.0', npm: '>=7.10.0'} dependencies: crypto-js: 4.1.1