From e120eebb025b2dcaf77050a2075442fea12444d9 Mon Sep 17 00:00:00 2001 From: Adam Rensel <118287406+adamrensel-artera@users.noreply.github.com> Date: Mon, 16 Oct 2023 09:34:26 -0400 Subject: [PATCH] Allow consumers stopped with abort: true to be restarted (#429) * Create a new AbortController each time the consumer starts * Return a new un aborted signal if the local abort controller doesn't exist * Better comment --- src/consumer.ts | 24 +++++++++++++++--------- src/controllers.ts | 1 - test/tests/consumer.test.ts | 30 +++++++++++++++++++++++++++--- 3 files changed, 42 insertions(+), 13 deletions(-) delete mode 100644 src/controllers.ts diff --git a/src/consumer.ts b/src/consumer.ts index 02c589bd..e54eec15 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -26,7 +26,6 @@ import { isConnectionError } from './errors'; import { validateOption, assertOptions, hasMessages } from './validation'; -import { abortController } from './controllers'; import { logger } from './logger'; /** @@ -52,6 +51,7 @@ export class Consumer extends TypedEventEmitter { private authenticationErrorTimeout: number; private pollingWaitTimeMs: number; private heartbeatInterval: number; + public abortController: AbortController; constructor(options: ConsumerOptions) { super(); @@ -94,6 +94,8 @@ export class Consumer extends TypedEventEmitter { */ public start(): void { if (this.stopped) { + // Create a new abort controller each time the consumer is started + this.abortController = new AbortController(); logger.debug('starting'); this.stopped = false; this.emit('started'); @@ -101,6 +103,17 @@ export class Consumer extends TypedEventEmitter { } } + /** + * A reusable options object for sqs.send that's used to avoid duplication. + */ + private get sqsSendOptions(): { abortSignal: AbortSignal } { + return { + // return the current abortController signal or a fresh signal that has not been aborted. + // This effectively defaults the signal sent to the AWS SDK to not aborted + abortSignal: this.abortController?.signal || new AbortController().signal + }; + } + /** * Stop polling the queue for messages (pre existing requests will still be made until concluded). */ @@ -120,7 +133,7 @@ export class Consumer extends TypedEventEmitter { if (options?.abort) { logger.debug('aborting'); - abortController.abort(); + this.abortController.abort(); this.emit('aborted'); } @@ -167,13 +180,6 @@ export class Consumer extends TypedEventEmitter { } } - /** - * A reusable options object for sqs.send that's used to avoid duplication. - */ - private sqsSendOptions = { - abortSignal: abortController.signal - }; - /** * Poll for new messages from SQS */ diff --git a/src/controllers.ts b/src/controllers.ts deleted file mode 100644 index 9417ee67..00000000 --- a/src/controllers.ts +++ /dev/null @@ -1 +0,0 @@ -export const abortController = new AbortController(); diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index 8f7e0fdc..6a5eefa0 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -12,7 +12,6 @@ import * as pEvent from 'p-event'; import { AWSError } from '../../src/types'; import { Consumer } from '../../src/consumer'; -import { abortController } from '../../src/controllers'; import { logger } from '../../src/logger'; const sandbox = sinon.createSandbox(); @@ -168,6 +167,32 @@ describe('Consumer', () => { }); describe('.start', () => { + it('uses the correct abort signal', async () => { + sqs.send + .withArgs(mockReceiveMessage) + .resolves(new Promise((res) => setTimeout(res, 100))); + + // Starts and abort is false + consumer.start(); + assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted); + + // normal stop without an abort and abort is false + consumer.stop(); + assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted); + + // Starts and abort is false + consumer.start(); + assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted); + + // Stop with abort and abort is true + consumer.stop({ abort: true }); + assert.isTrue(sqs.send.lastCall.lastArg.abortSignal.aborted); + + // Starts and abort is false + consumer.start(); + assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted); + }); + it('fires an event when the consumer is started', async () => { const handleStart = sandbox.stub().returns(null); @@ -1325,7 +1350,6 @@ describe('Consumer', () => { it('aborts requests when the abort param is true', async () => { const handleStop = sandbox.stub().returns(null); const handleAbort = sandbox.stub().returns(null); - const abortControllerAbort = sandbox.stub(abortController, 'abort'); consumer.on('stopped', handleStop); consumer.on('aborted', handleAbort); @@ -1335,8 +1359,8 @@ describe('Consumer', () => { await clock.runAllAsync(); + assert.isTrue(consumer.abortController.signal.aborted); sandbox.assert.calledOnce(handleMessage); - sandbox.assert.calledOnce(abortControllerAbort); sandbox.assert.calledOnce(handleAbort); sandbox.assert.calledOnce(handleStop); });