Skip to content

Commit

Permalink
feat: add pre and post receive message callbacks (#426)
Browse files Browse the repository at this point in the history
* feat: add pre and post receive message callbacks

* fix: formatting
  • Loading branch information
karolsojko authored Oct 16, 2023
1 parent 7d73fc9 commit 63d146a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
14 changes: 13 additions & 1 deletion src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export class Consumer extends TypedEventEmitter {
private queueUrl: string;
private handleMessage: (message: Message) => Promise<Message | void>;
private handleMessageBatch: (message: Message[]) => Promise<Message[] | void>;
private preReceiveMessageCallback?: () => Promise<void>;
private postReceiveMessageCallback?: () => Promise<void>;
private sqs: SQSClient;
private handleMessageTimeout: number;
private attributeNames: string[];
Expand All @@ -57,6 +59,8 @@ export class Consumer extends TypedEventEmitter {
this.queueUrl = options.queueUrl;
this.handleMessage = options.handleMessage;
this.handleMessageBatch = options.handleMessageBatch;
this.preReceiveMessageCallback = options.preReceiveMessageCallback;
this.postReceiveMessageCallback = options.postReceiveMessageCallback;
this.handleMessageTimeout = options.handleMessageTimeout;
this.attributeNames = options.attributeNames || [];
this.messageAttributeNames = options.messageAttributeNames || [];
Expand Down Expand Up @@ -223,10 +227,18 @@ export class Consumer extends TypedEventEmitter {
params: ReceiveMessageCommandInput
): Promise<ReceiveMessageCommandOutput> {
try {
return await this.sqs.send(
if (this.preReceiveMessageCallback) {
await this.preReceiveMessageCallback();
}
const result = await this.sqs.send(
new ReceiveMessageCommand(params),
this.sqsSendOptions
);
if (this.postReceiveMessageCallback) {
await this.postReceiveMessageCallback();
}

return result;
} catch (err) {
throw toSQSError(err, `SQS receive message failed: ${err.message}`);
}
Expand Down
16 changes: 16 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,22 @@ export interface ConsumerOptions {
* the successful messages only.
*/
handleMessageBatch?(messages: Message[]): Promise<Message[] | void>;
/**
* An `async` function (or function that returns a `Promise`) to be called right
* before the SQS Client sends a receive message command.
*
* This function is usefull if SQS Client module exports have been modified, for
* example to add middlewares.
*/
preReceiveMessageCallback?(): Promise<void>;
/**
* An `async` function (or function that returns a `Promise`) to be called right
* after the SQS Client sends a receive message command.
*
* This function is usefull if SQS Client module exports have been modified, for
* example to add middlewares.
*/
postReceiveMessageCallback?(): Promise<void>;
}

export type UpdatableOptions =
Expand Down
24 changes: 24 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,30 @@ describe('Consumer', () => {
sandbox.assert.calledWith(handleMessage, response.Messages[0]);
});

it('calls the preReceiveMessageCallback and postReceiveMessageCallback function before receiving a message', async () => {
let callbackCalls = 0;

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
preReceiveMessageCallback: async () => {
callbackCalls++;
},
postReceiveMessageCallback: async () => {
callbackCalls++;
}
});

consumer.start();
await pEvent(consumer, 'message_processed');
consumer.stop();

assert.equal(callbackCalls, 2);
});

it('deletes the message when the handleMessage function is called', async () => {
handleMessage.resolves();

Expand Down

0 comments on commit 63d146a

Please sign in to comment.