From 02ef12e77bf702e721507211c01a57b62f3e436f Mon Sep 17 00:00:00 2001 From: Nicholas Griffin Date: Fri, 24 Nov 2023 21:18:03 +0000 Subject: [PATCH 1/4] feat: adding a flag to force consumer to always acknowledge --- README.md | 2 +- src/consumer.ts | 10 +++++-- src/types.ts | 8 ++++++ test/tests/consumer.test.ts | 56 +++++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 7f4b1e2..1e89e85 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ app.start(); - By default, messages that are sent to the `handleMessage` and `handleMessageBatch` functions will be considered as processed if they return without an error. - To acknowledge individual messages, please return the message that you want to acknowledge if you are using `handleMessage` or the messages for `handleMessageBatch`. - To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. - - By default, if an object or an array is not returned, all messages will be acknowledged. + - By default, if an object or an array is not returned, all messages will be acknowledged. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed below](#options). - Messages are deleted from the queue once the handler function has completed successfully (the above items should also be taken into account). ### Credentials diff --git a/src/consumer.ts b/src/consumer.ts index 5351088..dc2df77 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -47,6 +47,7 @@ export class Consumer extends TypedEventEmitter { private attributeNames: QueueAttributeName[]; private messageAttributeNames: string[]; private shouldDeleteMessages: boolean; + private alwaysAcknowledge: boolean; private batchSize: number; private visibilityTimeout: number; private terminateVisibilityTimeout: boolean; @@ -77,6 +78,7 @@ export class Consumer extends TypedEventEmitter { options.authenticationErrorTimeout ?? 10000; this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0; this.shouldDeleteMessages = options.shouldDeleteMessages ?? true; + this.alwaysAcknowledge = options.alwaysAcknowledge ?? false; this.sqs = options.sqs || new SQSClient({ @@ -453,7 +455,9 @@ export class Consumer extends TypedEventEmitter { result = await this.handleMessage(message); } - return result instanceof Object ? result : message; + return !this.alwaysAcknowledge && result instanceof Object + ? result + : message; } catch (err) { if (err instanceof TimeoutError) { throw toTimeoutError( @@ -482,7 +486,9 @@ export class Consumer extends TypedEventEmitter { try { const result = await this.handleMessageBatch(messages); - return result instanceof Object ? result : messages; + return !this.alwaysAcknowledge && result instanceof Object + ? result + : messages; } catch (err) { if (err instanceof Error) { throw toStandardError( diff --git a/src/types.ts b/src/types.ts index 3f7744f..b04e83d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -84,6 +84,14 @@ export interface ConsumerOptions { * @defaultvalue `true` */ shouldDeleteMessages?: boolean; + /** + * By default, the consumer will treat an empty object or array from either of the + * handlers as a acknowledgement of no messages and will not delete those messages as + * a result. Set this to `true` to always acknowledge all messages no matter the returned + * value. + * @defaultvalue `false` + */ + alwaysAcknowledge?: boolean; /** * An `async` function (or function that returns a `Promise`) to be called whenever * a message is received. diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index 9577d10..e69bb67 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -1047,6 +1047,33 @@ describe('Consumer', () => { sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessage); }); + it('deletes the message if alwaysAcknowledge is `true` and handleMessage returns an empty object', async () => { + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessage: async () => { + return {}; + }, + sqs, + alwaysAcknowledge: true + }); + + consumer.start(); + await pEvent(consumer, 'response_processed'); + consumer.stop(); + + sandbox.assert.callCount(sqs.send, 2); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch(sqs.send.secondCall, mockDeleteMessage); + sandbox.assert.match( + sqs.send.secondCall.args[0].input, + sinon.match({ + QueueUrl: QUEUE_URL, + ReceiptHandle: 'receipt-handle' + }) + ); + }); + it('does not call deleteMessageBatch if handleMessagesBatch returns an empty array', async () => { consumer = new Consumer({ queueUrl: QUEUE_URL, @@ -1064,6 +1091,35 @@ describe('Consumer', () => { sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessageBatch); }); + it('calls deleteMessageBatch if alwaysAcknowledge is `true` and handleMessagesBatch returns an empty array', async () => { + consumer = new Consumer({ + queueUrl: QUEUE_URL, + region: REGION, + handleMessageBatch: async () => [], + batchSize: 2, + sqs, + alwaysAcknowledge: true + }); + + consumer.start(); + await pEvent(consumer, 'response_processed'); + consumer.stop(); + + sandbox.assert.callCount(sqs.send, 2); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch( + sqs.send.secondCall, + mockDeleteMessageBatch + ); + sandbox.assert.match( + sqs.send.secondCall.args[0].input, + sinon.match({ + QueueUrl: QUEUE_URL, + Entries: [{ Id: '123', ReceiptHandle: 'receipt-handle' }] + }) + ); + }); + it('ack all messages if handleMessageBatch returns void', async () => { consumer = new Consumer({ queueUrl: QUEUE_URL, From 8ce64db0d2800853ca2a72cdc8a72aaad966c352 Mon Sep 17 00:00:00 2001 From: Nicholas Griffin Date: Fri, 24 Nov 2023 21:24:48 +0000 Subject: [PATCH 2/4] fix: link --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1e89e85..4b6b8dc 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ app.start(); - By default, messages that are sent to the `handleMessage` and `handleMessageBatch` functions will be considered as processed if they return without an error. - To acknowledge individual messages, please return the message that you want to acknowledge if you are using `handleMessage` or the messages for `handleMessageBatch`. - To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. - - By default, if an object or an array is not returned, all messages will be acknowledged. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed below](#options). + - By default, if an object or an array is not returned, all messages will be acknowledged. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed below](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html). - Messages are deleted from the queue once the handler function has completed successfully (the above items should also be taken into account). ### Credentials From ef9d4aa370a221ba9412a3b49a504141bffea94e Mon Sep 17 00:00:00 2001 From: Nicholas Griffin Date: Fri, 24 Nov 2023 21:33:27 +0000 Subject: [PATCH 3/4] fix: typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4b6b8dc..f340819 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ app.start(); - By default, messages that are sent to the `handleMessage` and `handleMessageBatch` functions will be considered as processed if they return without an error. - To acknowledge individual messages, please return the message that you want to acknowledge if you are using `handleMessage` or the messages for `handleMessageBatch`. - To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. - - By default, if an object or an array is not returned, all messages will be acknowledged. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed below](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html). + - By default, if an object or an array is not returned, all messages will be acknowledged. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed here](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html). - Messages are deleted from the queue once the handler function has completed successfully (the above items should also be taken into account). ### Credentials From eac6af24c1074218fe65c48d04c9a248c6171ca1 Mon Sep 17 00:00:00 2001 From: Nicholas Griffin Date: Fri, 24 Nov 2023 21:34:36 +0000 Subject: [PATCH 4/4] chore: moving note --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d261cb9..92f7783 100644 --- a/README.md +++ b/README.md @@ -55,8 +55,8 @@ app.start(); - It's also important to await any processing that you are doing to ensure that messages are processed one at a time. - By default, messages that are sent to the `handleMessage` and `handleMessageBatch` functions will be considered as processed if they return without an error. - To acknowledge individual messages, please return the message that you want to acknowledge if you are using `handleMessage` or the messages for `handleMessageBatch`. - - To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. - - By default, if an object or an array is not returned, all messages will be acknowledged. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed here](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html). + - To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed here](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html). + - By default, if an object or an array is not returned, all messages will be acknowledged. - Messages are deleted from the queue once the handler function has completed successfully (the above items should also be taken into account). ### Credentials