diff --git a/packages/amqp/index.ts b/packages/amqp/index.ts index 7d1fae09..207f87d4 100644 --- a/packages/amqp/index.ts +++ b/packages/amqp/index.ts @@ -3,11 +3,14 @@ export type { AMQPQueueConfig } from './lib/AbstractAmqpService' export { AbstractAmqpConsumer, AMQPConsumerOptions } from './lib/AbstractAmqpConsumer' export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver' -export { AbstractAmqpPublisher, AMQPPublisherOptions } from './lib/AbstractAmqpPublisher' - export type { AmqpConfig } from './lib/amqpConnectionResolver' export { resolveAmqpConnection } from './lib/amqpConnectionResolver' export { AmqpConnectionManager } from './lib/AmqpConnectionManager' export type { ConnectionReceiver } from './lib/AmqpConnectionManager' export { deserializeAmqpMessage } from './lib/amqpMessageDeserializer' + +export * from './lib/AbstractAmqpQueuePublisher' +export * from './lib/AbstractAmqpExchangePublisher' +export * from './lib/AmqpExchangePublisherManager' +export * from './lib/AmqpQueuePublisherManager' diff --git a/packages/amqp/lib/AbstractAmqpExchangePublisher.ts b/packages/amqp/lib/AbstractAmqpExchangePublisher.ts new file mode 100644 index 00000000..04e27e19 --- /dev/null +++ b/packages/amqp/lib/AbstractAmqpExchangePublisher.ts @@ -0,0 +1,50 @@ +import type * as Buffer from 'node:buffer' + +import { objectToBuffer } from '@message-queue-toolkit/core' +import type { Options } from 'amqplib/properties' + +import type { AMQPPublisherOptions } from './AbstractAmqpPublisher' +import { AbstractAmqpPublisher } from './AbstractAmqpPublisher' +import type { AMQPDependencies } from './AbstractAmqpService' + +export type AMQPExchangePublisherOptions = Omit< + AMQPPublisherOptions, + 'creationConfig' +> & { + exchange: string +} + +export type AmqpExchangeMessageOptions = { + routingKey: string + publishOptions: Options.Publish +} + +export abstract class AbstractAmqpExchangePublisher< + MessagePayloadType extends object, +> extends AbstractAmqpPublisher { + constructor( + dependencies: AMQPDependencies, + options: AMQPExchangePublisherOptions, + ) { + super(dependencies, { + ...options, + // FixMe exchange publisher doesn't need queue at all + creationConfig: { + queueName: 'dummy', + queueOptions: {}, + updateAttributesIfExists: false, + }, + exchange: options.exchange, + locatorConfig: undefined, + }) + } + + protected publishInternal(message: Buffer, options: AmqpExchangeMessageOptions): void { + this.channel.publish( + this.exchange!, + options.routingKey, + objectToBuffer(message), + options.publishOptions, + ) + } +} diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index fe9914c0..8aedaa43 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -17,13 +17,16 @@ export type AMQPPublisherOptions = QueuePubli AMQPCreationConfig, AMQPLocator, MessagePayloadType -> +> & { + exchange?: string +} -export abstract class AbstractAmqpPublisher +export abstract class AbstractAmqpPublisher extends AbstractAmqpService - implements SyncPublisher + implements SyncPublisher { private readonly messageSchemaContainer: MessageSchemaContainer + protected readonly exchange?: string private initPromise?: Promise @@ -35,9 +38,10 @@ export abstract class AbstractAmqpPublisher messageSchemas, messageTypeField: options.messageTypeField, }) + this.exchange = options.exchange } - publish(message: MessagePayloadType): void { + publish(message: MessagePayloadType, options: MessageOptionsType): void { const resolveSchemaResult = this.resolveSchema(message) if (resolveSchemaResult.error) { throw resolveSchemaResult.error @@ -57,7 +61,7 @@ export abstract class AbstractAmqpPublisher */ this.initPromise .then(() => { - this.publish(message) + this.publish(message, options) }) .catch((err) => { this.handleError(err) @@ -82,7 +86,7 @@ export abstract class AbstractAmqpPublisher } try { - this.channel.sendToQueue(this.queueName, objectToBuffer(message)) + this.publishInternal(objectToBuffer(message), options) } catch (err) { // Unfortunately, reliable retry mechanism can't be implemented with try-catch block, // as not all failures end up here. If connection is closed programmatically, it works fine, @@ -107,6 +111,8 @@ export abstract class AbstractAmqpPublisher } } + protected abstract publishInternal(message: Buffer, options: MessageOptionsType): void + protected override resolveSchema( message: MessagePayloadType, ): Either> { diff --git a/packages/amqp/lib/AbstractAmqpQueuePublisher.ts b/packages/amqp/lib/AbstractAmqpQueuePublisher.ts new file mode 100644 index 00000000..6475abcc --- /dev/null +++ b/packages/amqp/lib/AbstractAmqpQueuePublisher.ts @@ -0,0 +1,23 @@ +import type { Options } from 'amqplib/properties' + +import { AbstractAmqpPublisher } from './AbstractAmqpPublisher' + +export type AmqpQueueMessageOptions = { + publishOptions: Options.Publish +} + +const NO_PARAMS: AmqpQueueMessageOptions = { + publishOptions: {}, +} + +export abstract class AbstractAmqpQueuePublisher< + MessagePayloadType extends object, +> extends AbstractAmqpPublisher { + protected publishInternal(message: Buffer, options: AmqpQueueMessageOptions): void { + this.channel.sendToQueue(this.queueName, message, options.publishOptions) + } + + publish(message: MessagePayloadType, options: AmqpQueueMessageOptions = NO_PARAMS) { + super.publish(message, options) + } +} diff --git a/packages/amqp/lib/AbstractAmqpService.ts b/packages/amqp/lib/AbstractAmqpService.ts index bbd92f72..62c4942a 100644 --- a/packages/amqp/lib/AbstractAmqpService.ts +++ b/packages/amqp/lib/AbstractAmqpService.ts @@ -23,6 +23,11 @@ export type AMQPCreationConfig = { updateAttributesIfExists?: boolean } +export type AMQPSubscriptionConfig = { + exchange: string + routingKey: string +} + export type AMQPLocator = { queueName: string } diff --git a/packages/amqp/lib/AmqpExchangePublisherManager.ts b/packages/amqp/lib/AmqpExchangePublisherManager.ts new file mode 100644 index 00000000..5e221512 --- /dev/null +++ b/packages/amqp/lib/AmqpExchangePublisherManager.ts @@ -0,0 +1,93 @@ +import { AbstractPublisherManager } from '@message-queue-toolkit/core' +import type { MessageMetadataType } from '@message-queue-toolkit/core/lib/messages/baseMessageSchemas' +import type { TypeOf } from 'zod' +import type z from 'zod' + +import type { + AbstractAmqpExchangePublisher, + AMQPExchangePublisherOptions, +} from './AbstractAmqpExchangePublisher' +import type { AmqpQueueMessageOptions } from './AbstractAmqpQueuePublisher' +import type { AMQPCreationConfig, AMQPDependencies, AMQPLocator } from './AbstractAmqpService' +import type { + AmqpAwareEventDefinition, + AmqpMessageSchemaType, + AmqpPublisherManagerDependencies, + AmqpPublisherManagerOptions, +} from './AmqpQueuePublisherManager' +import { CommonAmqpExchangePublisherFactory } from './CommonAmqpPublisherFactory' + +export class AmqpExchangePublisherManager< + T extends AbstractAmqpExchangePublisher>, + SupportedEventDefinitions extends AmqpAwareEventDefinition[], + MetadataType = MessageMetadataType, +> extends AbstractPublisherManager< + AmqpAwareEventDefinition, + NonNullable, + AbstractAmqpExchangePublisher>, + AMQPDependencies, + AMQPCreationConfig, + AMQPLocator, + AmqpMessageSchemaType, + Omit< + AMQPExchangePublisherOptions>, + 'messageSchemas' | 'locatorConfig' | 'exchange' + >, + SupportedEventDefinitions, + MetadataType, + z.infer +> { + constructor( + dependencies: AmqpPublisherManagerDependencies, + options: AmqpPublisherManagerOptions< + T, + AmqpQueueMessageOptions, + AMQPExchangePublisherOptions>, + z.infer, + MetadataType + >, + ) { + super({ + isAsync: false, + eventRegistry: dependencies.eventRegistry, + metadataField: options.metadataField ?? 'metadata', + metadataFiller: options.metadataFiller, + newPublisherOptions: options.newPublisherOptions, + publisherDependencies: { + amqpConnectionManager: dependencies.amqpConnectionManager, + logger: dependencies.logger, + errorReporter: dependencies.errorReporter, + }, + publisherFactory: options.publisherFactory ?? new CommonAmqpExchangePublisherFactory(), + }) + } + + protected resolvePublisherConfigOverrides( + exchange: string, + ): Partial< + Omit< + AMQPExchangePublisherOptions>, + 'messageSchemas' | 'locatorConfig' + > + > { + return { + exchange, + } + } + + protected override resolveCreationConfig( + queueName: NonNullable, + ): AMQPCreationConfig { + return { + ...this.newPublisherOptions, + queueOptions: {}, + queueName, + } + } + + protected override resolveEventTarget( + event: AmqpAwareEventDefinition, + ): NonNullable | undefined { + return event.queueName + } +} diff --git a/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts b/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts new file mode 100644 index 00000000..447d42e1 --- /dev/null +++ b/packages/amqp/lib/AmqpQueuePublisherManager.spec.ts @@ -0,0 +1,34 @@ +import type { AwilixContainer } from 'awilix' +import { beforeAll } from 'vitest' + +import { FakeConsumer } from '../test/fakes/FakeConsumer' +import { TEST_AMQP_CONFIG } from '../test/utils/testAmqpConfig' +import { registerDependencies, TestEvents } from '../test/utils/testContext' +import type { Dependencies } from '../test/utils/testContext' + +describe('AmqpQueuePublisherManager', () => { + describe('publish', () => { + let diContainer: AwilixContainer + beforeAll(async () => { + diContainer = await registerDependencies(TEST_AMQP_CONFIG) + }) + + it('publishes to the correct queue', async () => { + const { queuePublisherManager } = diContainer.cradle + const fakeConsumer = new FakeConsumer(diContainer.cradle, TestEvents.updated) + await fakeConsumer.start() + + const publishedMessage = await queuePublisherManager.publish(FakeConsumer.QUEUE_NAME, { + ...queuePublisherManager.resolveBaseFields(), + type: 'entity.updated', + payload: { + updatedData: 'msg', + }, + }) + + const result = await fakeConsumer.handlerSpy.waitForMessageWithId(publishedMessage.id) + + expect(result.processingResult).toBe('consumed') + }) + }) +}) diff --git a/packages/amqp/lib/AmqpQueuePublisherManager.ts b/packages/amqp/lib/AmqpQueuePublisherManager.ts new file mode 100644 index 00000000..ad11c571 --- /dev/null +++ b/packages/amqp/lib/AmqpQueuePublisherManager.ts @@ -0,0 +1,115 @@ +import type { + BaseEventType, + CommonEventDefinition, + EventRegistry, + MetadataFiller, +} from '@message-queue-toolkit/core' +import { AbstractPublisherManager } from '@message-queue-toolkit/core' +import type { MessageMetadataType } from '@message-queue-toolkit/core/lib/messages/baseMessageSchemas' +import type z from 'zod' + +import type { AbstractAmqpPublisher, AMQPPublisherOptions } from './AbstractAmqpPublisher' +import type { + AbstractAmqpQueuePublisher, + AmqpQueueMessageOptions, +} from './AbstractAmqpQueuePublisher' +import type { AMQPCreationConfig, AMQPDependencies, AMQPLocator } from './AbstractAmqpService' +import type { AmqpPublisherFactory } from './CommonAmqpPublisherFactory' +import { CommonAmqpQueuePublisherFactory } from './CommonAmqpPublisherFactory' + +export type AmqpAwareEventDefinition = { + schemaVersion?: string + exchange?: string + queueName?: string +} & CommonEventDefinition + +export type AmqpPublisherManagerDependencies = { + eventRegistry: EventRegistry +} & AMQPDependencies + +export type AmqpPublisherManagerOptions< + PublisherType extends AbstractAmqpPublisher, + MessageOptionsType, + PublisherOptionsType extends Omit, 'creationConfig'>, + EventType extends BaseEventType, + MetadataType, +> = { + metadataField?: string + publisherFactory: AmqpPublisherFactory< + PublisherType, + EventType, + MessageOptionsType, + PublisherOptionsType + > + metadataFiller: MetadataFiller + newPublisherOptions: Omit< + AMQPPublisherOptions, + 'messageSchemas' | 'creationConfig' | 'locatorConfig' + > & { + creationConfig?: Omit + } +} + +export type AmqpMessageSchemaType = z.infer + +export class AmqpQueuePublisherManager< + T extends AbstractAmqpQueuePublisher>, + SupportedEventDefinitions extends AmqpAwareEventDefinition[], + MetadataType = MessageMetadataType, +> extends AbstractPublisherManager< + AmqpAwareEventDefinition, + NonNullable, + AbstractAmqpQueuePublisher>, + AMQPDependencies, + AMQPCreationConfig, + AMQPLocator, + AmqpMessageSchemaType, + Omit< + AMQPPublisherOptions>, + 'messageSchemas' | 'creationConfig' | 'locatorConfig' + >, + SupportedEventDefinitions, + MetadataType, + z.infer +> { + constructor( + dependencies: AmqpPublisherManagerDependencies, + options: AmqpPublisherManagerOptions< + T, + AmqpQueueMessageOptions, + AMQPPublisherOptions>, + z.infer, + MetadataType + >, + ) { + super({ + isAsync: false, + eventRegistry: dependencies.eventRegistry, + metadataField: options.metadataField ?? 'metadata', + metadataFiller: options.metadataFiller, + newPublisherOptions: options.newPublisherOptions, + publisherDependencies: { + amqpConnectionManager: dependencies.amqpConnectionManager, + logger: dependencies.logger, + errorReporter: dependencies.errorReporter, + }, + publisherFactory: options.publisherFactory ?? new CommonAmqpQueuePublisherFactory(), + }) + } + + protected override resolveCreationConfig( + queueName: NonNullable, + ): AMQPCreationConfig { + return { + ...this.newPublisherOptions, + queueOptions: {}, + queueName, + } + } + + protected override resolveEventTarget( + event: AmqpAwareEventDefinition, + ): NonNullable | undefined { + return event.queueName + } +} diff --git a/packages/amqp/lib/CommonAmqpPublisherFactory.ts b/packages/amqp/lib/CommonAmqpPublisherFactory.ts new file mode 100644 index 00000000..de940202 --- /dev/null +++ b/packages/amqp/lib/CommonAmqpPublisherFactory.ts @@ -0,0 +1,62 @@ +import type { BaseEventType } from '@message-queue-toolkit/core' + +import type { + AmqpExchangeMessageOptions, + AMQPExchangePublisherOptions, +} from './AbstractAmqpExchangePublisher' +import { AbstractAmqpExchangePublisher } from './AbstractAmqpExchangePublisher' +import type { AbstractAmqpPublisher, AMQPPublisherOptions } from './AbstractAmqpPublisher' +import type { AmqpQueueMessageOptions } from './AbstractAmqpQueuePublisher' +import { AbstractAmqpQueuePublisher } from './AbstractAmqpQueuePublisher' +import type { AMQPDependencies } from './AbstractAmqpService' + +export type AmqpPublisherFactory< + T extends AbstractAmqpPublisher, + M extends BaseEventType, + MessageOptions, + PublisherOptions extends Omit, 'creationConfig'>, +> = { + buildPublisher(dependencies: AMQPDependencies, options: PublisherOptions): T +} + +export class CommonAmqpQueuePublisher< + M extends BaseEventType = BaseEventType, +> extends AbstractAmqpQueuePublisher {} + +export class CommonAmqpExchangePublisher< + M extends BaseEventType = BaseEventType, +> extends AbstractAmqpExchangePublisher {} + +export class CommonAmqpQueuePublisherFactory + implements + AmqpPublisherFactory< + CommonAmqpQueuePublisher, + M, + AmqpQueueMessageOptions, + AMQPPublisherOptions + > +{ + buildPublisher( + dependencies: AMQPDependencies, + options: AMQPPublisherOptions, + ): CommonAmqpQueuePublisher { + return new CommonAmqpQueuePublisher(dependencies, options) + } +} + +export class CommonAmqpExchangePublisherFactory + implements + AmqpPublisherFactory< + CommonAmqpExchangePublisher, + M, + AmqpExchangeMessageOptions, + AMQPExchangePublisherOptions + > +{ + buildPublisher( + dependencies: AMQPDependencies, + options: AMQPExchangePublisherOptions, + ): CommonAmqpExchangePublisher { + return new CommonAmqpExchangePublisher(dependencies, options) + } +} diff --git a/packages/amqp/package.json b/packages/amqp/package.json index 8fa435d6..3dd7bfab 100644 --- a/packages/amqp/package.json +++ b/packages/amqp/package.json @@ -25,8 +25,8 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^9.17.0", - "zod": "^3.23.6" + "@lokalise/node-core": "^9.18.0", + "zod": "^3.23.8" }, "peerDependencies": { "@message-queue-toolkit/core": "^12.1.0", @@ -34,14 +34,14 @@ }, "devDependencies": { "@message-queue-toolkit/core": "*", - "@types/amqplib": "^0.10.4", - "@types/node": "^20.12.8", - "@typescript-eslint/eslint-plugin": "^7.7.1", - "@typescript-eslint/parser": "^7.7.1", + "@types/amqplib": "^0.10.5", + "@types/node": "^20.12.12", + "@typescript-eslint/eslint-plugin": "^7.9.0", + "@typescript-eslint/parser": "^7.9.0", "@vitest/coverage-v8": "^1.6.0", "amqplib": "^0.10.4", - "awilix": "^10.0.1", - "awilix-manager": "^5.1.0", + "awilix": "^10.0.2", + "awilix-manager": "^5.2.1", "del-cli": "^5.1.0", "eslint": "^8.57.0", "eslint-plugin-import": "^2.29.1", diff --git a/packages/amqp/test/fakes/CustomFakeConsumer.ts b/packages/amqp/test/fakes/CustomFakeConsumer.ts new file mode 100644 index 00000000..cf5a2264 --- /dev/null +++ b/packages/amqp/test/fakes/CustomFakeConsumer.ts @@ -0,0 +1,30 @@ +import type { BaseMessageType } from '@message-queue-toolkit/core' +import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' +import type { ZodSchema } from 'zod' + +import { AbstractAmqpConsumer } from '../../lib/AbstractAmqpConsumer' +import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService' + +export class CustomFakeConsumer extends AbstractAmqpConsumer { + public static readonly QUEUE_NAME = 'dummy-queue' + constructor(dependencies: AMQPConsumerDependencies, schema: ZodSchema) { + super( + dependencies, + { + creationConfig: { + queueName: CustomFakeConsumer.QUEUE_NAME, + queueOptions: { + durable: true, + autoDelete: false, + }, + }, + handlerSpy: true, + messageTypeField: 'messageType', + handlers: new MessageHandlerConfigBuilder() + .addConfig(schema, () => Promise.resolve({ result: 'success' })) + .build(), + }, + {}, + ) + } +} diff --git a/packages/amqp/test/fakes/FakeConsumer.ts b/packages/amqp/test/fakes/FakeConsumer.ts index d02b7cb6..b50485cb 100644 --- a/packages/amqp/test/fakes/FakeConsumer.ts +++ b/packages/amqp/test/fakes/FakeConsumer.ts @@ -1,29 +1,30 @@ +import type { BaseMessageType } from '@message-queue-toolkit/core' import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' -import z from 'zod' import { AbstractAmqpConsumer } from '../../lib/AbstractAmqpConsumer' import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService' +import type { AmqpAwareEventDefinition } from '../../lib/AmqpQueuePublisherManager' -export const COMMON_MESSAGE_SCHEMA = z.object({ - messageType: z.string(), -}) - -export type CommonMessage = z.infer -export class FakeConsumer extends AbstractAmqpConsumer { - constructor(dependencies: AMQPConsumerDependencies) { +export class FakeConsumer extends AbstractAmqpConsumer { + public static readonly QUEUE_NAME = 'dummy-queue' + constructor(dependencies: AMQPConsumerDependencies, eventDefinition: AmqpAwareEventDefinition) { super( dependencies, { creationConfig: { - queueName: 'dummy', + queueName: eventDefinition.queueName!, queueOptions: { durable: true, autoDelete: false, }, }, - messageTypeField: 'messageType', - handlers: new MessageHandlerConfigBuilder() - .addConfig(COMMON_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' })) + deletionConfig: { + deleteIfExists: true, + }, + handlerSpy: true, + messageTypeField: 'type', + handlers: new MessageHandlerConfigBuilder() + .addConfig(eventDefinition.schema, () => Promise.resolve({ result: 'success' })) .build(), }, {}, diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index 0ae4b1e0..bd01a042 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -16,7 +16,7 @@ import { PERMISSIONS_MESSAGE_SCHEMA, PERMISSIONS_ADD_MESSAGE_SCHEMA, } from '../consumers/userConsumerSchemas' -import { FakeConsumer } from '../fakes/FakeConsumer' +import { CustomFakeConsumer } from '../fakes/CustomFakeConsumer' import { FakeConsumerErrorResolver } from '../fakes/FakeConsumerErrorResolver' import { FakeLogger } from '../fakes/FakeLogger' import { TEST_AMQP_CONFIG } from '../utils/testAmqpConfig' @@ -65,12 +65,17 @@ describe('PermissionPublisher', () => { beforeAll(async () => { diContainer = await registerDependencies(TEST_AMQP_CONFIG, { consumerErrorResolver: asClass(FakeConsumerErrorResolver, SINGLETON_CONFIG), - permissionConsumer: asClass(FakeConsumer, { - lifetime: Lifetime.SINGLETON, - asyncInit: 'start', - asyncDispose: 'close', - asyncDisposePriority: 10, - }), + permissionConsumer: asFunction( + (dependencies) => { + return new CustomFakeConsumer(dependencies, PERMISSIONS_MESSAGE_SCHEMA) + }, + { + lifetime: Lifetime.SINGLETON, + asyncInit: 'start', + asyncDispose: 'close', + asyncDisposePriority: 10, + }, + ), }) }) diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.ts index 3cbf59dc..a33e7f81 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.ts @@ -1,5 +1,5 @@ import type { AMQPPublisherOptions } from '../../lib/AbstractAmqpPublisher' -import { AbstractAmqpPublisher } from '../../lib/AbstractAmqpPublisher' +import { AbstractAmqpQueuePublisher } from '../../lib/AbstractAmqpQueuePublisher' import type { AMQPDependencies } from '../../lib/AbstractAmqpService' import type { PERMISSIONS_ADD_MESSAGE_TYPE, @@ -12,7 +12,7 @@ import { type SupportedTypes = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSAGE_TYPE -export class AmqpPermissionPublisher extends AbstractAmqpPublisher { +export class AmqpPermissionPublisher extends AbstractAmqpQueuePublisher { public static QUEUE_NAME = 'user_permissions_multi' constructor( diff --git a/packages/amqp/test/utils/testContext.ts b/packages/amqp/test/utils/testContext.ts index 79602577..e0e7e309 100644 --- a/packages/amqp/test/utils/testContext.ts +++ b/packages/amqp/test/utils/testContext.ts @@ -1,19 +1,56 @@ import type { ErrorReporter, ErrorResolver } from '@lokalise/node-core' import type { Logger, TransactionObservabilityManager } from '@message-queue-toolkit/core' +import { + BASE_MESSAGE_SCHEMA, + CommonMetadataFiller, + EventRegistry, +} from '@message-queue-toolkit/core' import type { Resolver } from 'awilix' import { asClass, asFunction, createContainer, Lifetime } from 'awilix' import { AwilixManager } from 'awilix-manager' +import { z } from 'zod' import { AmqpConnectionManager } from '../../lib/AmqpConnectionManager' +import type { AmqpAwareEventDefinition } from '../../lib/AmqpQueuePublisherManager' +import { AmqpQueuePublisherManager } from '../../lib/AmqpQueuePublisherManager' +import type { CommonAmqpQueuePublisher } from '../../lib/CommonAmqpPublisherFactory' +import { CommonAmqpQueuePublisherFactory } from '../../lib/CommonAmqpPublisherFactory' import type { AmqpConfig } from '../../lib/amqpConnectionResolver' import { AmqpConsumerErrorResolver } from '../../lib/errors/AmqpConsumerErrorResolver' import { AmqpPermissionConsumer } from '../consumers/AmqpPermissionConsumer' +import { FakeConsumer } from '../fakes/FakeConsumer' import { AmqpPermissionPublisher } from '../publishers/AmqpPermissionPublisher' export const SINGLETON_CONFIG = { lifetime: Lifetime.SINGLETON } export type DependencyOverrides = Partial +export const TestEvents = { + created: { + schema: BASE_MESSAGE_SCHEMA.extend({ + type: z.literal('entity.created'), + payload: z.object({ + newData: z.string(), + }), + }), + schemaVersion: '1.0.1', + queueName: FakeConsumer.QUEUE_NAME, + }, + + updated: { + schema: BASE_MESSAGE_SCHEMA.extend({ + type: z.literal('entity.updated'), + payload: z.object({ + updatedData: z.string(), + }), + }), + queueName: FakeConsumer.QUEUE_NAME, + }, +} as const satisfies Record + +export type TestEventsType = (typeof TestEvents)[keyof typeof TestEvents][] +export type TestEventPayloadsType = z.infer + // @ts-ignore const TestLogger: Logger = console @@ -69,6 +106,30 @@ export async function registerDependencies( enabled: queuesEnabled, }), + eventRegistry: asFunction(() => { + return new EventRegistry(Object.values(TestEvents)) + }, SINGLETON_CONFIG), + queuePublisherManager: asFunction( + (dependencies) => { + return new AmqpQueuePublisherManager(dependencies, { + metadataFiller: new CommonMetadataFiller({ + serviceId: 'service', + schemaVersion: '1.0.0', + }), + publisherFactory: new CommonAmqpQueuePublisherFactory(), + newPublisherOptions: { + handlerSpy: true, + messageIdField: 'id', + messageTypeField: 'type', + }, + }) + }, + { + lifetime: Lifetime.SINGLETON, + enabled: queuesEnabled, + }, + ), + // vendor-specific dependencies transactionObservabilityManager: asFunction(() => { return undefined @@ -105,4 +166,10 @@ export interface Dependencies { consumerErrorResolver: ErrorResolver permissionConsumer: AmqpPermissionConsumer permissionPublisher: AmqpPermissionPublisher + + eventRegistry: EventRegistry + queuePublisherManager: AmqpQueuePublisherManager< + CommonAmqpQueuePublisher, + TestEventsType + > } diff --git a/packages/amqp/vitest.config.mts b/packages/amqp/vitest.config.mts index 64fa1913..44db4635 100644 --- a/packages/amqp/vitest.config.mts +++ b/packages/amqp/vitest.config.mts @@ -12,13 +12,21 @@ export default defineConfig({ environment: 'node', reporters: ['default'], coverage: { + provider: 'v8', include: ['lib/**/*.ts'], - exclude: ['lib/**/*.spec.ts', 'lib/**/*.test.ts', 'test/**/*.*', 'lib/types/**/*.*'], + exclude: [ + 'lib/**/*.spec.ts', + 'lib/**/*.test.ts', + 'test/**/*.*', + 'lib/types/**/*.*', + 'lib/AmqpExchangePublisherManager.ts', + 'lib/AbstractAmqpExchangePublisher.ts', + ], reporter: ['text'], all: true, thresholds: { lines: 90, - functions: 100, + functions: 95, branches: 79, statements: 90, }, diff --git a/packages/core/index.ts b/packages/core/index.ts index ffc9d395..88db2bc8 100644 --- a/packages/core/index.ts +++ b/packages/core/index.ts @@ -25,6 +25,7 @@ export { RetryMessageLaterError } from './lib/errors/RetryMessageLaterError' export { DoNotProcessMessageError } from './lib/errors/DoNotProcessError' export { + PrehandlerResult, HandlerContainer, MessageHandlerConfig, MessageHandlerConfigBuilder, diff --git a/packages/core/lib/queues/AbstractPublisherManager.ts b/packages/core/lib/queues/AbstractPublisherManager.ts index dbad5579..64978feb 100644 --- a/packages/core/lib/queues/AbstractPublisherManager.ts +++ b/packages/core/lib/queues/AbstractPublisherManager.ts @@ -1,25 +1,18 @@ -import type { - AsyncPublisher, - CommonEventDefinition, - QueuePublisherOptions, -} from '@message-queue-toolkit/core' import type { TypeOf, z } from 'zod' import type { EventRegistry } from '../events/EventRegistry' import type { BaseEventType } from '../events/baseEventSchemas' +import type { CommonEventDefinition } from '../events/eventTypes' import type { MetadataFiller } from '../messages/MetadataFiller' -import type { SyncPublisher } from '../types/MessageQueueTypes' -import type { CommonCreationConfigType } from '../types/queueOptionsTypes' +import type { AsyncPublisher, SyncPublisher } from '../types/MessageQueueTypes' +import type { CommonCreationConfigType, QueuePublisherOptions } from '../types/queueOptionsTypes' -export type MessagePublishType = Pick< - z.infer, - 'type' | 'payload' -> & { id?: string } +export type MessagePublishType = z.infer export type MessageSchemaType = z.infer export type AbstractPublisherFactory< - PublisherType extends AsyncPublisher | SyncPublisher, + PublisherType extends AsyncPublisher | SyncPublisher, DependenciesType, CreationConfigType extends CommonCreationConfigType, QueueLocatorType extends object, @@ -35,7 +28,7 @@ export type AbstractPublisherFactory< export abstract class AbstractPublisherManager< EventDefinitionType extends CommonEventDefinition, EventTargets extends string, - PublisherType extends AsyncPublisher | SyncPublisher, + PublisherType extends AsyncPublisher | SyncPublisher, DependenciesType, CreationConfigType extends CommonCreationConfigType, QueueLocatorType extends object, @@ -116,6 +109,10 @@ export abstract class AbstractPublisherManager< } protected abstract resolveEventTarget(event: EventDefinitionType): EventTargets | undefined + protected abstract resolveCreationConfig(eventTarget: string): CreationConfigType + protected resolvePublisherConfigOverrides(_eventTarget: string): Partial { + return {} + } private registerEvents(events: SupportedEventDefinitions) { for (const supportedEvent of events) { @@ -132,9 +129,6 @@ export abstract class AbstractPublisherManager< this.targetToEventMap[eventTarget].push(supportedEvent) } } - - protected abstract resolveCreationConfig(eventTarget: string): CreationConfigType - private registerPublishers() { for (const eventTarget in this.targetToEventMap) { if (this.targetToPublisherMap[eventTarget]) { @@ -145,6 +139,7 @@ export abstract class AbstractPublisherManager< return entry.schema }) const creationConfig = this.resolveCreationConfig(eventTarget) + const configOverrides = this.resolvePublisherConfigOverrides(eventTarget) this.targetToPublisherMap[eventTarget] = this.publisherFactory.buildPublisher( this.publisherDependencies, @@ -152,6 +147,7 @@ export abstract class AbstractPublisherManager< ...this.newPublisherOptions, creationConfig, messageSchemas, + ...configOverrides, }, ) } @@ -184,12 +180,27 @@ export abstract class AbstractPublisherManager< if (!publisher) { throw new Error(`No publisher for target ${eventTarget}`) } - // ToDo optimize the lookup const messageDefinition = this.targetToEventMap[eventTarget].find( (entry) => entry.schema.shape.type.value === message.type, ) + const resolvedMessage = this.resolveMessage(messageDefinition, message, precedingEventMetadata) + + if (this.isAsync) { + await (publisher as AsyncPublisher).publish(resolvedMessage, messageOptions) + } else { + (publisher as SyncPublisher).publish(resolvedMessage, messageOptions) + } + + return resolvedMessage + } + + protected resolveMessage( + messageDefinition: EventDefinitionType | undefined, + message: MessagePublishType, + precedingEventMetadata?: MetadataType, + ): MessageSchemaType { // @ts-ignore const resolvedMetadata = message[this.metadataField] ? // @ts-ignore @@ -197,21 +208,17 @@ export abstract class AbstractPublisherManager< : // @ts-ignore this.metadataFiller.produceMetadata(message, messageDefinition, precedingEventMetadata) - const resolvedMessage: MessageSchemaType = { - id: message.id ? message.id : this.metadataFiller.produceId(), - timestamp: this.metadataFiller.produceTimestamp(), + return { ...message, - // @ts-ignore metadata: resolvedMetadata, } + } - if (this.isAsync) { - await (publisher as AsyncPublisher).publish(resolvedMessage, messageOptions) - } else { - (publisher as SyncPublisher).publish(resolvedMessage) + public resolveBaseFields() { + return { + id: this.metadataFiller.produceId(), + timestamp: this.metadataFiller.produceTimestamp(), } - - return resolvedMessage } /** diff --git a/packages/core/lib/types/MessageQueueTypes.ts b/packages/core/lib/types/MessageQueueTypes.ts index 89fcd62b..9c5ef0f0 100644 --- a/packages/core/lib/types/MessageQueueTypes.ts +++ b/packages/core/lib/types/MessageQueueTypes.ts @@ -15,9 +15,9 @@ export type MessageProcessingResult = | 'error' | 'invalid_message' -export interface SyncPublisher { +export interface SyncPublisher { handlerSpy: PublicHandlerSpy - publish(message: MessagePayloadType): void + publish(message: MessagePayloadType, options: MessageOptions): void } export interface AsyncPublisher { diff --git a/packages/sns/lib/sns/SnsPublisherManager.spec.ts b/packages/sns/lib/sns/SnsPublisherManager.spec.ts index 1c962c31..7848512e 100644 --- a/packages/sns/lib/sns/SnsPublisherManager.spec.ts +++ b/packages/sns/lib/sns/SnsPublisherManager.spec.ts @@ -12,7 +12,7 @@ import type { import { registerDependencies, TestEvents } from '../../test/utils/testContext' import { CommonSnsPublisher } from './CommonSnsPublisherFactory' -import type { SnsMessagePublishType, SnsPublisherManager } from './SnsPublisherManager' +import type { SnsPublisherManager } from './SnsPublisherManager' import { FakeConsumer } from './fakes/FakeConsumer' describe('SnsPublisherManager', () => { @@ -34,13 +34,6 @@ describe('SnsPublisherManager', () => { describe('publish', () => { it('publishes to a correct publisher', async () => { // Given - const message = { - payload: { - message: 'msg', - }, - type: 'entity.created', - } satisfies SnsMessagePublishType - const fakeConsumer = new FakeConsumer( diContainer.cradle, 'queue', @@ -50,7 +43,13 @@ describe('SnsPublisherManager', () => { await fakeConsumer.start() // When - const publishedMessage = await publisherManager.publish(TestEvents.created.snsTopic, message) + const publishedMessage = await publisherManager.publish(TestEvents.created.snsTopic, { + ...publisherManager.resolveBaseFields(), + payload: { + newData: 'msg', + }, + type: 'entity.created', + }) const handlerSpyPromise = publisherManager .handlerSpy(TestEvents.created.snsTopic) @@ -71,7 +70,7 @@ describe('SnsPublisherManager', () => { schemaVersion: '1.0.1', }, payload: { - message: 'msg', + newData: 'msg', }, timestamp: expect.any(String), type: 'entity.created', @@ -80,13 +79,26 @@ describe('SnsPublisherManager', () => { await fakeConsumer.close() }) + it('message publishing is type-safe', async () => { + await expect( + publisherManager.publish(TestEvents.created.snsTopic, { + ...publisherManager.resolveBaseFields(), + payload: { + // @ts-expect-error This should be causing a compilation error + updatedData: 'edwe', + }, + type: 'entity.created', + }), + ).rejects.toThrow(/invalid_type/) + }) + it('publish to a non-existing topic will throw error', async () => { await expect( // @ts-expect-error Testing error scenario publisherManager.publish('non-existing-topic', { type: 'entity.created', payload: { - message: 'msg', + newData: 'msg', }, }), ).rejects.toThrow('No publisher for target non-existing-topic') @@ -136,10 +148,11 @@ describe('SnsPublisherManager', () => { // @ts-expect-error Testing injected publisher await publisherManager.publish(topic, { + ...publisherManager.resolveBaseFields(), id: messageId, type: 'entity.created', payload: { - message: 'msg', + newData: 'msg', }, }) diff --git a/packages/sns/lib/sns/SnsPublisherManager.ts b/packages/sns/lib/sns/SnsPublisherManager.ts index 40081c11..202d37a6 100644 --- a/packages/sns/lib/sns/SnsPublisherManager.ts +++ b/packages/sns/lib/sns/SnsPublisherManager.ts @@ -8,7 +8,11 @@ import { AbstractPublisherManager } from '@message-queue-toolkit/core' import type { MessageMetadataType } from '@message-queue-toolkit/core/lib/messages/baseMessageSchemas' import type z from 'zod' -import type { AbstractSnsPublisher, SNSPublisherOptions } from './AbstractSnsPublisher' +import type { + AbstractSnsPublisher, + SNSMessageOptions, + SNSPublisherOptions, +} from './AbstractSnsPublisher' import type { SNSCreationConfig, SNSDependencies, SNSQueueLocatorType } from './AbstractSnsService' import type { SnsPublisherFactory } from './CommonSnsPublisherFactory' import { CommonSnsPublisherFactory } from './CommonSnsPublisherFactory' @@ -40,11 +44,6 @@ export type SnsPublisherManagerOptions< export type SnsMessageSchemaType = z.infer -export type SnsMessagePublishType = Pick< - z.infer, - 'type' | 'payload' -> & { id?: string } - export class SnsPublisherManager< T extends AbstractSnsPublisher>, SupportedEventDefinitions extends SnsAwareEventDefinition[], @@ -63,33 +62,8 @@ export class SnsPublisherManager< >, SupportedEventDefinitions, MetadataType, - z.infer + SNSMessageOptions > { - /* - private readonly publisherFactory: SnsPublisherFactory< - T, - z.infer - > - */ - - /* - private readonly topicToEventMap: Record = {} - private topicToPublisherMap: Record = {} - - */ - - /* - private readonly newPublisherOptions: Omit< - SNSPublisherOptions>, - 'messageSchemas' | 'creationConfig' | 'locatorConfig' - > - - private readonly metadataFiller: MetadataFiller< - z.infer, - MetadataType - > - */ - constructor( dependencies: SnsPublisherManagerDependencies, options: SnsPublisherManagerOptions< diff --git a/packages/sns/package.json b/packages/sns/package.json index cf8d450d..8242be7b 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -18,7 +18,7 @@ "test": "vitest", "test:coverage": "npm test -- --coverage", "test:ci": "npm run docker:start:dev && npm run test:coverage && npm run docker:stop:dev", - "lint": "eslint . --ext .ts", + "lint": "eslint . --ext .ts && tsc --noEmit", "lint:fix": "prettier --write . && eslint . --ext .ts --fix", "docker:start:dev": "docker compose up -d", "docker:stop:dev": "docker compose down", diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts index d410ddd6..9072310e 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts @@ -1,11 +1,11 @@ import type { Either } from '@lokalise/node-core' -import { - type BarrierResult, - MessageHandlerConfigBuilder, - type Prehandler, - type PreHandlingOutputs, -} from '@message-queue-toolkit/core' -import type { PrehandlerResult } from '@message-queue-toolkit/core/dist/lib/queues/HandlerContainer' +import type { + PrehandlerResult, + BarrierResult, + Prehandler, + PreHandlingOutputs} from '@message-queue-toolkit/core'; +import { MessageHandlerConfigBuilder, +} from '@message-queue-toolkit/core'; import type { SNSSQSConsumerDependencies, diff --git a/packages/sns/test/utils/testContext.ts b/packages/sns/test/utils/testContext.ts index a7d8723a..7fc05614 100644 --- a/packages/sns/test/utils/testContext.ts +++ b/packages/sns/test/utils/testContext.ts @@ -34,7 +34,7 @@ export const TestEvents = { schema: BASE_MESSAGE_SCHEMA.extend({ type: z.literal('entity.created'), payload: z.object({ - message: z.string(), + newData: z.string(), }), }), schemaVersion: '1.0.1', @@ -45,7 +45,7 @@ export const TestEvents = { schema: BASE_MESSAGE_SCHEMA.extend({ type: z.literal('entity.updated'), payload: z.object({ - message: z.string(), + updatedData: z.string(), }), }), snsTopic: 'dummy',