Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding a flag to force consumer to always acknowledge #446

Merged
merged 5 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ app.start();
- It's also important to await any processing that you are doing to ensure that messages are processed one at a time.
- By default, messages that are sent to the `handleMessage` and `handleMessageBatch` functions will be considered as processed if they return without an error.
- To acknowledge individual messages, please return the message that you want to acknowledge if you are using `handleMessage` or the messages for `handleMessageBatch`.
- To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted.
- To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed here](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html).
- By default, if an object or an array is not returned, all messages will be acknowledged.
- Messages are deleted from the queue once the handler function has completed successfully (the above items should also be taken into account).

Expand Down
10 changes: 8 additions & 2 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export class Consumer extends TypedEventEmitter {
private attributeNames: QueueAttributeName[];
private messageAttributeNames: string[];
private shouldDeleteMessages: boolean;
private alwaysAcknowledge: boolean;
private batchSize: number;
private visibilityTimeout: number;
private terminateVisibilityTimeout: boolean;
Expand Down Expand Up @@ -77,6 +78,7 @@ export class Consumer extends TypedEventEmitter {
options.authenticationErrorTimeout ?? 10000;
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0;
this.shouldDeleteMessages = options.shouldDeleteMessages ?? true;
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false;
this.sqs =
options.sqs ||
new SQSClient({
Expand Down Expand Up @@ -453,7 +455,9 @@ export class Consumer extends TypedEventEmitter {
result = await this.handleMessage(message);
}

return result instanceof Object ? result : message;
return !this.alwaysAcknowledge && result instanceof Object
? result
: message;
} catch (err) {
if (err instanceof TimeoutError) {
throw toTimeoutError(
Expand Down Expand Up @@ -482,7 +486,9 @@ export class Consumer extends TypedEventEmitter {
try {
const result = await this.handleMessageBatch(messages);

return result instanceof Object ? result : messages;
return !this.alwaysAcknowledge && result instanceof Object
? result
: messages;
} catch (err) {
if (err instanceof Error) {
throw toStandardError(
Expand Down
8 changes: 8 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ export interface ConsumerOptions {
* @defaultvalue `true`
*/
shouldDeleteMessages?: boolean;
/**
* By default, the consumer will treat an empty object or array from either of the
* handlers as a acknowledgement of no messages and will not delete those messages as
* a result. Set this to `true` to always acknowledge all messages no matter the returned
* value.
* @defaultvalue `false`
*/
alwaysAcknowledge?: boolean;
/**
* An `async` function (or function that returns a `Promise`) to be called whenever
* a message is received.
Expand Down
56 changes: 56 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,33 @@ describe('Consumer', () => {
sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessage);
});

it('deletes the message if alwaysAcknowledge is `true` and handleMessage returns an empty object', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: async () => {
return {};
},
sqs,
alwaysAcknowledge: true
});

consumer.start();
await pEvent(consumer, 'response_processed');
consumer.stop();

sandbox.assert.callCount(sqs.send, 2);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockDeleteMessage);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
ReceiptHandle: 'receipt-handle'
})
);
});

it('does not call deleteMessageBatch if handleMessagesBatch returns an empty array', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
Expand All @@ -1064,6 +1091,35 @@ describe('Consumer', () => {
sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessageBatch);
});

it('calls deleteMessageBatch if alwaysAcknowledge is `true` and handleMessagesBatch returns an empty array', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessageBatch: async () => [],
batchSize: 2,
sqs,
alwaysAcknowledge: true
});

consumer.start();
await pEvent(consumer, 'response_processed');
consumer.stop();

sandbox.assert.callCount(sqs.send, 2);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(
sqs.send.secondCall,
mockDeleteMessageBatch
);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
Entries: [{ Id: '123', ReceiptHandle: 'receipt-handle' }]
})
);
});

it('ack all messages if handleMessageBatch returns void', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
Expand Down