From 66fe8b8a9e08e9e0013274d2885488ae6adc2526 Mon Sep 17 00:00:00 2001 From: Hadi Date: Mon, 17 Jun 2024 15:15:12 +0100 Subject: [PATCH] feat: add retry and back-off when creating receivers and senders --- src/service/queue/queue.service.spec.ts | 70 +++++++++++++++++++++++++ src/service/queue/queue.service.ts | 36 ++++++++----- 2 files changed, 93 insertions(+), 13 deletions(-) diff --git a/src/service/queue/queue.service.spec.ts b/src/service/queue/queue.service.spec.ts index 4b00821..d99464f 100644 --- a/src/service/queue/queue.service.spec.ts +++ b/src/service/queue/queue.service.spec.ts @@ -241,6 +241,42 @@ describe('QueueService', () => { const messageControl = getInternallyCreatedMessageControl(); expect(messageControl.accept).toHaveBeenCalled(); }); + + it('should return an existing receiver if already created', async () => { + const receiver = {} as Receiver; + const source = 'test-queue'; + queueService['receivers'].set('default:test-queue', receiver); + + const result = await queueService['getReceiver'](source, 1, jest.fn(), 'default'); + + expect(result).toBe(receiver); + expect(amqpService.createReceiver).not.toHaveBeenCalled(); + }); + + it('should create a new receiver if not already created', async () => { + const receiver = {} as Receiver; + const source = 'test-queue'; + const messageHandler = jest.fn(); + + (amqpService as any).createReceiver.mockResolvedValue(receiver); + + const result = await queueService['getReceiver'](source, 1, messageHandler, 'default'); + + expect(result).toBe(receiver); + expect(amqpService.createReceiver).toHaveBeenCalledWith(source, 1, expect.any(Function), 'default'); + }); + + it('should retry creating a receiver on failure', async () => { + const source = 'test-queue'; + const messageHandler = jest.fn(); + + (amqpService as any).createReceiver.mockRejectedValueOnce(new Error('Test error')).mockResolvedValueOnce({} as Receiver); + + const result = await queueService['getReceiver'](source, 1, messageHandler, 'default'); + + expect(result).toBeDefined(); + expect(amqpService.createReceiver).toHaveBeenCalledTimes(2); + }); }); }); @@ -347,6 +383,40 @@ describe('QueueService', () => { expect(sender.send).toHaveBeenCalledWith({ body: 'null', message_annotations: { 'x-opt-delivery-delay': delay * 1000 } }); }); + + it('should return an existing sender if already created', async () => { + const sender = {} as AwaitableSender; + const target = 'test-queue'; + queueService['senders'].set('default:test-queue', sender); + + const result = await queueService['getSender'](target, 'default'); + + expect(result).toBe(sender); + expect(amqpService.createSender).not.toHaveBeenCalled(); + }); + + it('should create a new sender if not already created', async () => { + const sender = {} as AwaitableSender; + const target = 'test-queue'; + + (amqpService as any).createSender.mockResolvedValue(sender); + + const result = await queueService['getSender'](target, 'default'); + + expect(result).toBe(sender); + expect(amqpService.createSender).toHaveBeenCalledWith(target, 'default'); + }); + + it('should retry creating a sender on failure', async () => { + const target = 'test-queue'; + + (amqpService as any).createSender.mockRejectedValueOnce(new Error('Test error')).mockResolvedValueOnce({} as AwaitableSender); + + const result = await queueService['getSender'](target, 'default'); + + expect(result).toBeDefined(); + expect(amqpService.createSender).toHaveBeenCalledTimes(2); + }); }); describe('removeListener()', () => { diff --git a/src/service/queue/queue.service.ts b/src/service/queue/queue.service.ts index 8c4c24b..b5898cd 100644 --- a/src/service/queue/queue.service.ts +++ b/src/service/queue/queue.service.ts @@ -4,12 +4,12 @@ import { AwaitableSender, Delivery, EventContext, Message, Receiver, Source } fr import { extendObject, + getLoggerContext, + Logger, sleep, tryParseJSON, - ValidationNullObjectException, - Logger, - getLoggerContext, ValidationException, + ValidationNullObjectException, } from '../../util'; import { MessageControl } from '../../domain'; import { SendState } from '../../enum'; @@ -29,6 +29,7 @@ const toString = Object.prototype.toString; export class QueueService { private readonly receivers: Map; private readonly senders: Map; + private readonly reconnectDelay: number = 5000; // 5 seconds constructor(private readonly amqpService: AMQPService, private readonly objectValidatorService: ObjectValidatorService) { // this means only one sender and receiver / app / queue @@ -334,28 +335,37 @@ export class QueueService { const receiverToken = this.getLinkToken(sourceToken, connection); - if (!this.receivers.has(receiverToken)) { + if (this.receivers.has(receiverToken)) { + return this.receivers.get(receiverToken); + } + + try { const receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection); this.receivers.set(receiverToken, receiver); + return receiver; + } catch (error) { + logger.error(`Error creating receiver: ${error.message}`, error.stack); + await sleep(this.reconnectDelay); + return this.getReceiver(source, credit, messageHandler, connection); } - - return this.receivers.get(receiverToken); } private async getSender(target: string, connection: string): Promise { - let sender; - const senderToken = this.getLinkToken(target, connection); if (this.senders.has(senderToken)) { - sender = this.senders.get(senderToken); - } else { - sender = await this.amqpService.createSender(target, connection); + return this.senders.get(senderToken); + } + try { + const sender = await this.amqpService.createSender(target, connection); this.senders.set(senderToken, sender); + return sender; + } catch (error) { + logger.error(`Error creating sender: ${error.message}`, error.stack); + await sleep(this.reconnectDelay); + return this.getSender(target, connection); } - - return sender; } private encodeMessage(message: any): string {