From 53c8599b4788a2780d2fcd87a04fcbbd07a73cd2 Mon Sep 17 00:00:00 2001 From: Nicholas Griffin Date: Tue, 28 Nov 2023 21:21:44 +0000 Subject: [PATCH] feat: adding a flag to force consumer to always acknowledge (#446) * feat: adding a flag to force consumer to always acknowledge * fix: link * fix: typo * chore: moving note --- README.md | 2 +- src/consumer.ts | 10 +++++-- src/types.ts | 8 ++++++ test/tests/consumer.test.ts | 56 +++++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 7064e1e..92f7783 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ app.start(); - It's also important to await any processing that you are doing to ensure that messages are processed one at a time. - By default, messages that are sent to the `handleMessage` and `handleMessageBatch` functions will be considered as processed if they return without an error. - To acknowledge individual messages, please return the message that you want to acknowledge if you are using `handleMessage` or the messages for `handleMessageBatch`. - - To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. + - To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed here](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html). - By default, if an object or an array is not returned, all messages will be acknowledged. - Messages are deleted from the queue once the handler function has completed successfully (the above items should also be taken into account). diff --git a/src/consumer.ts b/src/consumer.ts index 5351088..dc2df77 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -47,6 +47,7 @@ export class Consumer extends TypedEventEmitter { private attributeNames: QueueAttributeName[]; private messageAttributeNames: string[]; private shouldDeleteMessages: boolean; + private alwaysAcknowledge: boolean; private batchSize: number; private visibilityTimeout: number; private terminateVisibilityTimeout: boolean; @@ -77,6 +78,7 @@ export class Consumer extends TypedEventEmitter { options.authenticationErrorTimeout ?? 10000; this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0; this.shouldDeleteMessages = options.shouldDeleteMessages ?? true; + this.alwaysAcknowledge = options.alwaysAcknowledge ?? false; this.sqs = options.sqs || new SQSClient({ @@ -453,7 +455,9 @@ export class Consumer extends TypedEventEmitter { result = await this.handleMessage(message); } - return result instanceof Object ? result : message; + return !this.alwaysAcknowledge && result instanceof Object + ? result + : message; } catch (err) { if (err instanceof TimeoutError) { throw toTimeoutError( @@ -482,7 +486,9 @@ export class Consumer extends TypedEventEmitter { try { const result = await this.handleMessageBatch(messages); - return result instanceof Object ? result : messages; + return !this.alwaysAcknowledge && result instanceof Object + ? result + : messages; } catch (err) { if (err instanceof Error) { throw toStandardError( diff --git a/src/types.ts b/src/types.ts index 3f7744f..b04e83d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -84,6 +84,14 @@ export interface ConsumerOptions { * @defaultvalue `true` */ shouldDeleteMessages?: boolean; + /** + * By default, the consumer will treat an empty object or array from either of the + * handlers as a acknowledgement of no messages and will not delete those messages as + * a result. Set this to `true` to always acknowledge all messages no matter the returned + * value. + * @defaultvalue `false` + */ + alwaysAcknowledge?: boolean; /** * An `async` function (or function that returns a `Promise`) to be called whenever * a message is received. diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index 9577d10..e69bb67 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -1047,6 +1047,33 @@ describe('Consumer', () => { sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessage); }); + it('deletes the message if alwaysAcknowledge is `true` and handleMessage returns an empty object', async () => { + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessage: async () => { + return {}; + }, + sqs, + alwaysAcknowledge: true + }); + + consumer.start(); + await pEvent(consumer, 'response_processed'); + consumer.stop(); + + sandbox.assert.callCount(sqs.send, 2); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch(sqs.send.secondCall, mockDeleteMessage); + sandbox.assert.match( + sqs.send.secondCall.args[0].input, + sinon.match({ + QueueUrl: QUEUE_URL, + ReceiptHandle: 'receipt-handle' + }) + ); + }); + it('does not call deleteMessageBatch if handleMessagesBatch returns an empty array', async () => { consumer = new Consumer({ queueUrl: QUEUE_URL, @@ -1064,6 +1091,35 @@ describe('Consumer', () => { sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessageBatch); }); + it('calls deleteMessageBatch if alwaysAcknowledge is `true` and handleMessagesBatch returns an empty array', async () => { + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessageBatch: async () => [], + batchSize: 2, + sqs, + alwaysAcknowledge: true + }); + + consumer.start(); + await pEvent(consumer, 'response_processed'); + consumer.stop(); + + sandbox.assert.callCount(sqs.send, 2); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch( + sqs.send.secondCall, + mockDeleteMessageBatch + ); + sandbox.assert.match( + sqs.send.secondCall.args[0].input, + sinon.match({ + QueueUrl: QUEUE_URL, + Entries: [{ Id: '123', ReceiptHandle: 'receipt-handle' }] + }) + ); + }); + it('ack all messages if handleMessageBatch returns void', async () => { consumer = new Consumer({ queueUrl: QUEUE_URL,