From f2b439f230503982f8579130f6b95701e7588ba4 Mon Sep 17 00:00:00 2001 From: agobrech <45268029+agobrech@users.noreply.github.com> Date: Wed, 16 Aug 2023 13:06:47 +0200 Subject: [PATCH] feat: Enable parallel processing on multiple queue nodes (#6295) * Add non-parallel execution * Add parallel processing for MQTT * Fix logic expression for trigger * fixes * remove unused import * fix MQTT parallel processing * fix AMQPTrigger node parallelProcessing * MQTTTrigger node default parallelProcessing to true * add AMQP credential test * improve error handling --------- Co-authored-by: Marcus --- .../credentials/Amqp.credentials.ts | 2 +- packages/nodes-base/nodes/Amqp/Amqp.node.ts | 53 ++++++++++++++++++- .../nodes-base/nodes/Amqp/AmqpTrigger.node.ts | 23 +++++++- .../nodes-base/nodes/MQTT/MqttTrigger.node.ts | 23 +++++++- 4 files changed, 95 insertions(+), 6 deletions(-) diff --git a/packages/nodes-base/credentials/Amqp.credentials.ts b/packages/nodes-base/credentials/Amqp.credentials.ts index 7f0487f839c1e5..1b20baf38eb4d7 100644 --- a/packages/nodes-base/credentials/Amqp.credentials.ts +++ b/packages/nodes-base/credentials/Amqp.credentials.ts @@ -40,7 +40,7 @@ export class Amqp implements ICredentialType { name: 'transportType', type: 'string', default: '', - description: 'Optional Transport Type to use', + description: 'Optional Transport Type to use. Either tcp or tls.', }, ]; } diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.ts index 38746077f27acb..5a7bb808b9f4a9 100644 --- a/packages/nodes-base/nodes/Amqp/Amqp.node.ts +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.ts @@ -1,4 +1,4 @@ -import type { ContainerOptions, Dictionary, EventContext } from 'rhea'; +import type { Connection, ContainerOptions, Dictionary, EventContext } from 'rhea'; import { create_container } from 'rhea'; import type { @@ -7,6 +7,10 @@ import type { INodeExecutionData, INodeType, INodeTypeDescription, + ICredentialTestFunctions, + INodeCredentialTestResult, + ICredentialsDecrypted, + ICredentialDataDecryptedObject, } from 'n8n-workflow'; import { NodeOperationError } from 'n8n-workflow'; @@ -28,6 +32,7 @@ export class Amqp implements INodeType { { name: 'amqp', required: true, + testedBy: 'amqpConnectionTest', }, ], properties: [ @@ -95,6 +100,52 @@ export class Amqp implements INodeType { ], }; + methods = { + credentialTest: { + async amqpConnectionTest( + this: ICredentialTestFunctions, + credential: ICredentialsDecrypted, + ): Promise { + const credentials = credential.data as ICredentialDataDecryptedObject; + const connectOptions: ContainerOptions = { + reconnect: false, + host: credentials.hostname as string, + hostname: credentials.hostname as string, + port: credentials.port as number, + username: credentials.username ? (credentials.username as string) : undefined, + password: credentials.password ? (credentials.password as string) : undefined, + transport: credentials.transportType ? (credentials.transportType as string) : undefined, + }; + + let conn: Connection | undefined = undefined; + try { + const container = create_container(); + await new Promise((resolve, reject) => { + container.on('connection_open', function (_contex: EventContext) { + resolve(); + }); + container.on('disconnected', function (context: EventContext) { + reject(context.error ?? new Error('unknown error')); + }); + conn = container.connect(connectOptions); + }); + } catch (error) { + return { + status: 'Error', + message: (error as Error).message, + }; + } finally { + if (conn) (conn as Connection).close(); + } + + return { + status: 'OK', + message: 'Connection successful!', + }; + }, + }, + }; + async execute(this: IExecuteFunctions): Promise { try { const credentials = await this.getCredentials('amqp'); diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index d69fa908b60644..6531918c26afc6 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -7,6 +7,8 @@ import type { INodeType, INodeTypeDescription, ITriggerResponse, + IDeferredPromise, + IRun, } from 'n8n-workflow'; import { deepCopy, jsonParse, NodeOperationError } from 'n8n-workflow'; @@ -100,6 +102,13 @@ export class AmqpTrigger implements INodeType { default: false, description: 'Whether to return only the body property', }, + { + displayName: 'Parallel Processing', + name: 'parallelProcessing', + type: 'boolean', + default: true, + description: 'Whether to process messages in parallel', + }, { displayName: 'Reconnect', name: 'reconnect', @@ -133,6 +142,7 @@ export class AmqpTrigger implements INodeType { const clientname = this.getNodeParameter('clientname', '') as string; const subscription = this.getNodeParameter('subscription', '') as string; const options = this.getNodeParameter('options', {}) as IDataObject; + const parallelProcessing = this.getNodeParameter('options.parallelProcessing', true) as boolean; const pullMessagesNumber = (options.pullMessagesNumber as number) || 100; const containerId = options.containerId as string; const containerReconnect = (options.reconnect as boolean) || true; @@ -156,7 +166,7 @@ export class AmqpTrigger implements INodeType { context.receiver?.add_credit(pullMessagesNumber); }); - container.on('message', (context: EventContext) => { + container.on('message', async (context: EventContext) => { // No message in the context if (!context.message) { return; @@ -195,7 +205,16 @@ export class AmqpTrigger implements INodeType { data = data.body; } - this.emit([this.helpers.returnJsonArray([data as any])]); + let responsePromise: IDeferredPromise | undefined = undefined; + if (!parallelProcessing) { + responsePromise = await this.helpers.createDeferredPromise(); + } + if (responsePromise) { + this.emit([this.helpers.returnJsonArray([data as any])], undefined, responsePromise); + await responsePromise.promise(); + } else { + this.emit([this.helpers.returnJsonArray([data as any])]); + } if (!context.receiver?.has_credit()) { setTimeout(() => { diff --git a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts index f6d4ed1e21275f..24aa3e2d82907c 100644 --- a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts +++ b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts @@ -4,6 +4,8 @@ import type { INodeType, INodeTypeDescription, ITriggerResponse, + IDeferredPromise, + IRun, } from 'n8n-workflow'; import { NodeOperationError } from 'n8n-workflow'; @@ -71,6 +73,14 @@ export class MqttTrigger implements INodeType { default: false, description: 'Whether to return only the message property', }, + { + displayName: 'Parallel Processing', + name: 'parallelProcessing', + type: 'boolean', + default: true, + description: + 'Whether to process messages in parallel or by keeping the message in order', + }, ], }, ], @@ -89,6 +99,7 @@ export class MqttTrigger implements INodeType { } const options = this.getNodeParameter('options') as IDataObject; + const parallelProcessing = this.getNodeParameter('options.parallelProcessing', true) as boolean; if (!topics) { throw new NodeOperationError(this.getNode(), 'Topics are mandatory!'); @@ -147,7 +158,7 @@ export class MqttTrigger implements INodeType { if (error) { reject(error); } - client.on('message', (topic: string, message: Buffer | string) => { + client.on('message', async (topic: string, message: Buffer | string) => { let result: IDataObject = {}; message = message.toString(); @@ -165,7 +176,15 @@ export class MqttTrigger implements INodeType { //@ts-ignore result = [message as string]; } - this.emit([this.helpers.returnJsonArray(result)]); + + let responsePromise: IDeferredPromise | undefined; + if (!parallelProcessing) { + responsePromise = await this.helpers.createDeferredPromise(); + } + this.emit([this.helpers.returnJsonArray([result])], undefined, responsePromise); + if (responsePromise) { + await responsePromise.promise(); + } resolve(true); }); });