Skip to content

Commit

Permalink
Better naming
Browse files Browse the repository at this point in the history
  • Loading branch information
ivov committed Sep 17, 2024
1 parent 67df195 commit 9543d88
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 23 deletions.
2 changes: 1 addition & 1 deletion packages/cli/src/license.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ export class License {

if (config.getEnv('executions.mode') === 'queue') {
const { Publisher } = await import('@/scaling/pubsub/publisher.service');
await Container.get(Publisher).sendCommand({ command: 'reloadLicense' });
await Container.get(Publisher).publishCommand({ command: 'reloadLicense' });
}

const isS3Selected = config.getEnv('binaryDataManager.mode') === 's3';
Expand Down
12 changes: 6 additions & 6 deletions packages/cli/src/scaling/__tests__/publisher.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ describe('Publisher', () => {
});
});

describe('sendCommand', () => {
it('should send command to `n8n.commands` pubsub channel', async () => {
describe('publishCommand', () => {
it('should publish command into `n8n.commands` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<RedisServiceCommandObject>({ command: 'reloadLicense' });

await publisher.sendCommand(msg);
await publisher.publishCommand(msg);

expect(client.publish).toHaveBeenCalledWith(
'n8n.commands',
Expand All @@ -58,14 +58,14 @@ describe('Publisher', () => {
});
});

describe('sendResponse', () => {
it('should send response to `n8n.worker-response` pubsub channel', async () => {
describe('publishResponse', () => {
it('should publish response into `n8n.worker-response` pubsub channel', async () => {
const publisher = new Publisher(mock(), redisClientService);
const msg = mock<RedisServiceWorkerResponseObject>({
command: 'reloadExternalSecretsProviders',
});

await publisher.sendResponse(msg);
await publisher.publishResponse(msg);

expect(client.publish).toHaveBeenCalledWith('n8n.worker-response', JSON.stringify(msg));
});
Expand Down
8 changes: 4 additions & 4 deletions packages/cli/src/scaling/pubsub/publisher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ export class Publisher {

// #region Publishing

/** Send a command into the `n8n.commands` channel. */
async sendCommand(msg: Omit<RedisServiceCommandObject, 'senderId'>) {
/** Publish a command into the `n8n.commands` channel. */
async publishCommand(msg: Omit<RedisServiceCommandObject, 'senderId'>) {
await this.client.publish(
'n8n.commands',
JSON.stringify({ ...msg, senderId: config.getEnv('redis.queueModeId') }),
Expand All @@ -53,8 +53,8 @@ export class Publisher {
this.logger.debug(`Published ${msg.command} to command channel`);
}

/** Send a response for a command into the `n8n.worker-response` channel. */
async sendResponse(msg: RedisServiceWorkerResponseObject) {
/** Publish a response for a command into the `n8n.worker-response` channel. */
async publishResponse(msg: RedisServiceWorkerResponseObject) {
await this.client.publish('n8n.worker-response', JSON.stringify(msg));

this.logger.debug(`Published response for ${msg.command} to worker response channel`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ describe('Orchestration Service', () => {

test('should send command messages', async () => {
// @ts-expect-error Private field
jest.spyOn(os.publisher, 'sendCommand').mockImplementation(async () => {});
jest.spyOn(os.publisher, 'publishCommand').mockImplementation(async () => {});
await os.getWorkerIds();
// @ts-expect-error Private field
expect(os.publisher.sendCommand).toHaveBeenCalled();
expect(os.publisher.publishCommand).toHaveBeenCalled();
// @ts-expect-error Private field
jest.spyOn(os.publisher, 'sendCommand').mockRestore();
jest.spyOn(os.publisher, 'publishCommand').mockRestore();
});

test('should prevent receiving commands too often', async () => {
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/services/orchestration.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export class OrchestrationService {

this.logger.debug(`[Instance ID ${this.instanceId}] Publishing command "${command}"`, payload);

await this.publisher.sendCommand({ command, payload });
await this.publisher.publishCommand({ command, payload });
}

// ----------------------------------
Expand All @@ -118,7 +118,7 @@ export class OrchestrationService {

this.logger.debug(`Sending "${command}" to command channel`);

await this.publisher.sendCommand({
await this.publisher.publishCommand({
command,
targets: id ? [id] : undefined,
});
Expand All @@ -131,7 +131,7 @@ export class OrchestrationService {

this.logger.debug(`Sending "${command}" to command channel`);

await this.publisher.sendCommand({ command });
await this.publisher.publishCommand({ command });
}

// ----------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
switch (message.command) {
case 'getStatus':
if (!debounceMessageReceiver(message, 500)) return;
await options.publisher.sendResponse({
await options.publisher.publishResponse({
workerId: options.queueModeId,
command: 'getStatus',
payload: {
Expand All @@ -66,7 +66,7 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
break;
case 'getId':
if (!debounceMessageReceiver(message, 500)) return;
await options.publisher.sendResponse({
await options.publisher.publishResponse({
workerId: options.queueModeId,
command: 'getId',
});
Expand All @@ -75,15 +75,15 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
if (!debounceMessageReceiver(message, 500)) return;
try {
await Container.get(MessageEventBus).restart();
await options.publisher.sendResponse({
await options.publisher.publishResponse({
workerId: options.queueModeId,
command: 'restartEventBus',
payload: {
result: 'success',
},
});
} catch (error) {
await options.publisher.sendResponse({
await options.publisher.publishResponse({
workerId: options.queueModeId,
command: 'restartEventBus',
payload: {
Expand All @@ -97,15 +97,15 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa
if (!debounceMessageReceiver(message, 500)) return;
try {
await Container.get(ExternalSecretsManager).reloadAllProviders();
await options.publisher.sendResponse({
await options.publisher.publishResponse({
workerId: options.queueModeId,
command: 'reloadExternalSecretsProviders',
payload: {
result: 'success',
},
});
} catch (error) {
await options.publisher.sendResponse({
await options.publisher.publishResponse({
workerId: options.queueModeId,
command: 'reloadExternalSecretsProviders',
payload: {
Expand Down

0 comments on commit 9543d88

Please sign in to comment.