Skip to content

Commit

Permalink
feat(RabbitMQ Node): Add mode for acknowledging and deleting from que…
Browse files Browse the repository at this point in the history
…ue later in workflow (#6225)

* Add later in workflow mode

* Add new operation

* Acknowledge message in next node

* Add response and emit for responsePromiseHook

* Remove double success message, close channel correctly

* Answser messages correctly

* Remove option from delete operation

* move operation name to camelCase

* Fix versioning

* To remove: add action item in v1

* Add notice for delete from queue

* Correctly only execute only the delete operation

* Refactor delete from queue operator and add return last items

---------

Co-authored-by: Marcus <[email protected]>
  • Loading branch information
agobrech and maspio authored May 22, 2023
1 parent 2d13b3f commit f5950b2
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 17 deletions.
83 changes: 75 additions & 8 deletions packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class RabbitMQ implements INodeType {
// eslint-disable-next-line n8n-nodes-base/node-class-description-icon-not-svg
icon: 'file:rabbitmq.png',
group: ['transform'],
version: 1,
version: [1, 1.1],
description: 'Sends messages to a RabbitMQ topic',
defaults: {
name: 'RabbitMQ',
Expand All @@ -43,18 +43,71 @@ export class RabbitMQ implements INodeType {
name: 'operation',
type: 'hidden',
noDataExpression: true,
default: 'send_message',
default: 'sendMessage',
displayOptions: {
show: {
'@version': [1],
},
},
// To remove when action view is fixed
options: [
{
name: 'Send a Message to RabbitMQ',
value: 'sendMessage',
action: 'Send a Message to RabbitMQ',
},
{
name: 'Delete From Queue',
value: 'deleteMessage',
action: 'Delete From Queue',
},
],
},
{
displayName: 'Operation',
name: 'operation',
type: 'options',
noDataExpression: true,
default: 'sendMessage',
displayOptions: {
show: {
'@version': [1.1],
},
},
options: [
{
name: 'Send a Message to RabbitMQ',
value: 'send_message',
value: 'sendMessage',
action: 'Send a Message to RabbitMQ',
},
{
name: 'Delete From Queue',
value: 'deleteMessage',
action: 'Delete From Queue',
},
],
},
{
displayName:
'Will delete an item from the queue triggered earlier in the workflow by a RabbitMQ Trigger node',
name: 'deleteMessage',
type: 'notice',
default: '',
displayOptions: {
show: {
operation: ['deleteMessage'],
},
},
},
{
displayName: 'Mode',
name: 'mode',
type: 'options',
displayOptions: {
hide: {
operation: ['deleteMessage'],
},
},
options: [
{
name: 'Queue',
Expand Down Expand Up @@ -82,6 +135,9 @@ export class RabbitMQ implements INodeType {
show: {
mode: ['queue'],
},
hide: {
operation: ['deleteMessage'],
},
},
default: '',
placeholder: 'queue-name',
Expand Down Expand Up @@ -161,6 +217,11 @@ export class RabbitMQ implements INodeType {
displayName: 'Send Input Data',
name: 'sendInputData',
type: 'boolean',
displayOptions: {
show: {
operation: ['sendMessage'],
},
},
default: true,
description: 'Whether to send the the data the node receives as JSON',
},
Expand All @@ -181,6 +242,11 @@ export class RabbitMQ implements INodeType {
name: 'options',
type: 'collection',
default: {},
displayOptions: {
show: {
operation: ['sendMessage'],
},
},
placeholder: 'Add Option',
options: [
{
Expand Down Expand Up @@ -341,10 +407,13 @@ export class RabbitMQ implements INodeType {
let channel, options: IDataObject;
try {
const items = this.getInputData();
const mode = this.getNodeParameter('mode', 0) as string;

const operation = this.getNodeParameter('operation', 0);
if (operation === 'deleteMessage') {
this.sendResponse(items[0].json);
return await this.prepareOutputData(items);
}
const mode = (this.getNodeParameter('mode', 0) as string) || 'queue';
const returnItems: INodeExecutionData[] = [];

if (mode === 'queue') {
const queue = this.getNodeParameter('queue', 0) as string;

Expand All @@ -355,7 +424,6 @@ export class RabbitMQ implements INodeType {
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;

let message: string;

const queuePromises = [];
for (let i = 0; i < items.length; i++) {
if (sendInputData) {
Expand All @@ -378,7 +446,6 @@ export class RabbitMQ implements INodeType {
);
headers = additionalHeaders;
}

queuePromises.push(channel.sendToQueue(queue, Buffer.from(message), { headers }));
}

Expand Down
44 changes: 35 additions & 9 deletions packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import type {
IDataObject,
IDeferredPromise,
IExecuteResponsePromiseData,
INodeExecutionData,
INodeProperties,
INodeType,
Expand Down Expand Up @@ -45,7 +46,6 @@ export class RabbitMQTrigger implements INodeType {
placeholder: 'queue-name',
description: 'The name of the queue to read from',
},

{
displayName: 'Options',
name: 'options',
Expand Down Expand Up @@ -81,6 +81,11 @@ export class RabbitMQTrigger implements INodeType {
value: 'immediately',
description: 'As soon as the message got received',
},
{
name: 'Specified Later in Workflow',
value: 'laterMessageNode',
description: 'Using a RabbitMQ node to remove the item from the queue',
},
],
default: 'immediately',
description: 'When to acknowledge the message',
Expand Down Expand Up @@ -139,6 +144,18 @@ export class RabbitMQTrigger implements INodeType {
return 0;
}) as INodeProperties[],
},
{
displayName:
"To delete an item from the queue, insert a RabbitMQ node later in the workflow and use the 'Delete from queue' operation",
name: 'laterMessageNode',
type: 'notice',
displayOptions: {
show: {
'/options.acknowledge': ['laterMessageNode'],
},
},
default: '',
},
],
};

Expand Down Expand Up @@ -201,7 +218,6 @@ export class RabbitMQTrigger implements INodeType {
const item: INodeExecutionData = {
json: {},
};

if (options.contentIsBinary === true) {
item.binary = {
data: await this.helpers.prepareBinaryData(message.content),
Expand All @@ -222,13 +238,20 @@ export class RabbitMQTrigger implements INodeType {
}

let responsePromise: IDeferredPromise<IRun> | undefined = undefined;
if (acknowledgeMode !== 'immediately') {
let responsePromiseHook: IDeferredPromise<IExecuteResponsePromiseData> | undefined =
undefined;
if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') {
responsePromise = await this.helpers.createDeferredPromise();
} else if (acknowledgeMode === 'laterMessageNode') {
responsePromiseHook =
await this.helpers.createDeferredPromise<IExecuteResponsePromiseData>();
}

this.emit([[item]], undefined, responsePromise);

if (responsePromise) {
if (responsePromiseHook) {
this.emit([[item]], responsePromiseHook, undefined);
} else {
this.emit([[item]], undefined, responsePromise);
}
if (responsePromise && acknowledgeMode !== 'laterMessageNode') {
// Acknowledge message after the execution finished
await responsePromise.promise().then(async (data: IRun) => {
if (data.data.resultData.error) {
Expand All @@ -239,7 +262,11 @@ export class RabbitMQTrigger implements INodeType {
return;
}
}

channel.ack(message);
messageTracker.answered(message);
});
} else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') {
await responsePromiseHook.promise().then(() => {
channel.ack(message);
messageTracker.answered(message);
});
Expand All @@ -266,7 +293,6 @@ export class RabbitMQTrigger implements INodeType {
});
consumerTag = consumerInfo.consumerTag;
};

await startConsumer();

// The "closeFunction" function gets called by n8n whenever
Expand Down

0 comments on commit f5950b2

Please sign in to comment.