Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Flatten Redis pubsub class hierarchy (no-changelog) #10616

Merged
merged 22 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cli/src/__tests__/wait-tracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jest.useFakeTimers();
describe('WaitTracker', () => {
const executionRepository = mock<ExecutionRepository>();
const multiMainSetup = mock<MultiMainSetup>();
const orchestrationService = new OrchestrationService(mock(), mock(), mock(), multiMainSetup);
const orchestrationService = new OrchestrationService(mock(), mock(), multiMainSetup);

const execution = mock<IExecutionResponse>({
id: '123',
Expand Down
3 changes: 2 additions & 1 deletion packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'
import { EventService } from '@/events/event.service';
import { ExecutionService } from '@/executions/execution.service';
import { License } from '@/license';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { Server } from '@/server';
import { OrchestrationHandlerMainService } from '@/services/orchestration/main/orchestration.handler.main.service';
import { OrchestrationService } from '@/services/orchestration.service';
Expand Down Expand Up @@ -241,7 +242,7 @@ export class Start extends BaseCommand {

await Container.get(OrchestrationHandlerMainService).initWithOptions({
queueModeId: this.queueModeId,
redisPublisher: Container.get(OrchestrationService).redisPublisher,
publisher: Container.get(Publisher),
});

if (!orchestrationService.isMultiMainSetupEnabled) return;
Expand Down
6 changes: 2 additions & 4 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import type { ICredentialsOverwrite } from '@/interfaces';
import { rawBodyReader, bodyParser } from '@/middlewares';
import * as ResponseHelper from '@/response-helper';
import { JobProcessor } from '@/scaling/job-processor';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import type { ScalingService } from '@/scaling/scaling.service';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import type { RedisServicePubSubSubscriber } from '@/services/redis/redis-service-pub-sub-subscriber';

import { BaseCommand } from './base-command';

Expand Down Expand Up @@ -48,8 +48,6 @@ export class Worker extends BaseCommand {

jobProcessor: JobProcessor;

redisSubscriber: RedisServicePubSubSubscriber;

override needsCommunityPackages = true;

/**
Expand Down Expand Up @@ -140,7 +138,7 @@ export class Worker extends BaseCommand {
await Container.get(OrchestrationWorkerService).init();
await Container.get(OrchestrationHandlerWorkerService).initWithOptions({
queueModeId: this.queueModeId,
redisPublisher: Container.get(OrchestrationWorkerService).redisPublisher,
publisher: Container.get(Publisher),
getRunningJobIds: () => this.jobProcessor.getRunningJobIds(),
getRunningJobsSummary: () => this.jobProcessor.getRunningJobsSummary(),
});
Expand Down
13 changes: 2 additions & 11 deletions packages/cli/src/license.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import {
UNLIMITED_LICENSE_QUOTA,
} from './constants';
import type { BooleanLicenseFeature, N8nInstanceType, NumericLicenseFeature } from './interfaces';
import type { RedisServicePubSubPublisher } from './services/redis/redis-service-pub-sub-publisher';
import { RedisService } from './services/redis.service';

type FeatureReturnType = Partial<
{
Expand All @@ -31,8 +29,6 @@ type FeatureReturnType = Partial<
export class License {
private manager: LicenseManager | undefined;

private redisPublisher: RedisServicePubSubPublisher;

private isShuttingDown = false;

constructor(
Expand Down Expand Up @@ -162,13 +158,8 @@ export class License {
}

if (config.getEnv('executions.mode') === 'queue') {
if (!this.redisPublisher) {
this.logger.debug('Initializing Redis publisher for License Service');
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
}
await this.redisPublisher.publishToCommandChannel({
command: 'reloadLicense',
});
const { Publisher } = await import('@/scaling/pubsub/publisher.service');
await Container.get(Publisher).sendCommand({ command: 'reloadLicense' });
}

const isS3Selected = config.getEnv('binaryDataManager.mode') === 's3';
Expand Down
73 changes: 73 additions & 0 deletions packages/cli/src/scaling/__tests__/publisher.service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import type { Redis as SingleNodeClient } from 'ioredis';
import { mock } from 'jest-mock-extended';
import { v4 as uuid } from 'uuid';

import config from '@/config';
import type { RedisClientService } from '@/services/redis/redis-client.service';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from '@/services/redis/redis-service-commands';

import { Publisher } from '../pubsub/publisher.service';

describe('Publisher', () => {
config.set('executions.mode', 'queue');
const queueModeId = uuid();
ivov marked this conversation as resolved.
Show resolved Hide resolved
config.set('redis.queueModeId', queueModeId);

const client = mock<SingleNodeClient>();
const redisClientService = mock<RedisClientService>({ createClient: () => client });

describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const publisher = new Publisher(mock(), redisClientService);

expect(publisher.getClient()).toEqual(client);
});

it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const publisher = new Publisher(mock(), redisClientService);

expect(publisher.getClient()).toBeUndefined();

config.set('executions.mode', 'queue');
ivov marked this conversation as resolved.
Show resolved Hide resolved
});
});

describe('shutdown', () => {
it('should disconnect Redis client', () => {
const publisher = new Publisher(mock(), redisClientService);
publisher.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
});

describe('sendCommand', () => {
it('should send command to `n8n.commands` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<RedisServiceCommandObject>({ command: 'reloadLicense' });

await publisher.sendCommand(msg);

expect(client.publish).toHaveBeenCalledWith(
'n8n.commands',
JSON.stringify({ ...msg, senderId: queueModeId }),
);
});
});

describe('sendResponse', () => {
it('should send response to `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<RedisServiceWorkerResponseObject>({
command: 'reloadExternalSecretsProviders',
});

await publisher.sendResponse(msg);

expect(client.publish).toHaveBeenCalledWith('n8n.worker-response', JSON.stringify(msg));
});
});
});
60 changes: 60 additions & 0 deletions packages/cli/src/scaling/__tests__/subscriber.service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type { Redis as SingleNodeClient } from 'ioredis';
import { mock } from 'jest-mock-extended';

import config from '@/config';
import type { RedisClientService } from '@/services/redis/redis-client.service';

import { Subscriber } from '../pubsub/subscriber.service';

describe('Subscriber', () => {
config.set('executions.mode', 'queue');

const client = mock<SingleNodeClient>();
const redisClientService = mock<RedisClientService>({ createClient: () => client });

describe('constructor', () => {
it('should init Redis client in scaling mode', () => {
const subscriber = new Subscriber(mock(), redisClientService);

expect(subscriber.getClient()).toEqual(client);
});

it('should not init Redis client in regular mode', () => {
config.set('executions.mode', 'regular');
const subscriber = new Subscriber(mock(), redisClientService);

expect(subscriber.getClient()).toBeUndefined();

config.set('executions.mode', 'queue');
ivov marked this conversation as resolved.
Show resolved Hide resolved
});
});

describe('shutdown', () => {
it('should disconnect Redis client', () => {
const subscriber = new Subscriber(mock(), redisClientService);
subscriber.shutdown();
expect(client.disconnect).toHaveBeenCalled();
});
});

describe('subscribe', () => {
it('should subscribe to pubsub channel', async () => {
const subscriber = new Subscriber(mock(), redisClientService);

await subscriber.subscribe('n8n.commands');

expect(client.subscribe).toHaveBeenCalledWith('n8n.commands', expect.any(Function));
});
});

describe('setHandler', () => {
it('should set handler function', () => {
const subscriber = new Subscriber(mock(), redisClientService);
const handlerFn = jest.fn();

subscriber.setHandler(handlerFn);

expect(client.on).toHaveBeenCalledWith('message', handlerFn);
});
});
});
88 changes: 88 additions & 0 deletions packages/cli/src/scaling/pubsub/publisher.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import { Service } from 'typedi';

import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis/redis-client.service';
import type {
RedisServiceCommandObject,
RedisServiceWorkerResponseObject,
} from '@/services/redis/redis-service-commands';

/**
* Responsible for publishing messages into the pubsub channels used by scaling mode.
*/
@Service()
export class Publisher {
private readonly client: SingleNodeClient | MultiNodeClient;

// #region Lifecycle

constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;

this.client = this.redisClientService.createClient({ type: 'publisher(n8n)' });

this.client.on('error', (error) => this.logger.error(error.message));
}

getClient() {
return this.client;
}
Comment on lines +33 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose the client publicly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing only, it's an open discussion on what's best practice for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I keep wavering between the both options. On one hand it's nice to test certain things that require private parts, but then again tests should test the public interface or it's too tied to implementation details 🤷 No need to change this now


// @TODO: Use `@OnShutdown()` decorator
shutdown() {
this.client.disconnect();
}

// #endregion

// #region Publishing

/** Send a command into the `n8n.commands` channel. */
async sendCommand(msg: Omit<RedisServiceCommandObject, 'senderId'>) {
ivov marked this conversation as resolved.
Show resolved Hide resolved
await this.client.publish(
'n8n.commands',
JSON.stringify({ ...msg, senderId: config.getEnv('redis.queueModeId') }),
);

this.logger.debug(`Published ${msg.command} to command channel`);
}

/** Send a response for a command into the `n8n.worker-response` channel. */
async sendResponse(msg: RedisServiceWorkerResponseObject) {
ivov marked this conversation as resolved.
Show resolved Hide resolved
await this.client.publish('n8n.worker-response', JSON.stringify(msg));
ivov marked this conversation as resolved.
Show resolved Hide resolved

this.logger.debug(`Published response for ${msg.command} to worker response channel`);
}

// #endregion

// #region Utils for multi-main setup

// @TODO: The following methods are not pubsub-specific. Consider a dedicated client for multi-main setup.
ivov marked this conversation as resolved.
Show resolved Hide resolved

async setIfNotExists(key: string, value: string) {
const success = await this.client.setnx(key, value);

return !!success;
}

async setExpiration(key: string, ttl: number) {
await this.client.expire(key, ttl);
}

async get(key: string) {
return await this.client.get(key);
}

async clear(key: string) {
await this.client?.del(key);
}

// #endregion
}
7 changes: 7 additions & 0 deletions packages/cli/src/scaling/pubsub/pubsub.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* Pubsub channel used by scaling mode:
*
* - `n8n.commands` for messages sent by a main process to command workers or other main processes
* - `n8n.worker-response` for messages sent by workers in response to commands from main processes
*/
export type PubSubChannel = 'n8n.commands' | 'n8n.worker-response';
ivov marked this conversation as resolved.
Show resolved Hide resolved
60 changes: 60 additions & 0 deletions packages/cli/src/scaling/pubsub/subscriber.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type { Redis as SingleNodeClient, Cluster as MultiNodeClient } from 'ioredis';
import { Service } from 'typedi';

import config from '@/config';
import { Logger } from '@/logger';
import { RedisClientService } from '@/services/redis/redis-client.service';

import type { PubSubChannel } from './pubsub.types';

/**
* Responsible for subscribing to the pubsub channels used by scaling mode.
*/
@Service()
export class Subscriber {
private readonly client: SingleNodeClient | MultiNodeClient;

// #region Lifecycle

constructor(
private readonly logger: Logger,
private readonly redisClientService: RedisClientService,
) {
// @TODO: Once this class is only ever initialized in scaling mode, throw in the next line instead.
if (config.getEnv('executions.mode') !== 'queue') return;

this.client = this.redisClientService.createClient({ type: 'subscriber(n8n)' });

this.client.on('error', (error) => this.logger.error(error.message));
}

getClient() {
return this.client;
}
Comment on lines +31 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing only, it's an open discussion on what's best practice for this.


// @TODO: Use `@OnShutdown()` decorator
shutdown() {
this.client.disconnect();
}

// #endregion

// #region Subscribing

async subscribe(channel: PubSubChannel) {
await this.client.subscribe(channel, (error) => {
if (error) {
this.logger.error('Failed to subscribe to channel', { channel, cause: error });
return;
}

this.logger.debug('Subscribed to channel', { channel });
});
}

setHandler(handlerFn: (channel: string, msg: string) => void) {
this.client.on('message', handlerFn);
}
ivov marked this conversation as resolved.
Show resolved Hide resolved

// #endregion
}
Loading