From d0fc9dee0e17211c1ed130b19286e9573c9ebfbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 20 Aug 2024 12:32:31 +0200 Subject: [PATCH] feat(core): Support bidirectional communication between specific mains and specific workers (#10377) --- packages/cli/src/commands/start.ts | 5 +- .../__tests__/orchestration.service.test.ts | 4 +- .../orchestration.handler.base.service.ts | 9 +++- .../main/handleWorkerResponseMessageMain.ts | 51 ++++++++++++------- .../orchestration.handler.main.service.ts | 5 +- .../src/services/orchestration/main/types.ts | 6 +++ .../services/redis/RedisServiceCommands.ts | 2 +- 7 files changed, 57 insertions(+), 25 deletions(-) create mode 100644 packages/cli/src/services/orchestration/main/types.ts diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index cc4555dc5e65b..16a227e52694a 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -239,7 +239,10 @@ export class Start extends BaseCommand { await orchestrationService.init(); - await Container.get(OrchestrationHandlerMainService).init(); + await Container.get(OrchestrationHandlerMainService).initWithOptions({ + queueModeId: this.queueModeId, + redisPublisher: Container.get(OrchestrationService).redisPublisher, + }); if (!orchestrationService.isMultiMainSetupEnabled) return; diff --git a/packages/cli/src/services/__tests__/orchestration.service.test.ts b/packages/cli/src/services/__tests__/orchestration.service.test.ts index c69e674613e25..ed4883768ce08 100644 --- a/packages/cli/src/services/__tests__/orchestration.service.test.ts +++ b/packages/cli/src/services/__tests__/orchestration.service.test.ts @@ -19,6 +19,7 @@ import { Push } from '@/push'; import { ActiveWorkflowManager } from '@/ActiveWorkflowManager'; import { mockInstance } from '@test/mocking'; import { RedisClientService } from '@/services/redis/redis-client.service'; +import type { MainResponseReceivedHandlerOptions } from '../orchestration/main/types'; const instanceSettings = Container.get(InstanceSettings); const redisClientService = mockInstance(RedisClientService); @@ -96,8 +97,9 @@ describe('Orchestration Service', () => { test('should handle worker responses', async () => { const response = await handleWorkerResponseMessageMain( JSON.stringify(workerRestartEventBusResponse), + mock(), ); - expect(response.command).toEqual('restartEventBus'); + expect(response?.command).toEqual('restartEventBus'); }); test('should handle command messages from others', async () => { diff --git a/packages/cli/src/services/orchestration.handler.base.service.ts b/packages/cli/src/services/orchestration.handler.base.service.ts index 6a706434e43bb..b2507151ce410 100644 --- a/packages/cli/src/services/orchestration.handler.base.service.ts +++ b/packages/cli/src/services/orchestration.handler.base.service.ts @@ -2,6 +2,7 @@ import Container from 'typedi'; import { RedisService } from './redis.service'; import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; import type { WorkerCommandReceivedHandlerOptions } from './orchestration/worker/types'; +import type { MainResponseReceivedHandlerOptions } from './orchestration/main/types'; export abstract class OrchestrationHandlerService { protected initialized = false; @@ -19,7 +20,9 @@ export abstract class OrchestrationHandlerService { this.initialized = true; } - async initWithOptions(options: WorkerCommandReceivedHandlerOptions) { + async initWithOptions( + options: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions, + ) { await this.initSubscriber(options); this.initialized = true; } @@ -29,5 +32,7 @@ export abstract class OrchestrationHandlerService { this.initialized = false; } - protected abstract initSubscriber(options?: WorkerCommandReceivedHandlerOptions): Promise; + protected abstract initSubscriber( + options?: WorkerCommandReceivedHandlerOptions | MainResponseReceivedHandlerOptions, + ): Promise; } diff --git a/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts b/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts index 47a1a080192f6..4e60d53741333 100644 --- a/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleWorkerResponseMessageMain.ts @@ -3,25 +3,40 @@ import Container from 'typedi'; import { Logger } from '@/Logger'; import { Push } from '../../../push'; import type { RedisServiceWorkerResponseObject } from '../../redis/RedisServiceCommands'; +import { WORKER_RESPONSE_REDIS_CHANNEL } from '@/services/redis/RedisConstants'; +import type { MainResponseReceivedHandlerOptions } from './types'; -export async function handleWorkerResponseMessageMain(messageString: string) { - const workerResponse = jsonParse(messageString); - if (workerResponse) { - switch (workerResponse.command) { - case 'getStatus': - const push = Container.get(Push); - push.broadcast('sendWorkerStatusMessage', { - workerId: workerResponse.workerId, - status: workerResponse.payload, - }); - break; - case 'getId': - break; - default: - Container.get(Logger).debug( - `Received worker response ${workerResponse.command} from ${workerResponse.workerId}`, - ); - } +export async function handleWorkerResponseMessageMain( + messageString: string, + options: MainResponseReceivedHandlerOptions, +) { + const workerResponse = jsonParse(messageString, { + fallbackValue: null, + }); + + if (!workerResponse) { + Container.get(Logger).debug( + `Received invalid message via channel ${WORKER_RESPONSE_REDIS_CHANNEL}: "${messageString}"`, + ); + return; } + + if (workerResponse.targets && !workerResponse.targets.includes(options.queueModeId)) return; + + switch (workerResponse.command) { + case 'getStatus': + Container.get(Push).broadcast('sendWorkerStatusMessage', { + workerId: workerResponse.workerId, + status: workerResponse.payload, + }); + break; + case 'getId': + break; + default: + Container.get(Logger).debug( + `Received worker response ${workerResponse.command} from ${workerResponse.workerId}`, + ); + } + return workerResponse; } diff --git a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts index 6cc86c9f51eec..983c39fd3333d 100644 --- a/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts +++ b/packages/cli/src/services/orchestration/main/orchestration.handler.main.service.ts @@ -3,10 +3,11 @@ import { COMMAND_REDIS_CHANNEL, WORKER_RESPONSE_REDIS_CHANNEL } from '../../redi import { handleWorkerResponseMessageMain } from './handleWorkerResponseMessageMain'; import { handleCommandMessageMain } from './handleCommandMessageMain'; import { OrchestrationHandlerService } from '../../orchestration.handler.base.service'; +import type { MainResponseReceivedHandlerOptions } from './types'; @Service() export class OrchestrationHandlerMainService extends OrchestrationHandlerService { - async initSubscriber() { + async initSubscriber(options: MainResponseReceivedHandlerOptions) { this.redisSubscriber = await this.redisService.getPubSubSubscriber(); await this.redisSubscriber.subscribeToCommandChannel(); @@ -16,7 +17,7 @@ export class OrchestrationHandlerMainService extends OrchestrationHandlerService 'OrchestrationMessageReceiver', async (channel: string, messageString: string) => { if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { - await handleWorkerResponseMessageMain(messageString); + await handleWorkerResponseMessageMain(messageString, options); } else if (channel === COMMAND_REDIS_CHANNEL) { await handleCommandMessageMain(messageString); } diff --git a/packages/cli/src/services/orchestration/main/types.ts b/packages/cli/src/services/orchestration/main/types.ts new file mode 100644 index 0000000000000..d189d7cdf7b13 --- /dev/null +++ b/packages/cli/src/services/orchestration/main/types.ts @@ -0,0 +1,6 @@ +import type { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; + +export type MainResponseReceivedHandlerOptions = { + queueModeId: string; + redisPublisher: RedisServicePubSubPublisher; +}; diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index a8ae41c11390b..b7786adad31ef 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -94,7 +94,7 @@ export type RedisServiceWorkerResponseObject = { workflowId: string; }; } -); +) & { targets?: string[] }; export type RedisServiceCommandObject = { targets?: string[];