Skip to content

Commit

Permalink
refactor(core): Flatten Redis pubsub class hierarchy (no-changelog) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov authored and riascho committed Sep 23, 2024
1 parent 518e967 commit 18ac442
Show file tree
Hide file tree
Showing 24 changed files with 392 additions and 335 deletions.
2 changes: 1 addition & 1 deletion packages/cli/src/__tests__/wait-tracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jest.useFakeTimers();
describe('WaitTracker', () => {
const executionRepository = mock<ExecutionRepository>();
const multiMainSetup = mock<MultiMainSetup>();
const orchestrationService = new OrchestrationService(mock(), mock(), mock(), multiMainSetup);
const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup);

const execution = mock<IExecutionResponse>({
id: '123',
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'
import { EventService } from '@/events/event.service';
import { ExecutionService } from '@/executions/execution.service';
import { License } from '@/license';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { Server } from '@/server';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import { OrchestrationService } from '@/services/orchestration.service';
Expand Down Expand Up @@ -240,7 +241,7 @@ export class Start extends BaseCommand {

await Container.get(OrchestrationHandlerMainService).initWithOptions({
queueModeId: this.queueModeId,
redisPublisher: Container.get(OrchestrationService).redisPublisher,
publisher: Container.get(Publisher),
});

if (!orchestrationService.isMultiMainSetupEnabled) return;
Expand Down
6 changes: 2 additions & 4 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay';
import { JobProcessor } from '@/scaling/job-processor';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import type { ScalingService } from '@/scaling/scaling.service';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import type { RedisServicePubSubSubscriber } from '@/services/redis/redis-service-pub-sub-subscriber';

import { BaseCommand } from './base-command';

Expand Down Expand Up @@ -40,8 +40,6 @@ export class Worker extends BaseCommand {

jobProcessor: JobProcessor;

redisSubscriber: RedisServicePubSubSubscriber;

override needsCommunityPackages = true;

/**
Expand Down Expand Up @@ -131,7 +129,7 @@ export class Worker extends BaseCommand {
await Container.get(OrchestrationWorkerService).init();
await Container.get(OrchestrationHandlerWorkerService).initWithOptions({
queueModeId: this.queueModeId,
redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher,
publisher: Container.get(Publisher),
getRunningJobIds: () => this.jobProcessor.getRunningJobIds(),
getRunningJobsSummary: () => this.jobProcessor.getRunningJobsSummary(),
});
Expand Down
13 changes: 2 additions & 11 deletions packages/cli/src/license.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import {
UNLIMITED_LICENSE_QUOTA,
} from './constants';
import type { BooleanLicenseFeature, NumericLicenseFeature } from './interfaces';
import type { RedisServicePubSubPublisher } from './services/redis/redis-service-pub-sub-publisher';
import { RedisService } from './services/redis.service';

export type FeatureReturnType = Partial<
{
Expand All @@ -31,8 +29,6 @@ export type FeatureReturnType = Partial<
export class License {
private manager: LicenseManager | undefined;

private redisPublisher: RedisServicePubSubPublisher;

private isShuttingDown = false;

constructor(
Expand Down Expand Up @@ -163,13 +159,8 @@ export class License {
}

if (config.getEnv('executions.mode') === 'queue') {
if (!this.redisPublisher) {
this.logger.debug('Initializing Redis publisher for License Service');
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
}
await this.redisPublisher.publishToCommandChannel({
command: 'reloadLicense',
});
const { Publisher } = await import('@/scaling/pubsub/publisher.service');
await Container.get(Publisher).publishCommand({ command: 'reloadLicense' });
}

const isS3Selected = config.getEnv('binaryDataManager.mode') === 's3';
Expand Down
75 changes: 75 additions & 0 deletions packages/cli/src/scaling/__tests__/publisher.service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import type { Redis as SingleNodeClient } from 'ioredis';
import { mock } from 'jest-mock-extended';

import config from '@/config';
import { generateNanoId } from '@/databases/utils/generators';
import type { RedisClientService } from '@/services/redis/redis-client.service';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from '@/services/redis/redis-service-commands';

import { Publisher } from '../pubsub/publisher.service';

describe('Publisher', () => {
let queueModeId: string;

beforeEach(() => {
config.set('executions.mode', 'queue');
queueModeId = generateNanoId();
config.set('redis.queueModeId', queueModeId);
});

const client = mock<SingleNodeClient>();
const redisClientService = mock<RedisClientService>({ createClient: () => client });

describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const publisher = new Publisher(mock(), redisClientService);

expect(publisher.getClient()).toEqual(client);
});

it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const publisher = new Publisher(mock(), redisClientService);

expect(publisher.getClient()).toBeUndefined();
});
});

describe('shutdown', () => {
it('should disconnect Redis client', () => {
const publisher = new Publisher(mock(), redisClientService);
publisher.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
});

describe('publishCommand', () => {
it('should publish command into `n8n.commands` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<RedisServiceCommandObject>({ command: 'reloadLicense' });

await publisher.publishCommand(msg);

expect(client.publish).toHaveBeenCalledWith(
'n8n.commands',
JSON.stringify({ ...msg, senderId: queueModeId }),
);
});
});

describe('publishWorkerResponse', () => {
it('should publish worker response into `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<RedisServiceWorkerResponseObject>({
command: 'reloadExternalSecretsProviders',
});

await publisher.publishWorkerResponse(msg);

expect(client.publish).toHaveBeenCalledWith('n8n.worker-response', JSON.stringify(msg));
});
});
});
60 changes: 60 additions & 0 deletions packages/cli/src/scaling/__tests__/subscriber.service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type { Redis as SingleNodeClient } from 'ioredis';
import { mock } from 'jest-mock-extended';

import config from '@/config';
import type { RedisClientService } from '@/services/redis/redis-client.service';

import { Subscriber } from '../pubsub/subscriber.service';

describe('Subscriber', () => {
beforeEach(() => {
config.set('executions.mode', 'queue');
});

const client = mock<SingleNodeClient>();
const redisClientService = mock<RedisClientService>({ createClient: () => client });

describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const subscriber = new Subscriber(mock(), redisClientService);

expect(subscriber.getClient()).toEqual(client);
});

it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const subscriber = new Subscriber(mock(), redisClientService);

expect(subscriber.getClient()).toBeUndefined();
});
});

describe('shutdown', () => {
it('should disconnect Redis client', () => {
const subscriber = new Subscriber(mock(), redisClientService);
subscriber.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
});

describe('subscribe', () => {
it('should subscribe to pubsub channel', async () => {
const subscriber = new Subscriber(mock(), redisClientService);

await subscriber.subscribe('n8n.commands');

expect(client.subscribe).toHaveBeenCalledWith('n8n.commands', expect.any(Function));
});
});

describe('setHandler', () => {
it('should set handler function', () => {
const subscriber = new Subscriber(mock(), redisClientService);
const handlerFn = jest.fn();

subscriber.addMessageHandler(handlerFn);

expect(client.on).toHaveBeenCalledWith('message', handlerFn);
});
});
});
88 changes: 88 additions & 0 deletions packages/cli/src/scaling/pubsub/publisher.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import { Service } from 'typedi';

import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis/redis-client.service';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from '@/services/redis/redis-service-commands';

/**
* Responsible for publishing messages into the pubsub channels used by scaling mode.
*/
@Service()
export class Publisher {
private readonly client: SingleNodeClient | MultiNodeClient;

// #region Lifecycle

constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;

this.client = this.redisClientService.createClient({ type: 'publisher(n8n)' });

this.client.on('error', (error) => this.logger.error(error.message));
}

getClient() {
return this.client;
}

// @TODO: Use `@OnShutdown()` decorator
shutdown() {
this.client.disconnect();
}

// #endregion

// #region Publishing

/** Publish a command into the `n8n.commands` channel. */
async publishCommand(msg: Omit<RedisServiceCommandObject, 'senderId'>) {
await this.client.publish(
'n8n.commands',
JSON.stringify({ ...msg, senderId: config.getEnv('redis.queueModeId') }),
);

this.logger.debug(`Published ${msg.command} to command channel`);
}

/** Publish a response for a command into the `n8n.worker-response` channel. */
async publishWorkerResponse(msg: RedisServiceWorkerResponseObject) {
await this.client.publish('n8n.worker-response', JSON.stringify(msg));

this.logger.debug(`Published response for ${msg.command} to worker response channel`);
}

// #endregion

// #region Utils for multi-main setup

// @TODO: The following methods are not pubsub-specific. Consider a dedicated client for multi-main setup.

async setIfNotExists(key: string, value: string) {
const success = await this.client.setnx(key, value);

return !!success;
}

async setExpiration(key: string, ttl: number) {
await this.client.expire(key, ttl);
}

async get(key: string) {
return await this.client.get(key);
}

async clear(key: string) {
await this.client?.del(key);
}

// #endregion
}
14 changes: 14 additions & 0 deletions packages/cli/src/scaling/pubsub/pubsub.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import type {
COMMAND_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL,
} from '@/services/redis/redis-constants';

/**
* Pubsub channel used by scaling mode:
*
* - `n8n.commands` for messages sent by a main process to command workers or other main processes
* - `n8n.worker-response` for messages sent by workers in response to commands from main processes
*/
export type ScalingPubSubChannel =
| typeof COMMAND_REDIS_CHANNEL
| typeof WORKER_RESPONSE_REDIS_CHANNEL;
60 changes: 60 additions & 0 deletions packages/cli/src/scaling/pubsub/subscriber.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import { Service } from 'typedi';

import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis/redis-client.service';

import type { ScalingPubSubChannel } from './pubsub.types';

/**
* Responsible for subscribing to the pubsub channels used by scaling mode.
*/
@Service()
export class Subscriber {
private readonly client: SingleNodeClient | MultiNodeClient;

// #region Lifecycle

constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;

this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' });

this.client.on('error', (error) => this.logger.error(error.message));
}

getClient() {
return this.client;
}

// @TODO: Use `@OnShutdown()` decorator
shutdown() {
this.client.disconnect();
}

// #endregion

// #region Subscribing

async subscribe(channel: ScalingPubSubChannel) {
await this.client.subscribe(channel, (error) => {
if (error) {
this.logger.error('Failed to subscribe to channel', { channel, cause: error });
return;
}

this.logger.debug('Subscribed to channel', { channel });
});
}

addMessageHandler(handlerFn: (channel: string, msg: string) => void) {
this.client.on('message', handlerFn);
}

// #endregion
}
Loading

0 comments on commit 18ac442

Please sign in to comment.