From 4d745e6b141b66175bd46b5e75613ffa255a169b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 10:49:25 +0100 Subject: [PATCH 01/23] Initial setup --- packages/cli/src/push/abstract.push.ts | 32 ++++++++++++++++--- packages/cli/src/push/sse.push.ts | 9 ++++-- packages/cli/src/push/websocket.push.ts | 5 +-- .../orchestration/main/MultiMainSetup.ee.ts | 25 ++++----------- .../main/handleCommandMessageMain.ts | 29 ++++++++++++++++- .../services/redis/RedisServiceCommands.ts | 5 +-- .../cli/src/workflows/workflow.service.ts | 2 +- 7 files changed, 75 insertions(+), 32 deletions(-) diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 42adadaa7beae..6914771703737 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -3,6 +3,8 @@ import { assert, jsonStringify } from 'n8n-workflow'; import type { IPushDataType } from '@/Interfaces'; import type { Logger } from '@/Logger'; import type { User } from '@db/entities/User'; +import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; +import type { RedisServiceBaseCommand } from '@/services/redis/RedisServiceCommands'; /** * Abstract class for two-way push communication. @@ -18,10 +20,17 @@ export abstract class AbstractPush extends EventEmitter { protected abstract close(connection: T): void; protected abstract sendToOne(connection: T, data: string): void; - constructor(protected readonly logger: Logger) { + constructor( + protected readonly logger: Logger, + private readonly multiMainSetup: MultiMainSetup, + ) { super(); } + hasSessionId(sessionId: string) { + return this.connections[sessionId] !== undefined; + } + protected add(sessionId: string, userId: User['id'], connection: T): void { const { connections, userIdBySessionId: userIdsBySessionId } = this; this.logger.debug('Add editor-UI session', { sessionId }); @@ -69,13 +78,26 @@ export abstract class AbstractPush extends EventEmitter { } } + /** + * Send the given data to all connected users. + */ broadcast(type: IPushDataType, data?: D) { this.sendToSessions(type, data, Object.keys(this.connections)); } + /** + * Send the given data to one specific user. + */ send(type: IPushDataType, data: D, sessionId: string) { - const { connections } = this; - if (connections[sessionId] === undefined) { + if (this.multiMainSetup.isEnabled) { + void this.multiMainSetup.publish( + 'executionLifecycleHook', + data as RedisServiceBaseCommand['payload'], // @TODO: Prevent assertion + ); + return; + } + + if (this.connections[sessionId] === undefined) { this.logger.error(`The session "${sessionId}" is not registered.`, { sessionId }); return; } @@ -84,7 +106,7 @@ export abstract class AbstractPush extends EventEmitter { } /** - * Sends the given data to given users' connections + * Send the given data to specific users. */ sendToUsers(type: IPushDataType, data: D, userIds: Array) { const { connections } = this; @@ -96,7 +118,7 @@ export abstract class AbstractPush extends EventEmitter { } /** - * Closes all push existing connections + * Close all push existing connections. */ closeAllConnections() { for (const sessionId in this.connections) { diff --git a/packages/cli/src/push/sse.push.ts b/packages/cli/src/push/sse.push.ts index f4c75a3205df9..f24af06d02672 100644 --- a/packages/cli/src/push/sse.push.ts +++ b/packages/cli/src/push/sse.push.ts @@ -4,6 +4,7 @@ import { Logger } from '@/Logger'; import { AbstractPush } from './abstract.push'; import type { PushRequest, PushResponse } from './types'; import type { User } from '@db/entities/User'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; type Connection = { req: PushRequest; res: PushResponse }; @@ -13,13 +14,17 @@ export class SSEPush extends AbstractPush { readonly connections: Record = {}; - constructor(logger: Logger) { - super(logger); + constructor(logger: Logger, multiMainSetup: MultiMainSetup) { + super(logger, multiMainSetup); this.channel.on('disconnect', (channel, { req }) => { this.remove(req?.query?.sessionId); }); } + hasSessionId(sessionId: string) { + return this.connections[sessionId] !== undefined; + } + add(sessionId: string, userId: User['id'], connection: Connection) { super.add(sessionId, userId, connection); this.channel.addClient(connection.req, connection.res); diff --git a/packages/cli/src/push/websocket.push.ts b/packages/cli/src/push/websocket.push.ts index 08ebad2e9d611..9cef37dd6fc19 100644 --- a/packages/cli/src/push/websocket.push.ts +++ b/packages/cli/src/push/websocket.push.ts @@ -3,6 +3,7 @@ import { Service } from 'typedi'; import { Logger } from '@/Logger'; import { AbstractPush } from './abstract.push'; import type { User } from '@db/entities/User'; +import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; function heartbeat(this: WebSocket) { this.isAlive = true; @@ -10,8 +11,8 @@ function heartbeat(this: WebSocket) { @Service() export class WebSocketPush extends AbstractPush { - constructor(logger: Logger) { - super(logger); + constructor(logger: Logger, multiMainSetup: MultiMainSetup) { + super(logger, multiMainSetup); // Ping all connected clients every 60 seconds setInterval(() => this.pingAll(), 60 * 1000); diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index b032b1e979845..5544ca80ecf8a 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -4,6 +4,10 @@ import { TIME } from '@/constants'; import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; import { getRedisPrefix } from '@/services/redis/RedisServiceHelper'; import { ErrorReporterProxy as EventReporter } from 'n8n-workflow'; +import type { + RedisServiceCommand, + RedisServiceBaseCommand, +} from '@/services/redis/RedisServiceCommands'; @Service() export class MultiMainSetup extends SingleMainSetup { @@ -122,27 +126,10 @@ export class MultiMainSetup extends SingleMainSetup { } } - async broadcastWorkflowActiveStateChanged(payload: { - workflowId: string; - oldState: boolean; - newState: boolean; - versionId: string; - }) { + async publish(event: RedisServiceCommand, payload: RedisServiceBaseCommand['payload']) { if (!this.sanityCheck()) return; - await this.redisPublisher.publishToCommandChannel({ - command: 'workflowActiveStateChanged', - payload, - }); - } - - async broadcastWorkflowFailedToActivate(payload: { workflowId: string; errorMessage: string }) { - if (!this.sanityCheck()) return; - - await this.redisPublisher.publishToCommandChannel({ - command: 'workflowFailedToActivate', - payload, - }); + await this.redisPublisher.publishToCommandChannel({ command: event, payload }); } async fetchLeaderKey() { diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 9a2752e5d6c8e..583bb34881bc9 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -40,6 +40,7 @@ export async function handleCommandMessageMain(messageString: string) { return message; } + // @TODO: Should we be checking for multi-main here? if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) { // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently logger.error( @@ -98,7 +99,7 @@ export async function handleCommandMessageMain(messageString: string) { versionId, }); - await Container.get(MultiMainSetup).broadcastWorkflowFailedToActivate({ + await Container.get(MultiMainSetup).publish('workflowFailedToActivate', { workflowId, errorMessage: error.message, }); @@ -127,6 +128,32 @@ export async function handleCommandMessageMain(messageString: string) { Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage }); } + case 'executionLifecycleHook': { + if (!debounceMessageReceiver(message, 100)) { + message.payload = { result: 'debounced' }; + return message; + } + + const { eventName, args, sessionId } = message.payload ?? {}; + + if ( + typeof eventName !== 'string' || + typeof args !== 'object' || + typeof sessionId !== 'string' + ) { + break; + } + + const push = Container.get(Push); + + push.getBackend().hasSessionId(sessionId); + + // check that session ID in payload matches this main's session ID + + // @ts-ignore @TODO: Fix type + push.send(eventName, args, sessionId); + } + default: break; } diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index 4c622e3ac9389..7eadcbae6e997 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -8,7 +8,8 @@ export type RedisServiceCommand = | 'reloadLicense' | 'reloadExternalSecretsProviders' | 'workflowActiveStateChanged' // multi-main only - | 'workflowFailedToActivate'; // multi-main only + | 'workflowFailedToActivate' // multi-main only + | 'executionLifecycleHook'; // multi-main only /** * An object to be sent via Redis pub/sub from the main process to the workers. @@ -16,7 +17,7 @@ export type RedisServiceCommand = * @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids. * @field payload: Optional arguments to be sent with the command. */ -type RedisServiceBaseCommand = { +export type RedisServiceBaseCommand = { senderId: string; command: RedisServiceCommand; payload?: { diff --git a/packages/cli/src/workflows/workflow.service.ts b/packages/cli/src/workflows/workflow.service.ts index 5a6a321d7f7ec..8949bab741a9f 100644 --- a/packages/cli/src/workflows/workflow.service.ts +++ b/packages/cli/src/workflows/workflow.service.ts @@ -282,7 +282,7 @@ export class WorkflowService { const newState = updatedWorkflow.active; if (this.multiMainSetup.isEnabled && oldState !== newState) { - await this.multiMainSetup.broadcastWorkflowActiveStateChanged({ + await this.multiMainSetup.publish('workflowActiveStateChanged', { workflowId, oldState, newState, From e44919c2253fd822b43d577bc254b5eb76253333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 10:52:38 +0100 Subject: [PATCH 02/23] Fix check --- .../services/orchestration/main/handleCommandMessageMain.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 583bb34881bc9..3f6d21e045cab 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -146,9 +146,7 @@ export async function handleCommandMessageMain(messageString: string) { const push = Container.get(Push); - push.getBackend().hasSessionId(sessionId); - - // check that session ID in payload matches this main's session ID + if (!push.getBackend().hasSessionId(sessionId)) break; // @ts-ignore @TODO: Fix type push.send(eventName, args, sessionId); From 11a60785b8213fe73409454658ef9d89498c68aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 11:11:22 +0100 Subject: [PATCH 03/23] Better naming --- packages/cli/src/push/abstract.push.ts | 12 +++++++----- .../orchestration/main/handleCommandMessageMain.ts | 2 +- .../cli/src/services/redis/RedisServiceCommands.ts | 10 +++++++++- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 6914771703737..10ae139b6fed8 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -4,7 +4,6 @@ import type { IPushDataType } from '@/Interfaces'; import type { Logger } from '@/Logger'; import type { User } from '@db/entities/User'; import type { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; -import type { RedisServiceBaseCommand } from '@/services/redis/RedisServiceCommands'; /** * Abstract class for two-way push communication. @@ -89,11 +88,14 @@ export abstract class AbstractPush extends EventEmitter { * Send the given data to one specific user. */ send(type: IPushDataType, data: D, sessionId: string) { + // @TODO: Skip if the webhook call reaches the correct main on multi-main setup if (this.multiMainSetup.isEnabled) { - void this.multiMainSetup.publish( - 'executionLifecycleHook', - data as RedisServiceBaseCommand['payload'], // @TODO: Prevent assertion - ); + void this.multiMainSetup.publish('multi-main-setup:relay-execution-lifecycle-event', { + eventName: type, + // @ts-ignore // @TODO + args: data, + sessionId, + }); return; } diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 3f6d21e045cab..e05ec91e773c9 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -128,7 +128,7 @@ export async function handleCommandMessageMain(messageString: string) { Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage }); } - case 'executionLifecycleHook': { + case 'multi-main-setup:relay-execution-lifecycle-event': { if (!debounceMessageReceiver(message, 100)) { message.payload = { result: 'debounced' }; return message; diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index 7eadcbae6e997..603c670a44c71 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -9,7 +9,7 @@ export type RedisServiceCommand = | 'reloadExternalSecretsProviders' | 'workflowActiveStateChanged' // multi-main only | 'workflowFailedToActivate' // multi-main only - | 'executionLifecycleHook'; // multi-main only + | 'multi-main-setup:relay-execution-lifecycle-event'; /** * An object to be sent via Redis pub/sub from the main process to the workers. @@ -61,6 +61,14 @@ export type RedisServiceWorkerResponseObject = { workflowId: string; }; } + | { + command: 'multi-main-setup:relay-execution-lifecycle-event'; + payload: { + eventName: string; + args: Record; + sessionId: string; + }; + } ); export type RedisServiceCommandObject = { From 7e11074d0ba32130b2748d792487ef1752417fe4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 15:43:39 +0100 Subject: [PATCH 04/23] Clean up push code --- packages/cli/src/push/abstract.push.ts | 70 ++++++++++--------- packages/cli/src/push/index.ts | 19 +++-- packages/cli/src/push/sse.push.ts | 5 +- packages/cli/src/push/websocket.push.ts | 2 +- .../cli/test/unit/push/websocket.push.test.ts | 2 +- 5 files changed, 52 insertions(+), 46 deletions(-) diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 10ae139b6fed8..91a7ccf1a56df 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -17,7 +17,7 @@ export abstract class AbstractPush extends EventEmitter { protected userIdBySessionId: Record = {}; protected abstract close(connection: T): void; - protected abstract sendToOne(connection: T, data: string): void; + protected abstract sendToConnection(connection: T, data: string): void; constructor( protected readonly logger: Logger, @@ -30,13 +30,14 @@ export abstract class AbstractPush extends EventEmitter { return this.connections[sessionId] !== undefined; } - protected add(sessionId: string, userId: User['id'], connection: T): void { + protected add(sessionId: string, userId: User['id'], connection: T) { const { connections, userIdBySessionId: userIdsBySessionId } = this; this.logger.debug('Add editor-UI session', { sessionId }); const existingConnection = connections[sessionId]; + if (existingConnection) { - // Make sure to remove existing connection with the same id + // Make sure to remove existing connection with the same ID this.close(existingConnection); } @@ -44,56 +45,61 @@ export abstract class AbstractPush extends EventEmitter { userIdsBySessionId[sessionId] = userId; } - protected onMessageReceived(sessionId: string, msg: unknown): void { + protected onMessageReceived(sessionId: string, msg: unknown) { this.logger.debug('Received message from editor-UI', { sessionId, msg }); + const userId = this.userIdBySessionId[sessionId]; - this.emit('message', { - sessionId, - userId, - msg, - }); + + this.emit('message', { sessionId, userId, msg }); } - protected remove(sessionId?: string): void { - if (sessionId !== undefined) { - this.logger.debug('Remove editor-UI session', { sessionId }); - delete this.connections[sessionId]; - delete this.userIdBySessionId[sessionId]; - } + protected remove(sessionId?: string) { + if (!sessionId) return; + + this.logger.debug('Removed editor-UI session', { sessionId }); + + delete this.connections[sessionId]; + delete this.userIdBySessionId[sessionId]; } - private sendToSessions(type: IPushDataType, data: D, sessionIds: string[]) { - this.logger.debug(`Send data of type "${type}" to editor-UI`, { - dataType: type, + /** + * Send a push type and payload to multiple sessions. + */ + private sendToSessions(pushType: IPushDataType, payload: unknown, sessionIds: string[]) { + this.logger.debug(`Send data of type "${pushType}" to editor-UI`, { + dataType: pushType, sessionIds: sessionIds.join(', '), }); - const sendData = jsonStringify({ type, data }, { replaceCircularRefs: true }); + const stringifiedPayload = jsonStringify( + { type: pushType, data: payload }, + { replaceCircularRefs: true }, + ); for (const sessionId of sessionIds) { const connection = this.connections[sessionId]; assert(connection); - this.sendToOne(connection, sendData); + this.sendToConnection(connection, stringifiedPayload); } } /** - * Send the given data to all connected users. + * Send a push type and payload to all sessions. */ - broadcast(type: IPushDataType, data?: D) { - this.sendToSessions(type, data, Object.keys(this.connections)); + broadcast(pushType: IPushDataType, payload?: D) { + this.sendToSessions(pushType, payload, Object.keys(this.connections)); } /** - * Send the given data to one specific user. + * Send a push type and payload to one session. */ - send(type: IPushDataType, data: D, sessionId: string) { + sendToSession(pushType: IPushDataType, payload: unknown, sessionId: string) { // @TODO: Skip if the webhook call reaches the correct main on multi-main setup if (this.multiMainSetup.isEnabled) { void this.multiMainSetup.publish('multi-main-setup:relay-execution-lifecycle-event', { - eventName: type, + eventName: pushType, // @ts-ignore // @TODO - args: data, + args: payload, sessionId, }); return; @@ -104,23 +110,23 @@ export abstract class AbstractPush extends EventEmitter { return; } - this.sendToSessions(type, data, [sessionId]); + this.sendToSessions(pushType, payload, [sessionId]); } /** - * Send the given data to specific users. + * Send a push type and payload to multiple users. */ - sendToUsers(type: IPushDataType, data: D, userIds: Array) { + sendToUsers(pushType: IPushDataType, payload: unknown, userIds: Array) { const { connections } = this; const userSessionIds = Object.keys(connections).filter((sessionId) => userIds.includes(this.userIdBySessionId[sessionId]), ); - this.sendToSessions(type, data, userSessionIds); + this.sendToSessions(pushType, payload, userSessionIds); } /** - * Close all push existing connections. + * Close all existing push connections. */ closeAllConnections() { for (const sessionId in this.connections) { diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index d8705a475c5fe..77c716bb7980e 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -34,9 +34,7 @@ export class Push extends EventEmitter { constructor() { super(); - if (useWebSockets) { - this.backend.on('message', (msg) => this.emit('message', msg)); - } + if (useWebSockets) this.backend.on('message', (msg) => this.emit('message', msg)); } handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) { @@ -44,6 +42,7 @@ export class Push extends EventEmitter { userId, query: { sessionId }, } = req; + if (req.ws) { (this.backend as WebSocketPush).add(sessionId, userId, req.ws); } else if (!useWebSockets) { @@ -56,24 +55,24 @@ export class Push extends EventEmitter { this.emit('editorUiConnected', sessionId); } - broadcast(type: IPushDataType, data?: D) { - this.backend.broadcast(type, data); + broadcast(pushType: IPushDataType, payload?: unknown) { + this.backend.broadcast(pushType, payload); } - send(type: IPushDataType, data: D, sessionId: string) { - this.backend.send(type, data, sessionId); + send(type: IPushDataType, payload: unknown, sessionId: string) { + this.backend.sendToSession(type, payload, sessionId); } getBackend() { return this.backend; } - sendToUsers(type: IPushDataType, data: D, userIds: Array) { - this.backend.sendToUsers(type, data, userIds); + sendToUsers(pushType: IPushDataType, payload: unknown, userIds: Array) { + this.backend.sendToUsers(pushType, payload, userIds); } @OnShutdown() - onShutdown(): void { + onShutdown() { this.backend.closeAllConnections(); } } diff --git a/packages/cli/src/push/sse.push.ts b/packages/cli/src/push/sse.push.ts index f24af06d02672..4076701f4ab4b 100644 --- a/packages/cli/src/push/sse.push.ts +++ b/packages/cli/src/push/sse.push.ts @@ -16,6 +16,7 @@ export class SSEPush extends AbstractPush { constructor(logger: Logger, multiMainSetup: MultiMainSetup) { super(logger, multiMainSetup); + this.channel.on('disconnect', (channel, { req }) => { this.remove(req?.query?.sessionId); }); @@ -30,12 +31,12 @@ export class SSEPush extends AbstractPush { this.channel.addClient(connection.req, connection.res); } - protected close({ res }: Connection): void { + protected close({ res }: Connection) { res.end(); this.channel.removeClient(res); } - protected sendToOne(connection: Connection, data: string): void { + protected sendToConnection(connection: Connection, data: string) { this.channel.send(data, [connection.res]); } } diff --git a/packages/cli/src/push/websocket.push.ts b/packages/cli/src/push/websocket.push.ts index 9cef37dd6fc19..c7c7660cfc096 100644 --- a/packages/cli/src/push/websocket.push.ts +++ b/packages/cli/src/push/websocket.push.ts @@ -52,7 +52,7 @@ export class WebSocketPush extends AbstractPush { connection.close(); } - protected sendToOne(connection: WebSocket, data: string): void { + protected sendToConnection(connection: WebSocket, data: string): void { connection.send(data); } diff --git a/packages/cli/test/unit/push/websocket.push.test.ts b/packages/cli/test/unit/push/websocket.push.test.ts index 2e072c3857558..9d8a530bcc14a 100644 --- a/packages/cli/test/unit/push/websocket.push.test.ts +++ b/packages/cli/test/unit/push/websocket.push.test.ts @@ -65,7 +65,7 @@ describe('WebSocketPush', () => { }, }; - webSocketPush.send('executionRecovered', data, sessionId1); + webSocketPush.sendToSession('executionRecovered', data, sessionId1); expect(mockWebSocket1.send).toHaveBeenCalledWith( JSON.stringify({ From 65fa543abc904ec35561059e7eafa2703d424fc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 16:01:57 +0100 Subject: [PATCH 05/23] Improve types --- packages/cli/src/push/abstract.push.ts | 31 +++++++++---------- packages/cli/src/push/index.ts | 12 +++---- .../orchestration/main/MultiMainSetup.ee.ts | 9 ++---- .../services/redis/RedisServiceCommands.ts | 11 ++++--- 4 files changed, 29 insertions(+), 34 deletions(-) diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 91a7ccf1a56df..47c1d95f1f55b 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -65,16 +65,13 @@ export abstract class AbstractPush extends EventEmitter { /** * Send a push type and payload to multiple sessions. */ - private sendToSessions(pushType: IPushDataType, payload: unknown, sessionIds: string[]) { - this.logger.debug(`Send data of type "${pushType}" to editor-UI`, { - dataType: pushType, + private sendToSessions(type: IPushDataType, data: unknown, sessionIds: string[]) { + this.logger.debug(`Send data of type "${type}" to editor-UI`, { + dataType: type, sessionIds: sessionIds.join(', '), }); - const stringifiedPayload = jsonStringify( - { type: pushType, data: payload }, - { replaceCircularRefs: true }, - ); + const stringifiedPayload = jsonStringify({ type, data }, { replaceCircularRefs: true }); for (const sessionId of sessionIds) { const connection = this.connections[sessionId]; @@ -86,22 +83,22 @@ export abstract class AbstractPush extends EventEmitter { /** * Send a push type and payload to all sessions. */ - broadcast(pushType: IPushDataType, payload?: D) { - this.sendToSessions(pushType, payload, Object.keys(this.connections)); + broadcast(type: IPushDataType, data?: unknown) { + this.sendToSessions(type, data, Object.keys(this.connections)); } /** * Send a push type and payload to one session. */ - sendToSession(pushType: IPushDataType, payload: unknown, sessionId: string) { + sendToSession(type: IPushDataType, data: unknown, sessionId: string) { // @TODO: Skip if the webhook call reaches the correct main on multi-main setup if (this.multiMainSetup.isEnabled) { - void this.multiMainSetup.publish('multi-main-setup:relay-execution-lifecycle-event', { - eventName: pushType, - // @ts-ignore // @TODO - args: payload, + void this.multiMainSetup.publish('relay-execution-lifecycle-event', { + event: type, + eventArgs: data, sessionId, }); + return; } @@ -110,19 +107,19 @@ export abstract class AbstractPush extends EventEmitter { return; } - this.sendToSessions(pushType, payload, [sessionId]); + this.sendToSessions(type, data, [sessionId]); } /** * Send a push type and payload to multiple users. */ - sendToUsers(pushType: IPushDataType, payload: unknown, userIds: Array) { + sendToUsers(type: IPushDataType, data: unknown, userIds: Array) { const { connections } = this; const userSessionIds = Object.keys(connections).filter((sessionId) => userIds.includes(this.userIdBySessionId[sessionId]), ); - this.sendToSessions(pushType, payload, userSessionIds); + this.sendToSessions(type, data, userSessionIds); } /** diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 77c716bb7980e..8be5f46af56e6 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -55,20 +55,20 @@ export class Push extends EventEmitter { this.emit('editorUiConnected', sessionId); } - broadcast(pushType: IPushDataType, payload?: unknown) { - this.backend.broadcast(pushType, payload); + broadcast(type: IPushDataType, data?: unknown) { + this.backend.broadcast(type, data); } - send(type: IPushDataType, payload: unknown, sessionId: string) { - this.backend.sendToSession(type, payload, sessionId); + send(type: IPushDataType, data: unknown, sessionId: string) { + this.backend.sendToSession(type, data, sessionId); } getBackend() { return this.backend; } - sendToUsers(pushType: IPushDataType, payload: unknown, userIds: Array) { - this.backend.sendToUsers(pushType, payload, userIds); + sendToUsers(pushType: IPushDataType, data: unknown, userIds: Array) { + this.backend.sendToUsers(pushType, data, userIds); } @OnShutdown() diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 5544ca80ecf8a..15dae3a8d5d7e 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -4,10 +4,7 @@ import { TIME } from '@/constants'; import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; import { getRedisPrefix } from '@/services/redis/RedisServiceHelper'; import { ErrorReporterProxy as EventReporter } from 'n8n-workflow'; -import type { - RedisServiceCommand, - RedisServiceBaseCommand, -} from '@/services/redis/RedisServiceCommands'; +import type { RedisServiceCommand } from '@/services/redis/RedisServiceCommands'; @Service() export class MultiMainSetup extends SingleMainSetup { @@ -126,10 +123,10 @@ export class MultiMainSetup extends SingleMainSetup { } } - async publish(event: RedisServiceCommand, payload: RedisServiceBaseCommand['payload']) { + async publish(command: RedisServiceCommand, payload: unknown) { if (!this.sanityCheck()) return; - await this.redisPublisher.publishToCommandChannel({ command: event, payload }); + await this.redisPublisher.publishToCommandChannel({ command, payload }); } async fetchLeaderKey() { diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index 603c670a44c71..a9b8435ca7b59 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -9,7 +9,7 @@ export type RedisServiceCommand = | 'reloadExternalSecretsProviders' | 'workflowActiveStateChanged' // multi-main only | 'workflowFailedToActivate' // multi-main only - | 'multi-main-setup:relay-execution-lifecycle-event'; + | 'relay-execution-lifecycle-event'; // multi-main only /** * An object to be sent via Redis pub/sub from the main process to the workers. @@ -20,9 +20,10 @@ export type RedisServiceCommand = export type RedisServiceBaseCommand = { senderId: string; command: RedisServiceCommand; - payload?: { - [key: string]: string | number | boolean | string[] | number[] | boolean[]; - }; + // payload?: { + // [key: string]: string | number | boolean | string[] | number[] | boolean[]; + // }; + payload: unknown; }; export type RedisServiceWorkerResponseObject = { @@ -65,7 +66,7 @@ export type RedisServiceWorkerResponseObject = { command: 'multi-main-setup:relay-execution-lifecycle-event'; payload: { eventName: string; - args: Record; + eventArgs: Record; sessionId: string; }; } From a53ce372ebc1df878520b949a16940d5a01b381d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 16:03:40 +0100 Subject: [PATCH 06/23] Missed a spot --- packages/cli/src/push/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index 8be5f46af56e6..c7452398a7269 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -67,8 +67,8 @@ export class Push extends EventEmitter { return this.backend; } - sendToUsers(pushType: IPushDataType, data: unknown, userIds: Array) { - this.backend.sendToUsers(pushType, data, userIds); + sendToUsers(type: IPushDataType, data: unknown, userIds: Array) { + this.backend.sendToUsers(type, data, userIds); } @OnShutdown() From d4f28bcd9ebad767be6a2bef9119ea737134510d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 16:06:22 +0100 Subject: [PATCH 07/23] Cleanup --- packages/cli/src/push/abstract.push.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 47c1d95f1f55b..dc8a7ddcf2764 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -26,10 +26,6 @@ export abstract class AbstractPush extends EventEmitter { super(); } - hasSessionId(sessionId: string) { - return this.connections[sessionId] !== undefined; - } - protected add(sessionId: string, userId: User['id'], connection: T) { const { connections, userIdBySessionId: userIdsBySessionId } = this; this.logger.debug('Add editor-UI session', { sessionId }); @@ -45,6 +41,10 @@ export abstract class AbstractPush extends EventEmitter { userIdsBySessionId[sessionId] = userId; } + hasSessionId(sessionId: string) { + return this.connections[sessionId] !== undefined; + } + protected onMessageReceived(sessionId: string, msg: unknown) { this.logger.debug('Received message from editor-UI', { sessionId, msg }); @@ -63,7 +63,7 @@ export abstract class AbstractPush extends EventEmitter { } /** - * Send a push type and payload to multiple sessions. + * Send data to multiple sessions. */ private sendToSessions(type: IPushDataType, data: unknown, sessionIds: string[]) { this.logger.debug(`Send data of type "${type}" to editor-UI`, { @@ -81,14 +81,14 @@ export abstract class AbstractPush extends EventEmitter { } /** - * Send a push type and payload to all sessions. + * Send data to all sessions. */ broadcast(type: IPushDataType, data?: unknown) { this.sendToSessions(type, data, Object.keys(this.connections)); } /** - * Send a push type and payload to one session. + * Send data to one session. */ sendToSession(type: IPushDataType, data: unknown, sessionId: string) { // @TODO: Skip if the webhook call reaches the correct main on multi-main setup @@ -111,7 +111,7 @@ export abstract class AbstractPush extends EventEmitter { } /** - * Send a push type and payload to multiple users. + * Send data to multiple users. */ sendToUsers(type: IPushDataType, data: unknown, userIds: Array) { const { connections } = this; From 524df8dbbacb2a6f5c9e3ac291a55f32430173ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 16:36:45 +0100 Subject: [PATCH 08/23] Complete circuit --- packages/cli/src/push/abstract.push.ts | 28 +++++++++---------- packages/cli/src/push/index.ts | 4 +-- .../orchestration/main/MultiMainSetup.ee.ts | 9 ++++-- .../main/handleCommandMessageMain.ts | 15 +++------- .../services/redis/RedisServiceCommands.ts | 26 ++++++++++------- .../cli/test/unit/push/websocket.push.test.ts | 4 +-- 6 files changed, 44 insertions(+), 42 deletions(-) diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index dc8a7ddcf2764..acd2e0dd95014 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -80,24 +80,22 @@ export abstract class AbstractPush extends EventEmitter { } } - /** - * Send data to all sessions. - */ - broadcast(type: IPushDataType, data?: unknown) { + sendToAllSessions(type: IPushDataType, data?: unknown) { this.sendToSessions(type, data, Object.keys(this.connections)); } - /** - * Send data to one session. - */ - sendToSession(type: IPushDataType, data: unknown, sessionId: string) { - // @TODO: Skip if the webhook call reaches the correct main on multi-main setup - if (this.multiMainSetup.isEnabled) { - void this.multiMainSetup.publish('relay-execution-lifecycle-event', { - event: type, - eventArgs: data, - sessionId, - }); + sendToOneSession(type: IPushDataType, data: unknown, sessionId: string) { + /** + * In a manual webhook execution in multi-main setup, the main process that handles + * a webhook might not be the same as the main process that created the webhook. + * + * If so, the handler process commands the creator process to relay execution + * lifecyle events occurring at the handler to the creator's frontend. + */ + if (this.multiMainSetup.isEnabled && !this.hasSessionId(sessionId)) { + const payload = { type, args: data, sessionId }; + + void this.multiMainSetup.publish('relay-execution-lifecycle-event', payload); return; } diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts index c7452398a7269..e740821321cd0 100644 --- a/packages/cli/src/push/index.ts +++ b/packages/cli/src/push/index.ts @@ -56,11 +56,11 @@ export class Push extends EventEmitter { } broadcast(type: IPushDataType, data?: unknown) { - this.backend.broadcast(type, data); + this.backend.sendToAllSessions(type, data); } send(type: IPushDataType, data: unknown, sessionId: string) { - this.backend.sendToSession(type, data, sessionId); + this.backend.sendToOneSession(type, data, sessionId); } getBackend() { diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 15dae3a8d5d7e..9427022cfbac3 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -4,7 +4,10 @@ import { TIME } from '@/constants'; import { SingleMainSetup } from '@/services/orchestration/main/SingleMainSetup'; import { getRedisPrefix } from '@/services/redis/RedisServiceHelper'; import { ErrorReporterProxy as EventReporter } from 'n8n-workflow'; -import type { RedisServiceCommand } from '@/services/redis/RedisServiceCommands'; +import type { + RedisServiceBaseCommand, + RedisServiceCommand, +} from '@/services/redis/RedisServiceCommands'; @Service() export class MultiMainSetup extends SingleMainSetup { @@ -123,9 +126,11 @@ export class MultiMainSetup extends SingleMainSetup { } } - async publish(command: RedisServiceCommand, payload: unknown) { + async publish(command: RedisServiceCommand, data: unknown) { if (!this.sanityCheck()) return; + const payload = data as RedisServiceBaseCommand['payload']; + await this.redisPublisher.publishToCommandChannel({ command, payload }); } diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index e05ec91e773c9..7a878ce270db3 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -128,28 +128,21 @@ export async function handleCommandMessageMain(messageString: string) { Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage }); } - case 'multi-main-setup:relay-execution-lifecycle-event': { + case 'relay-execution-lifecycle-event': { if (!debounceMessageReceiver(message, 100)) { message.payload = { result: 'debounced' }; return message; } - const { eventName, args, sessionId } = message.payload ?? {}; + if (message.command !== 'relay-execution-lifecycle-event') break; - if ( - typeof eventName !== 'string' || - typeof args !== 'object' || - typeof sessionId !== 'string' - ) { - break; - } + const { type, args, sessionId } = message.payload; const push = Container.get(Push); if (!push.getBackend().hasSessionId(sessionId)) break; - // @ts-ignore @TODO: Fix type - push.send(eventName, args, sessionId); + push.send(type, args, sessionId); } default: diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index a9b8435ca7b59..c777939fcf856 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -1,4 +1,4 @@ -import type { IPushDataWorkerStatusPayload } from '@/Interfaces'; +import type { IPushDataType, IPushDataWorkerStatusPayload } from '@/Interfaces'; export type RedisServiceCommand = | 'getStatus' @@ -17,14 +17,20 @@ export type RedisServiceCommand = * @field targets: The targets to execute the command on. Leave empty to execute on all workers or specify worker ids. * @field payload: Optional arguments to be sent with the command. */ -export type RedisServiceBaseCommand = { - senderId: string; - command: RedisServiceCommand; - // payload?: { - // [key: string]: string | number | boolean | string[] | number[] | boolean[]; - // }; - payload: unknown; -}; +export type RedisServiceBaseCommand = + | { + senderId: string; + command: Exclude; + payload?: { + [key: string]: string | number | boolean | string[] | number[] | boolean[]; + }; + } + | { + senderId: string; + command: 'relay-execution-lifecycle-event'; + payload: { type: IPushDataType; args: Record; sessionId: string }; + targets: string[]; + }; export type RedisServiceWorkerResponseObject = { workerId: string; @@ -63,7 +69,7 @@ export type RedisServiceWorkerResponseObject = { }; } | { - command: 'multi-main-setup:relay-execution-lifecycle-event'; + command: 'relay-execution-lifecycle-event'; payload: { eventName: string; eventArgs: Record; diff --git a/packages/cli/test/unit/push/websocket.push.test.ts b/packages/cli/test/unit/push/websocket.push.test.ts index 9d8a530bcc14a..ab55605ba9efa 100644 --- a/packages/cli/test/unit/push/websocket.push.test.ts +++ b/packages/cli/test/unit/push/websocket.push.test.ts @@ -65,7 +65,7 @@ describe('WebSocketPush', () => { }, }; - webSocketPush.sendToSession('executionRecovered', data, sessionId1); + webSocketPush.sendToOneSession('executionRecovered', data, sessionId1); expect(mockWebSocket1.send).toHaveBeenCalledWith( JSON.stringify({ @@ -91,7 +91,7 @@ describe('WebSocketPush', () => { }, }; - webSocketPush.broadcast('executionRecovered', data); + webSocketPush.sendToAllSessions('executionRecovered', data); const expectedMsg = JSON.stringify({ type: 'executionRecovered', From 94bf1fa40c8dd2ccbe0bd420fb75c8381c6b4be7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 16:38:55 +0100 Subject: [PATCH 09/23] Cleanup --- packages/cli/src/push/abstract.push.ts | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index acd2e0dd95014..e500d185f2d66 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -41,10 +41,6 @@ export abstract class AbstractPush extends EventEmitter { userIdsBySessionId[sessionId] = userId; } - hasSessionId(sessionId: string) { - return this.connections[sessionId] !== undefined; - } - protected onMessageReceived(sessionId: string, msg: unknown) { this.logger.debug('Received message from editor-UI', { sessionId, msg }); @@ -62,9 +58,6 @@ export abstract class AbstractPush extends EventEmitter { delete this.userIdBySessionId[sessionId]; } - /** - * Send data to multiple sessions. - */ private sendToSessions(type: IPushDataType, data: unknown, sessionIds: string[]) { this.logger.debug(`Send data of type "${type}" to editor-UI`, { dataType: type, @@ -108,9 +101,6 @@ export abstract class AbstractPush extends EventEmitter { this.sendToSessions(type, data, [sessionId]); } - /** - * Send data to multiple users. - */ sendToUsers(type: IPushDataType, data: unknown, userIds: Array) { const { connections } = this; const userSessionIds = Object.keys(connections).filter((sessionId) => @@ -120,9 +110,6 @@ export abstract class AbstractPush extends EventEmitter { this.sendToSessions(type, data, userSessionIds); } - /** - * Close all existing push connections. - */ closeAllConnections() { for (const sessionId in this.connections) { // Signal the connection that we want to close it. @@ -132,4 +119,8 @@ export abstract class AbstractPush extends EventEmitter { this.close(this.connections[sessionId]); } } + + hasSessionId(sessionId: string) { + return this.connections[sessionId] !== undefined; + } } From 38f7256327f5701bb1e55ef96383fab4e34e88b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 16:40:01 +0100 Subject: [PATCH 10/23] Better naming --- packages/cli/src/push/abstract.push.ts | 4 ++-- packages/cli/src/push/sse.push.ts | 2 +- packages/cli/src/push/websocket.push.ts | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index e500d185f2d66..51f8f9b3dd93f 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -17,7 +17,7 @@ export abstract class AbstractPush extends EventEmitter { protected userIdBySessionId: Record = {}; protected abstract close(connection: T): void; - protected abstract sendToConnection(connection: T, data: string): void; + protected abstract sendToOneConnection(connection: T, data: string): void; constructor( protected readonly logger: Logger, @@ -69,7 +69,7 @@ export abstract class AbstractPush extends EventEmitter { for (const sessionId of sessionIds) { const connection = this.connections[sessionId]; assert(connection); - this.sendToConnection(connection, stringifiedPayload); + this.sendToOneConnection(connection, stringifiedPayload); } } diff --git a/packages/cli/src/push/sse.push.ts b/packages/cli/src/push/sse.push.ts index 4076701f4ab4b..f3db0773bb4e9 100644 --- a/packages/cli/src/push/sse.push.ts +++ b/packages/cli/src/push/sse.push.ts @@ -36,7 +36,7 @@ export class SSEPush extends AbstractPush { this.channel.removeClient(res); } - protected sendToConnection(connection: Connection, data: string) { + protected sendToOneConnection(connection: Connection, data: string) { this.channel.send(data, [connection.res]); } } diff --git a/packages/cli/src/push/websocket.push.ts b/packages/cli/src/push/websocket.push.ts index c7c7660cfc096..6f47b1fb62942 100644 --- a/packages/cli/src/push/websocket.push.ts +++ b/packages/cli/src/push/websocket.push.ts @@ -52,7 +52,7 @@ export class WebSocketPush extends AbstractPush { connection.close(); } - protected sendToConnection(connection: WebSocket, data: string): void { + protected sendToOneConnection(connection: WebSocket, data: string): void { connection.send(data); } From 21c2226c92ea37549713a887c5e3e1431917fdad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 16:41:36 +0100 Subject: [PATCH 11/23] Expand comment --- .../services/orchestration/main/handleCommandMessageMain.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 7a878ce270db3..f83db9a506df6 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -40,7 +40,9 @@ export async function handleCommandMessageMain(messageString: string) { return message; } - // @TODO: Should we be checking for multi-main here? + // @TODO + // Should we be enforcing that only the leader is entitled to reload the license? + // Check with Omar if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) { // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently logger.error( From 06aadc72092ba6ac8e35656d10e68dbba8c1e244 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 16:44:47 +0100 Subject: [PATCH 12/23] Cleanup --- .../services/orchestration/main/handleCommandMessageMain.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index f83db9a506df6..d2063a084a745 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -40,9 +40,8 @@ export async function handleCommandMessageMain(messageString: string) { return message; } - // @TODO - // Should we be enforcing that only the leader is entitled to reload the license? - // Check with Omar + // @TODO - Check with Omar + // What's up with this multi-main check? if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) { // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently logger.error( From 961af880f6936d87576a6211ef1386190dfcc7aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 16:51:14 +0100 Subject: [PATCH 13/23] Optimize deregistration --- packages/cli/src/TestWebhooks.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/cli/src/TestWebhooks.ts b/packages/cli/src/TestWebhooks.ts index cffff29faab29..0923e5550a40f 100644 --- a/packages/cli/src/TestWebhooks.ts +++ b/packages/cli/src/TestWebhooks.ts @@ -137,8 +137,6 @@ export class TestWebhooks implements IWebhookManager { // Delete webhook also if an error is thrown if (timeout) clearTimeout(timeout); - await this.registrations.deregisterAll(); - await this.deactivateWebhooks(workflow); }); } @@ -342,9 +340,9 @@ export class TestWebhooks implements IWebhookManager { for (const webhook of webhooks) { await workflow.deleteWebhook(webhook, NodeExecuteFunctions, 'internal', 'update'); - - await this.registrations.deregister(webhook); } + + await this.registrations.deregisterAll(); } /** From f02d2bafeac9d3d90c6985dce0da9e200db510c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 17:17:57 +0100 Subject: [PATCH 14/23] Implement IPC for clearing test webhooks --- packages/cli/src/TestWebhooks.ts | 35 ++++++++++++++---- packages/cli/src/push/abstract.push.ts | 2 +- .../main/handleCommandMessageMain.ts | 37 ++++++++++++++++--- .../services/redis/RedisServiceCommands.ts | 16 ++++++-- 4 files changed, 72 insertions(+), 18 deletions(-) diff --git a/packages/cli/src/TestWebhooks.ts b/packages/cli/src/TestWebhooks.ts index 0923e5550a40f..8d42a9ca497aa 100644 --- a/packages/cli/src/TestWebhooks.ts +++ b/packages/cli/src/TestWebhooks.ts @@ -24,6 +24,7 @@ import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found import * as NodeExecuteFunctions from 'n8n-core'; import { removeTrailingSlash } from './utils'; import { TestWebhookRegistrationsService } from '@/services/test-webhook-registrations.service'; +import { MultiMainSetup } from './services/orchestration/main/MultiMainSetup.ee'; @Service() export class TestWebhooks implements IWebhookManager { @@ -31,6 +32,7 @@ export class TestWebhooks implements IWebhookManager { private readonly push: Push, private readonly nodeTypes: NodeTypes, private readonly registrations: TestWebhookRegistrationsService, + private readonly multiMainSetup: MultiMainSetup, ) {} private timeouts: { [webhookKey: string]: NodeJS.Timeout } = {}; @@ -88,7 +90,6 @@ export class TestWebhooks implements IWebhookManager { } const { destinationNode, sessionId, workflowEntity } = registration; - const timeout = this.timeouts[key]; const workflow = this.toWorkflow(workflowEntity); @@ -134,13 +135,35 @@ export class TestWebhooks implements IWebhookManager { } } catch {} - // Delete webhook also if an error is thrown - if (timeout) clearTimeout(timeout); + /** + * Multi-main setup: In a manual webhook execution, the main process that handles + * a webhook might not be the same as the main process that created the webhook. + * + * If so, after the test webhook has been successfully executed, the handler process + * commands the creator process to clear its test webhooks. + */ + if ( + this.multiMainSetup.isEnabled && + sessionId && + !this.push.getBackend().hasSessionId(sessionId) + ) { + const payload = { webhookKey: key, workflowEntity, sessionId }; + void this.multiMainSetup.publish('clear-test-webhooks', payload); + return; + } + + this.clearTimeout(key); await this.deactivateWebhooks(workflow); }); } + clearTimeout(key: string) { + const timeout = this.timeouts[key]; + + if (timeout) clearTimeout(timeout); + } + async getWebhookMethods(path: string) { const allKeys = await this.registrations.getAllKeys(); @@ -259,13 +282,11 @@ export class TestWebhooks implements IWebhookManager { const { sessionId, workflowEntity } = registration; - const timeout = this.timeouts[key]; - const workflow = this.toWorkflow(workflowEntity); if (workflowEntity.id !== workflowId) continue; - clearTimeout(timeout); + this.clearTimeout(key); if (sessionId !== undefined) { try { @@ -348,7 +369,7 @@ export class TestWebhooks implements IWebhookManager { /** * Convert a `WorkflowEntity` from `typeorm` to a `Workflow` from `n8n-workflow`. */ - private toWorkflow(workflowEntity: IWorkflowDb) { + toWorkflow(workflowEntity: IWorkflowDb) { return new Workflow({ id: workflowEntity.id, name: workflowEntity.name, diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index 51f8f9b3dd93f..bb231427ebe33 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -79,7 +79,7 @@ export abstract class AbstractPush extends EventEmitter { sendToOneSession(type: IPushDataType, data: unknown, sessionId: string) { /** - * In a manual webhook execution in multi-main setup, the main process that handles + * Multi-main setup: In a manual webhook execution, the main process that handles * a webhook might not be the same as the main process that created the webhook. * * If so, the handler process commands the creator process to relay execution diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index d2063a084a745..2a6645e814cdd 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -9,6 +9,7 @@ import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import { Push } from '@/push'; import { MultiMainSetup } from './MultiMainSetup.ee'; import { WorkflowRepository } from '@/databases/repositories/workflow.repository'; +import { TestWebhooks } from '@/TestWebhooks'; export async function handleCommandMessageMain(messageString: string) { const queueModeId = config.getEnv('redis.queueModeId'); @@ -31,6 +32,9 @@ export async function handleCommandMessageMain(messageString: string) { ); return message; } + + const push = Container.get(Push); + switch (message.command) { case 'reloadLicense': if (!debounceMessageReceiver(message, 500)) { @@ -86,8 +90,6 @@ export async function handleCommandMessageMain(messageString: string) { break; } - const push = Container.get(Push); - if (!oldState && newState) { try { await activeWorkflowRunner.add(workflowId, 'activate'); @@ -127,23 +129,46 @@ export async function handleCommandMessageMain(messageString: string) { if (typeof workflowId !== 'string' || typeof errorMessage !== 'string') break; Container.get(Push).broadcast('workflowFailedToActivate', { workflowId, errorMessage }); + + break; } case 'relay-execution-lifecycle-event': { if (!debounceMessageReceiver(message, 100)) { + // @ts-expect-error Legacy typing message.payload = { result: 'debounced' }; return message; } - if (message.command !== 'relay-execution-lifecycle-event') break; - const { type, args, sessionId } = message.payload; - const push = Container.get(Push); - if (!push.getBackend().hasSessionId(sessionId)) break; push.send(type, args, sessionId); + + break; + } + + case 'clear-test-webhooks': { + if (!debounceMessageReceiver(message, 100)) { + // @ts-expect-error Legacy typing + message.payload = { result: 'debounced' }; + return message; + } + + const { webhookKey, workflowEntity, sessionId } = message.payload; + + if (!push.getBackend().hasSessionId(sessionId)) break; + + const testWebhooks = Container.get(TestWebhooks); + + testWebhooks.clearTimeout(webhookKey); + + const workflow = testWebhooks.toWorkflow(workflowEntity); + + await testWebhooks.deactivateWebhooks(workflow); + + break; } default: diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index c777939fcf856..457a8f76de601 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -1,4 +1,4 @@ -import type { IPushDataType, IPushDataWorkerStatusPayload } from '@/Interfaces'; +import type { IPushDataType, IPushDataWorkerStatusPayload, IWorkflowDb } from '@/Interfaces'; export type RedisServiceCommand = | 'getStatus' @@ -9,7 +9,8 @@ export type RedisServiceCommand = | 'reloadExternalSecretsProviders' | 'workflowActiveStateChanged' // multi-main only | 'workflowFailedToActivate' // multi-main only - | 'relay-execution-lifecycle-event'; // multi-main only + | 'relay-execution-lifecycle-event' // multi-main only + | 'clear-test-webhooks'; // multi-main only /** * An object to be sent via Redis pub/sub from the main process to the workers. @@ -20,7 +21,10 @@ export type RedisServiceCommand = export type RedisServiceBaseCommand = | { senderId: string; - command: Exclude; + command: Exclude< + RedisServiceCommand, + 'relay-execution-lifecycle-event' | 'clear-test-webhooks' + >; payload?: { [key: string]: string | number | boolean | string[] | number[] | boolean[]; }; @@ -29,7 +33,11 @@ export type RedisServiceBaseCommand = senderId: string; command: 'relay-execution-lifecycle-event'; payload: { type: IPushDataType; args: Record; sessionId: string }; - targets: string[]; + } + | { + senderId: string; + command: 'clear-test-webhooks'; + payload: { webhookKey: string; workflowEntity: IWorkflowDb; sessionId: string }; }; export type RedisServiceWorkerResponseObject = { From acba608a710feb9bce0901dfb129770f8530becf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 9 Jan 2024 17:21:28 +0100 Subject: [PATCH 15/23] Fix some tests --- .../test/integration/workflow.service.test.ts | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/packages/cli/test/integration/workflow.service.test.ts b/packages/cli/test/integration/workflow.service.test.ts index 923fc71756cbd..b98fb8ff742e8 100644 --- a/packages/cli/test/integration/workflow.service.test.ts +++ b/packages/cli/test/integration/workflow.service.test.ts @@ -90,22 +90,30 @@ describe('update()', () => { const owner = await createOwner(); const workflow = await createWorkflow({ active: true }, owner); - const broadcastSpy = jest.spyOn(multiMainSetup, 'broadcastWorkflowActiveStateChanged'); + const publishSpy = jest.spyOn(multiMainSetup, 'publish'); workflow.active = false; await workflowService.update(owner, workflow, workflow.id); - expect(broadcastSpy).toHaveBeenCalledTimes(1); + expect(publishSpy).toHaveBeenCalledTimes(1); + expect(publishSpy).toHaveBeenCalledWith( + 'workflowActiveStateChanged', + expect.objectContaining({ + newState: false, + oldState: true, + workflowId: workflow.id, + }), + ); }); test('should not broadcast active workflow state change if state did not change', async () => { const owner = await createOwner(); const workflow = await createWorkflow({ active: true }, owner); - const broadcastSpy = jest.spyOn(multiMainSetup, 'broadcastWorkflowActiveStateChanged'); + const publishSpy = jest.spyOn(multiMainSetup, 'publish'); await workflowService.update(owner, workflow, workflow.id); - expect(broadcastSpy).not.toHaveBeenCalled(); + expect(publishSpy).not.toHaveBeenCalled(); }); }); From 532d2a1373923d4c84d097f94d4a67191e802030 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 10 Jan 2024 09:36:15 +0100 Subject: [PATCH 16/23] Cleanup --- packages/cli/src/TestWebhooks.ts | 21 +++++++++---------- packages/cli/src/push/abstract.push.ts | 9 ++++---- packages/cli/src/push/sse.push.ts | 4 ---- .../orchestration/main/MultiMainSetup.ee.ts | 2 ++ .../main/handleCommandMessageMain.ts | 2 +- .../services/redis/RedisServiceCommands.ts | 8 ------- packages/cli/test/unit/TestWebhooks.test.ts | 4 ++-- 7 files changed, 19 insertions(+), 31 deletions(-) diff --git a/packages/cli/src/TestWebhooks.ts b/packages/cli/src/TestWebhooks.ts index 0bf65835dd44f..4b0d169555a43 100644 --- a/packages/cli/src/TestWebhooks.ts +++ b/packages/cli/src/TestWebhooks.ts @@ -92,7 +92,7 @@ export class TestWebhooks implements IWebhookManager { const { destinationNode, sessionId, workflowEntity } = registration; - const workflow = this.toWorkflow(workflowEntity); + const workflow = this.toTempWorkflow(workflowEntity); const workflowStartNode = workflow.getNode(webhook.node); @@ -137,11 +137,10 @@ export class TestWebhooks implements IWebhookManager { } catch {} /** - * Multi-main setup: In a manual webhook execution, the main process that handles - * a webhook might not be the same as the main process that created the webhook. - * - * If so, after the test webhook has been successfully executed, the handler process - * commands the creator process to clear its test webhooks. + * Multi-main setup: In a manual webhook execution, the main process that + * handles a webhook might not be the same as the main process that created + * the webhook. If so, after the test webhook has been successfully executed, + * the handler process commands the creator process to clear its test webhooks. */ if ( this.multiMainSetup.isEnabled && @@ -190,7 +189,7 @@ export class TestWebhooks implements IWebhookManager { const { workflowEntity } = registration; - const workflow = this.toWorkflow(workflowEntity); + const workflow = this.toTempWorkflow(workflowEntity); const webhookNode = Object.values(workflow.nodes).find( ({ type, parameters, typeVersion }) => @@ -215,7 +214,7 @@ export class TestWebhooks implements IWebhookManager { ) { if (!workflowEntity.id) throw new WorkflowMissingIdError(workflowEntity); - const workflow = this.toWorkflow(workflowEntity); + const workflow = this.toTempWorkflow(workflowEntity); const webhooks = WebhookHelpers.getWorkflowWebhooks( workflow, @@ -286,7 +285,7 @@ export class TestWebhooks implements IWebhookManager { const { sessionId, workflowEntity } = registration; - const workflow = this.toWorkflow(workflowEntity); + const workflow = this.toTempWorkflow(workflowEntity); if (workflowEntity.id !== workflowId) continue; @@ -379,9 +378,9 @@ export class TestWebhooks implements IWebhookManager { } /** - * Convert a `WorkflowEntity` from `typeorm` to a `Workflow` from `n8n-workflow`. + * Convert a `WorkflowEntity` from `typeorm` to a temporary `Workflow` from `n8n-workflow`. */ - toWorkflow(workflowEntity: IWorkflowDb) { + toTempWorkflow(workflowEntity: IWorkflowDb) { return new Workflow({ id: workflowEntity.id, name: workflowEntity.name, diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index bb231427ebe33..655e5486b5179 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -79,11 +79,10 @@ export abstract class AbstractPush extends EventEmitter { sendToOneSession(type: IPushDataType, data: unknown, sessionId: string) { /** - * Multi-main setup: In a manual webhook execution, the main process that handles - * a webhook might not be the same as the main process that created the webhook. - * - * If so, the handler process commands the creator process to relay execution - * lifecyle events occurring at the handler to the creator's frontend. + * Multi-main setup: In a manual webhook execution, the main process that + * handles a webhook might not be the same as the main process that created + * the webhook. If so, the handler process commands the creator process to + * relay the former's execution lifecyle events to the creator's frontend. */ if (this.multiMainSetup.isEnabled && !this.hasSessionId(sessionId)) { const payload = { type, args: data, sessionId }; diff --git a/packages/cli/src/push/sse.push.ts b/packages/cli/src/push/sse.push.ts index f3db0773bb4e9..17f4c7ad9ec7b 100644 --- a/packages/cli/src/push/sse.push.ts +++ b/packages/cli/src/push/sse.push.ts @@ -22,10 +22,6 @@ export class SSEPush extends AbstractPush { }); } - hasSessionId(sessionId: string) { - return this.connections[sessionId] !== undefined; - } - add(sessionId: string, userId: User['id'], connection: Connection) { super.add(sessionId, userId, connection); this.channel.addClient(connection.req, connection.res); diff --git a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts index 9427022cfbac3..55822c9ac84b0 100644 --- a/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts +++ b/packages/cli/src/services/orchestration/main/MultiMainSetup.ee.ts @@ -131,6 +131,8 @@ export class MultiMainSetup extends SingleMainSetup { const payload = data as RedisServiceBaseCommand['payload']; + this.logger.debug(`[Instance ID ${this.id}] Publishing command "${command}"`, payload); + await this.redisPublisher.publishToCommandChannel({ command, payload }); } diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 2a6645e814cdd..bb47af9e74148 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -164,7 +164,7 @@ export async function handleCommandMessageMain(messageString: string) { testWebhooks.clearTimeout(webhookKey); - const workflow = testWebhooks.toWorkflow(workflowEntity); + const workflow = testWebhooks.toTempWorkflow(workflowEntity); await testWebhooks.deactivateWebhooks(workflow); diff --git a/packages/cli/src/services/redis/RedisServiceCommands.ts b/packages/cli/src/services/redis/RedisServiceCommands.ts index 457a8f76de601..e1c20d71a6f3b 100644 --- a/packages/cli/src/services/redis/RedisServiceCommands.ts +++ b/packages/cli/src/services/redis/RedisServiceCommands.ts @@ -76,14 +76,6 @@ export type RedisServiceWorkerResponseObject = { workflowId: string; }; } - | { - command: 'relay-execution-lifecycle-event'; - payload: { - eventName: string; - eventArgs: Record; - sessionId: string; - }; - } ); export type RedisServiceCommandObject = { diff --git a/packages/cli/test/unit/TestWebhooks.test.ts b/packages/cli/test/unit/TestWebhooks.test.ts index ecb3fe8044d92..6b6d14530f9da 100644 --- a/packages/cli/test/unit/TestWebhooks.test.ts +++ b/packages/cli/test/unit/TestWebhooks.test.ts @@ -39,7 +39,7 @@ let testWebhooks: TestWebhooks; describe('TestWebhooks', () => { beforeAll(() => { - testWebhooks = new TestWebhooks(mock(), mock(), registrations); + testWebhooks = new TestWebhooks(mock(), mock(), registrations, mock()); jest.useFakeTimers(); }); @@ -117,7 +117,7 @@ describe('TestWebhooks', () => { test('should add additional data to workflow', async () => { registrations.getAllRegistrations.mockResolvedValue([{ workflowEntity, webhook }]); - const workflow = testWebhooks.toWorkflow(workflowEntity); + const workflow = testWebhooks.toTempWorkflow(workflowEntity); await testWebhooks.deactivateWebhooks(workflow); From ea7b4628020cb71912ab4598a32e5503905ce0b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 10 Jan 2024 09:43:56 +0100 Subject: [PATCH 17/23] Renaming --- packages/cli/src/TestWebhooks.ts | 10 +++++----- .../orchestration/main/handleCommandMessageMain.ts | 2 +- packages/cli/test/unit/TestWebhooks.test.ts | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/cli/src/TestWebhooks.ts b/packages/cli/src/TestWebhooks.ts index 4b0d169555a43..eec8257119295 100644 --- a/packages/cli/src/TestWebhooks.ts +++ b/packages/cli/src/TestWebhooks.ts @@ -92,7 +92,7 @@ export class TestWebhooks implements IWebhookManager { const { destinationNode, sessionId, workflowEntity } = registration; - const workflow = this.toTempWorkflow(workflowEntity); + const workflow = this.toWorkflow(workflowEntity); const workflowStartNode = workflow.getNode(webhook.node); @@ -189,7 +189,7 @@ export class TestWebhooks implements IWebhookManager { const { workflowEntity } = registration; - const workflow = this.toTempWorkflow(workflowEntity); + const workflow = this.toWorkflow(workflowEntity); const webhookNode = Object.values(workflow.nodes).find( ({ type, parameters, typeVersion }) => @@ -214,7 +214,7 @@ export class TestWebhooks implements IWebhookManager { ) { if (!workflowEntity.id) throw new WorkflowMissingIdError(workflowEntity); - const workflow = this.toTempWorkflow(workflowEntity); + const workflow = this.toWorkflow(workflowEntity); const webhooks = WebhookHelpers.getWorkflowWebhooks( workflow, @@ -285,7 +285,7 @@ export class TestWebhooks implements IWebhookManager { const { sessionId, workflowEntity } = registration; - const workflow = this.toTempWorkflow(workflowEntity); + const workflow = this.toWorkflow(workflowEntity); if (workflowEntity.id !== workflowId) continue; @@ -380,7 +380,7 @@ export class TestWebhooks implements IWebhookManager { /** * Convert a `WorkflowEntity` from `typeorm` to a temporary `Workflow` from `n8n-workflow`. */ - toTempWorkflow(workflowEntity: IWorkflowDb) { + toWorkflow(workflowEntity: IWorkflowDb) { return new Workflow({ id: workflowEntity.id, name: workflowEntity.name, diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index bb47af9e74148..2a6645e814cdd 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -164,7 +164,7 @@ export async function handleCommandMessageMain(messageString: string) { testWebhooks.clearTimeout(webhookKey); - const workflow = testWebhooks.toTempWorkflow(workflowEntity); + const workflow = testWebhooks.toWorkflow(workflowEntity); await testWebhooks.deactivateWebhooks(workflow); diff --git a/packages/cli/test/unit/TestWebhooks.test.ts b/packages/cli/test/unit/TestWebhooks.test.ts index 6b6d14530f9da..6d2a561f106d7 100644 --- a/packages/cli/test/unit/TestWebhooks.test.ts +++ b/packages/cli/test/unit/TestWebhooks.test.ts @@ -117,7 +117,7 @@ describe('TestWebhooks', () => { test('should add additional data to workflow', async () => { registrations.getAllRegistrations.mockResolvedValue([{ workflowEntity, webhook }]); - const workflow = testWebhooks.toTempWorkflow(workflowEntity); + const workflow = testWebhooks.toWorkflow(workflowEntity); await testWebhooks.deactivateWebhooks(workflow); From c248357e1d1e80c7722bc69d141098f225b59685 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 10 Jan 2024 11:50:37 +0100 Subject: [PATCH 18/23] Remove debounding --- .../orchestration/main/handleCommandMessageMain.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 2a6645e814cdd..1e9081939b2f3 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -134,11 +134,9 @@ export async function handleCommandMessageMain(messageString: string) { } case 'relay-execution-lifecycle-event': { - if (!debounceMessageReceiver(message, 100)) { - // @ts-expect-error Legacy typing - message.payload = { result: 'debounced' }; - return message; - } + /** + * Do not debounce this - all events share the same message name. + */ const { type, args, sessionId } = message.payload; From bde1ecf1e674aef4683869deec88c4928ca226f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 10 Jan 2024 11:50:53 +0100 Subject: [PATCH 19/23] Sanity check on worker --- .../orchestration/worker/handleCommandMessageWorker.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts b/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts index 32c8a5c6318ce..ec68be49a8b8f 100644 --- a/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts +++ b/packages/cli/src/services/orchestration/worker/handleCommandMessageWorker.ts @@ -122,6 +122,13 @@ export function getWorkerCommandReceivedHandler(options: WorkerCommandReceivedHa // await this.stopProcess(); break; default: + if ( + message.command === 'relay-execution-lifecycle-event' || + message.command === 'clear-test-webhooks' + ) { + break; // meant only for main + } + logger.debug( // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `Received unknown command via channel ${COMMAND_REDIS_CHANNEL}: "${message.command}"`, From 9ddc31058ce0f5542acd5abf9f23ff0e633eb630 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 10 Jan 2024 11:51:03 +0100 Subject: [PATCH 20/23] Set up for expiration --- packages/cli/src/TestWebhooks.ts | 4 ++-- packages/cli/src/constants.ts | 2 ++ .../services/test-webhook-registrations.service.ts | 11 +++++++++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/packages/cli/src/TestWebhooks.ts b/packages/cli/src/TestWebhooks.ts index eec8257119295..e6004a84394fe 100644 --- a/packages/cli/src/TestWebhooks.ts +++ b/packages/cli/src/TestWebhooks.ts @@ -17,7 +17,7 @@ import type { import { Push } from '@/push'; import { NodeTypes } from '@/NodeTypes'; import * as WebhookHelpers from '@/WebhookHelpers'; -import { TIME } from '@/constants'; +import { TEST_WEBHOOK_TIMEOUT } from '@/constants'; import { NotFoundError } from '@/errors/response-errors/not-found.error'; import { WorkflowMissingIdError } from '@/errors/workflow-missing-id.error'; import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error'; @@ -227,7 +227,7 @@ export class TestWebhooks implements IWebhookManager { return false; // no webhooks found to start a workflow } - const timeout = setTimeout(async () => this.cancelWebhook(workflow.id), 2 * TIME.MINUTE); + const timeout = setTimeout(async () => this.cancelWebhook(workflow.id), TEST_WEBHOOK_TIMEOUT); for (const webhook of webhooks) { const key = this.registrations.toKey(webhook); diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index 7d4e4fbf6191b..edd8fb361f438 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -114,3 +114,5 @@ export const TIME = { export const MIN_PASSWORD_CHAR_LENGTH = 8; export const MAX_PASSWORD_CHAR_LENGTH = 64; + +export const TEST_WEBHOOK_TIMEOUT = 2 * TIME.MINUTE; diff --git a/packages/cli/src/services/test-webhook-registrations.service.ts b/packages/cli/src/services/test-webhook-registrations.service.ts index 7c0d05b2d53e0..fa2655b31e440 100644 --- a/packages/cli/src/services/test-webhook-registrations.service.ts +++ b/packages/cli/src/services/test-webhook-registrations.service.ts @@ -20,6 +20,17 @@ export class TestWebhookRegistrationsService { const hashKey = this.toKey(registration.webhook); await this.cacheService.setHash(this.cacheKey, { [hashKey]: registration }); + + /** + * @TODO: Redis lib does not expose `EXPIRE` + * + * Multi-main setup: In a manual webhook execution, the main process that + * handles a webhook might not be the same as the main process that created + * the webhook. If so, after the test webhook has been successfully executed, + * the handler process commands the creator process to clear its test webhooks. + * Even on creator process crash, test webhooks must be cleared from Redis. + */ + // await this.cacheService.expire(this.cacheKey, TEST_WEBHOOK_TIMEOUT); } async deregister(arg: IWebhookData | string) { From d32d039b2d79f915d422ac12bbc20a8b0a9e41ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 10 Jan 2024 12:01:36 +0100 Subject: [PATCH 21/23] Remove outdated comment --- .../src/services/orchestration/main/handleCommandMessageMain.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts index 1e9081939b2f3..f41106ca2d10f 100644 --- a/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts +++ b/packages/cli/src/services/orchestration/main/handleCommandMessageMain.ts @@ -44,8 +44,6 @@ export async function handleCommandMessageMain(messageString: string) { return message; } - // @TODO - Check with Omar - // What's up with this multi-main check? if (isMainInstance && !config.getEnv('multiMainSetup.enabled')) { // at this point in time, only a single main instance is supported, thus this command _should_ never be caught currently logger.error( From 61637c5d7d28593c2bea7c0981de783fc8c43528 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Wed, 10 Jan 2024 12:14:26 +0100 Subject: [PATCH 22/23] Implement expiration --- packages/cli/src/constants.ts | 2 +- packages/cli/src/services/cache/cache.service.ts | 16 ++++++++++++++++ .../src/services/cache/redis.cache-manager.ts | 4 ++++ .../test-webhook-registrations.service.ts | 7 +++---- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index edd8fb361f438..11d92ce4f7350 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -109,7 +109,7 @@ export const TIME = { MINUTE: 60 * 1000, HOUR: 60 * 60 * 1000, DAY: 24 * 60 * 60 * 1000, -}; +} as const; export const MIN_PASSWORD_CHAR_LENGTH = 8; diff --git a/packages/cli/src/services/cache/cache.service.ts b/packages/cli/src/services/cache/cache.service.ts index d9ea8b04b15a7..421597a17219e 100644 --- a/packages/cli/src/services/cache/cache.service.ts +++ b/packages/cli/src/services/cache/cache.service.ts @@ -15,6 +15,7 @@ import type { MaybeHash, Hash, } from '@/services/cache/cache.types'; +import { TIME } from '@/constants'; @Service() export class CacheService extends EventEmitter { @@ -130,6 +131,21 @@ export class CacheService extends EventEmitter { await this.set(key, hashObject); } + async expire(key: string, ttlMs: number) { + if (!this.cache) await this.init(); + + if (!key?.length) return; + + if (this.cache.kind === 'memory') { + setTimeout(async () => { + await this.cache.store.del(key); + }, ttlMs); + return; + } + + await this.cache.store.expire(key, ttlMs / TIME.SECOND); + } + // ---------------------------------- // retrieving // ---------------------------------- diff --git a/packages/cli/src/services/cache/redis.cache-manager.ts b/packages/cli/src/services/cache/redis.cache-manager.ts index ca3e920dc27e5..d556dacdc744b 100644 --- a/packages/cli/src/services/cache/redis.cache-manager.ts +++ b/packages/cli/src/services/cache/redis.cache-manager.ts @@ -39,6 +39,7 @@ export interface RedisStore extends Store { hvals(key: string): Promise; hexists(key: string, field: string): Promise; hdel(key: string, field: string): Promise; + expire(key: string, ttlSeconds: number): Promise; } function builder( @@ -56,6 +57,9 @@ function builder( if (val === undefined || val === null) return undefined; else return jsonParse(val); }, + async expire(key: string, ttlSeconds: number) { + await redisCache.expire(key, ttlSeconds); + }, async set(key, value, ttl) { // eslint-disable-next-line @typescript-eslint/no-throw-literal, @typescript-eslint/restrict-template-expressions if (!isCacheable(value)) throw new NoCacheableError(`"${value}" is not a cacheable value`); diff --git a/packages/cli/src/services/test-webhook-registrations.service.ts b/packages/cli/src/services/test-webhook-registrations.service.ts index fa2655b31e440..643acd24eda66 100644 --- a/packages/cli/src/services/test-webhook-registrations.service.ts +++ b/packages/cli/src/services/test-webhook-registrations.service.ts @@ -2,6 +2,7 @@ import { Service } from 'typedi'; import { CacheService } from '@/services/cache/cache.service'; import { type IWebhookData } from 'n8n-workflow'; import type { IWorkflowDb } from '@/Interfaces'; +import { TEST_WEBHOOK_TIMEOUT } from '@/constants'; export type TestWebhookRegistration = { sessionId?: string; @@ -22,15 +23,13 @@ export class TestWebhookRegistrationsService { await this.cacheService.setHash(this.cacheKey, { [hashKey]: registration }); /** - * @TODO: Redis lib does not expose `EXPIRE` - * * Multi-main setup: In a manual webhook execution, the main process that * handles a webhook might not be the same as the main process that created * the webhook. If so, after the test webhook has been successfully executed, * the handler process commands the creator process to clear its test webhooks. - * Even on creator process crash, test webhooks must be cleared from Redis. + * We set a TTL on the key so that it is cleared even on creator process crash. */ - // await this.cacheService.expire(this.cacheKey, TEST_WEBHOOK_TIMEOUT); + await this.cacheService.expire(this.cacheKey, TEST_WEBHOOK_TIMEOUT); } async deregister(arg: IWebhookData | string) { From 302103eeb2629a4b0247109d406013d108e048cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Fri, 12 Jan 2024 11:23:35 +0100 Subject: [PATCH 23/23] Add buffer --- packages/cli/src/constants.ts | 2 ++ .../src/services/test-webhook-registrations.service.ts | 10 +++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/cli/src/constants.ts b/packages/cli/src/constants.ts index 11d92ce4f7350..8603d2996330d 100644 --- a/packages/cli/src/constants.ts +++ b/packages/cli/src/constants.ts @@ -116,3 +116,5 @@ export const MIN_PASSWORD_CHAR_LENGTH = 8; export const MAX_PASSWORD_CHAR_LENGTH = 64; export const TEST_WEBHOOK_TIMEOUT = 2 * TIME.MINUTE; + +export const TEST_WEBHOOK_TIMEOUT_BUFFER = 30 * TIME.SECOND; diff --git a/packages/cli/src/services/test-webhook-registrations.service.ts b/packages/cli/src/services/test-webhook-registrations.service.ts index 643acd24eda66..58a80dd758559 100644 --- a/packages/cli/src/services/test-webhook-registrations.service.ts +++ b/packages/cli/src/services/test-webhook-registrations.service.ts @@ -2,7 +2,7 @@ import { Service } from 'typedi'; import { CacheService } from '@/services/cache/cache.service'; import { type IWebhookData } from 'n8n-workflow'; import type { IWorkflowDb } from '@/Interfaces'; -import { TEST_WEBHOOK_TIMEOUT } from '@/constants'; +import { TEST_WEBHOOK_TIMEOUT, TEST_WEBHOOK_TIMEOUT_BUFFER } from '@/constants'; export type TestWebhookRegistration = { sessionId?: string; @@ -27,9 +27,13 @@ export class TestWebhookRegistrationsService { * handles a webhook might not be the same as the main process that created * the webhook. If so, after the test webhook has been successfully executed, * the handler process commands the creator process to clear its test webhooks. - * We set a TTL on the key so that it is cleared even on creator process crash. + * We set a TTL on the key so that it is cleared even on creator process crash, + * with an additional buffer to ensure this safeguard expiration will not delete + * the key before the regular test webhook timeout fetches the key to delete it. */ - await this.cacheService.expire(this.cacheKey, TEST_WEBHOOK_TIMEOUT); + const ttl = TEST_WEBHOOK_TIMEOUT + TEST_WEBHOOK_TIMEOUT_BUFFER; + + await this.cacheService.expire(this.cacheKey, ttl); } async deregister(arg: IWebhookData | string) {