Skip to content

Commit

Permalink
feat(core): Add secrets provider reload and refactor (#7277)
Browse files Browse the repository at this point in the history
This PR adds a message for queue mode which triggers an external secrets
provider reload inside the workers if the configuration has changed on
the main instance.

It also refactors some of the message handler code to remove cyclic
dependencies, as well as remove unnecessary duplicate redis clients
inside services (thanks to no more cyclic deps)
  • Loading branch information
flipswitchingmonkey authored Sep 28, 2023
1 parent a80abad commit 53a7502
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 86 deletions.
2 changes: 2 additions & 0 deletions packages/cli/src/AbstractServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { WaitingWebhooks } from '@/WaitingWebhooks';
import { webhookRequestHandler } from '@/WebhookHelpers';
import { generateHostInstanceId } from './databases/utils/generators';
import { OrchestrationService } from './services/orchestration.service';
import { OrchestrationHandlerService } from './services/orchestration.handler.service';

export abstract class AbstractServer {
protected server: Server;
Expand Down Expand Up @@ -118,6 +119,7 @@ export abstract class AbstractServer {
if (config.getEnv('executions.mode') === 'queue') {
// will start the redis connections
await Container.get(OrchestrationService).init();
await Container.get(OrchestrationHandlerService).init();
}
}

Expand Down
19 changes: 19 additions & 0 deletions packages/cli/src/ExternalSecrets/ExternalSecretsManager.ee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
import { License } from '@/License';
import { InternalHooks } from '@/InternalHooks';
import { ExternalSecretsProviders } from './ExternalSecretsProviders.ee';
import { OrchestrationService } from '@/services/orchestration.service';

const logger = getLogger();

Expand Down Expand Up @@ -70,6 +71,21 @@ export class ExternalSecretsManager {
Object.values(this.initRetryTimeouts).forEach((v) => clearTimeout(v));
}

async reloadAllProviders(backoff?: number) {
logger.debug('Reloading all external secrets providers');
const providers = this.getProviderNames();
if (!providers) {
return;
}
for (const provider of providers) {
await this.reloadProvider(provider, backoff);
}
}

async broadcastReloadExternalSecretsProviders() {
await Container.get(OrchestrationService).broadcastReloadExternalSecretsProviders();
}

private async getEncryptionKey(): Promise<string> {
return UserSettings.getEncryptionKey();
}
Expand Down Expand Up @@ -274,6 +290,7 @@ export class ExternalSecretsManager {
await this.saveAndSetSettings(settings, this.settingsRepo);
this.cachedSettings = settings;
await this.reloadProvider(provider);
await this.broadcastReloadExternalSecretsProviders();

void this.trackProviderSave(provider, isNewProvider, userId);
}
Expand All @@ -293,6 +310,7 @@ export class ExternalSecretsManager {
this.cachedSettings = settings;
await this.reloadProvider(provider);
await this.updateSecrets();
await this.broadcastReloadExternalSecretsProviders();
}

private async trackProviderSave(vaultType: string, isNew: boolean, userId?: string) {
Expand Down Expand Up @@ -373,6 +391,7 @@ export class ExternalSecretsManager {
}
try {
await this.providers[provider].update();
await this.broadcastReloadExternalSecretsProviders();
return true;
} catch {
return false;
Expand Down
12 changes: 5 additions & 7 deletions packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,12 @@ export abstract class BaseCommand extends Command {
}

protected setInstanceQueueModeId() {
if (config.getEnv('executions.mode') === 'queue') {
if (config.get('redis.queueModeId')) {
this.queueModeId = config.get('redis.queueModeId');
return;
}
this.queueModeId = generateHostInstanceId(this.instanceType);
config.set('redis.queueModeId', this.queueModeId);
if (config.get('redis.queueModeId')) {
this.queueModeId = config.get('redis.queueModeId');
return;
}
this.queueModeId = generateHostInstanceId(this.instanceType);
config.set('redis.queueModeId', this.queueModeId);
}

protected async stopProcess() {
Expand Down
36 changes: 3 additions & 33 deletions packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@ import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
import Container, { Service } from 'typedi';
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';
import { RedisService } from '@/services/redis.service';
import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher';
import type { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber';
import { EVENT_BUS_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper';
import type { AbstractEventMessageOptions } from '../EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from '../EventMessageClasses/Helpers';
import { OrchestrationService } from '../../services/orchestration.service';

export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';

Expand All @@ -54,10 +51,6 @@ export class MessageEventBus extends EventEmitter {

isInitialized: boolean;

redisPublisher: RedisServicePubSubPublisher;

redisSubscriber: RedisServicePubSubSubscriber;

logWriter: MessageEventBusLogWriter;

destinations: {
Expand Down Expand Up @@ -91,20 +84,6 @@ export class MessageEventBus extends EventEmitter {
return;
}

if (config.getEnv('executions.mode') === 'queue') {
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
this.redisSubscriber = await Container.get(RedisService).getPubSubSubscriber();
await this.redisSubscriber.subscribeToEventLog();
this.redisSubscriber.addMessageHandler(
'MessageEventBusMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === EVENT_BUS_REDIS_CHANNEL) {
await this.handleRedisEventBusMessage(messageString);
}
},
);
}

LoggerProxy.debug('Initializing event bus...');

const savedEventDestinations = await Db.collections.EventDestinations.find({});
Expand Down Expand Up @@ -211,7 +190,7 @@ export class MessageEventBus extends EventEmitter {
this.destinations[destination.getId()] = destination;
this.destinations[destination.getId()].startListening();
if (notifyWorkers) {
await this.broadcastRestartEventbusAfterDestinationUpdate();
await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate();
}
return destination;
}
Expand All @@ -237,7 +216,7 @@ export class MessageEventBus extends EventEmitter {
delete this.destinations[id];
}
if (notifyWorkers) {
await this.broadcastRestartEventbusAfterDestinationUpdate();
await Container.get(OrchestrationService).broadcastRestartEventbusAfterDestinationUpdate();
}
return result;
}
Expand All @@ -253,14 +232,6 @@ export class MessageEventBus extends EventEmitter {
return eventData;
}

async broadcastRestartEventbusAfterDestinationUpdate() {
if (config.getEnv('executions.mode') === 'queue') {
await this.redisPublisher.publishToCommandChannel({
command: 'restartEventBus',
});
}
}

private async trySendingUnsent(msgs?: EventMessageTypes[]) {
const unsentMessages = msgs ?? (await this.getEventsUnsent());
if (unsentMessages.length > 0) {
Expand All @@ -281,7 +252,6 @@ export class MessageEventBus extends EventEmitter {
);
await this.destinations[destinationName].close();
}
await this.redisSubscriber?.unSubscribeFromEventLog();
this.isInitialized = false;
LoggerProxy.debug('EventBus shut down.');
}
Expand Down
47 changes: 47 additions & 0 deletions packages/cli/src/services/orchestration.handler.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import Container, { Service } from 'typedi';
import { RedisService } from './redis.service';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import {
COMMAND_REDIS_CHANNEL,
EVENT_BUS_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL,
} from './redis/RedisServiceHelper';
import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage';
import { handleCommandMessage } from './orchestration/handleCommandMessage';
import { MessageEventBus } from '../eventbus/MessageEventBus/MessageEventBus';

@Service()
export class OrchestrationHandlerService {
redisSubscriber: RedisServicePubSubSubscriber;

constructor(readonly redisService: RedisService) {}

async init() {
await this.initSubscriber();
}

async shutdown() {
await this.redisSubscriber?.destroy();
}

private async initSubscriber() {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();

await this.redisSubscriber.subscribeToWorkerResponseChannel();
await this.redisSubscriber.subscribeToCommandChannel();
await this.redisSubscriber.subscribeToEventLog();

this.redisSubscriber.addMessageHandler(
'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await handleWorkerResponseMessage(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessage(messageString);
} else if (channel === EVENT_BUS_REDIS_CHANNEL) {
await Container.get(MessageEventBus).handleRedisEventBusMessage(messageString);
}
},
);
}
}
59 changes: 34 additions & 25 deletions packages/cli/src/services/orchestration.service.ts
Original file line number Diff line number Diff line change
@@ -1,55 +1,37 @@
import { Service } from 'typedi';
import { RedisService } from './redis.service';
import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher';
import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber';
import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from './redis/RedisServiceHelper';
import { handleWorkerResponseMessage } from './orchestration/handleWorkerResponseMessage';
import { handleCommandMessage } from './orchestration/handleCommandMessage';
import config from '@/config';

@Service()
export class OrchestrationService {
private initialized = false;

redisPublisher: RedisServicePubSubPublisher;

redisSubscriber: RedisServicePubSubSubscriber;
get isQueueMode() {
return config.getEnv('executions.mode') === 'queue';
}

constructor(readonly redisService: RedisService) {}

async init() {
await this.initPublisher();
await this.initSubscriber();
this.initialized = true;
}

async shutdown() {
await this.redisPublisher?.destroy();
await this.redisSubscriber?.destroy();
}

private async initPublisher() {
this.redisPublisher = await this.redisService.getPubSubPublisher();
}

private async initSubscriber() {
this.redisSubscriber = await this.redisService.getPubSubSubscriber();

await this.redisSubscriber.subscribeToWorkerResponseChannel();
await this.redisSubscriber.subscribeToCommandChannel();

this.redisSubscriber.addMessageHandler(
'OrchestrationMessageReceiver',
async (channel: string, messageString: string) => {
if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
await handleWorkerResponseMessage(messageString);
} else if (channel === COMMAND_REDIS_CHANNEL) {
await handleCommandMessage(messageString);
}
},
);
}

async getWorkerStatus(id?: string) {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
Expand All @@ -60,11 +42,38 @@ export class OrchestrationService {
}

async getWorkerIds() {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
command: 'getId',
});
}

async broadcastRestartEventbusAfterDestinationUpdate() {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
command: 'restartEventBus',
});
}

async broadcastReloadExternalSecretsProviders() {
if (!this.isQueueMode) {
return;
}
if (!this.initialized) {
throw new Error('OrchestrationService not initialized');
}
await this.redisPublisher.publishToCommandChannel({
command: 'reloadExternalSecretsProviders',
});
}
}
28 changes: 20 additions & 8 deletions packages/cli/src/services/orchestration/handleCommandMessage.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import { LoggerProxy } from 'n8n-workflow';
import { messageToRedisServiceCommandObject } from './helpers';
import config from '@/config';
import { MessageEventBus } from '../../eventbus/MessageEventBus/MessageEventBus';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import Container from 'typedi';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import type { N8nInstanceType } from '@/Interfaces';
import { License } from '@/License';

// this function handles commands sent to the MAIN instance. the workers handle their own commands
export async function handleCommandMessage(messageString: string) {
const queueModeId = config.get('redis.queueModeId');
const instanceType = config.get('generic.instanceType') as N8nInstanceType;
const isMainInstance = instanceType === 'main';
const message = messageToRedisServiceCommandObject(messageString);

if (message) {
LoggerProxy.debug(
`RedisCommandHandler(main): Received command message ${message.command} from ${message.senderId}`,
);
if (
message.senderId === queueModeId ||
(message.targets && !message.targets.includes(queueModeId))
Expand All @@ -21,16 +30,19 @@ export async function handleCommandMessage(messageString: string) {
}
switch (message.command) {
case 'reloadLicense':
// at this point in time, only a single main instance is supported, thus this
// command _should_ never be caught currently (which is why we log a warning)
LoggerProxy.warn(
'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.',
);
// once multiple main instances are supported, this command should be handled
// await Container.get(License).reload();
if (isMainInstance) {
// at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently
LoggerProxy.error(
'Received command to reload license via Redis, but this should not have happened and is not supported on the main instance yet.',
);
return message;
}
await Container.get(License).reload();
break;
case 'restartEventBus':
await Container.get(MessageEventBus).restart();
case 'reloadExternalSecretsProviders':
await Container.get(ExternalSecretsManager).reloadAllProviders();
default:
break;
}
Expand Down
Loading

0 comments on commit 53a7502

Please sign in to comment.