Skip to content

Commit

Permalink
fix: Await final poll for graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
barwin committed Jan 24, 2024
1 parent 5ee2fc2 commit a1a1c76
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 28 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,13 @@ By default, the value of `abort` is set to `false` which means pre existing requ
`consumer.stop({ abort: true })`
Optionally, you can wait for any in flight messages to complete before stopping the consumer by passing `waitForInFlightMessages: true` in the options object.
Optionally, you can wait for in flight messages to complete before stopping the consumer
by passing a timeout `waitForInFlightMessagesMs` in the options object.
`consumer.stop({ waitForInFlightMessages: true, waitTimeMs: 30000 });`
This will allow the final in-progress `poll` to complete as well as any messages received during that poll.
Be sure to account for the polling `waitTimeSeconds` plus any message processing time you want to allow.
`consumer.stop({ waitForInFlightMessagesMs: 30000 });`
### `consumer.isRunning`
Expand Down
26 changes: 14 additions & 12 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class Consumer extends TypedEventEmitter {
private authenticationErrorTimeout: number;
private pollingWaitTimeMs: number;
private heartbeatInterval: number;
private areMessagesInFlight: boolean;
private isPolling: boolean;
public abortController: AbortController;

constructor(options: ConsumerOptions) {
Expand Down Expand Up @@ -143,12 +143,12 @@ export class Consumer extends TypedEventEmitter {
this.emit('aborted');
}

if (options?.waitForInFlightMessages) {
this.waitForInFlightMessagesToComplete(options?.waitTimeoutMs || 0).then(
() => {
this.emit('stopped');
}
);
if (options?.waitForInFlightMessagesMs > 0) {
this.waitForInFlightMessagesToComplete(
options.waitForInFlightMessagesMs
).then(() => {
this.emit('stopped');
});
} else {
this.emit('stopped');
}
Expand All @@ -163,8 +163,8 @@ export class Consumer extends TypedEventEmitter {
waitTimeoutMs: number
): Promise<void> {
const startedAt = Date.now();
while (this.areMessagesInFlight) {
if (waitTimeoutMs && Date.now() - startedAt > waitTimeoutMs) {
while (this.isPolling) {
if (Date.now() - startedAt > waitTimeoutMs) {
logger.debug(
'waiting_for_in_flight_messages_to_complete_wait_timeout_exceeded'
);
Expand Down Expand Up @@ -228,6 +228,8 @@ export class Consumer extends TypedEventEmitter {

logger.debug('polling');

this.isPolling = true;

let currentPollingTimeout = this.pollingWaitTimeMs;
this.receiveMessage({
QueueUrl: this.queueUrl,
Expand Down Expand Up @@ -257,6 +259,9 @@ export class Consumer extends TypedEventEmitter {
})
.catch((err) => {
this.emitError(err);
})
.finally(() => {
this.isPolling = false;
});
}

Expand Down Expand Up @@ -300,8 +305,6 @@ export class Consumer extends TypedEventEmitter {
});
}, 1000);

this.areMessagesInFlight = true;

if (this.handleMessageBatch) {
await this.processMessageBatch(response.Messages);
} else {
Expand All @@ -311,7 +314,6 @@ export class Consumer extends TypedEventEmitter {
clearInterval(handlerProcessingDebugger);

this.emit('response_processed');
this.areMessagesInFlight = false;
} else if (response) {
this.emit('empty');
}
Expand Down
15 changes: 4 additions & 11 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,11 @@ export interface StopOptions {
abort?: boolean;

/**
* Default to `false`, if you want the stop action to wait for in-flight messages
* to be processed before emitting 'stopped' set this to `true`.
* @defaultvalue `false`
*/
waitForInFlightMessages?: boolean;

/**
* if `waitForInFlightMessages` is set to `true`, this option will be used to
* determine how long to wait for in-flight messages to be processed before
* emitting 'stopped'.
* If you want the stop action to wait for the final poll to complete and in-flight messages
* to be processed before emitting 'stopped' set this to the max amount of time to wait.
* @defaultvalue `undefined`
*/
waitTimeoutMs?: number;
waitForInFlightMessagesMs?: number;
}

export interface Events {
Expand Down
6 changes: 6 additions & 0 deletions test/features/gracefulShutdown.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Feature: Graceful shutdown

Scenario: Several messages in flight
Given Several messages are sent to the SQS queue
Then the application is stopped while messages are in flight
Then the in-flight messages should be processed before stopped is emitted
56 changes: 56 additions & 0 deletions test/features/step_definitions/gracefulShutdown.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
const { Given, Then, After } = require('@cucumber/cucumber');
const assert = require('assert');
const { PurgeQueueCommand } = require('@aws-sdk/client-sqs');
const pEvent = require('p-event');

const { consumer } = require('../utils/consumer/gracefulShutdown');
const { producer } = require('../utils/producer');
const { sqs, QUEUE_URL } = require('../utils/sqs');

Given('Several messages are sent to the SQS queue', async () => {
const params = {
QueueUrl: QUEUE_URL
};
const command = new PurgeQueueCommand(params);
const response = await sqs.send(command);

assert.strictEqual(response['$metadata'].httpStatusCode, 200);

const size = await producer.queueSize();
assert.strictEqual(size, 0);

await producer.send(['msg1', 'msg2', 'msg3']);

const size2 = await producer.queueSize();

assert.strictEqual(size2, 3);
});

Then('the application is stopped while messages are in flight', async () => {
consumer.start();

consumer.stop({ waitForInFlightMessagesMs: 5000 });

assert.strictEqual(consumer.isRunning, false);
});

Then(
'the in-flight messages should be processed before stopped is emitted',
async () => {
let numProcessed = 0;
consumer.on('message_processed', () => {
numProcessed++;
});

await pEvent(consumer, 'stopped');

assert.strictEqual(numProcessed, 3);

const size = await producer.queueSize();
assert.strictEqual(size, 0);
}
);

After(() => {
return consumer.stop();
});
16 changes: 16 additions & 0 deletions test/features/utils/consumer/gracefulShutdown.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const { Consumer } = require('../../../../dist/consumer');

const { QUEUE_URL, sqs } = require('../sqs');

const consumer = Consumer.create({
queueUrl: QUEUE_URL,
sqs,
pollingWaitTimeMs: 1000,
batchSize: 10,
handleMessage: async (message) => {
await new Promise((resolve) => setTimeout(resolve, 100));
return message;
}
});

exports.consumer = consumer;
6 changes: 3 additions & 3 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,7 @@ describe('Consumer', () => {
sandbox.assert.calledOnce(handleStop);
});

it('waits for in-flight messages before emitting stopped (no timeout)', async () => {
it('waits for in-flight messages before emitting stopped (within timeout)', async () => {
const handleStop = sandbox.stub().returns(null);
const handleResponseProcessed = sandbox.stub().returns(null);

Expand All @@ -1504,7 +1504,7 @@ describe('Consumer', () => {

consumer.start();
await clock.nextAsync();
consumer.stop({ waitForInFlightMessages: true });
consumer.stop({ waitForInFlightMessagesMs: 5000 });

await clock.runAllAsync();

Expand Down Expand Up @@ -1540,7 +1540,7 @@ describe('Consumer', () => {

consumer.start();
await clock.nextAsync();
consumer.stop({ waitForInFlightMessages: true, waitTimeoutMs: 500 });
consumer.stop({ waitForInFlightMessagesMs: 500 });

await clock.runAllAsync();

Expand Down

0 comments on commit a1a1c76

Please sign in to comment.