Skip to content

Commit

Permalink
feat: adding a flag to force consumer to always acknowledge
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasgriffintn committed Nov 24, 2023
1 parent 83313e0 commit 02ef12e
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ app.start();
- By default, messages that are sent to the `handleMessage` and `handleMessageBatch` functions will be considered as processed if they return without an error.
- To acknowledge individual messages, please return the message that you want to acknowledge if you are using `handleMessage` or the messages for `handleMessageBatch`.
- To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted.
- By default, if an object or an array is not returned, all messages will be acknowledged.
- By default, if an object or an array is not returned, all messages will be acknowledged. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed below](#options).
- Messages are deleted from the queue once the handler function has completed successfully (the above items should also be taken into account).
### Credentials
Expand Down
10 changes: 8 additions & 2 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export class Consumer extends TypedEventEmitter {
private attributeNames: QueueAttributeName[];
private messageAttributeNames: string[];
private shouldDeleteMessages: boolean;
private alwaysAcknowledge: boolean;
private batchSize: number;
private visibilityTimeout: number;
private terminateVisibilityTimeout: boolean;
Expand Down Expand Up @@ -77,6 +78,7 @@ export class Consumer extends TypedEventEmitter {
options.authenticationErrorTimeout ?? 10000;
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0;
this.shouldDeleteMessages = options.shouldDeleteMessages ?? true;
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false;
this.sqs =
options.sqs ||
new SQSClient({
Expand Down Expand Up @@ -453,7 +455,9 @@ export class Consumer extends TypedEventEmitter {
result = await this.handleMessage(message);
}

return result instanceof Object ? result : message;
return !this.alwaysAcknowledge && result instanceof Object
? result
: message;
} catch (err) {
if (err instanceof TimeoutError) {
throw toTimeoutError(
Expand Down Expand Up @@ -482,7 +486,9 @@ export class Consumer extends TypedEventEmitter {
try {
const result = await this.handleMessageBatch(messages);

return result instanceof Object ? result : messages;
return !this.alwaysAcknowledge && result instanceof Object
? result
: messages;
} catch (err) {
if (err instanceof Error) {
throw toStandardError(
Expand Down
8 changes: 8 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ export interface ConsumerOptions {
* @defaultvalue `true`
*/
shouldDeleteMessages?: boolean;
/**
* By default, the consumer will treat an empty object or array from either of the
* handlers as a acknowledgement of no messages and will not delete those messages as
* a result. Set this to `true` to always acknowledge all messages no matter the returned
* value.
* @defaultvalue `false`
*/
alwaysAcknowledge?: boolean;
/**
* An `async` function (or function that returns a `Promise`) to be called whenever
* a message is received.
Expand Down
56 changes: 56 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,33 @@ describe('Consumer', () => {
sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessage);
});

it('deletes the message if alwaysAcknowledge is `true` and handleMessage returns an empty object', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: async () => {
return {};
},
sqs,
alwaysAcknowledge: true
});

consumer.start();
await pEvent(consumer, 'response_processed');
consumer.stop();

sandbox.assert.callCount(sqs.send, 2);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(sqs.send.secondCall, mockDeleteMessage);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
ReceiptHandle: 'receipt-handle'
})
);
});

it('does not call deleteMessageBatch if handleMessagesBatch returns an empty array', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
Expand All @@ -1064,6 +1091,35 @@ describe('Consumer', () => {
sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessageBatch);
});

it('calls deleteMessageBatch if alwaysAcknowledge is `true` and handleMessagesBatch returns an empty array', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessageBatch: async () => [],
batchSize: 2,
sqs,
alwaysAcknowledge: true
});

consumer.start();
await pEvent(consumer, 'response_processed');
consumer.stop();

sandbox.assert.callCount(sqs.send, 2);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(
sqs.send.secondCall,
mockDeleteMessageBatch
);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
Entries: [{ Id: '123', ReceiptHandle: 'receipt-handle' }]
})
);
});

it('ack all messages if handleMessageBatch returns void', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
Expand Down

0 comments on commit 02ef12e

Please sign in to comment.