Skip to content

Commit

Permalink
Add pre and post message callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
mglagola committed Dec 15, 2023
1 parent bed4d2d commit ffdf93c
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 2 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@vercel/sqs-consumer",
"version": "5.5.0",
"version": "5.6.0",
"description": "Build SQS-based Node applications without the boilerplate",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
32 changes: 31 additions & 1 deletion src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ export interface ConsumerOptions {
handleMessageTimeout?: number;
handleMessage?(message: SQSMessage): Promise<void>;
handleMessageBatch?(messages: SQSMessage[]): Promise<void>;
/**
* An `async` function (or function that returns a `Promise`) to be called right
* before the SQS Client sends a receive message command.
*
* This function is usefull if SQS Client module exports have been modified, for
* example to add middlewares.
*/
preReceiveMessageCallback?(): Promise<void>;
/**
* An `async` function (or function that returns a `Promise`) to be called right
* after the SQS Client sends a receive message command.
*
* This function is usefull if SQS Client module exports have been modified, for
* example to add middlewares.
*/
postReceiveMessageCallback?(): Promise<void>;
}

interface Events {
Expand Down Expand Up @@ -119,6 +135,8 @@ export class Consumer extends EventEmitter {
private terminateVisibilityTimeout: boolean;
private heartbeatInterval: number;
private sqs: SQS;
private preReceiveMessageCallback?: () => Promise<void>;
private postReceiveMessageCallback?: () => Promise<void>;

constructor(options: ConsumerOptions) {
super();
Expand All @@ -142,6 +160,9 @@ export class Consumer extends EventEmitter {
region: options.region || process.env.AWS_REGION || 'eu-west-1'
});

this.preReceiveMessageCallback = options.preReceiveMessageCallback;
this.postReceiveMessageCallback = options.postReceiveMessageCallback;

autoBind(this);
}

Expand Down Expand Up @@ -223,9 +244,18 @@ export class Consumer extends EventEmitter {

private async receiveMessage(params: ReceiveMessageRequest): Promise<ReceieveMessageResponse> {
try {
return await this.sqs
if (this.preReceiveMessageCallback) {
await this.preReceiveMessageCallback();
}

const result = await this.sqs
.receiveMessage(params)
.promise();

if (this.postReceiveMessageCallback) {
await this.postReceiveMessageCallback();
}
return result;
} catch (err) {
throw toSQSError(err, `SQS receive message failed: ${err.message}`);
}
Expand Down
24 changes: 24 additions & 0 deletions test/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,30 @@ describe('Consumer', () => {
sandbox.assert.calledWith(handleMessage, response.Messages[0]);
});

it('calls the preReceiveMessageCallback and postReceiveMessageCallback function before receiving a message', async () => {
let callbackCalls = 0;

consumer = new Consumer({
queueUrl: 'some-queue-url',
region: 'some-region',
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
preReceiveMessageCallback: async () => {
callbackCalls++;
},
postReceiveMessageCallback: async () => {
callbackCalls++;
}
});

consumer.start();
await pEvent(consumer, 'message_processed');
consumer.stop();

assert.equal(callbackCalls, 2);
});

it('deletes the message when the handleMessage function is called', async () => {
handleMessage.resolves();

Expand Down

0 comments on commit ffdf93c

Please sign in to comment.