From 53a7502d20eb95055e842e0450e9daea308443a1 Mon Sep 17 00:00:00 2001 From: Michael Auerswald Date: Thu, 28 Sep 2023 12:57:35 +0200 Subject: [PATCH] feat(core): Add secrets provider reload and refactor (#7277) This PR adds a message for queue mode which triggers an external secrets provider reload inside the workers if the configuration has changed on the main instance. It also refactors some of the message handler code to remove cyclic dependencies, as well as remove unnecessary duplicate redis clients inside services (thanks to no more cyclic deps) --- packages/cli/src/AbstractServer.ts | 2 + .../ExternalSecretsManager.ee.ts | 19 ++++++ packages/cli/src/commands/BaseCommand.ts | 12 ++-- .../MessageEventBus/MessageEventBus.ts | 36 +---------- .../services/orchestration.handler.service.ts | 47 +++++++++++++++ .../cli/src/services/orchestration.service.ts | 59 +++++++++++-------- .../orchestration/handleCommandMessage.ts | 28 ++++++--- .../services/redis/RedisServiceCommands.ts | 10 +++- .../cli/src/worker/workerCommandHandler.ts | 52 +++++++++++++--- .../services/orchestration.service.test.ts | 11 ++-- 10 files changed, 190 insertions(+), 86 deletions(-) create mode 100644 packages/cli/src/services/orchestration.handler.service.ts diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index 092b1a95a8675..0170b8a9f9691 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -20,6 +20,7 @@ import { WaitingWebhooks } from '@/WaitingWebhooks'; import { webhookRequestHandler } from '@/WebhookHelpers'; import { generateHostInstanceId } from './databases/utils/generators'; import { OrchestrationService } from './services/orchestration.service'; +import { OrchestrationHandlerService } from './services/orchestration.handler.service'; export abstract class AbstractServer { protected server: Server; @@ -118,6 +119,7 @@ export abstract class AbstractServer { if (config.getEnv('executions.mode') === 'queue') { // will start the redis connections await Container.get(OrchestrationService).init(); + await Container.get(OrchestrationHandlerService).init(); } } diff --git a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts index 6a0b422377f60..e8b0e00589b92 100644 --- a/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts +++ b/packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts @@ -20,6 +20,7 @@ import { import { License } from '@/License'; import { InternalHooks } from '@/InternalHooks'; import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee'; +import { OrchestrationService } from '@/services/orchestration.service'; const logger = getLogger(); @@ -70,6 +71,21 @@ export class ExternalSecretsManager { Object.values(this.initRetryTimeouts).forEach((v) => clearTimeout(v)); } + async reloadAllProviders(backoff?: number) { + logger.debug('Reloading all external secrets providers'); + const providers = this.getProviderNames(); + if (!providers) { + return; + } + for (const provider of providers) { + await this.reloadProvider(provider, backoff); + } + } + + async broadcastReloadExternalSecretsProviders() { + await Container.get(OrchestrationService).broadcastReloadExternalSecretsProviders(); + } + private async getEncryptionKey(): Promise { return UserSettings.getEncryptionKey(); } @@ -274,6 +290,7 @@ export class ExternalSecretsManager { await this.saveAndSetSettings(settings, this.settingsRepo); this.cachedSettings = settings; await this.reloadProvider(provider); + await this.broadcastReloadExternalSecretsProviders(); void this.trackProviderSave(provider, isNewProvider, userId); } @@ -293,6 +310,7 @@ export class ExternalSecretsManager { this.cachedSettings = settings; await this.reloadProvider(provider); await this.updateSecrets(); + await this.broadcastReloadExternalSecretsProviders(); } private async trackProviderSave(vaultType: string, isNew: boolean, userId?: string) { @@ -373,6 +391,7 @@ export class ExternalSecretsManager { } try { await this.providers[provider].update(); + await this.broadcastReloadExternalSecretsProviders(); return true; } catch { return false; diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 75446f511f25e..764ee0c31fa99 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -94,14 +94,12 @@ export abstract class BaseCommand extends Command { } protected setInstanceQueueModeId() { - if (config.getEnv('executions.mode') === 'queue') { - if (config.get('redis.queueModeId')) { - this.queueModeId = config.get('redis.queueModeId'); - return; - } - this.queueModeId = generateHostInstanceId(this.instanceType); - config.set('redis.queueModeId', this.queueModeId); + if (config.get('redis.queueModeId')) { + this.queueModeId = config.get('redis.queueModeId'); + return; } + this.queueModeId = generateHostInstanceId(this.instanceType); + config.set('redis.queueModeId', this.queueModeId); } protected async stopProcess() { diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 7d674379dca0f..05c0f7fee379f 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -29,12 +29,9 @@ import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; import Container, { Service } from 'typedi'; import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; -import { RedisService } from '@/services/redis.service'; -import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; -import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; -import { EVENT_BUS_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions'; import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers'; +import { OrchestrationService } from '../../services/orchestration.service'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -54,10 +51,6 @@ export class MessageEventBus extends EventEmitter { isInitialized: boolean; - redisPublisher: RedisServicePubSubPublisher; - - redisSubscriber: RedisServicePubSubSubscriber; - logWriter: MessageEventBusLogWriter; destinations: { @@ -91,20 +84,6 @@ export class MessageEventBus extends EventEmitter { return; } - if (config.getEnv('executions.mode') === 'queue') { - this.redisPublisher = await Container.get(RedisService).getPubSubPublisher(); - this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber(); - await this.redisSubscriber.subscribeToEventLog(); - this.redisSubscriber.addMessageHandler( - 'MessageEventBusMessageReceiver', - async (channel: string, messageString: string) => { - if (channel === EVENT_BUS_REDIS_CHANNEL) { - await this.handleRedisEventBusMessage(messageString); - } - }, - ); - } - LoggerProxy.debug('Initializing event bus...'); const savedEventDestinations = await Db.collections.EventDestinations.find({}); @@ -211,7 +190,7 @@ export class MessageEventBus extends EventEmitter { this.destinations[destination.getId()] = destination; this.destinations[destination.getId()].startListening(); if (notifyWorkers) { - await this.broadcastRestartEventbusAfterDestinationUpdate(); + await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate(); } return destination; } @@ -237,7 +216,7 @@ export class MessageEventBus extends EventEmitter { delete this.destinations[id]; } if (notifyWorkers) { - await this.broadcastRestartEventbusAfterDestinationUpdate(); + await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate(); } return result; } @@ -253,14 +232,6 @@ export class MessageEventBus extends EventEmitter { return eventData; } - async broadcastRestartEventbusAfterDestinationUpdate() { - if (config.getEnv('executions.mode') === 'queue') { - await this.redisPublisher.publishToCommandChannel({ - command: 'restartEventBus', - }); - } - } - private async trySendingUnsent(msgs?: EventMessageTypes[]) { const unsentMessages = msgs ?? (await this.getEventsUnsent()); if (unsentMessages.length > 0) { @@ -281,7 +252,6 @@ export class MessageEventBus extends EventEmitter { ); await this.destinations[destinationName].close(); } - await this.redisSubscriber?.unSubscribeFromEventLog(); this.isInitialized = false; LoggerProxy.debug('EventBus shut down.'); } diff --git a/packages/cli/src/services/orchestration.handler.service.ts b/packages/cli/src/services/orchestration.handler.service.ts new file mode 100644 index 0000000000000..aa64926068909 --- /dev/null +++ b/packages/cli/src/services/orchestration.handler.service.ts @@ -0,0 +1,47 @@ +import Container, { Service } from 'typedi'; +import { RedisService } from './redis.service'; +import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; +import { + COMMAND_REDIS_CHANNEL, + EVENT_BUS_REDIS_CHANNEL, + WORKER_RESPONSE_REDIS_CHANNEL, +} from './redis/RedisServiceHelper'; +import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage'; +import { handleCommandMessage } from './orchestration/handleCommandMessage'; +import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus'; + +@Service() +export class OrchestrationHandlerService { + redisSubscriber: RedisServicePubSubSubscriber; + + constructor(readonly redisService: RedisService) {} + + async init() { + await this.initSubscriber(); + } + + async shutdown() { + await this.redisSubscriber?.destroy(); + } + + private async initSubscriber() { + this.redisSubscriber = await this.redisService.getPubSubSubscriber(); + + await this.redisSubscriber.subscribeToWorkerResponseChannel(); + await this.redisSubscriber.subscribeToCommandChannel(); + await this.redisSubscriber.subscribeToEventLog(); + + this.redisSubscriber.addMessageHandler( + 'OrchestrationMessageReceiver', + async (channel: string, messageString: string) => { + if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { + await handleWorkerResponseMessage(messageString); + } else if (channel === COMMAND_REDIS_CHANNEL) { + await handleCommandMessage(messageString); + } else if (channel === EVENT_BUS_REDIS_CHANNEL) { + await Container.get(MessageEventBus).handleRedisEventBusMessage(messageString); + } + }, + ); + } +} diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts index 0cd240235220e..c81874ad9d29b 100644 --- a/packages/cli/src/services/orchestration.service.ts +++ b/packages/cli/src/services/orchestration.service.ts @@ -1,10 +1,7 @@ import { Service } from 'typedi'; import { RedisService } from './redis.service'; import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; -import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; -import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis/RedisServiceHelper'; -import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage'; -import { handleCommandMessage } from './orchestration/handleCommandMessage'; +import config from '@/config'; @Service() export class OrchestrationService { @@ -12,44 +9,29 @@ export class OrchestrationService { redisPublisher: RedisServicePubSubPublisher; - redisSubscriber: RedisServicePubSubSubscriber; + get isQueueMode() { + return config.getEnv('executions.mode') === 'queue'; + } constructor(readonly redisService: RedisService) {} async init() { await this.initPublisher(); - await this.initSubscriber(); this.initialized = true; } async shutdown() { await this.redisPublisher?.destroy(); - await this.redisSubscriber?.destroy(); } private async initPublisher() { this.redisPublisher = await this.redisService.getPubSubPublisher(); } - private async initSubscriber() { - this.redisSubscriber = await this.redisService.getPubSubSubscriber(); - - await this.redisSubscriber.subscribeToWorkerResponseChannel(); - await this.redisSubscriber.subscribeToCommandChannel(); - - this.redisSubscriber.addMessageHandler( - 'OrchestrationMessageReceiver', - async (channel: string, messageString: string) => { - if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { - await handleWorkerResponseMessage(messageString); - } else if (channel === COMMAND_REDIS_CHANNEL) { - await handleCommandMessage(messageString); - } - }, - ); - } - async getWorkerStatus(id?: string) { + if (!this.isQueueMode) { + return; + } if (!this.initialized) { throw new Error('OrchestrationService not initialized'); } @@ -60,6 +42,9 @@ export class OrchestrationService { } async getWorkerIds() { + if (!this.isQueueMode) { + return; + } if (!this.initialized) { throw new Error('OrchestrationService not initialized'); } @@ -67,4 +52,28 @@ export class OrchestrationService { command: 'getId', }); } + + async broadcastRestartEventbusAfterDestinationUpdate() { + if (!this.isQueueMode) { + return; + } + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + command: 'restartEventBus', + }); + } + + async broadcastReloadExternalSecretsProviders() { + if (!this.isQueueMode) { + return; + } + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + command: 'reloadExternalSecretsProviders', + }); + } } diff --git a/packages/cli/src/services/orchestration/handleCommandMessage.ts b/packages/cli/src/services/orchestration/handleCommandMessage.ts index 06e08977e806e..6939555cdcfbf 100644 --- a/packages/cli/src/services/orchestration/handleCommandMessage.ts +++ b/packages/cli/src/services/orchestration/handleCommandMessage.ts @@ -1,14 +1,23 @@ import { LoggerProxy } from 'n8n-workflow'; import { messageToRedisServiceCommandObject } from './helpers'; import config from '@/config'; -import { MessageEventBus } from '../../eventbus/MessageEventBus/MessageEventBus'; +import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; import Container from 'typedi'; +import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; +import type { N8nInstanceType } from '@/Interfaces'; +import { License } from '@/License'; // this function handles commands sent to the MAIN instance. the workers handle their own commands export async function handleCommandMessage(messageString: string) { const queueModeId = config.get('redis.queueModeId'); + const instanceType = config.get('generic.instanceType') as N8nInstanceType; + const isMainInstance = instanceType === 'main'; const message = messageToRedisServiceCommandObject(messageString); + if (message) { + LoggerProxy.debug( + `RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`, + ); if ( message.senderId === queueModeId || (message.targets && !message.targets.includes(queueModeId)) @@ -21,16 +30,19 @@ export async function handleCommandMessage(messageString: string) { } switch (message.command) { case 'reloadLicense': - // at this point in time, only a single main instance is supported, thus this - // command _should_ never be caught currently (which is why we log a warning) - LoggerProxy.warn( - 'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.', - ); - // once multiple main instances are supported, this command should be handled - // await Container.get(License).reload(); + if (isMainInstance) { + // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently + LoggerProxy.error( + 'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.', + ); + return message; + } + await Container.get(License).reload(); break; case 'restartEventBus': await Container.get(MessageEventBus).restart(); + case 'reloadExternalSecretsProviders': + await Container.get(ExternalSecretsManager).reloadAllProviders(); default: break; } diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index 634450b2e7b1d..602b5646f51bd 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -3,7 +3,8 @@ export type RedisServiceCommand = | 'getId' | 'restartEventBus' | 'stopWorker' - | 'reloadLicense'; + | 'reloadLicense' + | 'reloadExternalSecretsProviders'; /** * An object to be sent via Redis pub/sub from the main process to the workers. @@ -49,6 +50,13 @@ export type RedisServiceWorkerResponseObject = { error?: string; }; } + | { + command: 'reloadExternalSecretsProviders'; + payload: { + result: 'success' | 'error'; + error?: string; + }; + } | { command: 'stopWorker'; } diff --git a/packages/cli/src/worker/workerCommandHandler.ts b/packages/cli/src/worker/workerCommandHandler.ts index 285a222586458..63866fda78f26 100644 --- a/packages/cli/src/worker/workerCommandHandler.ts +++ b/packages/cli/src/worker/workerCommandHandler.ts @@ -6,6 +6,7 @@ import * as os from 'os'; import Container from 'typedi'; import { License } from '@/License'; import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus'; +import { ExternalSecretsManager } from '../ExternalSecrets/ExternalSecretsManager.ee'; export function getWorkerCommandReceivedHandler(options: { queueModeId: string; @@ -26,6 +27,9 @@ export function getWorkerCommandReceivedHandler(options: { return; } if (message) { + LoggerProxy.debug( + `RedisCommandHandler(worker): Received command message ${message.command} from ${message.senderId}`, + ); if (message.targets && !message.targets.includes(options.queueModeId)) { return; // early return if the message is not for this worker } @@ -59,14 +63,46 @@ export function getWorkerCommandReceivedHandler(options: { }); break; case 'restartEventBus': - await Container.get(MessageEventBus).restart(); - await options.redisPublisher.publishToWorkerChannel({ - workerId: options.queueModeId, - command: message.command, - payload: { - result: 'success', - }, - }); + try { + await Container.get(MessageEventBus).restart(); + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.queueModeId, + command: message.command, + payload: { + result: 'success', + }, + }); + } catch (error) { + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.queueModeId, + command: message.command, + payload: { + result: 'error', + error: (error as Error).message, + }, + }); + } + break; + case 'reloadExternalSecretsProviders': + try { + await Container.get(ExternalSecretsManager).reloadAllProviders(); + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.queueModeId, + command: message.command, + payload: { + result: 'success', + }, + }); + } catch (error) { + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.queueModeId, + command: message.command, + payload: { + result: 'error', + error: (error as Error).message, + }, + }); + } break; case 'reloadLicense': await Container.get(License).reload(); diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts index 9367e4da7bddd..a39c4bd789c25 100644 --- a/packages/cli/test/unit/services/orchestration.service.test.ts +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -9,8 +9,10 @@ import { RedisService } from '@/services/redis.service'; import { mockInstance } from '../../integration/shared/utils'; import { handleWorkerResponseMessage } from '../../../src/services/orchestration/handleWorkerResponseMessage'; import { handleCommandMessage } from '../../../src/services/orchestration/handleCommandMessage'; +import { OrchestrationHandlerService } from '../../../src/services/orchestration.handler.service'; const os = Container.get(OrchestrationService); +const handler = Container.get(OrchestrationHandlerService); let queueModeId: string; @@ -76,8 +78,9 @@ describe('Orchestration Service', () => { test('should initialize', async () => { await os.init(); + await handler.init(); expect(os.redisPublisher).toBeDefined(); - expect(os.redisSubscriber).toBeDefined(); + expect(handler.redisSubscriber).toBeDefined(); expect(queueModeId).toBeDefined(); }); @@ -89,7 +92,7 @@ describe('Orchestration Service', () => { }); test('should handle command messages from others', async () => { - jest.spyOn(LoggerProxy, 'warn'); + jest.spyOn(LoggerProxy, 'error'); const responseFalseId = await handleCommandMessage( JSON.stringify({ senderId: 'test', @@ -99,8 +102,8 @@ describe('Orchestration Service', () => { expect(responseFalseId).toBeDefined(); expect(responseFalseId!.command).toEqual('reloadLicense'); expect(responseFalseId!.senderId).toEqual('test'); - expect(LoggerProxy.warn).toHaveBeenCalled(); - jest.spyOn(LoggerProxy, 'warn').mockRestore(); + expect(LoggerProxy.error).toHaveBeenCalled(); + jest.spyOn(LoggerProxy, 'error').mockRestore(); }); test('should reject command messages from iteslf', async () => {