Skip to content

Commit

Permalink
feat(core): Implement inter-main communication for test webhooks in m…
Browse files Browse the repository at this point in the history
…ulti-main setup (#8267)
  • Loading branch information
ivov authored Jan 12, 2024
1 parent 135553b commit 1a0e285
Show file tree
Hide file tree
Showing 17 changed files with 231 additions and 96 deletions.
42 changes: 30 additions & 12 deletions packages/cli/src/TestWebhooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import type {
import { Push } from '@/push';
import { NodeTypes } from '@/NodeTypes';
import * as WebhookHelpers from '@/WebhookHelpers';
import { TIME } from '@/constants';
import { TEST_WEBHOOK_TIMEOUT } from '@/constants';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { WorkflowMissingIdError } from '@/errors/workflow-missing-id.error';
import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error';
import * as NodeExecuteFunctions from 'n8n-core';
import { removeTrailingSlash } from './utils';
import { TestWebhookRegistrationsService } from '@/services/test-webhook-registrations.service';
import { MultiMainSetup } from './services/orchestration/main/MultiMainSetup.ee';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';

@Service()
Expand All @@ -32,6 +33,7 @@ export class TestWebhooks implements IWebhookManager {
private readonly push: Push,
private readonly nodeTypes: NodeTypes,
private readonly registrations: TestWebhookRegistrationsService,
private readonly multiMainSetup: MultiMainSetup,
) {}

private timeouts: { [webhookKey: string]: NodeJS.Timeout } = {};
Expand Down Expand Up @@ -89,7 +91,6 @@ export class TestWebhooks implements IWebhookManager {
}

const { destinationNode, sessionId, workflowEntity } = registration;
const timeout = this.timeouts[key];

const workflow = this.toWorkflow(workflowEntity);

Expand Down Expand Up @@ -135,15 +136,34 @@ export class TestWebhooks implements IWebhookManager {
}
} catch {}

// Delete webhook also if an error is thrown
if (timeout) clearTimeout(timeout);
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
* the webhook. If so, after the test webhook has been successfully executed,
* the handler process commands the creator process to clear its test webhooks.
*/
if (
this.multiMainSetup.isEnabled &&
sessionId &&
!this.push.getBackend().hasSessionId(sessionId)
) {
const payload = { webhookKey: key, workflowEntity, sessionId };
void this.multiMainSetup.publish('clear-test-webhooks', payload);
return;
}

await this.registrations.deregisterAll();
this.clearTimeout(key);

await this.deactivateWebhooks(workflow);
});
}

clearTimeout(key: string) {
const timeout = this.timeouts[key];

if (timeout) clearTimeout(timeout);
}

async getWebhookMethods(path: string) {
const allKeys = await this.registrations.getAllKeys();

Expand Down Expand Up @@ -208,7 +228,7 @@ export class TestWebhooks implements IWebhookManager {
return false; // no webhooks found to start a workflow
}

const timeout = setTimeout(async () => this.cancelWebhook(workflow.id), 2 * TIME.MINUTE);
const timeout = setTimeout(async () => this.cancelWebhook(workflow.id), TEST_WEBHOOK_TIMEOUT);

for (const webhook of webhooks) {
const key = this.registrations.toKey(webhook);
Expand Down Expand Up @@ -270,13 +290,11 @@ export class TestWebhooks implements IWebhookManager {

const { sessionId, workflowEntity } = registration;

const timeout = this.timeouts[key];

const workflow = this.toWorkflow(workflowEntity);

if (workflowEntity.id !== workflowId) continue;

clearTimeout(timeout);
this.clearTimeout(key);

if (sessionId !== undefined) {
try {
Expand Down Expand Up @@ -359,13 +377,13 @@ export class TestWebhooks implements IWebhookManager {
if (staticData) workflow.staticData = staticData;

await workflow.deleteWebhook(webhook, NodeExecuteFunctions, 'internal', 'update');

await this.registrations.deregister(webhook);
}

await this.registrations.deregisterAll();
}

/**
* Convert a `WorkflowEntity` from `typeorm` to a `Workflow` from `n8n-workflow`.
* Convert a `WorkflowEntity` from `typeorm` to a temporary `Workflow` from `n8n-workflow`.
*/
toWorkflow(workflowEntity: IWorkflowDb) {
return new Workflow({
Expand Down
6 changes: 5 additions & 1 deletion packages/cli/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ export const TIME = {
MINUTE: 60 * 1000,
HOUR: 60 * 60 * 1000,
DAY: 24 * 60 * 60 * 1000,
};
} as const;

export const MIN_PASSWORD_CHAR_LENGTH = 8;

export const MAX_PASSWORD_CHAR_LENGTH = 64;

export const TEST_WEBHOOK_TIMEOUT = 2 * TIME.MINUTE;

export const TEST_WEBHOOK_TIMEOUT_BUFFER = 30 * TIME.SECOND;
75 changes: 45 additions & 30 deletions packages/cli/src/push/abstract.push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { assert, jsonStringify } from 'n8n-workflow';
import type { IPushDataType } from '@/Interfaces';
import type { Logger } from '@/Logger';
import type { User } from '@db/entities/User';
import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';

/**
* Abstract class for two-way push communication.
Expand All @@ -16,77 +17,90 @@ export abstract class AbstractPush<T> extends EventEmitter {
protected userIdBySessionId: Record<string, string> = {};

protected abstract close(connection: T): void;
protected abstract sendToOne(connection: T, data: string): void;
protected abstract sendToOneConnection(connection: T, data: string): void;

constructor(protected readonly logger: Logger) {
constructor(
protected readonly logger: Logger,
private readonly multiMainSetup: MultiMainSetup,
) {
super();
}

protected add(sessionId: string, userId: User['id'], connection: T): void {
protected add(sessionId: string, userId: User['id'], connection: T) {
const { connections, userIdBySessionId: userIdsBySessionId } = this;
this.logger.debug('Add editor-UI session', { sessionId });

const existingConnection = connections[sessionId];

if (existingConnection) {
// Make sure to remove existing connection with the same id
// Make sure to remove existing connection with the same ID
this.close(existingConnection);
}

connections[sessionId] = connection;
userIdsBySessionId[sessionId] = userId;
}

protected onMessageReceived(sessionId: string, msg: unknown): void {
protected onMessageReceived(sessionId: string, msg: unknown) {
this.logger.debug('Received message from editor-UI', { sessionId, msg });

const userId = this.userIdBySessionId[sessionId];
this.emit('message', {
sessionId,
userId,
msg,
});

this.emit('message', { sessionId, userId, msg });
}

protected remove(sessionId?: string): void {
if (sessionId !== undefined) {
this.logger.debug('Remove editor-UI session', { sessionId });
delete this.connections[sessionId];
delete this.userIdBySessionId[sessionId];
}
protected remove(sessionId?: string) {
if (!sessionId) return;

this.logger.debug('Removed editor-UI session', { sessionId });

delete this.connections[sessionId];
delete this.userIdBySessionId[sessionId];
}

private sendToSessions<D>(type: IPushDataType, data: D, sessionIds: string[]) {
private sendToSessions(type: IPushDataType, data: unknown, sessionIds: string[]) {
this.logger.debug(`Send data of type "${type}" to editor-UI`, {
dataType: type,
sessionIds: sessionIds.join(', '),
});

const sendData = jsonStringify({ type, data }, { replaceCircularRefs: true });
const stringifiedPayload = jsonStringify({ type, data }, { replaceCircularRefs: true });

for (const sessionId of sessionIds) {
const connection = this.connections[sessionId];
assert(connection);
this.sendToOne(connection, sendData);
this.sendToOneConnection(connection, stringifiedPayload);
}
}

broadcast<D>(type: IPushDataType, data?: D) {
sendToAllSessions(type: IPushDataType, data?: unknown) {
this.sendToSessions(type, data, Object.keys(this.connections));
}

send<D>(type: IPushDataType, data: D, sessionId: string) {
const { connections } = this;
if (connections[sessionId] === undefined) {
sendToOneSession(type: IPushDataType, data: unknown, sessionId: string) {
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
* the webhook. If so, the handler process commands the creator process to
* relay the former's execution lifecyle events to the creator's frontend.
*/
if (this.multiMainSetup.isEnabled && !this.hasSessionId(sessionId)) {
const payload = { type, args: data, sessionId };

void this.multiMainSetup.publish('relay-execution-lifecycle-event', payload);

return;
}

if (this.connections[sessionId] === undefined) {
this.logger.error(`The session "${sessionId}" is not registered.`, { sessionId });
return;
}

this.sendToSessions(type, data, [sessionId]);
}

/**
* Sends the given data to given users' connections
*/
sendToUsers<D>(type: IPushDataType, data: D, userIds: Array<User['id']>) {
sendToUsers(type: IPushDataType, data: unknown, userIds: Array<User['id']>) {
const { connections } = this;
const userSessionIds = Object.keys(connections).filter((sessionId) =>
userIds.includes(this.userIdBySessionId[sessionId]),
Expand All @@ -95,9 +109,6 @@ export abstract class AbstractPush<T> extends EventEmitter {
this.sendToSessions(type, data, userSessionIds);
}

/**
* Closes all push existing connections
*/
closeAllConnections() {
for (const sessionId in this.connections) {
// Signal the connection that we want to close it.
Expand All @@ -107,4 +118,8 @@ export abstract class AbstractPush<T> extends EventEmitter {
this.close(this.connections[sessionId]);
}
}

hasSessionId(sessionId: string) {
return this.connections[sessionId] !== undefined;
}
}
17 changes: 8 additions & 9 deletions packages/cli/src/push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@ export class Push extends EventEmitter {
constructor() {
super();

if (useWebSockets) {
this.backend.on('message', (msg) => this.emit('message', msg));
}
if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg));
}

handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
const {
userId,
query: { sessionId },
} = req;

if (req.ws) {
(this.backend as WebSocketPush).add(sessionId, userId, req.ws);
} else if (!useWebSockets) {
Expand All @@ -56,24 +55,24 @@ export class Push extends EventEmitter {
this.emit('editorUiConnected', sessionId);
}

broadcast<D>(type: IPushDataType, data?: D) {
this.backend.broadcast(type, data);
broadcast(type: IPushDataType, data?: unknown) {
this.backend.sendToAllSessions(type, data);
}

send<D>(type: IPushDataType, data: D, sessionId: string) {
this.backend.send(type, data, sessionId);
send(type: IPushDataType, data: unknown, sessionId: string) {
this.backend.sendToOneSession(type, data, sessionId);
}

getBackend() {
return this.backend;
}

sendToUsers<D>(type: IPushDataType, data: D, userIds: Array<User['id']>) {
sendToUsers(type: IPushDataType, data: unknown, userIds: Array<User['id']>) {
this.backend.sendToUsers(type, data, userIds);
}

@OnShutdown()
onShutdown(): void {
onShutdown() {
this.backend.closeAllConnections();
}
}
Expand Down
10 changes: 6 additions & 4 deletions packages/cli/src/push/sse.push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Logger } from '@/Logger';
import { AbstractPush } from './abstract.push';
import type { PushRequest, PushResponse } from './types';
import type { User } from '@db/entities/User';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';

type Connection = { req: PushRequest; res: PushResponse };

Expand All @@ -13,8 +14,9 @@ export class SSEPush extends AbstractPush<Connection> {

readonly connections: Record<string, Connection> = {};

constructor(logger: Logger) {
super(logger);
constructor(logger: Logger, multiMainSetup: MultiMainSetup) {
super(logger, multiMainSetup);

this.channel.on('disconnect', (channel, { req }) => {
this.remove(req?.query?.sessionId);
});
Expand All @@ -25,12 +27,12 @@ export class SSEPush extends AbstractPush<Connection> {
this.channel.addClient(connection.req, connection.res);
}

protected close({ res }: Connection): void {
protected close({ res }: Connection) {
res.end();
this.channel.removeClient(res);
}

protected sendToOne(connection: Connection, data: string): void {
protected sendToOneConnection(connection: Connection, data: string) {
this.channel.send(data, [connection.res]);
}
}
7 changes: 4 additions & 3 deletions packages/cli/src/push/websocket.push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ import { Service } from 'typedi';
import { Logger } from '@/Logger';
import { AbstractPush } from './abstract.push';
import type { User } from '@db/entities/User';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';

function heartbeat(this: WebSocket) {
this.isAlive = true;
}

@Service()
export class WebSocketPush extends AbstractPush<WebSocket> {
constructor(logger: Logger) {
super(logger);
constructor(logger: Logger, multiMainSetup: MultiMainSetup) {
super(logger, multiMainSetup);

// Ping all connected clients every 60 seconds
setInterval(() => this.pingAll(), 60 * 1000);
Expand Down Expand Up @@ -51,7 +52,7 @@ export class WebSocketPush extends AbstractPush<WebSocket> {
connection.close();
}

protected sendToOne(connection: WebSocket, data: string): void {
protected sendToOneConnection(connection: WebSocket, data: string): void {
connection.send(data);
}

Expand Down
Loading

0 comments on commit 1a0e285

Please sign in to comment.