Skip to content

Commit

Permalink
feat: adding a flag to force consumer to always acknowledge (#446)
Browse files Browse the repository at this point in the history
* feat: adding a flag to force consumer to always acknowledge

* fix: link

* fix: typo

* chore: moving note
  • Loading branch information
nicholasgriffintn authored Nov 28, 2023
1 parent ccaf358 commit 53c8599
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ app.start();
- It's also important to await any processing that you are doing to ensure that messages are processed one at a time.
- By default, messages that are sent to the `handleMessage` and `handleMessageBatch` functions will be considered as processed if they return without an error.
- To acknowledge individual messages, please return the message that you want to acknowledge if you are using `handleMessage` or the messages for `handleMessageBatch`.
- To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted.
- To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed here](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html).
- By default, if an object or an array is not returned, all messages will be acknowledged.
- Messages are deleted from the queue once the handler function has completed successfully (the above items should also be taken into account).
Expand Down
10 changes: 8 additions & 2 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export class Consumer extends TypedEventEmitter {
private attributeNames: QueueAttributeName[];
private messageAttributeNames: string[];
private shouldDeleteMessages: boolean;
private alwaysAcknowledge: boolean;
private batchSize: number;
private visibilityTimeout: number;
private terminateVisibilityTimeout: boolean;
Expand Down Expand Up @@ -77,6 +78,7 @@ export class Consumer extends TypedEventEmitter {
options.authenticationErrorTimeout ?? 10000;
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0;
this.shouldDeleteMessages = options.shouldDeleteMessages ?? true;
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false;
this.sqs =
options.sqs ||
new SQSClient({
Expand Down Expand Up @@ -453,7 +455,9 @@ export class Consumer extends TypedEventEmitter {
result = await this.handleMessage(message);
}

return result instanceof Object ? result : message;
return !this.alwaysAcknowledge && result instanceof Object
? result
: message;
} catch (err) {
if (err instanceof TimeoutError) {
throw toTimeoutError(
Expand Down Expand Up @@ -482,7 +486,9 @@ export class Consumer extends TypedEventEmitter {
try {
const result = await this.handleMessageBatch(messages);

return result instanceof Object ? result : messages;
return !this.alwaysAcknowledge && result instanceof Object
? result
: messages;
} catch (err) {
if (err instanceof Error) {
throw toStandardError(
Expand Down
8 changes: 8 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ export interface ConsumerOptions {
* @defaultvalue `true`
*/
shouldDeleteMessages?: boolean;
/**
* By default, the consumer will treat an empty object or array from either of the
* handlers as a acknowledgement of no messages and will not delete those messages as
* a result. Set this to `true` to always acknowledge all messages no matter the returned
* value.
* @defaultvalue `false`
*/
alwaysAcknowledge?: boolean;
/**
* An `async` function (or function that returns a `Promise`) to be called whenever
* a message is received.
Expand Down
56 changes: 56 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,33 @@ describe('Consumer', () => {
sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessage);
});

it('deletes the message if alwaysAcknowledge is `true` and handleMessage returns an empty object', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: async () => {
return {};
},
sqs,
alwaysAcknowledge: true
});

consumer.start();
await pEvent(consumer, 'response_processed');
consumer.stop();

sandbox.assert.callCount(sqs.send, 2);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockDeleteMessage);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
ReceiptHandle: 'receipt-handle'
})
);
});

it('does not call deleteMessageBatch if handleMessagesBatch returns an empty array', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
Expand All @@ -1064,6 +1091,35 @@ describe('Consumer', () => {
sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessageBatch);
});

it('calls deleteMessageBatch if alwaysAcknowledge is `true` and handleMessagesBatch returns an empty array', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessageBatch: async () => [],
batchSize: 2,
sqs,
alwaysAcknowledge: true
});

consumer.start();
await pEvent(consumer, 'response_processed');
consumer.stop();

sandbox.assert.callCount(sqs.send, 2);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(
sqs.send.secondCall,
mockDeleteMessageBatch
);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
Entries: [{ Id: '123', ReceiptHandle: 'receipt-handle' }]
})
);
});

it('ack all messages if handleMessageBatch returns void', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
Expand Down

0 comments on commit 53c8599

Please sign in to comment.