Skip to content

Commit

Permalink
fix(core): Make senderId required for all command messages (#7252)
Browse files Browse the repository at this point in the history
all commands sent between main instance and workers need to contain a
server id to prevent senders from reacting to their own messages,
causing loops

this PR makes sure all sent messages contain a sender id by default as
part of constructing a sending redis client.

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <[email protected]>
  • Loading branch information
flipswitchingmonkey and netroy authored Sep 26, 2023
1 parent 77d6e3f commit 4b01428
Show file tree
Hide file tree
Showing 23 changed files with 212 additions and 184 deletions.
1 change: 1 addition & 0 deletions packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
"@n8n/client-oauth2": "workspace:*",
"@n8n_io/license-sdk": "~2.6.0",
"@oclif/command": "^1.8.16",
"@oclif/config": "^1.18.17",
"@oclif/core": "^1.16.4",
"@oclif/errors": "^1.3.6",
"@rudderstack/rudder-sdk-node": "1.0.6",
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/AbstractServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export abstract class AbstractServer {

if (config.getEnv('executions.mode') === 'queue') {
// will start the redis connections
await Container.get(OrchestrationService).init(this.uniqueInstanceId);
await Container.get(OrchestrationService).init();
}
}

Expand Down
27 changes: 17 additions & 10 deletions packages/cli/src/License.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { TEntitlement, TLicenseBlock } from '@n8n_io/license-sdk';
import type { TEntitlement, TFeatures, TLicenseBlock } from '@n8n_io/license-sdk';
import { LicenseManager } from '@n8n_io/license-sdk';
import type { ILogger } from 'n8n-workflow';
import { getLogger } from './Logger';
Expand Down Expand Up @@ -50,6 +50,9 @@ export class License {
const saveCertStr = isMainInstance
? async (value: TLicenseBlock) => this.saveCertStr(value)
: async () => {};
const onFeatureChange = isMainInstance
? async (features: TFeatures) => this.onFeatureChange(features)
: async () => {};

try {
this.manager = new LicenseManager({
Expand All @@ -64,6 +67,7 @@ export class License {
loadCertStr: async () => this.loadCertStr(),
saveCertStr,
deviceFingerprint: () => instanceId,
onFeatureChange,
});

await this.manager.initialize();
Expand All @@ -89,6 +93,18 @@ export class License {
return databaseSettings?.value ?? '';
}

async onFeatureChange(_features: TFeatures): Promise<void> {
if (config.getEnv('executions.mode') === 'queue') {
if (!this.redisPublisher) {
this.logger.debug('Initializing Redis publisher for License Service');
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
}
await this.redisPublisher.publishToCommandChannel({
command: 'reloadLicense',
});
}
}

async saveCertStr(value: TLicenseBlock): Promise<void> {
// if we have an ephemeral license, we don't want to save it to the database
if (config.get('license.cert')) return;
Expand All @@ -100,15 +116,6 @@ export class License {
},
['key'],
);
if (config.getEnv('executions.mode') === 'queue') {
if (!this.redisPublisher) {
this.logger.debug('Initializing Redis publisher for License Service');
this.redisPublisher = await Container.get(RedisService).getPubSubPublisher();
}
await this.redisPublisher.publishToCommandChannel({
command: 'reloadLicense',
});
}
}

async activate(activationKey: string): Promise<void> {
Expand Down
4 changes: 1 addition & 3 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1474,9 +1474,7 @@ export class Server extends AbstractServer {
// ----------------------------------------

if (!eventBus.isInitialized) {
await eventBus.initialize({
uniqueInstanceId: this.uniqueInstanceId,
});
await eventBus.initialize();
}

if (this.endpointPresetCredentials !== '') {
Expand Down
27 changes: 23 additions & 4 deletions packages/cli/src/commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { PostHogClient } from '@/posthog';
import { License } from '@/License';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { initExpressionEvaluator } from '@/ExpressionEvalator';
import { generateHostInstanceId } from '../databases/utils/generators';

export abstract class BaseCommand extends Command {
protected logger = LoggerProxy.init(getLogger());
Expand All @@ -36,6 +37,10 @@ export abstract class BaseCommand extends Command {

protected instanceId: string;

instanceType: N8nInstanceType = 'main';

queueModeId: string;

protected server?: AbstractServer;

async init(): Promise<void> {
Expand Down Expand Up @@ -83,6 +88,22 @@ export abstract class BaseCommand extends Command {
await Container.get(InternalHooks).init(this.instanceId);
}

protected setInstanceType(instanceType: N8nInstanceType) {
this.instanceType = instanceType;
config.set('generic.instanceType', instanceType);
}

protected setInstanceQueueModeId() {
if (config.getEnv('executions.mode') === 'queue') {
if (config.get('redis.queueModeId')) {
this.queueModeId = config.get('redis.queueModeId');
return;
}
this.queueModeId = generateHostInstanceId(this.instanceType);
config.set('redis.queueModeId', this.queueModeId);
}
}

protected async stopProcess() {
// This needs to be overridden
}
Expand Down Expand Up @@ -115,11 +136,9 @@ export abstract class BaseCommand extends Command {
await this.externalHooks.init();
}

async initLicense(instanceType: N8nInstanceType = 'main'): Promise<void> {
config.set('generic.instanceType', instanceType);

async initLicense(): Promise<void> {
const license = Container.get(License);
await license.init(this.instanceId, instanceType);
await license.init(this.instanceId, this.instanceType ?? 'main');

const activationKey = config.getEnv('license.activationKey');

Expand Down
16 changes: 14 additions & 2 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { BaseCommand } from './BaseCommand';
import { InternalHooks } from '@/InternalHooks';
import { License } from '@/License';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { IConfig } from '@oclif/config';

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
Expand Down Expand Up @@ -65,6 +66,12 @@ export class Start extends BaseCommand {

protected server = new Server();

constructor(argv: string[], cmdConfig: IConfig) {
super(argv, cmdConfig);
this.setInstanceType('main');
this.setInstanceQueueModeId();
}

/**
* Opens the UI in browser
*/
Expand Down Expand Up @@ -196,11 +203,16 @@ export class Start extends BaseCommand {
async init() {
await this.initCrashJournal();

await super.init();
this.logger.info('Initializing n8n process');
if (config.getEnv('executions.mode') === 'queue') {
this.logger.debug('Main Instance running in queue mode');
this.logger.debug(`Queue mode id: ${this.queueModeId}`);
}

await super.init();
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);

await this.initLicense('main');
await this.initLicense();
await this.initBinaryDataService();
await this.initExternalHooks();
await this.initExternalSecrets();
Expand Down
16 changes: 15 additions & 1 deletion packages/cli/src/commands/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { WebhookServer } from '@/WebhookServer';
import { Queue } from '@/Queue';
import { BaseCommand } from './BaseCommand';
import { Container } from 'typedi';
import { IConfig } from '@oclif/config';

export class Webhook extends BaseCommand {
static description = 'Starts n8n webhook process. Intercepts only production URLs.';
Expand All @@ -18,6 +19,15 @@ export class Webhook extends BaseCommand {

protected server = new WebhookServer();

constructor(argv: string[], cmdConfig: IConfig) {
super(argv, cmdConfig);
this.setInstanceType('webhook');
if (this.queueModeId) {
this.logger.debug(`Webhook Instance queue mode id: ${this.queueModeId}`);
}
this.setInstanceQueueModeId();
}

/**
* Stops n8n in a graceful way.
* Make for example sure that all the webhooks from third party services
Expand Down Expand Up @@ -75,9 +85,13 @@ export class Webhook extends BaseCommand {
}

await this.initCrashJournal();

this.logger.info('Initializing n8n webhook process');
this.logger.debug(`Queue mode id: ${this.queueModeId}`);

await super.init();

await this.initLicense('webhook');
await this.initLicense();
await this.initBinaryDataService();
await this.initExternalHooks();
await this.initExternalSecrets();
Expand Down
26 changes: 16 additions & 10 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import { N8N_VERSION } from '@/constants';
import { BaseCommand } from './BaseCommand';
import { ExecutionRepository } from '@db/repositories';
import { OwnershipService } from '@/services/ownership.service';
import { generateHostInstanceId } from '@/databases/utils/generators';
import type { ICredentialsOverwrite } from '@/Interfaces';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { rawBodyReader, bodyParser } from '@/middlewares';
Expand All @@ -38,6 +37,7 @@ import { RedisServicePubSubPublisher } from '../services/redis/RedisServicePubSu
import { RedisServicePubSubSubscriber } from '../services/redis/RedisServicePubSubSubscriber';
import { EventMessageGeneric } from '../eventbus/EventMessageClasses/EventMessageGeneric';
import { getWorkerCommandReceivedHandler } from '../worker/workerCommandHandler';
import { IConfig } from '@oclif/config';

export class Worker extends BaseCommand {
static description = '\nStarts a n8n worker';
Expand All @@ -58,8 +58,6 @@ export class Worker extends BaseCommand {

static jobQueue: JobQueue;

readonly uniqueInstanceId = generateHostInstanceId('worker');

redisPublisher: RedisServicePubSubPublisher;

redisSubscriber: RedisServicePubSubSubscriber;
Expand Down Expand Up @@ -250,13 +248,22 @@ export class Worker extends BaseCommand {
};
}

constructor(argv: string[], cmdConfig: IConfig) {
super(argv, cmdConfig);
this.setInstanceType('worker');
this.setInstanceQueueModeId();
}

async init() {
await this.initCrashJournal();
await super.init();
this.logger.debug(`Worker ID: ${this.uniqueInstanceId}`);

this.logger.debug('Starting n8n worker...');
this.logger.debug(`Queue mode id: ${this.queueModeId}`);

await super.init();

await this.initLicense();

await this.initLicense('worker');
await this.initBinaryDataService();
await this.initExternalHooks();
await this.initExternalSecrets();
Expand All @@ -267,8 +274,7 @@ export class Worker extends BaseCommand {

async initEventBus() {
await eventBus.initialize({
workerId: this.uniqueInstanceId,
uniqueInstanceId: this.uniqueInstanceId,
workerId: this.queueModeId,
});
}

Expand All @@ -286,7 +292,7 @@ export class Worker extends BaseCommand {
new EventMessageGeneric({
eventName: 'n8n.worker.started',
payload: {
workerId: this.uniqueInstanceId,
workerId: this.queueModeId,
},
}),
);
Expand All @@ -295,7 +301,7 @@ export class Worker extends BaseCommand {
'WorkerCommandReceivedHandler',
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
getWorkerCommandReceivedHandler({
uniqueInstanceId: this.uniqueInstanceId,
queueModeId: this.queueModeId,
instanceId: this.instanceId,
redisPublisher: this.redisPublisher,
getRunningJobIds: () => Object.keys(Worker.runningJobs),
Expand Down
5 changes: 5 additions & 0 deletions packages/cli/src/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,11 @@ export const schema = {
default: 'n8n',
env: 'N8N_REDIS_KEY_PREFIX',
},
queueModeId: {
doc: 'Unique ID for this n8n instance, is usually set automatically by n8n during startup',
format: String,
default: '',
},
},

cache: {
Expand Down
Loading

0 comments on commit 4b01428

Please sign in to comment.