From 629fa36c438e14dd317fb675b3b7dfba468fa33c Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Sat, 25 May 2024 18:44:28 +0300 Subject: [PATCH 1/2] Implement topic consumers --- packages/amqp/index.ts | 5 +- packages/amqp/lib/AbstractAmqpConsumer.ts | 33 ++++++-- .../amqp/lib/AbstractAmqpExchangePublisher.ts | 55 ------------- packages/amqp/lib/AbstractAmqpPublisher.ts | 38 ++++++--- .../amqp/lib/AbstractAmqpQueueConsumer.ts | 10 ++- .../amqp/lib/AbstractAmqpQueuePublisher.ts | 27 ++++++- packages/amqp/lib/AbstractAmqpService.ts | 36 +++++---- .../amqp/lib/AbstractAmqpTopicConsumer.ts | 24 ++++++ .../amqp/lib/AbstractAmqpTopicPublisher.ts | 57 ++++++++++++++ .../lib/AmqpQueuePublisherManager.spec.ts | 6 +- .../amqp/lib/AmqpQueuePublisherManager.ts | 46 ++++++++--- .../lib/AmqpTopicPublisherManager.spec.ts | 56 ++++++++++++++ ...anager.ts => AmqpTopicPublisherManager.ts} | 76 ++++++++++-------- .../amqp/lib/CommonAmqpPublisherFactory.ts | 47 ++++++----- packages/amqp/lib/utils/amqpQueueUtils.ts | 77 +++++++++++++++++-- .../{FakeConsumer.ts => FakeQueueConsumer.ts} | 2 +- packages/amqp/test/fakes/FakeTopicConsumer.ts | 52 +++++++++++++ .../publishers/AmqpPermissionPublisher.ts | 8 +- packages/amqp/test/utils/testContext.ts | 51 ++++++++++-- 19 files changed, 533 insertions(+), 173 deletions(-) delete mode 100644 packages/amqp/lib/AbstractAmqpExchangePublisher.ts create mode 100644 packages/amqp/lib/AbstractAmqpTopicConsumer.ts create mode 100644 packages/amqp/lib/AbstractAmqpTopicPublisher.ts create mode 100644 packages/amqp/lib/AmqpTopicPublisherManager.spec.ts rename packages/amqp/lib/{AmqpExchangePublisherManager.ts => AmqpTopicPublisherManager.ts} (52%) rename packages/amqp/test/fakes/{FakeConsumer.ts => FakeQueueConsumer.ts} (92%) create mode 100644 packages/amqp/test/fakes/FakeTopicConsumer.ts diff --git a/packages/amqp/index.ts b/packages/amqp/index.ts index 06b171a0..bd55236a 100644 --- a/packages/amqp/index.ts +++ b/packages/amqp/index.ts @@ -1,6 +1,7 @@ export type { AMQPQueueConfig } from './lib/AbstractAmqpService' export { AbstractAmqpQueueConsumer } from './lib/AbstractAmqpQueueConsumer' +export { AbstractAmqpTopicConsumer } from './lib/AbstractAmqpTopicConsumer' export { AbstractAmqpConsumer, AMQPConsumerOptions } from './lib/AbstractAmqpConsumer' export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver' @@ -13,6 +14,6 @@ export type { ConnectionReceiver } from './lib/AmqpConnectionManager' export { deserializeAmqpMessage } from './lib/amqpMessageDeserializer' export * from './lib/AbstractAmqpQueuePublisher' -export * from './lib/AbstractAmqpExchangePublisher' -export * from './lib/AmqpExchangePublisherManager' +export * from './lib/AbstractAmqpTopicPublisher' +export * from './lib/AmqpTopicPublisherManager' export * from './lib/AmqpQueuePublisherManager' diff --git a/packages/amqp/lib/AbstractAmqpConsumer.ts b/packages/amqp/lib/AbstractAmqpConsumer.ts index bacef374..fec7a123 100644 --- a/packages/amqp/lib/AbstractAmqpConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpConsumer.ts @@ -20,8 +20,8 @@ import type { Connection, Message } from 'amqplib' import type { AMQPConsumerDependencies, - AMQPLocator, - AMQPCreationConfig, + AMQPQueueLocator, + AMQPQueueCreationConfig, } from './AbstractAmqpService' import { AbstractAmqpService } from './AbstractAmqpService' import { readAmqpMessage } from './amqpMessageReader' @@ -33,9 +33,11 @@ export type AMQPConsumerOptions< MessagePayloadType extends object, ExecutionContext = undefined, PrehandlerOutput = undefined, + CreationConfig extends AMQPQueueCreationConfig = AMQPQueueCreationConfig, + LocatorConfig extends AMQPQueueLocator = AMQPQueueLocator, > = QueueConsumerOptions< - AMQPCreationConfig, - AMQPLocator, + CreationConfig, + LocatorConfig, NonNullable, // DeadLetterQueueIntegrationOptions -> empty object for now MessagePayloadType, ExecutionContext, @@ -46,12 +48,16 @@ export abstract class AbstractAmqpConsumer< MessagePayloadType extends object, ExecutionContext, PrehandlerOutput = undefined, + CreationConfig extends AMQPQueueCreationConfig = AMQPQueueCreationConfig, + LocatorConfig extends AMQPQueueLocator = AMQPQueueLocator, > extends AbstractAmqpService< MessagePayloadType, AMQPConsumerDependencies, ExecutionContext, - PrehandlerOutput + PrehandlerOutput, + CreationConfig, + LocatorConfig > implements QueueConsumer { @@ -59,8 +65,8 @@ export abstract class AbstractAmqpConsumer< private readonly errorResolver: ErrorResolver private readonly executionContext: ExecutionContext private readonly deadLetterQueueOptions?: DeadLetterQueueOptions< - AMQPCreationConfig, - AMQPLocator, + AMQPQueueCreationConfig, + AMQPQueueLocator, NonNullable > private readonly maxRetryDuration: number @@ -71,10 +77,17 @@ export abstract class AbstractAmqpConsumer< ExecutionContext, PrehandlerOutput > + protected readonly queueName: string constructor( dependencies: AMQPConsumerDependencies, - options: AMQPConsumerOptions, + options: AMQPConsumerOptions< + MessagePayloadType, + ExecutionContext, + PrehandlerOutput, + CreationConfig, + LocatorConfig + >, executionContext: ExecutionContext, ) { super(dependencies, options) @@ -84,6 +97,10 @@ export abstract class AbstractAmqpConsumer< this.deadLetterQueueOptions = options.deadLetterQueue this.maxRetryDuration = options.maxRetryDuration ?? DEFAULT_MAX_RETRY_DURATION + this.queueName = options.locatorConfig + ? options.locatorConfig.queueName + : options.creationConfig!.queueName + const messageSchemas = options.handlers.map((entry) => entry.schema) this.messageSchemaContainer = new MessageSchemaContainer({ messageSchemas, diff --git a/packages/amqp/lib/AbstractAmqpExchangePublisher.ts b/packages/amqp/lib/AbstractAmqpExchangePublisher.ts deleted file mode 100644 index c66613b0..00000000 --- a/packages/amqp/lib/AbstractAmqpExchangePublisher.ts +++ /dev/null @@ -1,55 +0,0 @@ -import type * as Buffer from 'node:buffer' - -import { objectToBuffer } from '@message-queue-toolkit/core' -import type { Options } from 'amqplib/properties' - -import type { AMQPPublisherOptions } from './AbstractAmqpPublisher' -import { AbstractAmqpPublisher } from './AbstractAmqpPublisher' -import type { AMQPDependencies } from './AbstractAmqpService' - -export type AMQPExchangePublisherOptions = Omit< - AMQPPublisherOptions, - 'creationConfig' -> & { - exchange: string -} - -export type AmqpExchangeMessageOptions = { - routingKey: string - publishOptions: Options.Publish -} - -export abstract class AbstractAmqpExchangePublisher< - MessagePayloadType extends object, -> extends AbstractAmqpPublisher { - constructor( - dependencies: AMQPDependencies, - options: AMQPExchangePublisherOptions, - ) { - super(dependencies, { - ...options, - // FixMe exchange publisher doesn't need queue at all - creationConfig: { - queueName: 'dummy', - queueOptions: {}, - updateAttributesIfExists: false, - }, - exchange: options.exchange, - locatorConfig: undefined, - }) - } - - protected publishInternal(message: Buffer, options: AmqpExchangeMessageOptions): void { - this.channel.publish( - this.exchange!, - options.routingKey, - objectToBuffer(message), - options.publishOptions, - ) - } - - protected createMissingEntities(): Promise { - this.logger.warn('Missing entity creation is not implemented') - return Promise.resolve() - } -} diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index 8aedaa43..303b5414 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -2,27 +2,40 @@ import type { Either } from '@lokalise/node-core' import { InternalError } from '@lokalise/node-core' import type { BarrierResult, + CommonCreationConfigType, MessageInvalidFormatError, MessageValidationError, QueuePublisherOptions, SyncPublisher, } from '@message-queue-toolkit/core' -import { MessageSchemaContainer, objectToBuffer } from '@message-queue-toolkit/core' +import { objectToBuffer, MessageSchemaContainer } from '@message-queue-toolkit/core' import type { ZodSchema } from 'zod' -import type { AMQPLocator, AMQPCreationConfig, AMQPDependencies } from './AbstractAmqpService' +import type { AMQPDependencies } from './AbstractAmqpService' import { AbstractAmqpService } from './AbstractAmqpService' -export type AMQPPublisherOptions = QueuePublisherOptions< - AMQPCreationConfig, - AMQPLocator, - MessagePayloadType -> & { +export type AMQPPublisherOptions< + MessagePayloadType extends object, + CreationConfig extends CommonCreationConfigType, + LocatorConfig extends object, +> = QueuePublisherOptions & { exchange?: string } -export abstract class AbstractAmqpPublisher - extends AbstractAmqpService +export abstract class AbstractAmqpPublisher< + MessagePayloadType extends object, + MessageOptionsType, + CreationConfig extends CommonCreationConfigType, + LocatorConfig extends object, + > + extends AbstractAmqpService< + MessagePayloadType, + AMQPDependencies, + unknown, + unknown, + CreationConfig, + LocatorConfig + > implements SyncPublisher { private readonly messageSchemaContainer: MessageSchemaContainer @@ -30,7 +43,10 @@ export abstract class AbstractAmqpPublisher - constructor(dependencies: AMQPDependencies, options: AMQPPublisherOptions) { + constructor( + dependencies: AMQPDependencies, + options: AMQPPublisherOptions, + ) { super(dependencies, options) const messageSchemas = options.messageSchemas @@ -101,7 +117,9 @@ export abstract class AbstractAmqpPublisher extends AbstractAmqpConsumer { - protected override createMissingEntities(): Promise { - return ensureAmqpQueue(this.connection!, this.channel, this.creationConfig, this.locatorConfig) + protected override async createMissingEntities(): Promise { + if (this.deletionConfig && this.creationConfig) { + await deleteAmqpQueue(this.channel, this.deletionConfig, this.creationConfig) + } + + await ensureAmqpQueue(this.connection!, this.channel, this.creationConfig, this.locatorConfig) } } diff --git a/packages/amqp/lib/AbstractAmqpQueuePublisher.ts b/packages/amqp/lib/AbstractAmqpQueuePublisher.ts index b3e0f035..ae1bc810 100644 --- a/packages/amqp/lib/AbstractAmqpQueuePublisher.ts +++ b/packages/amqp/lib/AbstractAmqpQueuePublisher.ts @@ -1,6 +1,14 @@ +import type * as Buffer from 'node:buffer' + import type { Options } from 'amqplib/properties' +import type { AMQPPublisherOptions } from './AbstractAmqpPublisher' import { AbstractAmqpPublisher } from './AbstractAmqpPublisher' +import type { + AMQPDependencies, + AMQPQueueCreationConfig, + AMQPQueueLocator, +} from './AbstractAmqpService' import { ensureAmqpQueue } from './utils/amqpQueueUtils' export type AmqpQueueMessageOptions = { @@ -13,7 +21,24 @@ const NO_PARAMS: AmqpQueueMessageOptions = { export abstract class AbstractAmqpQueuePublisher< MessagePayloadType extends object, -> extends AbstractAmqpPublisher { +> extends AbstractAmqpPublisher< + MessagePayloadType, + AmqpQueueMessageOptions, + AMQPQueueCreationConfig, + AMQPQueueLocator +> { + protected readonly queueName: string + + constructor( + dependencies: AMQPDependencies, + options: AMQPPublisherOptions, + ) { + super(dependencies, options) + this.queueName = options.locatorConfig + ? options.locatorConfig.queueName + : options.creationConfig.queueName + } + protected publishInternal(message: Buffer, options: AmqpQueueMessageOptions): void { this.channel.sendToQueue(this.queueName, message, options.publishOptions) } diff --git a/packages/amqp/lib/AbstractAmqpService.ts b/packages/amqp/lib/AbstractAmqpService.ts index d9d142df..58b24fdb 100644 --- a/packages/amqp/lib/AbstractAmqpService.ts +++ b/packages/amqp/lib/AbstractAmqpService.ts @@ -1,4 +1,5 @@ import type { + CommonCreationConfigType, QueueConsumerDependencies, QueueDependencies, QueueOptions, @@ -8,7 +9,6 @@ import type { Channel, Connection, Message } from 'amqplib' import type { Options } from 'amqplib/properties' import type { AmqpConnectionManager, ConnectionReceiver } from './AmqpConnectionManager' -import { deleteAmqpQueue } from './utils/amqpQueueUtils' export type AMQPDependencies = QueueDependencies & { amqpConnectionManager: AmqpConnectionManager @@ -17,34 +17,44 @@ export type AMQPDependencies = QueueDependencies & { export type AMQPConsumerDependencies = AMQPDependencies & QueueConsumerDependencies export type AMQPQueueConfig = Options.AssertQueue -export type AMQPCreationConfig = { +export type AMQPQueueCreationConfig = { queueOptions: AMQPQueueConfig queueName: string updateAttributesIfExists?: boolean } -export type AMQPSubscriptionConfig = { +export type AMQPTopicCreationConfig = AMQPQueueCreationConfig & { exchange: string - routingKey: string + topicPattern: string } -export type AMQPLocator = { +export type AMQPTopicPublisherConfig = { + exchange: string +} & CommonCreationConfigType + +export type AMQPQueueLocator = { queueName: string } +export type AMQPTopicLocator = AMQPQueueLocator & { + exchange: string +} + export abstract class AbstractAmqpService< MessagePayloadType extends object, DependenciesType extends AMQPDependencies = AMQPDependencies, ExecutionContext = unknown, PrehandlerOutput = unknown, + CreationConfig extends CommonCreationConfigType = AMQPQueueCreationConfig, + LocatorConfig extends object = AMQPQueueLocator, > extends AbstractQueueService< MessagePayloadType, Message, DependenciesType, - AMQPCreationConfig, - AMQPLocator, - QueueOptions, + CreationConfig, + LocatorConfig, + QueueOptions, ExecutionContext, PrehandlerOutput > @@ -55,17 +65,13 @@ export abstract class AbstractAmqpService< // @ts-ignore protected channel: Channel private isShuttingDown: boolean - protected readonly queueName: string constructor( dependencies: DependenciesType, - options: QueueOptions, + options: QueueOptions, ) { super(dependencies, options) - this.queueName = options.locatorConfig - ? options.locatorConfig.queueName - : options.creationConfig?.queueName this.isShuttingDown = false this.connectionManager = dependencies.amqpConnectionManager this.connection = this.connectionManager.getConnectionSync() @@ -98,10 +104,6 @@ export abstract class AbstractAmqpService< this.isShuttingDown = false } - if (this.deletionConfig && this.creationConfig) { - await deleteAmqpQueue(this.channel, this.deletionConfig, this.creationConfig) - } - this.channel.on('close', () => { if (!this.isShuttingDown) { this.logger.error(`AMQP connection lost!`) diff --git a/packages/amqp/lib/AbstractAmqpTopicConsumer.ts b/packages/amqp/lib/AbstractAmqpTopicConsumer.ts new file mode 100644 index 00000000..beec58a5 --- /dev/null +++ b/packages/amqp/lib/AbstractAmqpTopicConsumer.ts @@ -0,0 +1,24 @@ +import { AbstractAmqpConsumer } from './AbstractAmqpConsumer' +import type { AMQPTopicCreationConfig, AMQPTopicLocator } from './AbstractAmqpService' +import { ensureAmqpTopicSubscription } from './utils/amqpQueueUtils' + +export class AbstractAmqpTopicConsumer< + MessagePayloadType extends object, + ExecutionContext, + PrehandlerOutput = undefined, +> extends AbstractAmqpConsumer< + MessagePayloadType, + ExecutionContext, + PrehandlerOutput, + AMQPTopicCreationConfig, + AMQPTopicLocator +> { + protected override createMissingEntities(): Promise { + return ensureAmqpTopicSubscription( + this.connection!, + this.channel, + this.creationConfig, + this.locatorConfig, + ) + } +} diff --git a/packages/amqp/lib/AbstractAmqpTopicPublisher.ts b/packages/amqp/lib/AbstractAmqpTopicPublisher.ts new file mode 100644 index 00000000..b6c3b52c --- /dev/null +++ b/packages/amqp/lib/AbstractAmqpTopicPublisher.ts @@ -0,0 +1,57 @@ +import type * as Buffer from 'node:buffer' + +import type { Options } from 'amqplib/properties' + +import type { AMQPPublisherOptions } from './AbstractAmqpPublisher' +import { AbstractAmqpPublisher } from './AbstractAmqpPublisher' +import type { + AMQPDependencies, + AMQPTopicCreationConfig, + AMQPTopicLocator, + AMQPTopicPublisherConfig, +} from './AbstractAmqpService' +import { ensureExchange } from './utils/amqpQueueUtils' + +export type AMQPTopicPublisherOptions = Omit< + AMQPPublisherOptions, + 'creationConfig' +> & { + exchange: string +} + +export type AmqpTopicMessageOptions = { + routingKey: string + publishOptions?: Options.Publish +} + +export abstract class AbstractAmqpTopicPublisher< + MessagePayloadType extends object, +> extends AbstractAmqpPublisher< + MessagePayloadType, + AmqpTopicMessageOptions, + AMQPTopicPublisherConfig, + AMQPTopicPublisherConfig +> { + constructor( + dependencies: AMQPDependencies, + options: AMQPTopicPublisherOptions, + ) { + super(dependencies, { + ...options, + creationConfig: { + exchange: options.exchange, + updateAttributesIfExists: false, + }, + exchange: options.exchange, + locatorConfig: undefined, + }) + } + + protected publishInternal(message: Buffer, options: AmqpTopicMessageOptions): void { + this.channel.publish(this.exchange!, options.routingKey, message, options.publishOptions) + } + + protected override async createMissingEntities(): Promise { + await ensureExchange(this.connection!, this.channel, this.creationConfig, this.locatorConfig) + } +} diff --git a/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts b/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts index 6a5079ec..ffe9d270 100644 --- a/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts +++ b/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts @@ -1,7 +1,7 @@ import type { AwilixContainer } from 'awilix' import { beforeAll } from 'vitest' -import { FakeConsumer } from '../test/fakes/FakeConsumer' +import { FakeQueueConsumer } from '../test/fakes/FakeQueueConsumer' import { TEST_AMQP_CONFIG } from '../test/utils/testAmqpConfig' import { registerDependencies, TestEvents } from '../test/utils/testContext' import type { Dependencies } from '../test/utils/testContext' @@ -15,10 +15,10 @@ describe('AmqpQueuePublisherManager', () => { it('publishes to the correct queue', async () => { const { queuePublisherManager } = diContainer.cradle - const fakeConsumer = new FakeConsumer(diContainer.cradle, TestEvents.updated) + const fakeConsumer = new FakeQueueConsumer(diContainer.cradle, TestEvents.updated) await fakeConsumer.start() - const publishedMessage = queuePublisherManager.publishSync(FakeConsumer.QUEUE_NAME, { + const publishedMessage = queuePublisherManager.publishSync(FakeQueueConsumer.QUEUE_NAME, { type: 'entity.updated', payload: { updatedData: 'msg', diff --git a/packages/amqp/lib/AmqpQueuePublisherManager.ts b/packages/amqp/lib/AmqpQueuePublisherManager.ts index a5154184..3a162303 100644 --- a/packages/amqp/lib/AmqpQueuePublisherManager.ts +++ b/packages/amqp/lib/AmqpQueuePublisherManager.ts @@ -6,6 +6,7 @@ import type { MessageMetadataType, MessagePublishType, MessageSchemaType, + CommonCreationConfigType, } from '@message-queue-toolkit/core' import { AbstractPublisherManager } from '@message-queue-toolkit/core' import type z from 'zod' @@ -15,7 +16,11 @@ import type { AbstractAmqpQueuePublisher, AmqpQueueMessageOptions, } from './AbstractAmqpQueuePublisher' -import type { AMQPCreationConfig, AMQPDependencies, AMQPLocator } from './AbstractAmqpService' +import type { + AMQPQueueCreationConfig, + AMQPDependencies, + AMQPQueueLocator, +} from './AbstractAmqpService' import type { AmqpPublisherFactory } from './CommonAmqpPublisherFactory' import { CommonAmqpQueuePublisherFactory } from './CommonAmqpPublisherFactory' @@ -30,11 +35,21 @@ export type AmqpPublisherManagerDependencies, + PublisherType extends AbstractAmqpPublisher< + EventType, + MessageOptionsType, + CreationConfig, + LocatorConfig + >, MessageOptionsType, - PublisherOptionsType extends Omit, 'creationConfig'>, + PublisherOptionsType extends Omit< + AMQPPublisherOptions, + 'creationConfig' + >, EventType extends PublisherBaseEventType, MetadataType, + CreationConfig extends CommonCreationConfigType = AMQPQueueCreationConfig, + LocatorConfig extends object = AMQPQueueLocator, > = { metadataField?: string publisherFactory: AmqpPublisherFactory< @@ -45,10 +60,10 @@ export type AmqpPublisherManagerOptions< > metadataFiller: MetadataFiller newPublisherOptions: Omit< - AMQPPublisherOptions, + AMQPPublisherOptions, 'messageSchemas' | 'creationConfig' | 'locatorConfig' > & { - creationConfig?: Omit + creationConfig?: Omit } } @@ -67,11 +82,15 @@ export class AmqpQueuePublisherManager< NonNullable, AbstractAmqpQueuePublisher>, AMQPDependencies, - AMQPCreationConfig, - AMQPLocator, + AMQPQueueCreationConfig, + AMQPQueueLocator, AmqpMessageSchemaType, Omit< - AMQPPublisherOptions>, + AMQPPublisherOptions< + z.infer, + AMQPQueueCreationConfig, + AMQPQueueLocator + >, 'messageSchemas' | 'creationConfig' | 'locatorConfig' >, SupportedEventDefinitions, @@ -83,7 +102,11 @@ export class AmqpQueuePublisherManager< options: AmqpPublisherManagerOptions< T, AmqpQueueMessageOptions, - AMQPPublisherOptions>, + AMQPPublisherOptions< + z.infer, + AMQPQueueCreationConfig, + AMQPQueueLocator + >, z.infer, MetadataType >, @@ -105,7 +128,7 @@ export class AmqpQueuePublisherManager< protected override resolveCreationConfig( queueName: NonNullable, - ): AMQPCreationConfig { + ): AMQPQueueCreationConfig { return { ...this.newPublisherOptions, queueOptions: {}, @@ -113,6 +136,9 @@ export class AmqpQueuePublisherManager< } } + /** + * @deprecated use `publishSync` instead. + */ publish(): Promise> { throw new Error('Please use `publishSync` method for AMQP publisher managers') } diff --git a/packages/amqp/lib/AmqpTopicPublisherManager.spec.ts b/packages/amqp/lib/AmqpTopicPublisherManager.spec.ts new file mode 100644 index 00000000..d284dfca --- /dev/null +++ b/packages/amqp/lib/AmqpTopicPublisherManager.spec.ts @@ -0,0 +1,56 @@ +import type { AwilixContainer } from 'awilix' +import { beforeAll } from 'vitest' + +import { FakeTopicConsumer } from '../test/fakes/FakeTopicConsumer' +import { TEST_AMQP_CONFIG } from '../test/utils/testAmqpConfig' +import { registerDependencies, TestEvents } from '../test/utils/testContext' +import type { Dependencies } from '../test/utils/testContext' + +describe('AmqpTopicPublisherManager', () => { + describe('publish', () => { + let diContainer: AwilixContainer + beforeAll(async () => { + diContainer = await registerDependencies(TEST_AMQP_CONFIG) + }) + + it('publishes to the correct queues', async () => { + const { topicPublisherManager } = diContainer.cradle + const fakeConsumer = new FakeTopicConsumer(diContainer.cradle, TestEvents.updatedPubSub, { + queueName: 'queue1', + topicPattern: 'topic1', + }) + await fakeConsumer.start() + const fakeConsumer2 = new FakeTopicConsumer(diContainer.cradle, TestEvents.updatedPubSub, { + queueName: 'queue2', + topicPattern: 'topic2', + }) + await fakeConsumer2.start() + const fakeConsumer3 = new FakeTopicConsumer(diContainer.cradle, TestEvents.updatedPubSub, { + queueName: 'queue3', + topicPattern: 'topic1', + }) + await fakeConsumer3.start() + + const publishedMessage = topicPublisherManager.publishSync( + TestEvents.updatedPubSub.exchange, + { + type: 'entity.updated', + payload: { + updatedData: 'msg', + }, + }, + { + routingKey: 'topic1', + }, + ) + + const result = await fakeConsumer.handlerSpy.waitForMessageWithId(publishedMessage.id) + const result2 = await fakeConsumer3.handlerSpy.waitForMessageWithId(publishedMessage.id) + expect(result.processingResult).toBe('consumed') + expect(result2.processingResult).toBe('consumed') + expect(fakeConsumer.messageCounter).toEqual(1) + expect(fakeConsumer2.messageCounter).toEqual(0) + expect(fakeConsumer3.messageCounter).toEqual(1) + }) + }) +}) diff --git a/packages/amqp/lib/AmqpExchangePublisherManager.ts b/packages/amqp/lib/AmqpTopicPublisherManager.ts similarity index 52% rename from packages/amqp/lib/AmqpExchangePublisherManager.ts rename to packages/amqp/lib/AmqpTopicPublisherManager.ts index 9ce90609..95308cb5 100644 --- a/packages/amqp/lib/AmqpExchangePublisherManager.ts +++ b/packages/amqp/lib/AmqpTopicPublisherManager.ts @@ -6,23 +6,22 @@ import type { } from '@message-queue-toolkit/core' import type z from 'zod' +import type { AMQPDependencies, AMQPTopicPublisherConfig } from './AbstractAmqpService' import type { - AbstractAmqpExchangePublisher, - AmqpExchangeMessageOptions, - AMQPExchangePublisherOptions, -} from './AbstractAmqpExchangePublisher' -import type { AmqpQueueMessageOptions } from './AbstractAmqpQueuePublisher' -import type { AMQPCreationConfig, AMQPDependencies, AMQPLocator } from './AbstractAmqpService' + AbstractAmqpTopicPublisher, + AmqpTopicMessageOptions, + AMQPTopicPublisherOptions, +} from './AbstractAmqpTopicPublisher' import type { AmqpAwareEventDefinition, AmqpMessageSchemaType, AmqpPublisherManagerDependencies, AmqpPublisherManagerOptions, } from './AmqpQueuePublisherManager' -import { CommonAmqpExchangePublisherFactory } from './CommonAmqpPublisherFactory' +import { CommonAmqpTopicPublisherFactory } from './CommonAmqpPublisherFactory' -export class AmqpExchangePublisherManager< - T extends AbstractAmqpExchangePublisher< +export class AmqpTopicPublisherManager< + PublisherType extends AbstractAmqpTopicPublisher< z.infer >, SupportedEventDefinitions extends AmqpAwareEventDefinition[], @@ -30,27 +29,29 @@ export class AmqpExchangePublisherManager< > extends AbstractPublisherManager< AmqpAwareEventDefinition, NonNullable, - AbstractAmqpExchangePublisher>, + AbstractAmqpTopicPublisher>, AMQPDependencies, - AMQPCreationConfig, - AMQPLocator, + AMQPTopicPublisherConfig, + AMQPTopicPublisherConfig, AmqpMessageSchemaType, Omit< - AMQPExchangePublisherOptions>, + AMQPTopicPublisherOptions>, 'messageSchemas' | 'locatorConfig' | 'exchange' >, SupportedEventDefinitions, MetadataType, - AmqpExchangeMessageOptions + AmqpTopicMessageOptions > { constructor( dependencies: AmqpPublisherManagerDependencies, options: AmqpPublisherManagerOptions< - T, - AmqpQueueMessageOptions, - AMQPExchangePublisherOptions>, + PublisherType, + AmqpTopicMessageOptions, + AMQPTopicPublisherOptions>, z.infer, - MetadataType + MetadataType, + AMQPTopicPublisherConfig, + AMQPTopicPublisherConfig >, ) { super({ @@ -64,7 +65,7 @@ export class AmqpExchangePublisherManager< logger: dependencies.logger, errorReporter: dependencies.errorReporter, }, - publisherFactory: options.publisherFactory ?? new CommonAmqpExchangePublisherFactory(), + publisherFactory: options.publisherFactory ?? new CommonAmqpTopicPublisherFactory(), }) } @@ -72,7 +73,7 @@ export class AmqpExchangePublisherManager< exchange: string, ): Partial< Omit< - AMQPExchangePublisherOptions>, + AMQPTopicPublisherOptions>, 'messageSchemas' | 'locatorConfig' > > { @@ -82,27 +83,42 @@ export class AmqpExchangePublisherManager< } protected override resolveCreationConfig( - queueName: NonNullable, - ): AMQPCreationConfig { + exchange: NonNullable, + ): AMQPTopicPublisherConfig { return { ...this.newPublisherOptions, - queueOptions: {}, - queueName, + exchange, + updateAttributesIfExists: false, } } - publish( - eventTarget: NonNullable, + /** + * @deprecated use `publishSync` instead. + */ + publish(): Promise> { + throw new Error('Please use `publishSync` method for AMQP publisher managers') + } + + publishSync( + exchange: NonNullable, message: MessagePublishType, + messageOptions: AmqpTopicMessageOptions, precedingEventMetadata?: Partial, - messageOptions?: AmqpExchangeMessageOptions, - ): Promise> { - return super.publish(eventTarget, message, precedingEventMetadata, messageOptions) + ): MessageSchemaType { + const publisher = this.targetToPublisherMap[exchange] + if (!publisher) { + throw new Error(`No publisher for exchange ${exchange}`) + } + + const messageDefinition = this.resolveMessageDefinition(exchange, message) + const resolvedMessage = this.resolveMessage(messageDefinition, message, precedingEventMetadata) + publisher.publish(resolvedMessage, messageOptions) + return resolvedMessage } protected override resolveEventTarget( event: AmqpAwareEventDefinition, ): NonNullable | undefined { - return event.queueName + return event.exchange } } diff --git a/packages/amqp/lib/CommonAmqpPublisherFactory.ts b/packages/amqp/lib/CommonAmqpPublisherFactory.ts index 710ffcc2..897e0aff 100644 --- a/packages/amqp/lib/CommonAmqpPublisherFactory.ts +++ b/packages/amqp/lib/CommonAmqpPublisherFactory.ts @@ -1,20 +1,27 @@ -import type { PublisherBaseEventType } from '@message-queue-toolkit/core' +import type { CommonCreationConfigType, PublisherBaseEventType } from '@message-queue-toolkit/core' -import type { - AmqpExchangeMessageOptions, - AMQPExchangePublisherOptions, -} from './AbstractAmqpExchangePublisher' -import { AbstractAmqpExchangePublisher } from './AbstractAmqpExchangePublisher' import type { AbstractAmqpPublisher, AMQPPublisherOptions } from './AbstractAmqpPublisher' import type { AmqpQueueMessageOptions } from './AbstractAmqpQueuePublisher' import { AbstractAmqpQueuePublisher } from './AbstractAmqpQueuePublisher' -import type { AMQPDependencies } from './AbstractAmqpService' +import type { + AMQPDependencies, + AMQPQueueCreationConfig, + AMQPQueueLocator, +} from './AbstractAmqpService' +import { AbstractAmqpTopicPublisher } from './AbstractAmqpTopicPublisher' +import type { + AmqpTopicMessageOptions, + AMQPTopicPublisherOptions, +} from './AbstractAmqpTopicPublisher' export type AmqpPublisherFactory< - T extends AbstractAmqpPublisher, + T extends AbstractAmqpPublisher, M extends PublisherBaseEventType, MessageOptions, - PublisherOptions extends Omit, 'creationConfig'>, + PublisherOptions extends Omit< + AMQPPublisherOptions, + 'creationConfig' + >, > = { buildPublisher(dependencies: AMQPDependencies, options: PublisherOptions): T } @@ -23,9 +30,9 @@ export class CommonAmqpQueuePublisher< M extends PublisherBaseEventType = PublisherBaseEventType, > extends AbstractAmqpQueuePublisher {} -export class CommonAmqpExchangePublisher< +export class CommonAmqpTopicPublisher< M extends PublisherBaseEventType = PublisherBaseEventType, -> extends AbstractAmqpExchangePublisher {} +> extends AbstractAmqpTopicPublisher {} export class CommonAmqpQueuePublisherFactory< M extends PublisherBaseEventType = PublisherBaseEventType, @@ -34,31 +41,31 @@ export class CommonAmqpQueuePublisherFactory< CommonAmqpQueuePublisher, M, AmqpQueueMessageOptions, - AMQPPublisherOptions + AMQPPublisherOptions > { buildPublisher( dependencies: AMQPDependencies, - options: AMQPPublisherOptions, + options: AMQPPublisherOptions, ): CommonAmqpQueuePublisher { return new CommonAmqpQueuePublisher(dependencies, options) } } -export class CommonAmqpExchangePublisherFactory< +export class CommonAmqpTopicPublisherFactory< M extends PublisherBaseEventType = PublisherBaseEventType, > implements AmqpPublisherFactory< - CommonAmqpExchangePublisher, + CommonAmqpTopicPublisher, M, - AmqpExchangeMessageOptions, - AMQPExchangePublisherOptions + AmqpTopicMessageOptions, + AMQPTopicPublisherOptions > { buildPublisher( dependencies: AMQPDependencies, - options: AMQPExchangePublisherOptions, - ): CommonAmqpExchangePublisher { - return new CommonAmqpExchangePublisher(dependencies, options) + options: AMQPTopicPublisherOptions, + ): CommonAmqpTopicPublisher { + return new CommonAmqpTopicPublisher(dependencies, options) } } diff --git a/packages/amqp/lib/utils/amqpQueueUtils.ts b/packages/amqp/lib/utils/amqpQueueUtils.ts index f3e54759..b056ae00 100644 --- a/packages/amqp/lib/utils/amqpQueueUtils.ts +++ b/packages/amqp/lib/utils/amqpQueueUtils.ts @@ -2,9 +2,18 @@ import type { DeletionConfig } from '@message-queue-toolkit/core' import { isProduction } from '@message-queue-toolkit/core' import type { Channel, Connection } from 'amqplib' -import type { AMQPCreationConfig, AMQPLocator } from '../AbstractAmqpService' +import type { + AMQPQueueCreationConfig, + AMQPQueueLocator, + AMQPTopicCreationConfig, + AMQPTopicLocator, + AMQPTopicPublisherConfig, +} from '../AbstractAmqpService' -export async function checkQueueExists(connection: Connection, locatorConfig: AMQPLocator) { +export async function checkQueueExists( + connection: Connection, + locatorConfig: AMQPQueueLocator, +): Promise { // queue check breaks channel if not successful const checkChannel = await connection.createChannel() checkChannel.on('error', () => { @@ -18,12 +27,29 @@ export async function checkQueueExists(connection: Connection, locatorConfig: AM } } +export async function checkExchangeExists( + connection: Connection, + locatorConfig: AMQPTopicPublisherConfig, +): Promise { + // exchange check breaks channel if not successful + const checkChannel = await connection.createChannel() + checkChannel.on('error', () => { + // it's OK + }) + try { + await checkChannel.checkExchange(locatorConfig.exchange) + await checkChannel.close() + } catch (err) { + throw new Error(`Exchange ${locatorConfig.exchange} does not exist.`) + } +} + export async function ensureAmqpQueue( connection: Connection, channel: Channel, - creationConfig?: AMQPCreationConfig, - locatorConfig?: AMQPLocator, -) { + creationConfig?: AMQPQueueCreationConfig, + locatorConfig?: AMQPQueueLocator, +): Promise { if (creationConfig) { await channel.assertQueue(creationConfig.queueName, creationConfig.queueOptions) } else { @@ -34,10 +60,49 @@ export async function ensureAmqpQueue( } } +export async function ensureAmqpTopicSubscription( + connection: Connection, + channel: Channel, + creationConfig?: AMQPTopicCreationConfig, + locatorConfig?: AMQPTopicLocator, +): Promise { + await ensureAmqpQueue(connection, channel, creationConfig, locatorConfig) + + if (creationConfig) { + await channel.assertExchange(creationConfig.exchange, 'topic') + await channel.bindQueue( + creationConfig.queueName, + creationConfig.exchange, + creationConfig.topicPattern, + ) + } else { + if (!locatorConfig) { + throw new Error('locatorConfig is mandatory when creationConfig is not set') + } + await checkExchangeExists(connection, locatorConfig) + } +} + +export async function ensureExchange( + connection: Connection, + channel: Channel, + creationConfig?: AMQPTopicPublisherConfig, + locatorConfig?: AMQPTopicPublisherConfig, +): Promise { + if (creationConfig) { + await channel.assertExchange(creationConfig.exchange, 'topic') + } else { + if (!locatorConfig) { + throw new Error('locatorConfig is mandatory when creationConfig is not set') + } + await checkExchangeExists(connection, locatorConfig) + } +} + export async function deleteAmqpQueue( channel: Channel, deletionConfig: DeletionConfig, - creationConfig: AMQPCreationConfig, + creationConfig: AMQPQueueCreationConfig, ) { if (!deletionConfig.deleteIfExists) { return diff --git a/packages/amqp/test/fakes/FakeConsumer.ts b/packages/amqp/test/fakes/FakeQueueConsumer.ts similarity index 92% rename from packages/amqp/test/fakes/FakeConsumer.ts rename to packages/amqp/test/fakes/FakeQueueConsumer.ts index 12fc5b20..18d95d1e 100644 --- a/packages/amqp/test/fakes/FakeConsumer.ts +++ b/packages/amqp/test/fakes/FakeQueueConsumer.ts @@ -5,7 +5,7 @@ import { AbstractAmqpQueueConsumer } from '../../lib/AbstractAmqpQueueConsumer' import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService' import type { AmqpAwareEventDefinition } from '../../lib/AmqpQueuePublisherManager' -export class FakeConsumer extends AbstractAmqpQueueConsumer { +export class FakeQueueConsumer extends AbstractAmqpQueueConsumer { public static readonly QUEUE_NAME = 'dummy-queue' constructor(dependencies: AMQPConsumerDependencies, eventDefinition: AmqpAwareEventDefinition) { super( diff --git a/packages/amqp/test/fakes/FakeTopicConsumer.ts b/packages/amqp/test/fakes/FakeTopicConsumer.ts new file mode 100644 index 00000000..ab73a973 --- /dev/null +++ b/packages/amqp/test/fakes/FakeTopicConsumer.ts @@ -0,0 +1,52 @@ +import type { BaseMessageType } from '@message-queue-toolkit/core' +import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' + +import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService' +import { AbstractAmqpTopicConsumer } from '../../lib/AbstractAmqpTopicConsumer' +import type { AmqpAwareEventDefinition } from '../../lib/AmqpQueuePublisherManager' + +export class FakeTopicConsumer extends AbstractAmqpTopicConsumer { + public messageCounter = 0 + + constructor( + dependencies: AMQPConsumerDependencies, + eventDefinition: AmqpAwareEventDefinition, + options: { + queueName: string + topicPattern: string + }, + ) { + if (!eventDefinition.exchange) { + throw new Error( + `No exchange defined for event ${eventDefinition.consumerSchema.shape.type.value}`, + ) + } + + super( + dependencies, + { + creationConfig: { + queueName: options.queueName, + queueOptions: { + durable: true, + autoDelete: false, + }, + topicPattern: options.topicPattern, + exchange: eventDefinition.exchange, + }, + deletionConfig: { + deleteIfExists: true, + }, + handlerSpy: true, + messageTypeField: 'type', + handlers: new MessageHandlerConfigBuilder() + .addConfig(eventDefinition.consumerSchema, () => { + this.messageCounter++ + return Promise.resolve({ result: 'success' }) + }) + .build(), + }, + {}, + ) + } +} diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.ts index a33e7f81..f7994cec 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.ts @@ -1,6 +1,10 @@ import type { AMQPPublisherOptions } from '../../lib/AbstractAmqpPublisher' import { AbstractAmqpQueuePublisher } from '../../lib/AbstractAmqpQueuePublisher' -import type { AMQPDependencies } from '../../lib/AbstractAmqpService' +import type { + AMQPDependencies, + AMQPQueueCreationConfig, + AMQPQueueLocator, +} from '../../lib/AbstractAmqpService' import type { PERMISSIONS_ADD_MESSAGE_TYPE, PERMISSIONS_REMOVE_MESSAGE_TYPE, @@ -18,7 +22,7 @@ export class AmqpPermissionPublisher extends AbstractAmqpQueuePublisher, + AMQPPublisherOptions, 'creationConfig' | 'logMessages' | 'locatorConfig' > = { creationConfig: { diff --git a/packages/amqp/test/utils/testContext.ts b/packages/amqp/test/utils/testContext.ts index 25aa0765..6a4f00f3 100644 --- a/packages/amqp/test/utils/testContext.ts +++ b/packages/amqp/test/utils/testContext.ts @@ -13,12 +13,19 @@ import { z } from 'zod' import { AmqpConnectionManager } from '../../lib/AmqpConnectionManager' import type { AmqpAwareEventDefinition } from '../../lib/AmqpQueuePublisherManager' import { AmqpQueuePublisherManager } from '../../lib/AmqpQueuePublisherManager' -import type { CommonAmqpQueuePublisher } from '../../lib/CommonAmqpPublisherFactory' -import { CommonAmqpQueuePublisherFactory } from '../../lib/CommonAmqpPublisherFactory' +import { AmqpTopicPublisherManager } from '../../lib/AmqpTopicPublisherManager' +import type { + CommonAmqpQueuePublisher, + CommonAmqpTopicPublisher, +} from '../../lib/CommonAmqpPublisherFactory' +import { + CommonAmqpTopicPublisherFactory, + CommonAmqpQueuePublisherFactory, +} from '../../lib/CommonAmqpPublisherFactory' import type { AmqpConfig } from '../../lib/amqpConnectionResolver' import { AmqpConsumerErrorResolver } from '../../lib/errors/AmqpConsumerErrorResolver' import { AmqpPermissionConsumer } from '../consumers/AmqpPermissionConsumer' -import { FakeConsumer } from '../fakes/FakeConsumer' +import { FakeQueueConsumer } from '../fakes/FakeQueueConsumer' import { AmqpPermissionPublisher } from '../publishers/AmqpPermissionPublisher' export const SINGLETON_CONFIG = { lifetime: Lifetime.SINGLETON } @@ -34,7 +41,7 @@ export const TestEvents = { }), ), schemaVersion: '1.0.1', - queueName: FakeConsumer.QUEUE_NAME, + queueName: FakeQueueConsumer.QUEUE_NAME, }, updated: { @@ -44,7 +51,17 @@ export const TestEvents = { updatedData: z.string(), }), ), - queueName: FakeConsumer.QUEUE_NAME, + queueName: FakeQueueConsumer.QUEUE_NAME, + }, + + updatedPubSub: { + ...enrichMessageSchemaWithBase( + 'entity.updated', + z.object({ + updatedData: z.string(), + }), + ), + exchange: 'entity', }, } as const satisfies Record @@ -128,6 +145,25 @@ export async function registerDependencies( enabled: queuesEnabled, }, ), + topicPublisherManager: asFunction( + (dependencies) => { + return new AmqpTopicPublisherManager(dependencies, { + metadataFiller: new CommonMetadataFiller({ + serviceId: 'service', + }), + publisherFactory: new CommonAmqpTopicPublisherFactory(), + newPublisherOptions: { + handlerSpy: true, + messageIdField: 'id', + messageTypeField: 'type', + }, + }) + }, + { + lifetime: Lifetime.SINGLETON, + enabled: queuesEnabled, + }, + ), // vendor-specific dependencies transactionObservabilityManager: asFunction(() => { @@ -171,4 +207,9 @@ export interface Dependencies { CommonAmqpQueuePublisher, TestEventsType > + + topicPublisherManager: AmqpTopicPublisherManager< + CommonAmqpTopicPublisher, + TestEventsType + > } From e24a792e8c8ad2c98228fb26223139e96f121629 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Sat, 25 May 2024 18:49:55 +0300 Subject: [PATCH 2/2] fix tests --- packages/amqp/lib/AbstractAmqpPublisher.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index 303b5414..3b230215 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -1,5 +1,5 @@ -import type { Either } from '@lokalise/node-core' -import { InternalError } from '@lokalise/node-core' +import type { Either } from '@lokalise/node-core'; +import { copyWithoutUndefined , InternalError } from '@lokalise/node-core' import type { BarrierResult, CommonCreationConfigType, @@ -115,14 +115,14 @@ export abstract class AbstractAmqpPublisher< throw new InternalError({ message: `Error while publishing to AMQP ${(err as Error).message}`, errorCode: 'AMQP_PUBLISH_ERROR', - details: { + details: copyWithoutUndefined({ publisher: this.constructor.name, // @ts-ignore queueName: this.queueName, exchange: this.exchange, // @ts-ignore messageType: message[this.messageTypeField] ?? 'unknown', - }, + }), cause: err as Error, }) }