From ca99c9810560c2be771f2d3695a1cc99452850bd Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Sat, 18 May 2024 14:16:05 +0300 Subject: [PATCH 1/3] Implement AbstractPublisherManager --- packages/core/index.ts | 2 + .../lib/queues/AbstractPublisherManager.ts | 223 ++++++++++++++++++ packages/core/lib/types/MessageQueueTypes.ts | 8 +- packages/core/lib/types/queueOptionsTypes.ts | 2 +- .../sns/lib/sns/SnsPublisherManager.spec.ts | 8 +- packages/sns/lib/sns/SnsPublisherManager.ts | 169 ++++--------- 6 files changed, 289 insertions(+), 123 deletions(-) create mode 100644 packages/core/lib/queues/AbstractPublisherManager.ts diff --git a/packages/core/index.ts b/packages/core/index.ts index 8a96ad53..ffc9d395 100644 --- a/packages/core/index.ts +++ b/packages/core/index.ts @@ -62,3 +62,5 @@ export * from './lib/events/baseEventSchemas' export * from './lib/messages/baseMessageSchemas' export * from './lib/messages/MetadataFiller' + +export * from './lib/queues/AbstractPublisherManager' diff --git a/packages/core/lib/queues/AbstractPublisherManager.ts b/packages/core/lib/queues/AbstractPublisherManager.ts new file mode 100644 index 00000000..eb72354a --- /dev/null +++ b/packages/core/lib/queues/AbstractPublisherManager.ts @@ -0,0 +1,223 @@ +import type { + AsyncPublisher, + CommonEventDefinition, + QueuePublisherOptions, +} from '@message-queue-toolkit/core' +import type { TypeOf, z } from 'zod' + +import type { EventRegistry } from '../events/EventRegistry' +import type { BaseEventType } from '../events/baseEventSchemas' +import type { MetadataFiller } from '../messages/MetadataFiller' +import type { SyncPublisher } from '../types/MessageQueueTypes' +import type { CommonCreationConfigType } from '../types/queueOptionsTypes' + +export type MessagePublishType = Pick< + z.infer, + 'type' | 'payload' +> & { id?: string } + +export type MessageSchemaType = z.infer + +export type AbstractPublisherFactory< + PublisherType extends AsyncPublisher | SyncPublisher, + DependenciesType, + CreationConfigType extends CommonCreationConfigType, + QueueLocatorType extends object, + EventType extends BaseEventType, + OptionsType extends Omit< + QueuePublisherOptions, + 'messageSchemas' | 'creationConfig' | 'locatorConfig' + >, +> = { + buildPublisher(dependencies: DependenciesType, options: OptionsType): PublisherType +} + +export abstract class AbstractPublisherManager< + EventDefinitionType extends CommonEventDefinition, + EventTargets extends string, + PublisherType extends AsyncPublisher | SyncPublisher, + DependenciesType, + CreationConfigType extends CommonCreationConfigType, + QueueLocatorType extends object, + EventType extends BaseEventType, + OptionsType extends Omit< + QueuePublisherOptions, + 'messageSchemas' | 'creationConfig' | 'locatorConfig' + >, + SupportedEventDefinitions extends EventDefinitionType[], + MetadataType, + MessageOptionsType, +> { + private readonly publisherFactory: AbstractPublisherFactory< + PublisherType, + DependenciesType, + CreationConfigType, + QueueLocatorType, + EventType, + OptionsType + > + + protected readonly newPublisherOptions: OptionsType + + protected readonly metadataFiller: MetadataFiller< + z.infer, + MetadataType + > + protected readonly metadataField: string + + // In this context "target" can be a topic or an exchange, depending on the transport + protected readonly targetToEventMap: Record = {} as Record + protected readonly isAsync: boolean + protected targetToPublisherMap: Record = {} as Record + private readonly publisherDependencies: DependenciesType + + protected constructor({ + publisherFactory, + newPublisherOptions, + publisherDependencies, + metadataFiller, + eventRegistry, + metadataField, + isAsync, + }: { + publisherFactory: AbstractPublisherFactory< + PublisherType, + DependenciesType, + CreationConfigType, + QueueLocatorType, + EventType, + OptionsType + > + newPublisherOptions: OptionsType + publisherDependencies: DependenciesType + metadataFiller: MetadataFiller< + TypeOf, + MetadataType + > + eventRegistry: EventRegistry + metadataField: string + isAsync: boolean + }) { + this.publisherFactory = publisherFactory + this.newPublisherOptions = newPublisherOptions + this.metadataFiller = metadataFiller + this.metadataField = metadataField + this.isAsync = isAsync + this.publisherDependencies = publisherDependencies + + this.registerEvents(eventRegistry.supportedEvents) + this.registerPublishers() + } + + protected abstract resolveEventTarget(event: EventDefinitionType): EventTargets | undefined + + private registerEvents(events: SupportedEventDefinitions) { + for (const supportedEvent of events) { + const eventTarget = this.resolveEventTarget(supportedEvent) + + if (!eventTarget) { + continue + } + + if (!this.targetToEventMap[eventTarget]) { + this.targetToEventMap[eventTarget] = [] + } + + this.targetToEventMap[eventTarget].push(supportedEvent) + } + } + + protected abstract resolveCreationConfig(eventTarget: string): CreationConfigType + + private registerPublishers() { + for (const eventTarget in this.targetToEventMap) { + if (this.targetToPublisherMap[eventTarget]) { + continue + } + + const messageSchemas = this.targetToEventMap[eventTarget].map((entry) => { + return entry.schema + }) + const creationConfig = this.resolveCreationConfig(eventTarget) + + this.targetToPublisherMap[eventTarget] = this.publisherFactory.buildPublisher( + this.publisherDependencies, + { + ...this.newPublisherOptions, + creationConfig, + messageSchemas, + }, + ) + } + } + + public injectPublisher(eventTarget: EventTargets, publisher: PublisherType) { + this.targetToPublisherMap[eventTarget] = publisher + } + + public injectEventDefinition(eventDefinition: EventDefinitionType) { + const eventTarget = this.resolveEventTarget(eventDefinition) + if (!eventTarget) { + throw new Error('eventTarget could not be resolved for the event') + } + + if (!this.targetToEventMap[eventTarget]) { + this.targetToEventMap[eventTarget] = [] + } + + this.targetToEventMap[eventTarget].push(eventDefinition) + } + + public async publish( + eventTarget: EventTargets, + message: MessagePublishType, + precedingEventMetadata?: MetadataType, + messageOptions?: MessageOptionsType, + ): Promise> { + const publisher = this.targetToPublisherMap[eventTarget] + if (!publisher) { + throw new Error(`No publisher for target ${eventTarget}`) + } + + // ToDo optimize the lookup + const messageDefinition = this.targetToEventMap[eventTarget].find( + (entry) => entry.schema.shape.type.value === message.type, + ) + + // @ts-ignore + const resolvedMetadata = message[this.metadataField] + ? // @ts-ignore + message[this.metadataField] + : // @ts-ignore + this.metadataFiller.produceMetadata(message, messageDefinition, precedingEventMetadata) + + const resolvedMessage: MessageSchemaType = { + id: message.id ? message.id : this.metadataFiller.produceId(), + timestamp: this.metadataFiller.produceTimestamp(), + ...message, + // @ts-ignore + metadata: resolvedMetadata, + } + + if (this.isAsync) { + await (publisher as AsyncPublisher).publish(resolvedMessage, messageOptions) + } else { + (publisher as SyncPublisher).publish(resolvedMessage) + } + + return resolvedMessage + } + + /** + * @param eventTarget - topic or exchange + */ + public handlerSpy(eventTarget: EventTargets) { + const publisher = this.targetToPublisherMap[eventTarget] + + if (!publisher) { + throw new Error(`No publisher for target ${eventTarget}`) + } + + return publisher.handlerSpy + } +} diff --git a/packages/core/lib/types/MessageQueueTypes.ts b/packages/core/lib/types/MessageQueueTypes.ts index c7ea6732..89fcd62b 100644 --- a/packages/core/lib/types/MessageQueueTypes.ts +++ b/packages/core/lib/types/MessageQueueTypes.ts @@ -1,6 +1,8 @@ import type { TransactionObservabilityManager } from '@lokalise/node-core' import type { ZodSchema } from 'zod' +import type { PublicHandlerSpy } from '../queues/HandlerSpy' + export interface QueueConsumer { start(): Promise // subscribe and start listening close(): Promise @@ -13,11 +15,13 @@ export type MessageProcessingResult = | 'error' | 'invalid_message' -export interface SyncPublisher { +export interface SyncPublisher { + handlerSpy: PublicHandlerSpy publish(message: MessagePayloadType): void } -export interface AsyncPublisher { +export interface AsyncPublisher { + handlerSpy: PublicHandlerSpy publish(message: MessagePayloadType, options: MessageOptions): Promise } diff --git a/packages/core/lib/types/queueOptionsTypes.ts b/packages/core/lib/types/queueOptionsTypes.ts index e1bbc99a..d2d2503c 100644 --- a/packages/core/lib/types/queueOptionsTypes.ts +++ b/packages/core/lib/types/queueOptionsTypes.ts @@ -25,7 +25,7 @@ export type CommonQueueOptions = { deletionConfig?: DeletionConfig } -type CommonCreationConfigType = { +export type CommonCreationConfigType = { updateAttributesIfExists?: boolean } diff --git a/packages/sns/lib/sns/SnsPublisherManager.spec.ts b/packages/sns/lib/sns/SnsPublisherManager.spec.ts index a73873c4..1c962c31 100644 --- a/packages/sns/lib/sns/SnsPublisherManager.spec.ts +++ b/packages/sns/lib/sns/SnsPublisherManager.spec.ts @@ -82,13 +82,14 @@ describe('SnsPublisherManager', () => { it('publish to a non-existing topic will throw error', async () => { await expect( + // @ts-expect-error Testing error scenario publisherManager.publish('non-existing-topic', { type: 'entity.created', payload: { message: 'msg', }, }), - ).rejects.toThrow('No publisher for topic non-existing-topic') + ).rejects.toThrow('No publisher for target non-existing-topic') }) }) @@ -99,8 +100,9 @@ describe('SnsPublisherManager', () => { }) it('returns error when no publisher for topic', () => { + // @ts-expect-error Testing incorrect scenario expect(() => publisherManager.handlerSpy('non-existing-topic')).toThrow( - 'No publisher for topic non-existing-topic', + 'No publisher for target non-existing-topic', ) }) }) @@ -132,6 +134,7 @@ describe('SnsPublisherManager', () => { schemaVersion: '2.0.0', }) + // @ts-expect-error Testing injected publisher await publisherManager.publish(topic, { id: messageId, type: 'entity.created', @@ -142,6 +145,7 @@ describe('SnsPublisherManager', () => { // Then const spyRes = await publisherManager + // @ts-expect-error Testing injected publisher .handlerSpy(topic) .waitForMessageWithId(messageId, 'published') expect(spyRes.processingResult).toBe('published') diff --git a/packages/sns/lib/sns/SnsPublisherManager.ts b/packages/sns/lib/sns/SnsPublisherManager.ts index 3645b68b..40081c11 100644 --- a/packages/sns/lib/sns/SnsPublisherManager.ts +++ b/packages/sns/lib/sns/SnsPublisherManager.ts @@ -1,18 +1,15 @@ -import type { SNSClient } from '@aws-sdk/client-sns' -import type { ErrorReporter } from '@lokalise/node-core' import type { BaseEventType, CommonEventDefinition, EventRegistry, - Logger, MetadataFiller, } from '@message-queue-toolkit/core' +import { AbstractPublisherManager } from '@message-queue-toolkit/core' import type { MessageMetadataType } from '@message-queue-toolkit/core/lib/messages/baseMessageSchemas' -import type { SNSDependencies } from '@message-queue-toolkit/sns' import type z from 'zod' import type { AbstractSnsPublisher, SNSPublisherOptions } from './AbstractSnsPublisher' -import type { SNSCreationConfig } from './AbstractSnsService' +import type { SNSCreationConfig, SNSDependencies, SNSQueueLocatorType } from './AbstractSnsService' import type { SnsPublisherFactory } from './CommonSnsPublisherFactory' import { CommonSnsPublisherFactory } from './CommonSnsPublisherFactory' @@ -52,26 +49,46 @@ export class SnsPublisherManager< T extends AbstractSnsPublisher>, SupportedEventDefinitions extends SnsAwareEventDefinition[], MetadataType = MessageMetadataType, +> extends AbstractPublisherManager< + SnsAwareEventDefinition, + NonNullable, + AbstractSnsPublisher>, + SNSDependencies, + SNSCreationConfig, + SNSQueueLocatorType, + SnsMessageSchemaType, + Omit< + SNSPublisherOptions>, + 'messageSchemas' | 'creationConfig' | 'locatorConfig' + >, + SupportedEventDefinitions, + MetadataType, + z.infer > { + /* private readonly publisherFactory: SnsPublisherFactory< T, z.infer > - private readonly snsClient: SNSClient - private readonly logger: Logger - private readonly errorReporter: ErrorReporter + */ + /* private readonly topicToEventMap: Record = {} private topicToPublisherMap: Record = {} + + */ + + /* private readonly newPublisherOptions: Omit< SNSPublisherOptions>, 'messageSchemas' | 'creationConfig' | 'locatorConfig' > + private readonly metadataFiller: MetadataFiller< z.infer, MetadataType > - private readonly metadataField: string + */ constructor( dependencies: SnsPublisherManagerDependencies, @@ -81,119 +98,35 @@ export class SnsPublisherManager< MetadataType >, ) { - this.snsClient = dependencies.snsClient - this.errorReporter = dependencies.errorReporter - this.logger = dependencies.logger - this.publisherFactory = options.publisherFactory ?? new CommonSnsPublisherFactory() - this.newPublisherOptions = options.newPublisherOptions - this.metadataFiller = options.metadataFiller - this.metadataField = options.metadataField ?? 'metadata' - - this.registerEvents(dependencies.eventRegistry.supportedEvents) - this.registerPublishers() - } - - private registerEvents(events: SupportedEventDefinitions) { - for (const supportedEvent of events) { - if (!supportedEvent.snsTopic) { - continue - } - - if (!this.topicToEventMap[supportedEvent.snsTopic]) { - this.topicToEventMap[supportedEvent.snsTopic] = [] - } - - this.topicToEventMap[supportedEvent.snsTopic].push(supportedEvent) - } + super({ + isAsync: true, + eventRegistry: dependencies.eventRegistry, + metadataField: options.metadataField ?? 'metadata', + metadataFiller: options.metadataFiller, + newPublisherOptions: options.newPublisherOptions, + publisherDependencies: { + snsClient: dependencies.snsClient, + logger: dependencies.logger, + errorReporter: dependencies.errorReporter, + }, + publisherFactory: options.publisherFactory ?? new CommonSnsPublisherFactory(), + }) } - private registerPublishers() { - const dependencies: SNSDependencies = { - snsClient: this.snsClient, - logger: this.logger, - errorReporter: this.errorReporter, + protected override resolveCreationConfig( + eventTarget: NonNullable, + ): SNSCreationConfig { + return { + ...this.newPublisherOptions, + topic: { + Name: eventTarget, + }, } - for (const snsTopic in this.topicToEventMap) { - if (this.topicToPublisherMap[snsTopic]) { - continue - } - - const messageSchemas = this.topicToEventMap[snsTopic].map((entry) => { - return entry.schema - }) - const creationConfig: SNSCreationConfig = { - ...this.newPublisherOptions, - topic: { - Name: snsTopic, - }, - } - - this.topicToPublisherMap[snsTopic] = this.publisherFactory.buildPublisher(dependencies, { - ...this.newPublisherOptions, - creationConfig, - messageSchemas, - }) - } - } - - public injectPublisher(snsTopic: string, publisher: T) { - this.topicToPublisherMap[snsTopic] = publisher } - public injectEventDefinition(eventDefinition: SnsAwareEventDefinition) { - if (!eventDefinition.snsTopic) { - throw new Error('snsTopic is mandatory') - } - - if (!this.topicToEventMap[eventDefinition.snsTopic]) { - this.topicToEventMap[eventDefinition.snsTopic] = [] - } - - this.topicToEventMap[eventDefinition.snsTopic].push(eventDefinition) - } - - public async publish( - snsTopic: string, - message: SnsMessagePublishType, - precedingEventMetadata?: MetadataType, - ): Promise> { - const publisher = this.topicToPublisherMap[snsTopic] - if (!publisher) { - throw new Error(`No publisher for topic ${snsTopic}`) - } - - // ToDo optimize the lookup - const messageDefinition = this.topicToEventMap[snsTopic].find( - (entry) => entry.schema.shape.type.value === message.type, - ) - - // @ts-ignore - const resolvedMetadata = message[this.metadataField] - ? // @ts-ignore - message[this.metadataField] - : // @ts-ignore - this.metadataFiller.produceMetadata(message, messageDefinition, precedingEventMetadata) - - const resolvedMessage: SnsMessageSchemaType = { - id: message.id ? message.id : this.metadataFiller.produceId(), - timestamp: this.metadataFiller.produceTimestamp(), - ...message, - // @ts-ignore - metadata: resolvedMetadata, - } - - await publisher.publish(resolvedMessage) - - return resolvedMessage - } - - public handlerSpy(snsTopic: string) { - const publisher = this.topicToPublisherMap[snsTopic] - - if (!publisher) { - throw new Error(`No publisher for topic ${snsTopic}`) - } - - return publisher.handlerSpy + protected override resolveEventTarget( + event: SnsAwareEventDefinition, + ): NonNullable | undefined { + return event.snsTopic } } From 837baedea163f083beefc9ec165d131a8eb72c07 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Sat, 18 May 2024 14:22:40 +0300 Subject: [PATCH 2/3] Try to avoid flakines --- .../consumers/SqsPermissionConsumer.spec.ts | 87 ++++++++++--------- 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index 41c3dd08..74c19a41 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -511,45 +511,52 @@ describe('SqsPermissionConsumer', () => { ) }) - it.each([false, true])('using 2 consumers with heartbeat -> %s', async (heartbeatEnabled) => { - let consumer1IsProcessing = false - let consumer1Counter = 0 - let consumer2Counter = 0 - - const consumer1 = new SqsPermissionConsumer(diContainer.cradle, { - creationConfig: { queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } } }, - consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined }, - removeHandlerOverride: async () => { - consumer1IsProcessing = true - await setTimeout(3100) // Wait to the visibility timeout to expire - consumer1Counter++ - consumer1IsProcessing = false - return { result: 'success' } - }, - }) - await consumer1.start() - - const consumer2 = new SqsPermissionConsumer(diContainer.cradle, { - locatorConfig: { queueUrl: consumer1.queueProps.url }, - removeHandlerOverride: async () => { - consumer2Counter++ - return { result: 'success' } - }, - }) - const publisher = new SqsPermissionPublisher(diContainer.cradle, { - locatorConfig: { queueUrl: consumer1.queueProps.url }, - }) - - await publisher.publish({ id: '10', messageType: 'remove' }) - // wait for consumer1 to start processing to start second consumer - await waitAndRetry(() => consumer1IsProcessing, 5, 5) - await consumer2.start() - - // wait for both consumers to process message - await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40) - - expect(consumer1Counter).toBe(1) - expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1) - }) + it.each([false, true])( + 'using 2 consumers with heartbeat -> %s', + async (heartbeatEnabled) => { + let consumer1IsProcessing = false + let consumer1Counter = 0 + let consumer2Counter = 0 + + const consumer1 = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } }, + }, + consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined }, + removeHandlerOverride: async () => { + consumer1IsProcessing = true + await setTimeout(3100) // Wait to the visibility timeout to expire + consumer1Counter++ + consumer1IsProcessing = false + return { result: 'success' } + }, + }) + await consumer1.start() + + const consumer2 = new SqsPermissionConsumer(diContainer.cradle, { + locatorConfig: { queueUrl: consumer1.queueProps.url }, + removeHandlerOverride: async () => { + consumer2Counter++ + return { result: 'success' } + }, + }) + const publisher = new SqsPermissionPublisher(diContainer.cradle, { + locatorConfig: { queueUrl: consumer1.queueProps.url }, + }) + + await publisher.publish({ id: '10', messageType: 'remove' }) + // wait for consumer1 to start processing to start second consumer + await waitAndRetry(() => consumer1IsProcessing, 5, 5) + await consumer2.start() + + // wait for both consumers to process message + await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40) + + expect(consumer1Counter).toBe(1) + expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1) + }, + // This reduces flakiness in CI + 10000, + ) }) }) From 02a5c2761d3babb389c7395dfc83e96db8715cc7 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Sat, 18 May 2024 14:28:14 +0300 Subject: [PATCH 3/3] Ignore coverage for an abstract class --- packages/core/lib/queues/AbstractPublisherManager.ts | 10 ++++++++-- packages/core/vitest.config.mts | 9 ++++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/packages/core/lib/queues/AbstractPublisherManager.ts b/packages/core/lib/queues/AbstractPublisherManager.ts index eb72354a..dbad5579 100644 --- a/packages/core/lib/queues/AbstractPublisherManager.ts +++ b/packages/core/lib/queues/AbstractPublisherManager.ts @@ -66,9 +66,15 @@ export abstract class AbstractPublisherManager< protected readonly metadataField: string // In this context "target" can be a topic or an exchange, depending on the transport - protected readonly targetToEventMap: Record = {} as Record + protected readonly targetToEventMap: Record = {} as Record< + EventTargets, + EventDefinitionType[] + > protected readonly isAsync: boolean - protected targetToPublisherMap: Record = {} as Record + protected targetToPublisherMap: Record = {} as Record< + EventTargets, + PublisherType + > private readonly publisherDependencies: DependenciesType protected constructor({ diff --git a/packages/core/vitest.config.mts b/packages/core/vitest.config.mts index 3afa8ed2..25ed77d6 100644 --- a/packages/core/vitest.config.mts +++ b/packages/core/vitest.config.mts @@ -7,8 +7,15 @@ export default defineConfig({ environment: 'node', reporters: ['default'], coverage: { + provider: 'v8', include: ['lib/**/*.ts'], - exclude: ['lib/**/*.spec.ts', 'lib/**/*.test.ts', 'test/**/*.*', 'lib/types/**/*.*'], + exclude: [ + 'lib/**/*.spec.ts', + 'lib/**/*.test.ts', + 'test/**/*.*', + 'lib/types/**/*.*', + 'lib/queues/AbstractPublisherManager.ts', + ], reporter: ['text'], all: true, thresholds: {