Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for processing more than 10 messages concurrently #271

Closed
wants to merge 9 commits into from
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ Creates a new SQS consumer.
- `attributeNames` - _Array_ - List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`).
- `messageAttributeNames` - _Array_ - List of message attributes to retrieve (i.e. `['name', 'address']`).
- `batchSize` - _Number_ - The number of messages to request from SQS when polling (default `1`). This cannot be higher than the AWS limit of 10.

* `concurrency` - _Number_ - The number of messages (or batches if `handleMessageBatch` is set) to process concurrently. This **can** be higher than the `batchSize` limit.
* `bufferMessages` - _Boolean_ - When enabled, maintains a buffer (up to the `batchSize`) of messages in an internal queue. When this option is enabled, the consumer does not need to wait for an entire batch of messages to be processed before moving on to the next one. Enabled by default when `concurrency` is set.

- `visibilityTimeout` - _Number_ - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request.
- `heartbeatInterval` - _Number_ - The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`.
- `terminateVisibilityTimeout` - _Boolean_ - If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`).
Expand Down
13 changes: 5 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
},
"dependencies": {
"aws-sdk": "^2.1271.0",
"debug": "^4.3.4"
"debug": "^4.3.4",
"fastq": "^1.14.0"
},
"peerDependencies": {
"aws-sdk": "^2.1271.0"
Expand Down
74 changes: 68 additions & 6 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import Debug from 'debug';
import { EventEmitter } from 'events';
import { autoBind } from './bind';
import { SQSError, TimeoutError } from './errors';
import fastq from 'fastq';

const debug = Debug('sqs-consumer');

Expand Down Expand Up @@ -90,6 +91,8 @@ export interface ConsumerOptions {
messageAttributeNames?: string[];
stopped?: boolean;
batchSize?: number;
concurrency?: number;
bufferMessages?: boolean;
visibilityTimeout?: number;
waitTimeSeconds?: number;
authenticationErrorTimeout?: number;
Expand All @@ -115,6 +118,13 @@ interface Events {
stopped: [];
}

enum POLLING_STATUS {
ACTIVE,
WAITING,
INACTIVE,
READY
}

export class Consumer extends EventEmitter {
private queueUrl: string;
private handleMessage: (message: SQSMessage) => Promise<void>;
Expand All @@ -124,13 +134,17 @@ export class Consumer extends EventEmitter {
private messageAttributeNames: string[];
private stopped: boolean;
private batchSize: number;
private concurrency: number;
private bufferMessages: boolean;
private visibilityTimeout: number;
private waitTimeSeconds: number;
private authenticationErrorTimeout: number;
private pollingWaitTimeMs: number;
private terminateVisibilityTimeout: boolean;
private heartbeatInterval: number;
private sqs: SQS;
private workQueue: fastq.queueAsPromised;
private pollingStatus: POLLING_STATUS;
private shouldDeleteMessages: boolean;

constructor(options: ConsumerOptions) {
Expand All @@ -144,15 +158,22 @@ export class Consumer extends EventEmitter {
this.messageAttributeNames = options.messageAttributeNames || [];
this.stopped = true;
this.batchSize = options.batchSize || 1;
this.visibilityTimeout = options.visibilityTimeout;
this.concurrency =
options.concurrency || (this.handleMessageBatch ? 1 : this.batchSize);
(this.bufferMessages = options.bufferMessages ?? !!options.concurrency),
(this.visibilityTimeout = options.visibilityTimeout);
this.terminateVisibilityTimeout =
options.terminateVisibilityTimeout || false;
this.heartbeatInterval = options.heartbeatInterval;
this.waitTimeSeconds = options.waitTimeSeconds ?? 20;
this.authenticationErrorTimeout =
options.authenticationErrorTimeout ?? 10000;
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0;
this.pollingStatus = POLLING_STATUS.INACTIVE;
this.shouldDeleteMessages = options.shouldDeleteMessages ?? true;
this.workQueue = this.handleMessageBatch
? fastq.promise(this.executeBatchHandler.bind(this), this.concurrency)
: fastq.promise(this.executeHandler.bind(this), this.concurrency);

this.sqs =
options.sqs ||
Expand Down Expand Up @@ -233,7 +254,9 @@ export class Consumer extends EventEmitter {
return this.changeVisibilityTimeout(message, this.visibilityTimeout);
});
}
await this.executeHandler(message);
debug('pushed');
await this.workQueue.push(message);
debug('done');
await this.deleteMessage(message);
this.emit('message_processed', message);
} catch (err) {
Expand All @@ -244,6 +267,7 @@ export class Consumer extends EventEmitter {
}
} finally {
clearInterval(heartbeat);
this.queuePoll();
}
}

Expand Down Expand Up @@ -333,10 +357,35 @@ export class Consumer extends EventEmitter {

private poll(): void {
if (this.stopped) {
this.pollingStatus === POLLING_STATUS.INACTIVE;
this.emit('stopped');
return;
}

if (this.pollingStatus === POLLING_STATUS.ACTIVE) {
debug('sqs polling already in progress');
return;
}

if (!this.bufferMessages && (this.workQueue as any).running() > 0) {
debug('work queue is not yet empty. not polling');
this.pollingStatus = POLLING_STATUS.READY;
return;
}

if (this.workQueue.length() > 0) {
debug('unstarted work in queue. not polling');
this.pollingStatus = POLLING_STATUS.READY;
return;
}

if ((this.workQueue as any).running() >= this.concurrency) {
debug('work queue at capacity, no need to poll');
this.pollingStatus = POLLING_STATUS.READY;
return;
}

this.pollingStatus = POLLING_STATUS.ACTIVE;
debug('Polling for messages');
const receiveParams = {
QueueUrl: this.queueUrl,
Expand All @@ -349,7 +398,6 @@ export class Consumer extends EventEmitter {

let currentPollingTimeout = this.pollingWaitTimeMs;
this.receiveMessage(receiveParams)
.then(this.handleSqsResponse)
.catch((err) => {
this.emit('error', err);
if (isConnectionError(err)) {
Expand All @@ -358,14 +406,27 @@ export class Consumer extends EventEmitter {
}
return;
})
.then(() => {
setTimeout(this.poll, currentPollingTimeout);
.then((message) => {
this.queuePoll(currentPollingTimeout);
if (message) return this.handleSqsResponse(message);
})
.catch((err) => {
this.emit('error', err);
})
.finally(() => {
if (this.pollingStatus === POLLING_STATUS.ACTIVE) {
this.pollingStatus = POLLING_STATUS.INACTIVE;
}
});
}

private queuePoll(timeout?: number) {
if (this.pollingStatus !== POLLING_STATUS.WAITING) {
this.pollingStatus = POLLING_STATUS.WAITING;
setTimeout(this.poll, timeout ?? this.pollingWaitTimeMs);
}
}

private async processMessageBatch(messages: SQSMessage[]): Promise<void> {
messages.forEach((message) => {
this.emit('message_received', message);
Expand All @@ -381,7 +442,7 @@ export class Consumer extends EventEmitter {
);
});
}
await this.executeBatchHandler(messages);
await this.workQueue.push(messages);
await this.deleteMessageBatch(messages);
messages.forEach((message) => {
this.emit('message_processed', message);
Expand All @@ -394,6 +455,7 @@ export class Consumer extends EventEmitter {
}
} finally {
clearInterval(heartbeat);
this.queuePoll();
}
}

Expand Down
36 changes: 28 additions & 8 deletions test/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ describe('Consumer', () => {
handleMessage,
sqs,
authenticationErrorTimeout: 20,
pollingWaitTimeMs: 100
pollingWaitTimeMs: POLLING_TIMEOUT
});

consumer.start();
Expand Down Expand Up @@ -712,15 +712,34 @@ describe('Consumer', () => {
{ Id: '3', ReceiptHandle: 'receipt-handle-3', VisibilityTimeout: 40 }
]
});
sandbox.assert.calledWith(sqs.changeMessageVisibilityBatch, {
QueueUrl: 'some-queue-url',
Entries: [
{ Id: '1', ReceiptHandle: 'receipt-handle-1', VisibilityTimeout: 40 },
{ Id: '2', ReceiptHandle: 'receipt-handle-2', VisibilityTimeout: 40 },
{ Id: '3', ReceiptHandle: 'receipt-handle-3', VisibilityTimeout: 40 }
sandbox.assert.calledOnce(clearIntervalSpy);
});

it('can process more messages than the batch limit', async () => {
sqs.receiveMessage = stubResolve({
Messages: [
{ MessageId: '1', ReceiptHandle: 'receipt-handle-1', Body: 'body-1' },
{ MessageId: '2', ReceiptHandle: 'receipt-handle-2', Body: 'body-2' },
{ MessageId: '3', ReceiptHandle: 'receipt-handle-3', Body: 'body-3' }
]
});
sandbox.assert.calledOnce(clearIntervalSpy);
(handleMessage = sinon
.stub()
.callsFake(() => new Promise((resolve) => setTimeout(resolve, 100)))),
(consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
handleMessage,
batchSize: 3,
concurrency: 6,
sqs
}));

consumer.start();
await clock.tickAsync(100);
consumer.stop();

sandbox.assert.callCount(handleMessage, 6);
});

it('emit error when changing visibility timeout fails', async () => {
Expand Down Expand Up @@ -823,6 +842,7 @@ describe('Consumer', () => {

consumer.start();
consumer.stop();
await clock.runAllAsync();
consumer.start();
consumer.stop();
await clock.runAllAsync();
Expand Down
3 changes: 2 additions & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"moduleResolution": "node",
"allowJs": false,
"noUnusedLocals": true,
"declaration": true
"declaration": true,
"esModuleInterop": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
Expand Down