Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ack partial of a batch of messages #255

Merged
merged 8 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,24 +106,24 @@ Creates a new SQS consumer.

#### Options

| Option | Type | Description |
| ---------------------------- | --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `queueUrl` | String | The SQS queue URL |
| `region` | String | The AWS region (default `eu-west-1`) |
| `handleMessage` | Function | An `async` function (or function that returns a `Promise`) to be called whenever a message is received. Receives an SQS message object as it's first argument. |
| `handleMessageBatch` | Function | An `async` function (or function that returns a `Promise`) to be called whenever a batch of messages is received. Similar to `handleMessage` but will receive the list of messages, not each message individually. **If both are set, `handleMessageBatch` overrides `handleMessage`**. |
| `handleMessageTimeout` | Number | Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue. |
| `attributeNames` | Array | List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`). |
| `messageAttributeNames` | Array | List of message attributes to retrieve (i.e. `['name', 'address']`). |
| `batchSize` | Number | The number of messages to request from SQS when polling (default `1`). This cannot be higher than the [AWS limit of 10](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html). |
| `visibilityTimeout` | Number | The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. |
| `heartbeatInterval` | Number | The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`. |
| `terminateVisibilityTimeout` | Boolean | If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`). |
| `waitTimeSeconds` | Number | The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning (defaults to `20`). |
| `authenticationErrorTimeout` | Number | The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`). |
| `pollingWaitTimeMs` | Number | The duration (in milliseconds) to wait before repolling the queue (defaults to `0`). |
| `sqs` | SQSClient | An optional [SQS Client](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/classes/sqsclient.html) object to use if you need to configure the client manually |
| `shouldDeleteMessages` | Boolean | Default to `true`, if you don't want the package to delete messages from sqs set this to `false` |
| Option | Type | Description |
| ---------------------------- | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `queueUrl` | String | The SQS queue URL |
| `region` | String | The AWS region (default `eu-west-1`) |
| `handleMessage` | Function | An `async` function (or function that returns a `Promise`) to be called whenever a message is received. Receives an SQS message object as it's first argument. |
| `handleMessageBatch` | Function | An `async` function (or function that returns a `Promise`) to be called whenever a batch of messages is received. Similar to `handleMessage` but will receive the list of messages, not each message individually. **If both are set, `handleMessageBatch` overrides `handleMessage`**. In the case that you need to ack only some of the messages, return an array with the successful messages only. |
| `handleMessageTimeout` | Number | Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue. |
| `attributeNames` | Array | List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`). |
| `messageAttributeNames` | Array | List of message attributes to retrieve (i.e. `['name', 'address']`). |
| `batchSize` | Number | The number of messages to request from SQS when polling (default `1`). This cannot be higher than the [AWS limit of 10](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html). |
| `visibilityTimeout` | Number | The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. |
| `heartbeatInterval` | Number | The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`. |
| `terminateVisibilityTimeout` | Boolean | If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`). |
| `waitTimeSeconds` | Number | The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning (defaults to `20`). |
| `authenticationErrorTimeout` | Number | The duration (in milliseconds) to wait before retrying after an authentication error (defaults to `10000`). |
| `pollingWaitTimeMs` | Number | The duration (in milliseconds) to wait before repolling the queue (defaults to `0`). |
| `sqs` | SQSClient | An optional [SQS Client](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/classes/sqsclient.html) object to use if you need to configure the client manually |
| `shouldDeleteMessages` | Boolean | Default to `true`, if you don't want the package to delete messages from sqs set this to `false` |

### `consumer.start()`

Expand Down
24 changes: 17 additions & 7 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export interface ConsumerOptions {
handleMessageTimeout?: number;
shouldDeleteMessages?: boolean;
handleMessage?(message: Message): Promise<void>;
handleMessageBatch?(messages: Message[]): Promise<void>;
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>;
}

interface Events {
Expand All @@ -128,7 +128,7 @@ interface Events {
export class Consumer extends EventEmitter {
private queueUrl: string;
private handleMessage: (message: Message) => Promise<void>;
private handleMessageBatch: (message: Message[]) => Promise<void>;
private handleMessageBatch: (message: Message[]) => Promise<Message[] | void>;
private handleMessageTimeout: number;
private attributeNames: string[];
private messageAttributeNames: string[];
Expand Down Expand Up @@ -390,9 +390,13 @@ export class Consumer extends EventEmitter {
);
});
}
await this.executeBatchHandler(messages);
await this.deleteMessageBatch(messages);
messages.forEach((message) => {
const ackedMessages = await this.executeBatchHandler(messages);

if (ackedMessages.length > 0) {
await this.deleteMessageBatch(ackedMessages);
}

ackedMessages.forEach((message) => {
this.emit('message_processed', message);
});
} catch (err) {
Expand Down Expand Up @@ -433,9 +437,15 @@ export class Consumer extends EventEmitter {
}
}

private async executeBatchHandler(messages: Message[]): Promise<void> {
private async executeBatchHandler(messages: Message[]): Promise<Message[]> {
try {
await this.handleMessageBatch(messages);
const result = await this.handleMessageBatch(messages);

if (result instanceof Object) {
return result;
}

return messages;
} catch (err) {
err.message = `Unexpected message handler failure: ${err.message}`;
throw err;
Expand Down
77 changes: 77 additions & 0 deletions test/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,83 @@ describe('Consumer', () => {
sandbox.assert.callCount(handleMessage, 0);
});

it('does not call deleteMessageBatch if handleMessagesBatch returns an empty array', async () => {
consumer = new Consumer({
queueUrl: 'some-queue-url',
messageAttributeNames: ['attribute-1', 'attribute-2'],
region: 'some-region',
handleMessageBatch: async () => [],
batchSize: 2,
sqs
});

consumer.start();
await pEvent(consumer, 'response_processed');
consumer.stop();

sandbox.assert.neverCalledWithMatch(sqs.send, mockDeleteMessageBatch);
});

it('ack all messages if handleMessageBatch returns void', async () => {
consumer = new Consumer({
queueUrl: 'some-queue-url',
messageAttributeNames: ['attribute-1', 'attribute-2'],
region: 'some-region',
handleMessageBatch: async () => {},
batchSize: 2,
sqs
});

consumer.start();
await pEvent(consumer, 'response_processed');
consumer.stop();

sandbox.assert.callCount(sqs.send, 2);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(
sqs.send.secondCall,
mockDeleteMessageBatch
);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
Entries: [{ Id: '123', ReceiptHandle: 'receipt-handle' }]
})
);
});

it('ack only returned messages if handleMessagesBatch returns an array', async () => {
consumer = new Consumer({
queueUrl: 'some-queue-url',
messageAttributeNames: ['attribute-1', 'attribute-2'],
region: 'some-region',
handleMessageBatch: async () => [
{ MessageId: '123', ReceiptHandle: 'receipt-handle' }
],
batchSize: 2,
sqs
});

consumer.start();
await pEvent(consumer, 'response_processed');
consumer.stop();

sandbox.assert.callCount(sqs.send, 2);
sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage);
sandbox.assert.calledWithMatch(
sqs.send.secondCall,
mockDeleteMessageBatch
);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
Entries: [{ Id: '123', ReceiptHandle: 'receipt-handle' }]
})
);
});

it('uses the correct visibility timeout for long running handler functions', async () => {
consumer = new Consumer({
queueUrl: QUEUE_URL,
Expand Down