From ffdf93c957a620842124608a26a23f10981af812 Mon Sep 17 00:00:00 2001 From: Mark Glagola <1508008+mglagola@users.noreply.github.com> Date: Fri, 15 Dec 2023 10:43:59 -0600 Subject: [PATCH] Add pre and post message callbacks --- package.json | 2 +- src/consumer.ts | 32 +++++++++++++++++++++++++++++++- test/consumer.ts | 24 ++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 6a45b724..407baa45 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@vercel/sqs-consumer", - "version": "5.5.0", + "version": "5.6.0", "description": "Build SQS-based Node applications without the boilerplate", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/src/consumer.ts b/src/consumer.ts index 1b941730..6729d4e7 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -90,6 +90,22 @@ export interface ConsumerOptions { handleMessageTimeout?: number; handleMessage?(message: SQSMessage): Promise; handleMessageBatch?(messages: SQSMessage[]): Promise; + /** + * An `async` function (or function that returns a `Promise`) to be called right + * before the SQS Client sends a receive message command. + * + * This function is usefull if SQS Client module exports have been modified, for + * example to add middlewares. + */ + preReceiveMessageCallback?(): Promise; + /** + * An `async` function (or function that returns a `Promise`) to be called right + * after the SQS Client sends a receive message command. + * + * This function is usefull if SQS Client module exports have been modified, for + * example to add middlewares. + */ + postReceiveMessageCallback?(): Promise; } interface Events { @@ -119,6 +135,8 @@ export class Consumer extends EventEmitter { private terminateVisibilityTimeout: boolean; private heartbeatInterval: number; private sqs: SQS; + private preReceiveMessageCallback?: () => Promise; + private postReceiveMessageCallback?: () => Promise; constructor(options: ConsumerOptions) { super(); @@ -142,6 +160,9 @@ export class Consumer extends EventEmitter { region: options.region || process.env.AWS_REGION || 'eu-west-1' }); + this.preReceiveMessageCallback = options.preReceiveMessageCallback; + this.postReceiveMessageCallback = options.postReceiveMessageCallback; + autoBind(this); } @@ -223,9 +244,18 @@ export class Consumer extends EventEmitter { private async receiveMessage(params: ReceiveMessageRequest): Promise { try { - return await this.sqs + if (this.preReceiveMessageCallback) { + await this.preReceiveMessageCallback(); + } + + const result = await this.sqs .receiveMessage(params) .promise(); + + if (this.postReceiveMessageCallback) { + await this.postReceiveMessageCallback(); + } + return result; } catch (err) { throw toSQSError(err, `SQS receive message failed: ${err.message}`); } diff --git a/test/consumer.ts b/test/consumer.ts index acc91f05..569dcf25 100644 --- a/test/consumer.ts +++ b/test/consumer.ts @@ -367,6 +367,30 @@ describe('Consumer', () => { sandbox.assert.calledWith(handleMessage, response.Messages[0]); }); + it('calls the preReceiveMessageCallback and postReceiveMessageCallback function before receiving a message', async () => { + let callbackCalls = 0; + + consumer = new Consumer({ + queueUrl: 'some-queue-url', + region: 'some-region', + handleMessage, + sqs, + authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT, + preReceiveMessageCallback: async () => { + callbackCalls++; + }, + postReceiveMessageCallback: async () => { + callbackCalls++; + } + }); + + consumer.start(); + await pEvent(consumer, 'message_processed'); + consumer.stop(); + + assert.equal(callbackCalls, 2); + }); + it('deletes the message when the handleMessage function is called', async () => { handleMessage.resolves();