Skip to content

Commit

Permalink
Allow consumers stopped with abort: true to be restarted (#429)
Browse files Browse the repository at this point in the history
* Create a new AbortController each time the consumer starts

* Return a new un aborted signal if the local abort controller doesn't exist

* Better comment
  • Loading branch information
adamrensel-artera authored Oct 16, 2023
1 parent e73df6d commit e120eeb
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 13 deletions.
24 changes: 15 additions & 9 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import {
isConnectionError
} from './errors';
import { validateOption, assertOptions, hasMessages } from './validation';
import { abortController } from './controllers';
import { logger } from './logger';

/**
Expand All @@ -52,6 +51,7 @@ export class Consumer extends TypedEventEmitter {
private authenticationErrorTimeout: number;
private pollingWaitTimeMs: number;
private heartbeatInterval: number;
public abortController: AbortController;

constructor(options: ConsumerOptions) {
super();
Expand Down Expand Up @@ -94,13 +94,26 @@ export class Consumer extends TypedEventEmitter {
*/
public start(): void {
if (this.stopped) {
// Create a new abort controller each time the consumer is started
this.abortController = new AbortController();
logger.debug('starting');
this.stopped = false;
this.emit('started');
this.poll();
}
}

/**
* A reusable options object for sqs.send that's used to avoid duplication.
*/
private get sqsSendOptions(): { abortSignal: AbortSignal } {
return {
// return the current abortController signal or a fresh signal that has not been aborted.
// This effectively defaults the signal sent to the AWS SDK to not aborted
abortSignal: this.abortController?.signal || new AbortController().signal
};
}

/**
* Stop polling the queue for messages (pre existing requests will still be made until concluded).
*/
Expand All @@ -120,7 +133,7 @@ export class Consumer extends TypedEventEmitter {

if (options?.abort) {
logger.debug('aborting');
abortController.abort();
this.abortController.abort();
this.emit('aborted');
}

Expand Down Expand Up @@ -167,13 +180,6 @@ export class Consumer extends TypedEventEmitter {
}
}

/**
* A reusable options object for sqs.send that's used to avoid duplication.
*/
private sqsSendOptions = {
abortSignal: abortController.signal
};

/**
* Poll for new messages from SQS
*/
Expand Down
1 change: 0 additions & 1 deletion src/controllers.ts

This file was deleted.

30 changes: 27 additions & 3 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import * as pEvent from 'p-event';

import { AWSError } from '../../src/types';
import { Consumer } from '../../src/consumer';
import { abortController } from '../../src/controllers';
import { logger } from '../../src/logger';

const sandbox = sinon.createSandbox();
Expand Down Expand Up @@ -168,6 +167,32 @@ describe('Consumer', () => {
});

describe('.start', () => {
it('uses the correct abort signal', async () => {
sqs.send
.withArgs(mockReceiveMessage)
.resolves(new Promise((res) => setTimeout(res, 100)));

// Starts and abort is false
consumer.start();
assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted);

// normal stop without an abort and abort is false
consumer.stop();
assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted);

// Starts and abort is false
consumer.start();
assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted);

// Stop with abort and abort is true
consumer.stop({ abort: true });
assert.isTrue(sqs.send.lastCall.lastArg.abortSignal.aborted);

// Starts and abort is false
consumer.start();
assert.isFalse(sqs.send.lastCall.lastArg.abortSignal.aborted);
});

it('fires an event when the consumer is started', async () => {
const handleStart = sandbox.stub().returns(null);

Expand Down Expand Up @@ -1325,7 +1350,6 @@ describe('Consumer', () => {
it('aborts requests when the abort param is true', async () => {
const handleStop = sandbox.stub().returns(null);
const handleAbort = sandbox.stub().returns(null);
const abortControllerAbort = sandbox.stub(abortController, 'abort');

consumer.on('stopped', handleStop);
consumer.on('aborted', handleAbort);
Expand All @@ -1335,8 +1359,8 @@ describe('Consumer', () => {

await clock.runAllAsync();

assert.isTrue(consumer.abortController.signal.aborted);
sandbox.assert.calledOnce(handleMessage);
sandbox.assert.calledOnce(abortControllerAbort);
sandbox.assert.calledOnce(handleAbort);
sandbox.assert.calledOnce(handleStop);
});
Expand Down

0 comments on commit e120eeb

Please sign in to comment.