From a1a1c761efc2ee3740c85c5cb652a63b94840af8 Mon Sep 17 00:00:00 2001 From: Ben Arwin Date: Wed, 24 Jan 2024 10:47:13 -0800 Subject: [PATCH] fix: Await final poll for graceful shutdown --- README.md | 8 ++- src/consumer.ts | 26 +++++---- src/types.ts | 15 ++--- test/features/gracefulShutdown.feature | 6 ++ .../step_definitions/gracefulShutdown.js | 56 +++++++++++++++++++ .../utils/consumer/gracefulShutdown.js | 16 ++++++ test/tests/consumer.test.ts | 6 +- 7 files changed, 105 insertions(+), 28 deletions(-) create mode 100644 test/features/gracefulShutdown.feature create mode 100644 test/features/step_definitions/gracefulShutdown.js create mode 100644 test/features/utils/consumer/gracefulShutdown.js diff --git a/README.md b/README.md index 7d29a29d..7c6a5009 100644 --- a/README.md +++ b/README.md @@ -125,9 +125,13 @@ By default, the value of `abort` is set to `false` which means pre existing requ `consumer.stop({ abort: true })` -Optionally, you can wait for any in flight messages to complete before stopping the consumer by passing `waitForInFlightMessages: true` in the options object. +Optionally, you can wait for in flight messages to complete before stopping the consumer +by passing a timeout `waitForInFlightMessagesMs` in the options object. -`consumer.stop({ waitForInFlightMessages: true, waitTimeMs: 30000 });` +This will allow the final in-progress `poll` to complete as well as any messages received during that poll. +Be sure to account for the polling `waitTimeSeconds` plus any message processing time you want to allow. + +`consumer.stop({ waitForInFlightMessagesMs: 30000 });` ### `consumer.isRunning` diff --git a/src/consumer.ts b/src/consumer.ts index 11e081cb..31ad1220 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -55,7 +55,7 @@ export class Consumer extends TypedEventEmitter { private authenticationErrorTimeout: number; private pollingWaitTimeMs: number; private heartbeatInterval: number; - private areMessagesInFlight: boolean; + private isPolling: boolean; public abortController: AbortController; constructor(options: ConsumerOptions) { @@ -143,12 +143,12 @@ export class Consumer extends TypedEventEmitter { this.emit('aborted'); } - if (options?.waitForInFlightMessages) { - this.waitForInFlightMessagesToComplete(options?.waitTimeoutMs || 0).then( - () => { - this.emit('stopped'); - } - ); + if (options?.waitForInFlightMessagesMs > 0) { + this.waitForInFlightMessagesToComplete( + options.waitForInFlightMessagesMs + ).then(() => { + this.emit('stopped'); + }); } else { this.emit('stopped'); } @@ -163,8 +163,8 @@ export class Consumer extends TypedEventEmitter { waitTimeoutMs: number ): Promise { const startedAt = Date.now(); - while (this.areMessagesInFlight) { - if (waitTimeoutMs && Date.now() - startedAt > waitTimeoutMs) { + while (this.isPolling) { + if (Date.now() - startedAt > waitTimeoutMs) { logger.debug( 'waiting_for_in_flight_messages_to_complete_wait_timeout_exceeded' ); @@ -228,6 +228,8 @@ export class Consumer extends TypedEventEmitter { logger.debug('polling'); + this.isPolling = true; + let currentPollingTimeout = this.pollingWaitTimeMs; this.receiveMessage({ QueueUrl: this.queueUrl, @@ -257,6 +259,9 @@ export class Consumer extends TypedEventEmitter { }) .catch((err) => { this.emitError(err); + }) + .finally(() => { + this.isPolling = false; }); } @@ -300,8 +305,6 @@ export class Consumer extends TypedEventEmitter { }); }, 1000); - this.areMessagesInFlight = true; - if (this.handleMessageBatch) { await this.processMessageBatch(response.Messages); } else { @@ -311,7 +314,6 @@ export class Consumer extends TypedEventEmitter { clearInterval(handlerProcessingDebugger); this.emit('response_processed'); - this.areMessagesInFlight = false; } else if (response) { this.emit('empty'); } diff --git a/src/types.ts b/src/types.ts index a73df349..f388ad5f 100644 --- a/src/types.ts +++ b/src/types.ts @@ -143,18 +143,11 @@ export interface StopOptions { abort?: boolean; /** - * Default to `false`, if you want the stop action to wait for in-flight messages - * to be processed before emitting 'stopped' set this to `true`. - * @defaultvalue `false` - */ - waitForInFlightMessages?: boolean; - - /** - * if `waitForInFlightMessages` is set to `true`, this option will be used to - * determine how long to wait for in-flight messages to be processed before - * emitting 'stopped'. + * If you want the stop action to wait for the final poll to complete and in-flight messages + * to be processed before emitting 'stopped' set this to the max amount of time to wait. + * @defaultvalue `undefined` */ - waitTimeoutMs?: number; + waitForInFlightMessagesMs?: number; } export interface Events { diff --git a/test/features/gracefulShutdown.feature b/test/features/gracefulShutdown.feature new file mode 100644 index 00000000..8cb4e61a --- /dev/null +++ b/test/features/gracefulShutdown.feature @@ -0,0 +1,6 @@ +Feature: Graceful shutdown + + Scenario: Several messages in flight + Given Several messages are sent to the SQS queue + Then the application is stopped while messages are in flight + Then the in-flight messages should be processed before stopped is emitted \ No newline at end of file diff --git a/test/features/step_definitions/gracefulShutdown.js b/test/features/step_definitions/gracefulShutdown.js new file mode 100644 index 00000000..b31741de --- /dev/null +++ b/test/features/step_definitions/gracefulShutdown.js @@ -0,0 +1,56 @@ +const { Given, Then, After } = require('@cucumber/cucumber'); +const assert = require('assert'); +const { PurgeQueueCommand } = require('@aws-sdk/client-sqs'); +const pEvent = require('p-event'); + +const { consumer } = require('../utils/consumer/gracefulShutdown'); +const { producer } = require('../utils/producer'); +const { sqs, QUEUE_URL } = require('../utils/sqs'); + +Given('Several messages are sent to the SQS queue', async () => { + const params = { + QueueUrl: QUEUE_URL + }; + const command = new PurgeQueueCommand(params); + const response = await sqs.send(command); + + assert.strictEqual(response['$metadata'].httpStatusCode, 200); + + const size = await producer.queueSize(); + assert.strictEqual(size, 0); + + await producer.send(['msg1', 'msg2', 'msg3']); + + const size2 = await producer.queueSize(); + + assert.strictEqual(size2, 3); +}); + +Then('the application is stopped while messages are in flight', async () => { + consumer.start(); + + consumer.stop({ waitForInFlightMessagesMs: 5000 }); + + assert.strictEqual(consumer.isRunning, false); +}); + +Then( + 'the in-flight messages should be processed before stopped is emitted', + async () => { + let numProcessed = 0; + consumer.on('message_processed', () => { + numProcessed++; + }); + + await pEvent(consumer, 'stopped'); + + assert.strictEqual(numProcessed, 3); + + const size = await producer.queueSize(); + assert.strictEqual(size, 0); + } +); + +After(() => { + return consumer.stop(); +}); diff --git a/test/features/utils/consumer/gracefulShutdown.js b/test/features/utils/consumer/gracefulShutdown.js new file mode 100644 index 00000000..87883386 --- /dev/null +++ b/test/features/utils/consumer/gracefulShutdown.js @@ -0,0 +1,16 @@ +const { Consumer } = require('../../../../dist/consumer'); + +const { QUEUE_URL, sqs } = require('../sqs'); + +const consumer = Consumer.create({ + queueUrl: QUEUE_URL, + sqs, + pollingWaitTimeMs: 1000, + batchSize: 10, + handleMessage: async (message) => { + await new Promise((resolve) => setTimeout(resolve, 100)); + return message; + } +}); + +exports.consumer = consumer; diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index 0865c236..8d79a0b2 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -1482,7 +1482,7 @@ describe('Consumer', () => { sandbox.assert.calledOnce(handleStop); }); - it('waits for in-flight messages before emitting stopped (no timeout)', async () => { + it('waits for in-flight messages before emitting stopped (within timeout)', async () => { const handleStop = sandbox.stub().returns(null); const handleResponseProcessed = sandbox.stub().returns(null); @@ -1504,7 +1504,7 @@ describe('Consumer', () => { consumer.start(); await clock.nextAsync(); - consumer.stop({ waitForInFlightMessages: true }); + consumer.stop({ waitForInFlightMessagesMs: 5000 }); await clock.runAllAsync(); @@ -1540,7 +1540,7 @@ describe('Consumer', () => { consumer.start(); await clock.nextAsync(); - consumer.stop({ waitForInFlightMessages: true, waitTimeoutMs: 500 }); + consumer.stop({ waitForInFlightMessagesMs: 500 }); await clock.runAllAsync();