Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Set up worker server #10814

Merged
merged 6 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 6 additions & 114 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
import { Flags, type Config } from '@oclif/core';
import express from 'express';
import http from 'http';
import { ApplicationError } from 'n8n-workflow';
import { Container } from 'typedi';

import config from '@/config';
import { N8N_VERSION, inTest } from '@/constants';
import { CredentialsOverwrites } from '@/credentials-overwrites';
import * as Db from '@/db';
import { ServiceUnavailableError } from '@/errors/response-errors/service-unavailable.error';
import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay';
import type { ICredentialsOverwrite } from '@/interfaces';
import { rawBodyReader, bodyParser } from '@/middlewares';
import * as ResponseHelper from '@/response-helper';
import { JobProcessor } from '@/scaling/job-processor';
import type { ScalingService } from '@/scaling/scaling.service';
import { WorkerServer } from '@/scaling/worker-server';
import { OrchestrationHandlerWorkerService } from '@/services/orchestration/worker/orchestration.handler.worker.service';
import { OrchestrationWorkerService } from '@/services/orchestration/worker/orchestration.worker.service';
import type { RedisServicePubSubSubscriber } from '@/services/redis/redis-service-pub-sub-subscriber';
Expand Down Expand Up @@ -165,118 +158,17 @@ export class Worker extends BaseCommand {
this.jobProcessor = Container.get(JobProcessor);
}

async setupHealthMonitor() {
const { port } = this.globalConfig.queue.health;

const app = express();
app.disable('x-powered-by');

const server = http.createServer(app);

app.get('/healthz/readiness', async (_req, res) => {
return Db.connectionState.connected && Db.connectionState.migrated
? res.status(200).send({ status: 'ok' })
: res.status(503).send({ status: 'error' });
});

app.get(
'/healthz',

async (_req: express.Request, res: express.Response) => {
this.logger.debug('Health check started!');

const connection = Db.getConnection();

try {
if (!connection.isInitialized) {
// Connection is not active
throw new ApplicationError('No active database connection');
}
// DB ping
await connection.query('SELECT 1');
} catch (e) {
this.logger.error('No Database connection!', e as Error);
const error = new ServiceUnavailableError('No Database connection!');
return ResponseHelper.sendErrorResponse(res, error);
}

// Just to be complete, generally will the worker stop automatically
// if it loses the connection to redis
try {
// Redis ping
await this.scalingService.pingQueue();
} catch (e) {
this.logger.error('No Redis connection!', e as Error);
const error = new ServiceUnavailableError('No Redis connection!');
return ResponseHelper.sendErrorResponse(res, error);
}

// Everything fine
const responseData = {
status: 'ok',
};

this.logger.debug('Health check completed successfully!');

ResponseHelper.sendSuccessResponse(res, responseData, true, 200);
},
);

let presetCredentialsLoaded = false;

const endpointPresetCredentials = this.globalConfig.credentials.overwrite.endpoint;
if (endpointPresetCredentials !== '') {
// POST endpoint to set preset credentials
app.post(
`/${endpointPresetCredentials}`,
rawBodyReader,
bodyParser,
async (req: express.Request, res: express.Response) => {
if (!presetCredentialsLoaded) {
const body = req.body as ICredentialsOverwrite;

if (req.contentType !== 'application/json') {
ResponseHelper.sendErrorResponse(
res,
new Error(
'Body must be a valid JSON, make sure the content-type is application/json',
),
);
return;
}

Container.get(CredentialsOverwrites).setData(body);
presetCredentialsLoaded = true;
ResponseHelper.sendSuccessResponse(res, { success: true }, true, 200);
} else {
ResponseHelper.sendErrorResponse(res, new Error('Preset credentials can be set once'));
}
},
);
}

server.on('error', (error: Error & { code: string }) => {
if (error.code === 'EADDRINUSE') {
this.logger.error(
`n8n's port ${port} is already in use. Do you have the n8n main process running on that port?`,
);
process.exit(1);
}
});

await new Promise<void>((resolve) => server.listen(port, () => resolve()));
await this.externalHooks?.run('worker.ready');
this.logger.info(`\nn8n worker health check via, port ${port}`);
}

async run() {
this.logger.info('\nn8n worker is now ready');
this.logger.info(` * Version: ${N8N_VERSION}`);
this.logger.info(` * Concurrency: ${this.concurrency}`);
this.logger.info('');

if (this.globalConfig.queue.health.active) {
await this.setupHealthMonitor();
if (
this.globalConfig.queue.health.active ||
this.globalConfig.credentials.overwrite.endpoint !== ''
) {
await Container.get(WorkerServer).init();
ivov marked this conversation as resolved.
Show resolved Hide resolved
}

if (!inTest && process.stdout.isTTY) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';

export class CredentialsOverwritesAlreadySetError extends ApplicationError {
constructor() {
super('Credentials overwrites may not be set more than once.');
}
}
7 changes: 7 additions & 0 deletions packages/cli/src/errors/non-json-body.error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { ApplicationError } from 'n8n-workflow';

export class NonJsonBodyError extends ApplicationError {
ivov marked this conversation as resolved.
Show resolved Hide resolved
constructor() {
super('Body must be valid JSON. Please make sure `content-type` is `application/json`.');
}
}
9 changes: 9 additions & 0 deletions packages/cli/src/errors/port-taken.error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { ApplicationError } from 'n8n-workflow';

export class PortTakenError extends ApplicationError {
constructor(port: number) {
super(
`Port ${port} is already in use. Do you already have the n8n main process running on that port?`,
);
}
}
127 changes: 127 additions & 0 deletions packages/cli/src/scaling/__tests__/worker-server.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import type { GlobalConfig } from '@n8n/config';
import type express from 'express';
import { mock } from 'jest-mock-extended';
import { AssertionError } from 'node:assert';
import * as http from 'node:http';

import config from '@/config';
import { PortTakenError } from '@/errors/port-taken.error';
import type { ExternalHooks } from '@/external-hooks';
import { bodyParser, rawBodyReader } from '@/middlewares';

import { WorkerServer } from '../worker-server';

const app = mock<express.Application>();

jest.mock('node:http');
jest.mock('express', () => ({ __esModule: true, default: () => app }));

const addressInUseError = () => {
const error: NodeJS.ErrnoException = new Error('Port already in use');
error.code = 'EADDRINUSE';

return error;
};

describe('WorkerServer', () => {
let globalConfig: GlobalConfig;

const externalHooks = mock<ExternalHooks>();

beforeEach(() => {
config.set('generic.instanceType', 'worker');
globalConfig = mock<GlobalConfig>({
queue: {
health: { active: true, port: 5678 },
},
credentials: {
overwrite: { endpoint: '' },
},
});
jest.restoreAllMocks();
});

describe('constructor', () => {
it('should throw if non-worker instance type', () => {
config.set('generic.instanceType', 'webhook');

expect(
() => new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks),
).toThrowError(AssertionError);
});

it('should throw if port taken', async () => {
const server = mock<http.Server>();

jest.spyOn(http, 'createServer').mockReturnValue(server);

server.on.mockImplementation((event: string, callback: (arg?: unknown) => void) => {
if (event === 'error') callback(addressInUseError());
return server;
});

expect(
() => new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks),
).toThrowError(PortTakenError);
});

it('should set up `/healthz` if health check is enabled', async () => {
jest.spyOn(http, 'createServer').mockReturnValue(mock<http.Server>());

new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks);

expect(app.get).toHaveBeenCalledWith('/healthz', expect.any(Function));
});

it('should not set up `/healthz` if health check is disabled', async () => {
globalConfig.queue.health.active = false;

jest.spyOn(http, 'createServer').mockReturnValue(mock<http.Server>());

new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks);

expect(app.get).not.toHaveBeenCalled();
});

it('should set up `/:endpoint` if overwrites endpoint is set', async () => {
jest.spyOn(http, 'createServer').mockReturnValue(mock<http.Server>());

const CREDENTIALS_OVERWRITE_ENDPOINT = 'credentials/overwrites';
globalConfig.credentials.overwrite.endpoint = CREDENTIALS_OVERWRITE_ENDPOINT;

new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks);

expect(app.post).toHaveBeenCalledWith(
`/${CREDENTIALS_OVERWRITE_ENDPOINT}`,
rawBodyReader,
bodyParser,
expect.any(Function),
);
});

it('should not set up `/:endpoint` if overwrites endpoint is not set', async () => {
jest.spyOn(http, 'createServer').mockReturnValue(mock<http.Server>());

new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks);

expect(app.post).not.toHaveBeenCalled();
});
});

describe('init', () => {
it('should call `worker.ready` external hook', async () => {
const server = mock<http.Server>();
jest.spyOn(http, 'createServer').mockReturnValue(server);

server.listen.mockImplementation((_port, callback: () => void) => {
callback();
return server;
});

const workerServer = new WorkerServer(globalConfig, mock(), mock(), mock(), externalHooks);
await workerServer.init();

expect(externalHooks.run).toHaveBeenCalledWith('worker.ready');
});
});
});
Loading
Loading