From 02ef12e77bf702e721507211c01a57b62f3e436f Mon Sep 17 00:00:00 2001 From: Nicholas Griffin Date: Fri, 24 Nov 2023 21:18:03 +0000 Subject: [PATCH] feat: adding a flag to force consumer to always acknowledge --- README.md | 2 +- src/consumer.ts | 10 +++++-- src/types.ts | 8 ++++++ test/tests/consumer.test.ts | 56 +++++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 7f4b1e2..1e89e85 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ app.start(); - By default, messages that are sent to the `handleMessage` and `handleMessageBatch` functions will be considered as processed if they return without an error. - To acknowledge individual messages, please return the message that you want to acknowledge if you are using `handleMessage` or the messages for `handleMessageBatch`. - To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. - - By default, if an object or an array is not returned, all messages will be acknowledged. + - By default, if an object or an array is not returned, all messages will be acknowledged. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed below](#options). - Messages are deleted from the queue once the handler function has completed successfully (the above items should also be taken into account). ### Credentials diff --git a/src/consumer.ts b/src/consumer.ts index 5351088..dc2df77 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -47,6 +47,7 @@ export class Consumer extends TypedEventEmitter { private attributeNames: QueueAttributeName[]; private messageAttributeNames: string[]; private shouldDeleteMessages: boolean; + private alwaysAcknowledge: boolean; private batchSize: number; private visibilityTimeout: number; private terminateVisibilityTimeout: boolean; @@ -77,6 +78,7 @@ export class Consumer extends TypedEventEmitter { options.authenticationErrorTimeout ?? 10000; this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0; this.shouldDeleteMessages = options.shouldDeleteMessages ?? true; + this.alwaysAcknowledge = options.alwaysAcknowledge ?? false; this.sqs = options.sqs || new SQSClient({ @@ -453,7 +455,9 @@ export class Consumer extends TypedEventEmitter { result = await this.handleMessage(message); } - return result instanceof Object ? result : message; + return !this.alwaysAcknowledge && result instanceof Object + ? result + : message; } catch (err) { if (err instanceof TimeoutError) { throw toTimeoutError( @@ -482,7 +486,9 @@ export class Consumer extends TypedEventEmitter { try { const result = await this.handleMessageBatch(messages); - return result instanceof Object ? result : messages; + return !this.alwaysAcknowledge && result instanceof Object + ? result + : messages; } catch (err) { if (err instanceof Error) { throw toStandardError( diff --git a/src/types.ts b/src/types.ts index 3f7744f..b04e83d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -84,6 +84,14 @@ export interface ConsumerOptions { * @defaultvalue `true` */ shouldDeleteMessages?: boolean; + /** + * By default, the consumer will treat an empty object or array from either of the + * handlers as a acknowledgement of no messages and will not delete those messages as + * a result. Set this to `true` to always acknowledge all messages no matter the returned + * value. + * @defaultvalue `false` + */ + alwaysAcknowledge?: boolean; /** * An `async` function (or function that returns a `Promise`) to be called whenever * a message is received. diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index 9577d10..e69bb67 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -1047,6 +1047,33 @@ describe('Consumer', () => { sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessage); }); + it('deletes the message if alwaysAcknowledge is `true` and handleMessage returns an empty object', async () => { + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessage: async () => { + return {}; + }, + sqs, + alwaysAcknowledge: true + }); + + consumer.start(); + await pEvent(consumer, 'response_processed'); + consumer.stop(); + + sandbox.assert.callCount(sqs.send, 2); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch(sqs.send.secondCall, mockDeleteMessage); + sandbox.assert.match( + sqs.send.secondCall.args[0].input, + sinon.match({ + QueueUrl: QUEUE_URL, + ReceiptHandle: 'receipt-handle' + }) + ); + }); + it('does not call deleteMessageBatch if handleMessagesBatch returns an empty array', async () => { consumer = new Consumer({ queueUrl: QUEUE_URL, @@ -1064,6 +1091,35 @@ describe('Consumer', () => { sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessageBatch); }); + it('calls deleteMessageBatch if alwaysAcknowledge is `true` and handleMessagesBatch returns an empty array', async () => { + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessageBatch: async () => [], + batchSize: 2, + sqs, + alwaysAcknowledge: true + }); + + consumer.start(); + await pEvent(consumer, 'response_processed'); + consumer.stop(); + + sandbox.assert.callCount(sqs.send, 2); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch( + sqs.send.secondCall, + mockDeleteMessageBatch + ); + sandbox.assert.match( + sqs.send.secondCall.args[0].input, + sinon.match({ + QueueUrl: QUEUE_URL, + Entries: [{ Id: '123', ReceiptHandle: 'receipt-handle' }] + }) + ); + }); + it('ack all messages if handleMessageBatch returns void', async () => { consumer = new Consumer({ queueUrl: QUEUE_URL,