Skip to content

Commit

Permalink
feat(core): Add commands to workers to respond with current state (n8…
Browse files Browse the repository at this point in the history
…n-io#7029)

This PR adds new endpoints to the REST API:
`/orchestration/worker/status` and `/orchestration/worker/id`

Currently these just trigger the return of status / ids from the workers
via the redis back channel, this still needs to be handled and passed
through to the frontend.

It also adds the eventbus to each worker, and triggers a reload of those
eventbus instances when the configuration changes on the main instances.
  • Loading branch information
flipswitchingmonkey authored Sep 7, 2023
1 parent 0a35025 commit 7b49cf2
Show file tree
Hide file tree
Showing 14 changed files with 790 additions and 221 deletions.
83 changes: 4 additions & 79 deletions packages/cli/src/AbstractServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { Server } from 'http';
import express from 'express';
import compression from 'compression';
import isbot from 'isbot';
import { jsonParse, LoggerProxy as Logger } from 'n8n-workflow';
import { LoggerProxy as Logger } from 'n8n-workflow';

import config from '@/config';
import { N8N_VERSION, inDevelopment, inTest } from '@/constants';
Expand All @@ -18,16 +18,8 @@ import { rawBodyReader, bodyParser, corsMiddleware } from '@/middlewares';
import { TestWebhooks } from '@/TestWebhooks';
import { WaitingWebhooks } from '@/WaitingWebhooks';
import { webhookRequestHandler } from '@/WebhookHelpers';
import { RedisService } from '@/services/redis.service';
import { eventBus } from './eventbus';
import type { AbstractEventMessageOptions } from './eventbus/EventMessageClasses/AbstractEventMessageOptions';
import { getEventMessageObjectByType } from './eventbus/EventMessageClasses/Helpers';
import type { RedisServiceWorkerResponseObject } from './services/redis/RedisServiceCommands';
import {
EVENT_BUS_REDIS_CHANNEL,
WORKER_RESPONSE_REDIS_CHANNEL,
} from './services/redis/RedisServiceHelper';
import { generateHostInstanceId } from './databases/utils/generators';
import { OrchestrationService } from './services/orchestration.service';

export abstract class AbstractServer {
protected server: Server;
Expand Down Expand Up @@ -124,78 +116,11 @@ export abstract class AbstractServer {
});

if (config.getEnv('executions.mode') === 'queue') {
await this.setupRedis();
// will start the redis connections
await Container.get(OrchestrationService).init(this.uniqueInstanceId);
}
}

// This connection is going to be our heartbeat
// IORedis automatically pings redis and tries to reconnect
// We will be using a retryStrategy to control how and when to exit.
// We are also subscribing to the event log channel to receive events from workers
private async setupRedis() {
const redisService = Container.get(RedisService);
const redisSubscriber = await redisService.getPubSubSubscriber();

// TODO: these are all proof of concept implementations for the moment
// until worker communication is implemented
// #region proof of concept
await redisSubscriber.subscribeToEventLog();
await redisSubscriber.subscribeToWorkerResponseChannel();
redisSubscriber.addMessageHandler(
'AbstractServerReceiver',
async (channel: string, message: string) => {
// TODO: this is a proof of concept implementation to forward events to the main instance's event bus
// Events are arriving through a pub/sub channel and are forwarded to the eventBus
// In the future, a stream should probably replace this implementation entirely
if (channel === EVENT_BUS_REDIS_CHANNEL) {
const eventData = jsonParse<AbstractEventMessageOptions>(message);
if (eventData) {
const eventMessage = getEventMessageObjectByType(eventData);
if (eventMessage) {
await eventBus.send(eventMessage);
}
}
} else if (channel === WORKER_RESPONSE_REDIS_CHANNEL) {
// The back channel from the workers as a pub/sub channel
const workerResponse = jsonParse<RedisServiceWorkerResponseObject>(message);
if (workerResponse) {
// TODO: Handle worker response
console.log('Received worker response', workerResponse);
}
}
},
);
// TODO: Leave comments for now as implementation example
// const redisStreamListener = await redisService.getStreamConsumer();
// void redisStreamListener.listenToStream('teststream');
// redisStreamListener.addMessageHandler(
// 'MessageLogger',
// async (stream: string, id: string, message: string[]) => {
// // TODO: this is a proof of concept implementation of a stream consumer
// switch (stream) {
// case EVENT_BUS_REDIS_STREAM:
// case COMMAND_REDIS_STREAM:
// case WORKER_RESPONSE_REDIS_STREAM:
// default:
// LoggerProxy.debug(
// `Received message from stream ${stream} with id ${id} and message ${message.join(
// ',',
// )}`,
// );
// break;
// }
// },
// );

// const redisListReceiver = await redisService.getListReceiver();
// await redisListReceiver.init();

// setInterval(async () => {
// await redisListReceiver.popLatestWorkerResponse();
// }, 1000);
// #endregion
}

async init(): Promise<void> {
const { app, protocol, sslKey, sslCert } = this;

Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ import { handleMfaDisable, isMfaFeatureEnabled } from './Mfa/helpers';
import { JwtService } from './services/jwt.service';
import { RoleService } from './services/role.service';
import { UserService } from './services/user.service';
import { OrchestrationController } from './controllers/orchestration.controller';

const exec = promisify(callbackExec);

Expand Down Expand Up @@ -551,6 +552,7 @@ export class Server extends AbstractServer {
Container.get(SourceControlController),
Container.get(WorkflowStatisticsController),
Container.get(ExternalSecretsController),
Container.get(OrchestrationController),
];

if (isLdapEnabled()) {
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ export abstract class BaseCommand extends Command {
process.exit(1);
}

protected async initBinaryManager() {
async initBinaryManager() {
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig, true);
}

protected async initExternalHooks() {
async initExternalHooks() {
this.externalHooks = Container.get(ExternalHooks);
await this.externalHooks.init();
}
Expand Down
Loading

0 comments on commit 7b49cf2

Please sign in to comment.