Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(RabbitMQ Node): Add mode for acknowledging and deleting from queue later in workflow #6225

Merged
merged 14 commits into from
May 22, 2023
Merged
72 changes: 60 additions & 12 deletions packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class RabbitMQ implements INodeType {
// eslint-disable-next-line n8n-nodes-base/node-class-description-icon-not-svg
icon: 'file:rabbitmq.png',
group: ['transform'],
version: 1,
version: 1.1,
agobrech marked this conversation as resolved.
Show resolved Hide resolved
description: 'Sends messages to a RabbitMQ topic',
defaults: {
name: 'RabbitMQ',
Expand All @@ -43,18 +43,52 @@ export class RabbitMQ implements INodeType {
name: 'operation',
type: 'hidden',
noDataExpression: true,
default: 'send_message',
default: 'sendMessage',
displayOptions: {
show: {
'@version': [1],
},
},
options: [
{
name: 'Send a Message to RabbitMQ',
value: 'send_message',
value: 'sendMessage',
},
],
},
{
displayName: 'Operation',
name: 'operation',
type: 'options',
noDataExpression: true,
default: 'sendMessage',
displayOptions: {
show: {
'@version': [1.1],
},
},
options: [
{
name: 'Send a Message to RabbitMQ',
value: 'sendMessage',
action: 'Send a Message to RabbitMQ',
},
{
name: 'Delete From Queue',
value: 'deleteMessage',
action: 'Delete From Queue',
},
],
},
{
displayName: 'Mode',
name: 'mode',
type: 'options',
displayOptions: {
hide: {
operation: ['deleteMessage'],
},
},
options: [
{
name: 'Queue',
Expand Down Expand Up @@ -82,6 +116,9 @@ export class RabbitMQ implements INodeType {
show: {
mode: ['queue'],
},
hide: {
operation: ['deleteMessage'],
},
},
default: '',
placeholder: 'queue-name',
Expand Down Expand Up @@ -161,6 +198,11 @@ export class RabbitMQ implements INodeType {
displayName: 'Send Input Data',
name: 'sendInputData',
type: 'boolean',
displayOptions: {
show: {
operation: ['sendMessage'],
},
},
default: true,
description: 'Whether to send the the data the node receives as JSON',
},
Expand All @@ -181,6 +223,11 @@ export class RabbitMQ implements INodeType {
name: 'options',
type: 'collection',
default: {},
displayOptions: {
show: {
operation: ['sendMessage'],
},
},
placeholder: 'Add Option',
options: [
{
Expand Down Expand Up @@ -339,25 +386,27 @@ export class RabbitMQ implements INodeType {

async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let channel, options: IDataObject;
let mode = 'queue';
try {
mode = (this.getNodeParameter('mode', 0) as string) || 'queue';
} catch (error) {}
try {
const items = this.getInputData();
const mode = this.getNodeParameter('mode', 0) as string;

const operation = this.getNodeParameter('operation', 0);
const returnItems: INodeExecutionData[] = [];

if (mode === 'queue') {
const queue = this.getNodeParameter('queue', 0) as string;

const queue = this.getNodeParameter('queue', 0, '') as string;
const sendInputData = this.getNodeParameter('sendInputData', 0, {}) as boolean;
options = this.getNodeParameter('options', 0, {});

channel = await rabbitmqConnectQueue.call(this, queue, options);

const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;

let message: string;

const queuePromises = [];
for (let i = 0; i < items.length; i++) {
if (operation === 'deleteMessage') {
this.sendResponse(items[0].json);
}
if (sendInputData) {
message = JSON.stringify(items[i].json);
} else {
Expand All @@ -378,7 +427,6 @@ export class RabbitMQ implements INodeType {
);
headers = additionalHeaders;
}

queuePromises.push(channel.sendToQueue(queue, Buffer.from(message), { headers }));
}

Expand Down
44 changes: 35 additions & 9 deletions packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import type {
IDataObject,
IDeferredPromise,
IExecuteResponsePromiseData,
INodeExecutionData,
INodeProperties,
INodeType,
Expand Down Expand Up @@ -45,7 +46,6 @@ export class RabbitMQTrigger implements INodeType {
placeholder: 'queue-name',
description: 'The name of the queue to read from',
},

{
displayName: 'Options',
name: 'options',
Expand Down Expand Up @@ -81,6 +81,11 @@ export class RabbitMQTrigger implements INodeType {
value: 'immediately',
description: 'As soon as the message got received',
},
{
name: 'Specified Later in Workflow',
value: 'laterMessageNode',
description: 'Using a RabbitMQ node to remove the item from the queue',
},
],
default: 'immediately',
description: 'When to acknowledge the message',
Expand Down Expand Up @@ -139,6 +144,18 @@ export class RabbitMQTrigger implements INodeType {
return 0;
}) as INodeProperties[],
},
{
displayName:
"To delete an item from the queue, insert a RabbitMQ node later in the workflow and use the 'Delete from queue' operation",
name: 'laterMessageNode',
type: 'notice',
displayOptions: {
show: {
'/options.acknowledge': ['laterMessageNode'],
},
},
default: '',
},
],
};

Expand Down Expand Up @@ -201,7 +218,6 @@ export class RabbitMQTrigger implements INodeType {
const item: INodeExecutionData = {
json: {},
};

if (options.contentIsBinary === true) {
item.binary = {
data: await this.helpers.prepareBinaryData(message.content),
Expand All @@ -222,13 +238,20 @@ export class RabbitMQTrigger implements INodeType {
}

let responsePromise: IDeferredPromise<IRun> | undefined = undefined;
if (acknowledgeMode !== 'immediately') {
let responsePromiseHook: IDeferredPromise<IExecuteResponsePromiseData> | undefined =
undefined;
if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') {
responsePromise = await this.helpers.createDeferredPromise();
} else if (acknowledgeMode === 'laterMessageNode') {
responsePromiseHook =
await this.helpers.createDeferredPromise<IExecuteResponsePromiseData>();
}

this.emit([[item]], undefined, responsePromise);

if (responsePromise) {
if (responsePromiseHook) {
this.emit([[item]], responsePromiseHook, undefined);
} else {
this.emit([[item]], undefined, responsePromise);
}
if (responsePromise && acknowledgeMode !== 'laterMessageNode') {
// Acknowledge message after the execution finished
await responsePromise.promise().then(async (data: IRun) => {
if (data.data.resultData.error) {
Expand All @@ -239,7 +262,11 @@ export class RabbitMQTrigger implements INodeType {
return;
}
}

channel.ack(message);
messageTracker.answered(message);
});
} else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') {
await responsePromiseHook.promise().then(() => {
channel.ack(message);
messageTracker.answered(message);
});
Expand All @@ -266,7 +293,6 @@ export class RabbitMQTrigger implements INodeType {
});
consumerTag = consumerInfo.consumerTag;
};

await startConsumer();

// The "closeFunction" function gets called by n8n whenever
Expand Down