Skip to content

Commit

Permalink
feat: Wait for msgs to be processed before emitting stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
barwin committed Jan 23, 2024
1 parent 6b95edb commit bc1a360
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 1 deletion.
35 changes: 34 additions & 1 deletion src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class Consumer extends TypedEventEmitter {
private authenticationErrorTimeout: number;
private pollingWaitTimeMs: number;
private heartbeatInterval: number;
private areMessagesInFlight: boolean;
public abortController: AbortController;

constructor(options: ConsumerOptions) {
Expand Down Expand Up @@ -142,7 +143,36 @@ export class Consumer extends TypedEventEmitter {
this.emit('aborted');
}

this.emit('stopped');
if (options?.waitForInFlightMessages) {
this.waitForInFlightMessagesToComplete(options?.waitTimeoutMs || 0).then(
() => {
this.emit('stopped');
}
);
} else {
this.emit('stopped');
}
}

/**
* Wait for in flight messages to complete.
* @param {number} waitTimeoutMs
* @private
*/
private async waitForInFlightMessagesToComplete(
waitTimeoutMs: number
): Promise<void> {
const startedAt = Date.now();
while (this.areMessagesInFlight) {
if (waitTimeoutMs && Date.now() - startedAt > waitTimeoutMs) {
logger.debug(
'waiting_for_in_flight_messages_to_complete_wait_timeout_exceeded'
);
return;
}
logger.debug('waiting_for_in_flight_messages_to_complete');
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}

/**
Expand Down Expand Up @@ -270,6 +300,8 @@ export class Consumer extends TypedEventEmitter {
});
}, 1000);

this.areMessagesInFlight = true;

if (this.handleMessageBatch) {
await this.processMessageBatch(response.Messages);
} else {
Expand All @@ -279,6 +311,7 @@ export class Consumer extends TypedEventEmitter {
clearInterval(handlerProcessingDebugger);

this.emit('response_processed');
this.areMessagesInFlight = false;
} else if (response) {
this.emit('empty');
}
Expand Down
14 changes: 14 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ export interface StopOptions {
* @defaultvalue `false`
*/
abort?: boolean;

/**
* Default to `false`, if you want the stop action to wait for in-flight messages
* to be processed before emitting 'stopped' set this to `true`.
* @defaultvalue `false`
*/
waitForInFlightMessages?: boolean;

/**
* if `waitForInFlightMessages` is set to `true`, this option will be used to
* determine how long to wait for in-flight messages to be processed before
* emitting 'stopped'.
*/
waitTimeoutMs?: number;
}

export interface Events {
Expand Down
71 changes: 71 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,77 @@ describe('Consumer', () => {
sandbox.assert.calledOnce(handleAbort);
sandbox.assert.calledOnce(handleStop);
});

it('waits for in-flight messages before emitting stopped (no timeout)', async () => {
const handleStop = sandbox.stub().returns(null);
const handleResponseProcessed = sandbox.stub().returns(null);

// A slow message handler
handleMessage = sandbox.stub().callsFake(async () => {
await new Promise((resolve) => setTimeout(resolve, 5000));
});

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT
});

consumer.on('stopped', handleStop);
consumer.on('response_processed', handleResponseProcessed);

consumer.start();
await clock.nextAsync();
consumer.stop({ waitForInFlightMessages: true });

await clock.runAllAsync();

sandbox.assert.calledOnce(handleStop);
sandbox.assert.calledOnce(handleResponseProcessed);
sandbox.assert.calledOnce(handleMessage);
assert.ok(handleMessage.calledBefore(handleStop));

// handleResponseProcessed is called after handleMessage, indicating
// messages were allowed to complete before 'stopped' was emitted
assert.ok(handleResponseProcessed.calledBefore(handleStop));
});

it('waits for in-flight messages before emitting stopped (timeout reached)', async () => {
const handleStop = sandbox.stub().returns(null);
const handleResponseProcessed = sandbox.stub().returns(null);

// A slow message handler
handleMessage = sandbox.stub().callsFake(async () => {
await new Promise((resolve) => setTimeout(resolve, 5000));
});

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT
});

consumer.on('stopped', handleStop);
consumer.on('response_processed', handleResponseProcessed);

consumer.start();
await clock.nextAsync();
consumer.stop({ waitForInFlightMessages: true, waitTimeoutMs: 500 });

await clock.runAllAsync();

sandbox.assert.calledOnce(handleStop);
sandbox.assert.calledOnce(handleResponseProcessed);
sandbox.assert.calledOnce(handleMessage);
assert(handleMessage.calledBefore(handleStop));

// Stop was called before the message could be processed, because we reached timeout.
assert(handleStop.calledBefore(handleResponseProcessed));
});
});

describe('isRunning', async () => {
Expand Down

0 comments on commit bc1a360

Please sign in to comment.