From 7b49cf2a2c750d685af6cff464401f38482dac5a Mon Sep 17 00:00:00 2001 From: Michael Auerswald Date: Thu, 7 Sep 2023 14:44:19 +0200 Subject: [PATCH] feat(core): Add commands to workers to respond with current state (#7029) This PR adds new endpoints to the REST API: `/orchestration/worker/status` and `/orchestration/worker/id` Currently these just trigger the return of status / ids from the workers via the redis back channel, this still needs to be handled and passed through to the frontend. It also adds the eventbus to each worker, and triggers a reload of those eventbus instances when the configuration changes on the main instances. --- packages/cli/src/AbstractServer.ts | 83 +------ packages/cli/src/Server.ts | 2 + packages/cli/src/commands/BaseCommand.ts | 4 +- packages/cli/src/commands/worker.ts | 222 +++++++++++------- .../cli/src/commands/workerCommandHandler.ts | 82 +++++++ .../controllers/orchestration.controller.ts | 35 +++ .../src/eventbus/EventMessageClasses/index.ts | 10 +- .../MessageEventBus/MessageEventBus.ts | 129 ++++++---- packages/cli/src/requests.ts | 9 + .../cli/src/services/orchestration.service.ts | 172 ++++++++++++++ .../services/redis/RedisServiceCommands.ts | 38 ++- .../redis/RedisServicePubSubSubscriber.ts | 4 +- .../integration/commands/worker.cmd.test.ts | 81 +++++++ .../services/orchestration.service.test.ts | 140 +++++++++++ 14 files changed, 790 insertions(+), 221 deletions(-) create mode 100644 packages/cli/src/commands/workerCommandHandler.ts create mode 100644 packages/cli/src/controllers/orchestration.controller.ts create mode 100644 packages/cli/src/services/orchestration.service.ts create mode 100644 packages/cli/test/integration/commands/worker.cmd.test.ts create mode 100644 packages/cli/test/unit/services/orchestration.service.test.ts diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index fa321a6724b3d..2e25342bc1fcc 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -4,7 +4,7 @@ import type { Server } from 'http'; import express from 'express'; import compression from 'compression'; import isbot from 'isbot'; -import { jsonParse, LoggerProxy as Logger } from 'n8n-workflow'; +import { LoggerProxy as Logger } from 'n8n-workflow'; import config from '@/config'; import { N8N_VERSION, inDevelopment, inTest } from '@/constants'; @@ -18,16 +18,8 @@ import { rawBodyReader, bodyParser, corsMiddleware } from '@/middlewares'; import { TestWebhooks } from '@/TestWebhooks'; import { WaitingWebhooks } from '@/WaitingWebhooks'; import { webhookRequestHandler } from '@/WebhookHelpers'; -import { RedisService } from '@/services/redis.service'; -import { eventBus } from './eventbus'; -import type { AbstractEventMessageOptions } from './eventbus/EventMessageClasses/AbstractEventMessageOptions'; -import { getEventMessageObjectByType } from './eventbus/EventMessageClasses/Helpers'; -import type { RedisServiceWorkerResponseObject } from './services/redis/RedisServiceCommands'; -import { - EVENT_BUS_REDIS_CHANNEL, - WORKER_RESPONSE_REDIS_CHANNEL, -} from './services/redis/RedisServiceHelper'; import { generateHostInstanceId } from './databases/utils/generators'; +import { OrchestrationService } from './services/orchestration.service'; export abstract class AbstractServer { protected server: Server; @@ -124,78 +116,11 @@ export abstract class AbstractServer { }); if (config.getEnv('executions.mode') === 'queue') { - await this.setupRedis(); + // will start the redis connections + await Container.get(OrchestrationService).init(this.uniqueInstanceId); } } - // This connection is going to be our heartbeat - // IORedis automatically pings redis and tries to reconnect - // We will be using a retryStrategy to control how and when to exit. - // We are also subscribing to the event log channel to receive events from workers - private async setupRedis() { - const redisService = Container.get(RedisService); - const redisSubscriber = await redisService.getPubSubSubscriber(); - - // TODO: these are all proof of concept implementations for the moment - // until worker communication is implemented - // #region proof of concept - await redisSubscriber.subscribeToEventLog(); - await redisSubscriber.subscribeToWorkerResponseChannel(); - redisSubscriber.addMessageHandler( - 'AbstractServerReceiver', - async (channel: string, message: string) => { - // TODO: this is a proof of concept implementation to forward events to the main instance's event bus - // Events are arriving through a pub/sub channel and are forwarded to the eventBus - // In the future, a stream should probably replace this implementation entirely - if (channel === EVENT_BUS_REDIS_CHANNEL) { - const eventData = jsonParse(message); - if (eventData) { - const eventMessage = getEventMessageObjectByType(eventData); - if (eventMessage) { - await eventBus.send(eventMessage); - } - } - } else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { - // The back channel from the workers as a pub/sub channel - const workerResponse = jsonParse(message); - if (workerResponse) { - // TODO: Handle worker response - console.log('Received worker response', workerResponse); - } - } - }, - ); - // TODO: Leave comments for now as implementation example - // const redisStreamListener = await redisService.getStreamConsumer(); - // void redisStreamListener.listenToStream('teststream'); - // redisStreamListener.addMessageHandler( - // 'MessageLogger', - // async (stream: string, id: string, message: string[]) => { - // // TODO: this is a proof of concept implementation of a stream consumer - // switch (stream) { - // case EVENT_BUS_REDIS_STREAM: - // case COMMAND_REDIS_STREAM: - // case WORKER_RESPONSE_REDIS_STREAM: - // default: - // LoggerProxy.debug( - // `Received message from stream ${stream} with id ${id} and message ${message.join( - // ',', - // )}`, - // ); - // break; - // } - // }, - // ); - - // const redisListReceiver = await redisService.getListReceiver(); - // await redisListReceiver.init(); - - // setInterval(async () => { - // await redisListReceiver.popLatestWorkerResponse(); - // }, 1000); - // #endregion - } - async init(): Promise { const { app, protocol, sslKey, sslCert } = this; diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 3d8f8de2aefa1..3903373c71e35 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -177,6 +177,7 @@ import { handleMfaDisable, isMfaFeatureEnabled } from './Mfa/helpers'; import { JwtService } from './services/jwt.service'; import { RoleService } from './services/role.service'; import { UserService } from './services/user.service'; +import { OrchestrationController } from './controllers/orchestration.controller'; const exec = promisify(callbackExec); @@ -551,6 +552,7 @@ export class Server extends AbstractServer { Container.get(SourceControlController), Container.get(WorkflowStatisticsController), Container.get(ExternalSecretsController), + Container.get(OrchestrationController), ]; if (isLdapEnabled()) { diff --git a/packages/cli/src/commands/BaseCommand.ts b/packages/cli/src/commands/BaseCommand.ts index 78be7cc23f04c..008e9f4e93435 100644 --- a/packages/cli/src/commands/BaseCommand.ts +++ b/packages/cli/src/commands/BaseCommand.ts @@ -103,12 +103,12 @@ export abstract class BaseCommand extends Command { process.exit(1); } - protected async initBinaryManager() { + async initBinaryManager() { const binaryDataConfig = config.getEnv('binaryDataManager'); await BinaryDataManager.init(binaryDataConfig, true); } - protected async initExternalHooks() { + async initExternalHooks() { this.externalHooks = Container.get(ExternalHooks); await this.externalHooks.init(); } diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index dfaf0e34c0c62..320c08aa9c8af 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -27,6 +27,11 @@ import { generateHostInstanceId } from '@/databases/utils/generators'; import type { ICredentialsOverwrite } from '@/Interfaces'; import { CredentialsOverwrites } from '@/CredentialsOverwrites'; import { rawBodyReader, bodyParser } from '@/middlewares'; +import { eventBus } from '../eventbus'; +import { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; +import { RedisServicePubSubSubscriber } from '../services/redis/RedisServicePubSubSubscriber'; +import { EventMessageGeneric } from '../eventbus/EventMessageClasses/EventMessageGeneric'; +import { getWorkerCommandReceivedHandler } from './workerCommandHandler'; export class Worker extends BaseCommand { static description = '\nStarts a n8n worker'; @@ -49,6 +54,10 @@ export class Worker extends BaseCommand { readonly uniqueInstanceId = generateHostInstanceId('worker'); + redisPublisher: RedisServicePubSubPublisher; + + redisSubscriber: RedisServicePubSubSubscriber; + /** * Stop n8n in a graceful way. * Make for example sure that all the webhooks from third party services @@ -240,9 +249,48 @@ export class Worker extends BaseCommand { await this.initBinaryManager(); await this.initExternalHooks(); await this.initExternalSecrets(); + await this.initEventBus(); + await this.initRedis(); + await this.initQueue(); } - async run() { + async initEventBus() { + await eventBus.initialize({ + workerId: this.uniqueInstanceId, + }); + } + + /** + * Initializes the redis connection + * A publishing connection to redis is created to publish events to the event log + * A subscription connection to redis is created to subscribe to commands from the main process + * The subscription connection adds a handler to handle the command messages + */ + async initRedis() { + this.redisPublisher = Container.get(RedisServicePubSubPublisher); + this.redisSubscriber = Container.get(RedisServicePubSubSubscriber); + await this.redisPublisher.init(); + await this.redisPublisher.publishToEventLog( + new EventMessageGeneric({ + eventName: 'n8n.worker.started', + payload: { + workerId: this.uniqueInstanceId, + }, + }), + ); + await this.redisSubscriber.subscribeToCommandChannel(); + this.redisSubscriber.addMessageHandler( + 'WorkerCommandReceivedHandler', + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + getWorkerCommandReceivedHandler({ + uniqueInstanceId: this.uniqueInstanceId, + redisPublisher: this.redisPublisher, + getRunningJobIds: () => Object.keys(Worker.runningJobs), + }), + ); + } + + async initQueue() { // eslint-disable-next-line @typescript-eslint/no-shadow const { flags } = this.parse(Worker); @@ -255,11 +303,6 @@ export class Worker extends BaseCommand { this.runJob(job, this.nodeTypes), ); - this.logger.info('\nn8n worker is now ready'); - this.logger.info(` * Version: ${N8N_VERSION}`); - this.logger.info(` * Concurrency: ${flags.concurrency}`); - this.logger.info(''); - Worker.jobQueue.on('global:progress', (jobId: JobId, progress) => { // Progress of a job got updated which does get used // to communicate that a job got canceled. @@ -305,105 +348,116 @@ export class Worker extends BaseCommand { throw error; } }); + } - if (config.getEnv('queue.health.active')) { - const port = config.getEnv('queue.health.port'); + async setupHealthMonitor() { + const port = config.getEnv('queue.health.port'); - const app = express(); - app.disable('x-powered-by'); + const app = express(); + app.disable('x-powered-by'); - const server = http.createServer(app); + const server = http.createServer(app); - app.get( - '/healthz', + app.get( + '/healthz', - async (req: express.Request, res: express.Response) => { - LoggerProxy.debug('Health check started!'); + async (req: express.Request, res: express.Response) => { + LoggerProxy.debug('Health check started!'); - const connection = Db.getConnection(); + const connection = Db.getConnection(); - try { - if (!connection.isInitialized) { - // Connection is not active - throw new Error('No active database connection!'); - } - // DB ping - await connection.query('SELECT 1'); - } catch (e) { - LoggerProxy.error('No Database connection!', e as Error); - const error = new ResponseHelper.ServiceUnavailableError('No Database connection!'); - return ResponseHelper.sendErrorResponse(res, error); + try { + if (!connection.isInitialized) { + // Connection is not active + throw new Error('No active database connection!'); } + // DB ping + await connection.query('SELECT 1'); + } catch (e) { + LoggerProxy.error('No Database connection!', e as Error); + const error = new ResponseHelper.ServiceUnavailableError('No Database connection!'); + return ResponseHelper.sendErrorResponse(res, error); + } - // Just to be complete, generally will the worker stop automatically - // if it loses the connection to redis - try { - // Redis ping - await Worker.jobQueue.client.ping(); - } catch (e) { - LoggerProxy.error('No Redis connection!', e as Error); - const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!'); - return ResponseHelper.sendErrorResponse(res, error); - } + // Just to be complete, generally will the worker stop automatically + // if it loses the connection to redis + try { + // Redis ping + await Worker.jobQueue.client.ping(); + } catch (e) { + LoggerProxy.error('No Redis connection!', e as Error); + const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!'); + return ResponseHelper.sendErrorResponse(res, error); + } - // Everything fine - const responseData = { - status: 'ok', - }; + // Everything fine + const responseData = { + status: 'ok', + }; - LoggerProxy.debug('Health check completed successfully!'); + LoggerProxy.debug('Health check completed successfully!'); - ResponseHelper.sendSuccessResponse(res, responseData, true, 200); - }, - ); + ResponseHelper.sendSuccessResponse(res, responseData, true, 200); + }, + ); - let presetCredentialsLoaded = false; - const endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); - if (endpointPresetCredentials !== '') { - // POST endpoint to set preset credentials - app.post( - `/${endpointPresetCredentials}`, - rawBodyReader, - bodyParser, - async (req: express.Request, res: express.Response) => { - if (!presetCredentialsLoaded) { - const body = req.body as ICredentialsOverwrite; - - if (req.contentType !== 'application/json') { - ResponseHelper.sendErrorResponse( - res, - new Error( - 'Body must be a valid JSON, make sure the content-type is application/json', - ), - ); - return; - } - - CredentialsOverwrites().setData(body); - presetCredentialsLoaded = true; - ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200); - } else { + let presetCredentialsLoaded = false; + const endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); + if (endpointPresetCredentials !== '') { + // POST endpoint to set preset credentials + app.post( + `/${endpointPresetCredentials}`, + rawBodyReader, + bodyParser, + async (req: express.Request, res: express.Response) => { + if (!presetCredentialsLoaded) { + const body = req.body as ICredentialsOverwrite; + + if (req.contentType !== 'application/json') { ResponseHelper.sendErrorResponse( res, - new Error('Preset credentials can be set once'), + new Error( + 'Body must be a valid JSON, make sure the content-type is application/json', + ), ); + return; } - }, + + CredentialsOverwrites().setData(body); + presetCredentialsLoaded = true; + ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200); + } else { + ResponseHelper.sendErrorResponse(res, new Error('Preset credentials can be set once')); + } + }, + ); + } + + server.on('error', (error: Error & { code: string }) => { + if (error.code === 'EADDRINUSE') { + this.logger.error( + `n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`, ); + process.exit(1); } + }); - server.on('error', (error: Error & { code: string }) => { - if (error.code === 'EADDRINUSE') { - this.logger.error( - `n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`, - ); - process.exit(1); - } - }); + await new Promise((resolve) => server.listen(port, () => resolve())); + await this.externalHooks.run('worker.ready'); + this.logger.info(`\nn8n worker health check via, port ${port}`); + } + + async run() { + // eslint-disable-next-line @typescript-eslint/no-shadow + const { flags } = this.parse(Worker); - await new Promise((resolve) => server.listen(port, () => resolve())); - await this.externalHooks.run('worker.ready'); - this.logger.info(`\nn8n worker health check via, port ${port}`); + this.logger.info('\nn8n worker is now ready'); + this.logger.info(` * Version: ${N8N_VERSION}`); + this.logger.info(` * Concurrency: ${flags.concurrency}`); + this.logger.info(''); + + if (config.getEnv('queue.health.active')) { + await this.setupHealthMonitor(); } // Make sure that the process does not close diff --git a/packages/cli/src/commands/workerCommandHandler.ts b/packages/cli/src/commands/workerCommandHandler.ts new file mode 100644 index 0000000000000..874ead410c542 --- /dev/null +++ b/packages/cli/src/commands/workerCommandHandler.ts @@ -0,0 +1,82 @@ +import { jsonParse, LoggerProxy } from 'n8n-workflow'; +import { eventBus } from '../eventbus'; +import type { RedisServiceCommandObject } from '@/services/redis/RedisServiceCommands'; +import { COMMAND_REDIS_CHANNEL } from '@/services/redis/RedisServiceHelper'; +import type { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSubPublisher'; +import * as os from 'os'; + +export function getWorkerCommandReceivedHandler(options: { + uniqueInstanceId: string; + redisPublisher: RedisServicePubSubPublisher; + getRunningJobIds: () => string[]; +}) { + return async (channel: string, messageString: string) => { + if (channel === COMMAND_REDIS_CHANNEL) { + if (!messageString) return; + let message: RedisServiceCommandObject; + try { + message = jsonParse(messageString); + } catch { + LoggerProxy.debug( + `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, + ); + return; + } + if (message) { + if (message.targets && !message.targets.includes(options.uniqueInstanceId)) { + return; // early return if the message is not for this worker + } + switch (message.command) { + case 'getStatus': + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.uniqueInstanceId, + command: message.command, + payload: { + workerId: options.uniqueInstanceId, + runningJobs: options.getRunningJobIds(), + freeMem: os.freemem(), + totalMem: os.totalmem(), + uptime: process.uptime(), + loadAvg: os.loadavg(), + cpus: os.cpus().map((cpu) => `${cpu.model} - speed: ${cpu.speed}`), + arch: os.arch(), + platform: os.platform(), + hostname: os.hostname(), + net: Object.values(os.networkInterfaces()).flatMap( + (interfaces) => + interfaces?.map((net) => `${net.family} - address: ${net.address}`) ?? '', + ), + }, + }); + break; + case 'getId': + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.uniqueInstanceId, + command: message.command, + }); + break; + case 'restartEventBus': + await eventBus.restart(); + await options.redisPublisher.publishToWorkerChannel({ + workerId: options.uniqueInstanceId, + command: message.command, + payload: { + result: 'success', + }, + }); + break; + case 'stopWorker': + // TODO: implement proper shutdown + // await this.stopProcess(); + break; + default: + LoggerProxy.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`, + ); + break; + } + } + } + }; +} diff --git a/packages/cli/src/controllers/orchestration.controller.ts b/packages/cli/src/controllers/orchestration.controller.ts new file mode 100644 index 0000000000000..fd6b68ad4e167 --- /dev/null +++ b/packages/cli/src/controllers/orchestration.controller.ts @@ -0,0 +1,35 @@ +import config from '@/config'; +import { Authorized, Get, RestController } from '@/decorators'; +import { OrchestrationRequest } from '@/requests'; +import { Service } from 'typedi'; +import { OrchestrationService } from '../services/orchestration.service'; + +@Authorized(['global', 'owner']) +@RestController('/orchestration') +@Service() +export class OrchestrationController { + private config = config; + + constructor(private readonly orchestrationService: OrchestrationService) {} + + /** + * These endpoint currently do not return anything, they just trigger the messsage to + * the workers to respond on Redis with their status. + * TODO: these responses need to be forwarded to and handled by the frontend + */ + @Get('/worker/status/:id') + async getWorkersStatus(req: OrchestrationRequest.Get) { + const id = req.params.id; + return this.orchestrationService.getWorkerStatus(id); + } + + @Get('/worker/status') + async getWorkersStatusAll() { + return this.orchestrationService.getWorkerStatus(); + } + + @Get('/worker/ids') + async getWorkerIdsAll() { + return this.orchestrationService.getWorkerIds(); + } +} diff --git a/packages/cli/src/eventbus/EventMessageClasses/index.ts b/packages/cli/src/eventbus/EventMessageClasses/index.ts index 28da7c5eccaa4..c6a0f85bd99ff 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/index.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/index.ts @@ -9,6 +9,7 @@ export const eventNamesWorkflow = [ 'n8n.workflow.failed', 'n8n.workflow.crashed', ] as const; +export const eventNamesGeneric = ['n8n.worker.started', 'n8n.worker.stopped'] as const; export const eventNamesNode = ['n8n.node.started', 'n8n.node.finished'] as const; export const eventNamesAudit = [ 'n8n.audit.user.login.success', @@ -37,14 +38,21 @@ export const eventNamesAudit = [ export type EventNamesWorkflowType = (typeof eventNamesWorkflow)[number]; export type EventNamesAuditType = (typeof eventNamesAudit)[number]; export type EventNamesNodeType = (typeof eventNamesNode)[number]; +export type EventNamesGenericType = (typeof eventNamesGeneric)[number]; export type EventNamesTypes = | EventNamesAuditType | EventNamesWorkflowType | EventNamesNodeType + | EventNamesGenericType | 'n8n.destination.test'; -export const eventNamesAll = [...eventNamesAudit, ...eventNamesWorkflow, ...eventNamesNode]; +export const eventNamesAll = [ + ...eventNamesAudit, + ...eventNamesWorkflow, + ...eventNamesNode, + ...eventNamesGeneric, +]; export type EventMessageTypes = | EventMessageGeneric diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index c6d14bf9a3180..43059bf15954b 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -29,6 +29,7 @@ import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; import Container from 'typedi'; import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; +import { OrchestrationService } from '../../services/orchestration.service'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -37,6 +38,11 @@ export interface MessageWithCallback { confirmCallback: (message: EventMessageTypes, src: EventMessageConfirmSource) => void; } +export interface MessageEventBusInitializeOptions { + skipRecoveryPass?: boolean; + workerId?: string; +} + export class MessageEventBus extends EventEmitter { private static instance: MessageEventBus; @@ -70,7 +76,7 @@ export class MessageEventBus extends EventEmitter { * * Sets `isInitialized` to `true` once finished. */ - async initialize() { + async initialize(options?: MessageEventBusInitializeOptions): Promise { if (this.isInitialized) { return; } @@ -93,64 +99,75 @@ export class MessageEventBus extends EventEmitter { } LoggerProxy.debug('Initializing event writer'); - this.logWriter = await MessageEventBusLogWriter.getInstance(); + if (options?.workerId) { + // only add 'worker' to log file name since the ID changes on every start and we + // would not be able to recover the log files from the previous run not knowing it + const logBaseName = config.getEnv('eventBus.logWriter.logBaseName') + '-worker'; + this.logWriter = await MessageEventBusLogWriter.getInstance({ + logBaseName, + }); + } else { + this.logWriter = await MessageEventBusLogWriter.getInstance(); + } if (!this.logWriter) { LoggerProxy.warn('Could not initialize event writer'); } - // unsent event check: - // - find unsent messages in current event log(s) - // - cycle event logs and start the logging to a fresh file - // - retry sending events - LoggerProxy.debug('Checking for unsent event messages'); - const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions(); - LoggerProxy.debug( - `Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `, - ); - this.logWriter?.startLogging(); - await this.send(unsentAndUnfinished.unsentMessages); - - const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions); - - if (unfinishedExecutionIds.length > 0) { - LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`); - LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.'); - const activeWorkflows = await Container.get(WorkflowRepository).find({ - where: { active: true }, - select: ['id', 'name'], - }); - if (activeWorkflows.length > 0) { - LoggerProxy.info('Currently active workflows:'); - for (const workflowData of activeWorkflows) { - LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`); + if (options?.skipRecoveryPass) { + LoggerProxy.debug('Skipping unsent event check'); + } else { + // unsent event check: + // - find unsent messages in current event log(s) + // - cycle event logs and start the logging to a fresh file + // - retry sending events + LoggerProxy.debug('Checking for unsent event messages'); + const unsentAndUnfinished = await this.getUnsentAndUnfinishedExecutions(); + LoggerProxy.debug( + `Start logging into ${this.logWriter?.getLogFileName() ?? 'unknown filename'} `, + ); + this.logWriter?.startLogging(); + await this.send(unsentAndUnfinished.unsentMessages); + + const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions); + + if (unfinishedExecutionIds.length > 0) { + LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`); + LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.'); + const activeWorkflows = await Container.get(WorkflowRepository).find({ + where: { active: true }, + select: ['id', 'name'], + }); + if (activeWorkflows.length > 0) { + LoggerProxy.info('Currently active workflows:'); + for (const workflowData of activeWorkflows) { + LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`); + } } - } - - const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning(); - if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') { - await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds); - // if we end up here, it means that the previous recovery process did not finish - // a possible reason would be that recreating the workflow data itself caused e.g an OOM error - // in that case, we do not want to retry the recovery process, but rather mark the executions as crashed - if (recoveryAlreadyAttempted) - LoggerProxy.warn('Skipped recovery process since it previously failed.'); - } else { - // start actual recovery process and write recovery process flag file - this.logWriter?.startRecoveryProcess(); - for (const executionId of unfinishedExecutionIds) { - LoggerProxy.warn(`Attempting to recover execution ${executionId}`); - await recoverExecutionDataFromEventLogMessages( - executionId, - unsentAndUnfinished.unfinishedExecutions[executionId], - true, - ); + const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning(); + if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') { + await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds); + // if we end up here, it means that the previous recovery process did not finish + // a possible reason would be that recreating the workflow data itself caused e.g an OOM error + // in that case, we do not want to retry the recovery process, but rather mark the executions as crashed + if (recoveryAlreadyAttempted) + LoggerProxy.warn('Skipped recovery process since it previously failed.'); + } else { + // start actual recovery process and write recovery process flag file + this.logWriter?.startRecoveryProcess(); + for (const executionId of unfinishedExecutionIds) { + LoggerProxy.warn(`Attempting to recover execution ${executionId}`); + await recoverExecutionDataFromEventLogMessages( + executionId, + unsentAndUnfinished.unfinishedExecutions[executionId], + true, + ); + } } + // remove the recovery process flag file + this.logWriter?.endRecoveryProcess(); } - // remove the recovery process flag file - this.logWriter?.endRecoveryProcess(); } - // if configured, run this test every n ms if (config.getEnv('eventBus.checkUnsentInterval') > 0) { if (this.pushIntervalTimer) { @@ -192,6 +209,12 @@ export class MessageEventBus extends EventEmitter { return result; } + async broadcastRestartEventbusAfterDestinationUpdate() { + if (config.getEnv('executions.mode') === 'queue') { + await Container.get(OrchestrationService).restartEventBus(); + } + } + private async trySendingUnsent(msgs?: EventMessageTypes[]) { const unsentMessages = msgs ?? (await this.getEventsUnsent()); if (unsentMessages.length > 0) { @@ -212,9 +235,15 @@ export class MessageEventBus extends EventEmitter { ); await this.destinations[destinationName].close(); } + this.isInitialized = false; LoggerProxy.debug('EventBus shut down.'); } + async restart() { + await this.close(); + await this.initialize({ skipRecoveryPass: true }); + } + async send(msgs: EventMessageTypes | EventMessageTypes[]) { if (!Array.isArray(msgs)) { msgs = [msgs]; diff --git a/packages/cli/src/requests.ts b/packages/cli/src/requests.ts index 7a63bc889ffdc..82f2c6a7a1860 100644 --- a/packages/cli/src/requests.ts +++ b/packages/cli/src/requests.ts @@ -535,3 +535,12 @@ export declare namespace ExternalSecretsRequest { type UpdateProvider = AuthenticatedRequest<{ provider: string }>; } + +// ---------------------------------- +// /orchestration +// ---------------------------------- +// +export declare namespace OrchestrationRequest { + type GetAll = AuthenticatedRequest; + type Get = AuthenticatedRequest<{ id: string }, {}, {}, {}>; +} diff --git a/packages/cli/src/services/orchestration.service.ts b/packages/cli/src/services/orchestration.service.ts new file mode 100644 index 0000000000000..d3f8c7e27eb9f --- /dev/null +++ b/packages/cli/src/services/orchestration.service.ts @@ -0,0 +1,172 @@ +import { Service } from 'typedi'; +import { RedisService } from './redis.service'; +import type { RedisServicePubSubPublisher } from './redis/RedisServicePubSubPublisher'; +import type { RedisServicePubSubSubscriber } from './redis/RedisServicePubSubSubscriber'; +import { LoggerProxy, jsonParse } from 'n8n-workflow'; +import { eventBus } from '../eventbus'; +import type { AbstractEventMessageOptions } from '../eventbus/EventMessageClasses/AbstractEventMessageOptions'; +import { getEventMessageObjectByType } from '../eventbus/EventMessageClasses/Helpers'; +import type { + RedisServiceCommandObject, + RedisServiceWorkerResponseObject, +} from './redis/RedisServiceCommands'; +import { + COMMAND_REDIS_CHANNEL, + EVENT_BUS_REDIS_CHANNEL, + WORKER_RESPONSE_REDIS_CHANNEL, +} from './redis/RedisServiceHelper'; + +@Service() +export class OrchestrationService { + private initialized = false; + + private _uniqueInstanceId = ''; + + get uniqueInstanceId(): string { + return this._uniqueInstanceId; + } + + redisPublisher: RedisServicePubSubPublisher; + + redisSubscriber: RedisServicePubSubSubscriber; + + constructor(readonly redisService: RedisService) {} + + async init(uniqueInstanceId: string) { + this._uniqueInstanceId = uniqueInstanceId; + await this.initPublisher(); + await this.initSubscriber(); + this.initialized = true; + } + + async shutdown() { + await this.redisPublisher?.destroy(); + await this.redisSubscriber?.destroy(); + } + + private async initPublisher() { + this.redisPublisher = await this.redisService.getPubSubPublisher(); + } + + private async initSubscriber() { + this.redisSubscriber = await this.redisService.getPubSubSubscriber(); + + // TODO: these are all proof of concept implementations for the moment + // until worker communication is implemented + // #region proof of concept + await this.redisSubscriber.subscribeToEventLog(); + await this.redisSubscriber.subscribeToWorkerResponseChannel(); + await this.redisSubscriber.subscribeToCommandChannel(); + + this.redisSubscriber.addMessageHandler( + 'OrchestrationMessageReceiver', + async (channel: string, messageString: string) => { + // TODO: this is a proof of concept implementation to forward events to the main instance's event bus + // Events are arriving through a pub/sub channel and are forwarded to the eventBus + // In the future, a stream should probably replace this implementation entirely + if (channel === EVENT_BUS_REDIS_CHANNEL) { + await this.handleEventBusMessage(messageString); + } else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) { + await this.handleWorkerResponseMessage(messageString); + } else if (channel === COMMAND_REDIS_CHANNEL) { + await this.handleCommandMessage(messageString); + } + }, + ); + } + + async handleWorkerResponseMessage(messageString: string) { + const workerResponse = jsonParse(messageString); + if (workerResponse) { + // TODO: Handle worker response + LoggerProxy.debug('Received worker response', workerResponse); + } + return workerResponse; + } + + async handleEventBusMessage(messageString: string) { + const eventData = jsonParse(messageString); + if (eventData) { + const eventMessage = getEventMessageObjectByType(eventData); + if (eventMessage) { + await eventBus.send(eventMessage); + } + } + return eventData; + } + + async handleCommandMessage(messageString: string) { + if (!messageString) return; + let message: RedisServiceCommandObject; + try { + message = jsonParse(messageString); + } catch { + LoggerProxy.debug( + `Received invalid message via channel ${COMMAND_REDIS_CHANNEL}: "${messageString}"`, + ); + return; + } + if (message) { + if ( + message.senderId === this.uniqueInstanceId || + (message.targets && !message.targets.includes(this.uniqueInstanceId)) + ) { + LoggerProxy.debug( + `Skipping command message ${message.command} because it's not for this instance.`, + ); + return message; + } + switch (message.command) { + case 'restartEventBus': + await eventBus.restart(); + break; + } + return message; + } + return; + } + + async getWorkerStatus(id?: string) { + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + senderId: this.uniqueInstanceId, + command: 'getStatus', + targets: id ? [id] : undefined, + }); + } + + async getWorkerIds() { + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + senderId: this.uniqueInstanceId, + command: 'getId', + }); + } + + // TODO: not implemented yet on worker side + async stopWorker(id?: string) { + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + senderId: this.uniqueInstanceId, + command: 'stopWorker', + targets: id ? [id] : undefined, + }); + } + + async restartEventBus(id?: string) { + if (!this.initialized) { + throw new Error('OrchestrationService not initialized'); + } + await this.redisPublisher.publishToCommandChannel({ + senderId: this.uniqueInstanceId, + command: 'restartEventBus', + targets: id ? [id] : undefined, + }); + } +} diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index cd70a32d6e853..5796560d4b1e2 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -1,12 +1,13 @@ -export type RedisServiceCommand = 'getStatus' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands +export type RedisServiceCommand = 'getStatus' | 'getId' | 'restartEventBus' | 'stopWorker'; // TODO: add more commands /** * An object to be sent via Redis pub/sub from the main process to the workers. * @field command: The command to be executed. * @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids. - * @field args: Optional arguments to be passed to the command. + * @field payload: Optional arguments to be sent with the command. */ type RedisServiceBaseCommand = { + senderId: string; command: RedisServiceCommand; payload?: { [key: string]: string | number | boolean | string[] | number[] | boolean[]; @@ -15,7 +16,38 @@ type RedisServiceBaseCommand = { export type RedisServiceWorkerResponseObject = { workerId: string; -} & RedisServiceBaseCommand; +} & ( + | RedisServiceBaseCommand + | { + command: 'getStatus'; + payload: { + workerId: string; + runningJobs: string[]; + freeMem: number; + totalMem: number; + uptime: number; + loadAvg: number[]; + cpus: string[]; + arch: string; + platform: NodeJS.Platform; + hostname: string; + net: string[]; + }; + } + | { + command: 'getId'; + } + | { + command: 'restartEventBus'; + payload: { + result: 'success' | 'error'; + error?: string; + }; + } + | { + command: 'stopWorker'; + } +); export type RedisServiceCommandObject = { targets?: string[]; diff --git a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts index cb7b05d41fa68..404544d6f9e93 100644 --- a/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts +++ b/packages/cli/src/services/redis/RedisServicePubSubSubscriber.ts @@ -23,11 +23,11 @@ export class RedisServicePubSubSubscriber extends RedisServiceBaseReceiver { if (!this.redisClient) { await this.init(); } - await this.redisClient?.subscribe(channel, (error, count: number) => { + await this.redisClient?.subscribe(channel, (error, _count: number) => { if (error) { Logger.error(`Error subscribing to channel ${channel}`); } else { - Logger.debug(`Subscribed ${count.toString()} to eventlog channel`); + Logger.debug(`Subscribed Redis PubSub client to channel: ${channel}`); } }); } diff --git a/packages/cli/test/integration/commands/worker.cmd.test.ts b/packages/cli/test/integration/commands/worker.cmd.test.ts new file mode 100644 index 0000000000000..d860579ee392b --- /dev/null +++ b/packages/cli/test/integration/commands/worker.cmd.test.ts @@ -0,0 +1,81 @@ +import { mockInstance } from '../shared/utils/'; +import { Worker } from '@/commands/worker'; +import * as Config from '@oclif/config'; +import { LoggerProxy } from 'n8n-workflow'; +import { Telemetry } from '@/telemetry'; +import { getLogger } from '@/Logger'; +import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee'; +import { BinaryDataManager } from 'n8n-core'; +import { CacheService } from '@/services/cache.service'; +import { RedisServicePubSubPublisher } from '@/services/redis/RedisServicePubSubPublisher'; +import { RedisServicePubSubSubscriber } from '@/services/redis/RedisServicePubSubSubscriber'; +import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus'; +import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; +import { CredentialTypes } from '@/CredentialTypes'; +import { NodeTypes } from '@/NodeTypes'; +import { InternalHooks } from '@/InternalHooks'; +import { PostHogClient } from '@/posthog'; +import { RedisService } from '@/services/redis.service'; + +const config: Config.IConfig = new Config.Config({ root: __dirname }); + +beforeAll(async () => { + LoggerProxy.init(getLogger()); + mockInstance(Telemetry); + mockInstance(PostHogClient); + mockInstance(InternalHooks); + mockInstance(CacheService); + mockInstance(ExternalSecretsManager); + mockInstance(BinaryDataManager); + mockInstance(MessageEventBus); + mockInstance(LoadNodesAndCredentials); + mockInstance(CredentialTypes); + mockInstance(NodeTypes); + mockInstance(RedisService); + mockInstance(RedisServicePubSubPublisher); + mockInstance(RedisServicePubSubSubscriber); +}); + +test('worker initializes all its components', async () => { + const worker = new Worker([], config); + + jest.spyOn(worker, 'init'); + jest.spyOn(worker, 'initLicense').mockImplementation(async () => {}); + jest.spyOn(worker, 'initBinaryManager').mockImplementation(async () => {}); + jest.spyOn(worker, 'initExternalHooks').mockImplementation(async () => {}); + jest.spyOn(worker, 'initExternalSecrets').mockImplementation(async () => {}); + jest.spyOn(worker, 'initEventBus').mockImplementation(async () => {}); + jest.spyOn(worker, 'initRedis'); + jest.spyOn(RedisServicePubSubPublisher.prototype, 'init').mockImplementation(async () => {}); + jest + .spyOn(RedisServicePubSubPublisher.prototype, 'publishToEventLog') + .mockImplementation(async () => {}); + jest + .spyOn(RedisServicePubSubSubscriber.prototype, 'subscribeToCommandChannel') + .mockImplementation(async () => {}); + jest + .spyOn(RedisServicePubSubSubscriber.prototype, 'addMessageHandler') + .mockImplementation(async () => {}); + jest.spyOn(worker, 'initQueue').mockImplementation(async () => {}); + + await worker.init(); + + expect(worker.uniqueInstanceId).toBeDefined(); + expect(worker.uniqueInstanceId).toContain('worker'); + expect(worker.uniqueInstanceId.length).toBeGreaterThan(15); + expect(worker.initLicense).toHaveBeenCalled(); + expect(worker.initBinaryManager).toHaveBeenCalled(); + expect(worker.initExternalHooks).toHaveBeenCalled(); + expect(worker.initExternalSecrets).toHaveBeenCalled(); + expect(worker.initEventBus).toHaveBeenCalled(); + expect(worker.initRedis).toHaveBeenCalled(); + expect(worker.redisPublisher).toBeDefined(); + expect(worker.redisPublisher.init).toHaveBeenCalled(); + expect(worker.redisPublisher.publishToEventLog).toHaveBeenCalled(); + expect(worker.redisSubscriber).toBeDefined(); + expect(worker.redisSubscriber.subscribeToCommandChannel).toHaveBeenCalled(); + expect(worker.redisSubscriber.addMessageHandler).toHaveBeenCalled(); + expect(worker.initQueue).toHaveBeenCalled(); + + jest.restoreAllMocks(); +}); diff --git a/packages/cli/test/unit/services/orchestration.service.test.ts b/packages/cli/test/unit/services/orchestration.service.test.ts new file mode 100644 index 0000000000000..18204cea2b81c --- /dev/null +++ b/packages/cli/test/unit/services/orchestration.service.test.ts @@ -0,0 +1,140 @@ +import Container from 'typedi'; +import config from '@/config'; +import { LoggerProxy } from 'n8n-workflow'; +import { getLogger } from '@/Logger'; +import { OrchestrationService } from '@/services/orchestration.service'; +import type { RedisServiceWorkerResponseObject } from '@/services/redis/RedisServiceCommands'; +import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow'; +import { eventBus } from '@/eventbus'; +import * as EventHelpers from '@/eventbus/EventMessageClasses/Helpers'; +import { RedisService } from '@/services/redis.service'; +import { mockInstance } from '../../integration/shared/utils'; + +const os = Container.get(OrchestrationService); + +function setDefaultConfig() { + config.set('executions.mode', 'queue'); +} + +const workerRestartEventbusResponse: RedisServiceWorkerResponseObject = { + senderId: 'test', + workerId: 'test', + command: 'restartEventBus', + payload: { + result: 'success', + }, +}; + +const eventBusMessage = new EventMessageWorkflow({ + eventName: 'n8n.workflow.success', + id: 'test', + message: 'test', + payload: { + test: 'test', + }, +}); + +describe('Orchestration Service', () => { + beforeAll(async () => { + mockInstance(RedisService); + LoggerProxy.init(getLogger()); + jest.mock('ioredis', () => { + const Redis = require('ioredis-mock'); + if (typeof Redis === 'object') { + // the first mock is an ioredis shim because ioredis-mock depends on it + // https://github.com/stipsan/ioredis-mock/blob/master/src/index.js#L101-L111 + return { + Command: { _transformer: { argument: {}, reply: {} } }, + }; + } + // second mock for our code + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return function (...args: any) { + return new Redis(args); + }; + }); + jest.mock('../../../src/services/redis/RedisServicePubSubPublisher', () => { + return jest.fn().mockImplementation(() => { + return { + init: jest.fn(), + publishToEventLog: jest.fn(), + publishToWorkerChannel: jest.fn(), + destroy: jest.fn(), + }; + }); + }); + jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber', () => { + return jest.fn().mockImplementation(() => { + return { + subscribeToCommandChannel: jest.fn(), + destroy: jest.fn(), + }; + }); + }); + setDefaultConfig(); + }); + + afterAll(async () => { + jest.mock('../../../src/services/redis/RedisServicePubSubPublisher').restoreAllMocks(); + jest.mock('../../../src/services/redis/RedisServicePubSubSubscriber').restoreAllMocks(); + }); + + test('should initialize', async () => { + await os.init('test-orchestration-service'); + expect(os.redisPublisher).toBeDefined(); + expect(os.redisSubscriber).toBeDefined(); + expect(os.uniqueInstanceId).toBeDefined(); + }); + + test('should handle worker responses', async () => { + const response = await os.handleWorkerResponseMessage( + JSON.stringify(workerRestartEventbusResponse), + ); + expect(response.command).toEqual('restartEventBus'); + }); + + test('should handle event messages', async () => { + const response = await os.handleEventBusMessage(JSON.stringify(eventBusMessage)); + jest.spyOn(eventBus, 'send'); + jest.spyOn(EventHelpers, 'getEventMessageObjectByType'); + expect(eventBus.send).toHaveBeenCalled(); + expect(response.eventName).toEqual('n8n.workflow.success'); + jest.spyOn(eventBus, 'send').mockRestore(); + jest.spyOn(EventHelpers, 'getEventMessageObjectByType').mockRestore(); + }); + + test('should handle command messages from others', async () => { + jest.spyOn(eventBus, 'restart'); + const responseFalseId = await os.handleCommandMessage( + JSON.stringify(workerRestartEventbusResponse), + ); + expect(responseFalseId).toBeDefined(); + expect(responseFalseId!.command).toEqual('restartEventBus'); + expect(responseFalseId!.senderId).toEqual('test'); + expect(eventBus.restart).toHaveBeenCalled(); + jest.spyOn(eventBus, 'restart').mockRestore(); + }); + + test('should reject command messages from iteslf', async () => { + jest.spyOn(eventBus, 'restart'); + const response = await os.handleCommandMessage( + JSON.stringify({ ...workerRestartEventbusResponse, senderId: os.uniqueInstanceId }), + ); + expect(response).toBeDefined(); + expect(response!.command).toEqual('restartEventBus'); + expect(response!.senderId).toEqual(os.uniqueInstanceId); + expect(eventBus.restart).not.toHaveBeenCalled(); + jest.spyOn(eventBus, 'restart').mockRestore(); + }); + + test('should send command messages', async () => { + jest.spyOn(os.redisPublisher, 'publishToCommandChannel'); + await os.getWorkerIds(); + expect(os.redisPublisher.publishToCommandChannel).toHaveBeenCalled(); + jest.spyOn(os.redisPublisher, 'publishToCommandChannel').mockRestore(); + }); + + afterAll(async () => { + await os.shutdown(); + }); +});