From 9704f54e61be680a72b807ee42581ee8d6ecddd6 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 24 May 2024 18:25:06 +0300 Subject: [PATCH] Make AMQP publisher sync --- .../lib/AmqpQueuePublisherManager.spec.ts | 2 +- .../amqp/lib/AmqpQueuePublisherManager.ts | 20 ++++++++++++++----- .../lib/queues/AbstractPublisherManager.ts | 16 ++++++++++----- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts b/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts index c4fd56ec..6a5079ec 100644 --- a/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts +++ b/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts @@ -18,7 +18,7 @@ describe('AmqpQueuePublisherManager', () => { const fakeConsumer = new FakeConsumer(diContainer.cradle, TestEvents.updated) await fakeConsumer.start() - const publishedMessage = await queuePublisherManager.publish(FakeConsumer.QUEUE_NAME, { + const publishedMessage = queuePublisherManager.publishSync(FakeConsumer.QUEUE_NAME, { type: 'entity.updated', payload: { updatedData: 'msg', diff --git a/packages/amqp/lib/AmqpQueuePublisherManager.ts b/packages/amqp/lib/AmqpQueuePublisherManager.ts index 8fb291c0..a5154184 100644 --- a/packages/amqp/lib/AmqpQueuePublisherManager.ts +++ b/packages/amqp/lib/AmqpQueuePublisherManager.ts @@ -113,15 +113,25 @@ export class AmqpQueuePublisherManager< } } - publish( + publish(): Promise> { + throw new Error('Please use `publishSync` method for AMQP publisher managers') + } + + publishSync( queue: NonNullable, message: MessagePublishType, precedingEventMetadata?: Partial, messageOptions?: AmqpQueueMessageOptions, - ): Promise> { - // Purpose of this override is to provide better name for the first argument - // For AMQP Queues it is going to be queue - return super.publish(queue, message, precedingEventMetadata, messageOptions) + ): MessageSchemaType { + const publisher = this.targetToPublisherMap[queue] + if (!publisher) { + throw new Error(`No publisher for queue ${queue}`) + } + + const messageDefinition = this.resolveMessageDefinition(queue, message) + const resolvedMessage = this.resolveMessage(messageDefinition, message, precedingEventMetadata) + publisher.publish(resolvedMessage, messageOptions) + return resolvedMessage } protected override resolveEventTarget( diff --git a/packages/core/lib/queues/AbstractPublisherManager.ts b/packages/core/lib/queues/AbstractPublisherManager.ts index 0f447f73..739dbeaf 100644 --- a/packages/core/lib/queues/AbstractPublisherManager.ts +++ b/packages/core/lib/queues/AbstractPublisherManager.ts @@ -180,11 +180,7 @@ export abstract class AbstractPublisherManager< if (!publisher) { throw new Error(`No publisher for target ${eventTarget}`) } - // ToDo optimize the lookup - const messageDefinition = this.targetToEventMap[eventTarget].find( - (entry) => entry.consumerSchema.shape.type.value === message.type, - ) - + const messageDefinition = this.resolveMessageDefinition(eventTarget, message) const resolvedMessage = this.resolveMessage(messageDefinition, message, precedingEventMetadata) if (this.isAsync) { @@ -196,6 +192,16 @@ export abstract class AbstractPublisherManager< return resolvedMessage } + protected resolveMessageDefinition( + eventTarget: EventTargets, + message: MessagePublishType, + ) { + // ToDo optimize the lookup + return this.targetToEventMap[eventTarget].find( + (entry) => entry.consumerSchema.shape.type.value === message.type, + ) + } + protected resolveMessage( messageDefinition: EventDefinitionType | undefined, message: MessagePublishType,