From 3dbbe4401407e96ee16ce97ff57c73bb7c5af26b Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 17 May 2024 16:21:54 +0300 Subject: [PATCH 1/5] Lazy init for AMQP --- README.md | 3 -- docker-compose.yml | 2 +- packages/amqp/.eslintrc.json | 2 +- packages/amqp/docker-compose.yml | 2 +- packages/amqp/lib/AbstractAmqpPublisher.ts | 23 +++++++++ packages/amqp/lib/AbstractAmqpService.ts | 2 + .../AmqpPermissionPublisher.spec.ts | 50 ++++++++++++++++++- .../lib/events/DomainEventEmitter.spec.ts | 2 +- packages/core/lib/events/baseEventSchemas.ts | 2 +- packages/core/lib/messages/MetadataFiller.ts | 6 +-- .../core/lib/queues/AbstractQueueService.ts | 2 + packages/sns/lib/sns/AbstractSnsPublisher.ts | 2 - packages/sns/lib/sns/AbstractSnsService.ts | 2 + .../sns/lib/sns/SnsPublisherManager.spec.ts | 4 +- packages/sqs/lib/sqs/AbstractSqsPublisher.ts | 2 - packages/sqs/lib/sqs/AbstractSqsService.ts | 7 ++- ...ermisssionConsumer.deadLetterQueue.spec.ts | 1 - 17 files changed, 93 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 59f53631..d721527c 100644 --- a/README.md +++ b/README.md @@ -40,9 +40,6 @@ They implement the following public methods: > **_NOTE:_** See [SqsPermissionPublisher.ts](./packages/sqs/test/publishers/SqsPermissionPublisher.ts) for a practical example. -> **_NOTE:_** Lazy loading is not supported for AMQP publishers. - - ### Consumers `message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocol. diff --git a/docker-compose.yml b/docker-compose.yml index c9d93ae7..e8645ab9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,6 @@ services: rabbitmq: - image: rabbitmq:3.11.20 + image: rabbitmq:3.12.14 ports: - ${DOCKER_RABBITMQ_CLIENT_PORT:-5672}:5672 - ${DOCKER_RABBITMQ_MANAGEMENT_PORT:-15672}:15672 diff --git a/packages/amqp/.eslintrc.json b/packages/amqp/.eslintrc.json index 9f36aefe..9414065b 100644 --- a/packages/amqp/.eslintrc.json +++ b/packages/amqp/.eslintrc.json @@ -60,7 +60,7 @@ ], "max-lines": ["error", { "max": 600 }], "max-params": ["error", { "max": 4 }], - "max-statements": ["error", { "max": 20 }], + "max-statements": ["error", { "max": 30 }], "complexity": ["error", { "max": 20 }] }, "overrides": [ diff --git a/packages/amqp/docker-compose.yml b/packages/amqp/docker-compose.yml index 028712ad..e5c587d8 100644 --- a/packages/amqp/docker-compose.yml +++ b/packages/amqp/docker-compose.yml @@ -1,6 +1,6 @@ services: rabbitmq: - image: rabbitmq:3.11.20 + image: rabbitmq:3.12.14 ports: - ${DOCKER_RABBITMQ_CLIENT_PORT:-5672}:5672 - ${DOCKER_RABBITMQ_MANAGEMENT_PORT:-15672}:15672 diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index 16962f1a..b252bfaa 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -25,6 +25,8 @@ export abstract class AbstractAmqpPublisher { private readonly messageSchemaContainer: MessageSchemaContainer + private initPromise?: Promise + constructor(dependencies: AMQPDependencies, options: AMQPPublisherOptions) { super(dependencies, options) @@ -42,6 +44,22 @@ export abstract class AbstractAmqpPublisher } resolveSchemaResult.result.parse(message) + // If it's not initted yet, do the lazy init + if (!this.isInitted) { + // avoid multiple concurrent inits + if (!this.initPromise) { + this.initPromise = this.init() + } + this.initPromise + .then(() => { + this.publish(message) + }) + .catch((err) => { + this.handleError(err) + }) + return + } + /** * If the message doesn't have a timestamp field -> add it * will be used on the consumer to prevent infinite retries on the same message @@ -108,6 +126,11 @@ export abstract class AbstractAmqpPublisher throw new Error('Not implemented for publisher') } + async close(): Promise { + this.initPromise = undefined + await super.close() + } + override processMessage(): Promise> { throw new Error('Not implemented for publisher') } diff --git a/packages/amqp/lib/AbstractAmqpService.ts b/packages/amqp/lib/AbstractAmqpService.ts index 4fa8243a..bbd92f72 100644 --- a/packages/amqp/lib/AbstractAmqpService.ts +++ b/packages/amqp/lib/AbstractAmqpService.ts @@ -158,6 +158,7 @@ export abstract class AbstractAmqpService< if (this.connection) { await this.receiveNewConnection(this.connection) } + this.isInitted = true } public async reconnect() { @@ -167,5 +168,6 @@ export abstract class AbstractAmqpService< async close(): Promise { this.isShuttingDown = true await this.destroyChannel() + this.isInitted = false } } diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index 5e1312e1..0ae4b1e0 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -159,7 +159,8 @@ describe('PermissionPublisher', () => { it('return details if publish failed', async () => { expect.assertions(3) try { - await permissionPublisher.close() + // @ts-ignore + permissionPublisher.channel = undefined permissionPublisher.publish({ id: '11', messageType: 'add', @@ -243,6 +244,53 @@ describe('PermissionPublisher', () => { }) }) + it('publishes a message with lazy init', async () => { + await permissionConsumer.close() + await permissionPublisher.close() + + const message = { + id: '1', + messageType: 'add', + userIds: [1], + permissions: ['100'], + timestamp: new Date(), + } satisfies PERMISSIONS_MESSAGE_TYPE + + let receivedMessage: unknown + await channel.consume(AmqpPermissionPublisher.QUEUE_NAME, (message) => { + if (message === null) { + return + } + const decodedMessage = deserializeAmqpMessage( + message, + PERMISSIONS_MESSAGE_SCHEMA, + new FakeConsumerErrorResolver(), + ) + receivedMessage = decodedMessage.result! + }) + + permissionPublisher.publish(message) + + await waitAndRetry(() => !!receivedMessage) + + expect(receivedMessage).toEqual({ + parsedMessage: { + id: '1', + messageType: 'add', + userIds: [1], + permissions: ['100'], + timestamp: message.timestamp.toISOString(), + }, + originalMessage: { + id: '1', + messageType: 'add', + userIds: [1], + permissions: ['100'], + timestamp: message.timestamp.toISOString(), + }, + }) + }) + it('publishes a message auto-filling timestamp', async () => { await permissionConsumer.close() diff --git a/packages/core/lib/events/DomainEventEmitter.spec.ts b/packages/core/lib/events/DomainEventEmitter.spec.ts index b1fe7398..e2534205 100644 --- a/packages/core/lib/events/DomainEventEmitter.spec.ts +++ b/packages/core/lib/events/DomainEventEmitter.spec.ts @@ -16,7 +16,7 @@ const createdEventPayload: CommonEventDefinitionSchemaType('').describe('event type name'), payload: z.optional(z.object({})).describe('event payload based on type'), }) diff --git a/packages/core/lib/messages/MetadataFiller.ts b/packages/core/lib/messages/MetadataFiller.ts index efb2f255..2e10064a 100644 --- a/packages/core/lib/messages/MetadataFiller.ts +++ b/packages/core/lib/messages/MetadataFiller.ts @@ -6,7 +6,7 @@ import type { CommonEventDefinition } from '../events/eventTypes' import type { MessageMetadataType } from './baseMessageSchemas' export type IdGenerator = () => string -export type TimestampGenerator = () => string +export type TimestampGenerator = () => Date export type MetadataFillerOptions = { serviceId: string @@ -23,7 +23,7 @@ export type MetadataFiller< > = { produceMetadata(currentMessage: T, eventDefinition: D, precedingMessageMetadata?: M): M produceId(): string - produceTimestamp(): string + produceTimestamp(): Date } export class CommonMetadataFiller implements MetadataFiller { @@ -43,7 +43,7 @@ export class CommonMetadataFiller implements MetadataFiller { this.produceTimestamp = options.timestampGenerator ?? (() => { - return new Date().toISOString() + return new Date() }) } diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index d2794d07..5327281a 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -52,6 +52,7 @@ export abstract class AbstractQueueService< protected readonly locatorConfig?: QueueLocatorType protected readonly deletionConfig?: DeletionConfig protected readonly _handlerSpy?: HandlerSpy + protected isInitted: boolean get handlerSpy(): PublicHandlerSpy { if (!this._handlerSpy) { @@ -75,6 +76,7 @@ export abstract class AbstractQueueService< this.logMessages = options.logMessages ?? false this._handlerSpy = resolveHandlerSpy(options) + this.isInitted = false } protected abstract resolveSchema( diff --git a/packages/sns/lib/sns/AbstractSnsPublisher.ts b/packages/sns/lib/sns/AbstractSnsPublisher.ts index a36a13ba..9dc472c2 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisher.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisher.ts @@ -31,7 +31,6 @@ export abstract class AbstractSnsPublisher { private readonly messageSchemaContainer: MessageSchemaContainer - private isInitted: boolean private initPromise?: Promise constructor(dependencies: SNSDependencies, options: SNSPublisherOptions) { @@ -42,7 +41,6 @@ export abstract class AbstractSnsPublisher messageSchemas, messageTypeField: options.messageTypeField, }) - this.isInitted = false } async publish(message: MessagePayloadType, options: SNSMessageOptions = {}): Promise { diff --git a/packages/sns/lib/sns/AbstractSnsService.ts b/packages/sns/lib/sns/AbstractSnsService.ts index 190d21bd..95f081c5 100644 --- a/packages/sns/lib/sns/AbstractSnsService.ts +++ b/packages/sns/lib/sns/AbstractSnsService.ts @@ -71,9 +71,11 @@ export abstract class AbstractSnsService< const initResult = await initSns(this.snsClient, this.locatorConfig, this.creationConfig) this.topicArn = initResult.topicArn + this.isInitted = true } public override close(): Promise { + this.isInitted = false return Promise.resolve() } } diff --git a/packages/sns/lib/sns/SnsPublisherManager.spec.ts b/packages/sns/lib/sns/SnsPublisherManager.spec.ts index d37ecb01..a73873c4 100644 --- a/packages/sns/lib/sns/SnsPublisherManager.spec.ts +++ b/packages/sns/lib/sns/SnsPublisherManager.spec.ts @@ -15,7 +15,7 @@ import { CommonSnsPublisher } from './CommonSnsPublisherFactory' import type { SnsMessagePublishType, SnsPublisherManager } from './SnsPublisherManager' import { FakeConsumer } from './fakes/FakeConsumer' -describe('AutopilotPublisherManager', () => { +describe('SnsPublisherManager', () => { let diContainer: AwilixContainer let publisherManager: SnsPublisherManager< CommonSnsPublisher, @@ -80,7 +80,7 @@ describe('AutopilotPublisherManager', () => { await fakeConsumer.close() }) - it('publish to a non existing topic will throw error', async () => { + it('publish to a non-existing topic will throw error', async () => { await expect( publisherManager.publish('non-existing-topic', { type: 'entity.created', diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts index c0c2640e..74eb0cbb 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts @@ -26,7 +26,6 @@ export abstract class AbstractSqsPublisher implements AsyncPublisher { private readonly messageSchemaContainer: MessageSchemaContainer - private isInitted: boolean private initPromise?: Promise constructor( @@ -40,7 +39,6 @@ export abstract class AbstractSqsPublisher messageSchemas, messageTypeField: options.messageTypeField, }) - this.isInitted = false } async publish(message: MessagePayloadType, options: SQSMessageOptions = {}): Promise { diff --git a/packages/sqs/lib/sqs/AbstractSqsService.ts b/packages/sqs/lib/sqs/AbstractSqsService.ts index c00eccca..37bf1966 100644 --- a/packages/sqs/lib/sqs/AbstractSqsService.ts +++ b/packages/sqs/lib/sqs/AbstractSqsService.ts @@ -70,8 +70,11 @@ export abstract class AbstractSqsService< this.queueName = queueName this.queueUrl = queueUrl this.queueArn = queueArn + this.isInitted = true } - // eslint-disable-next-line @typescript-eslint/require-await - public override async close(): Promise {} + public override close(): Promise { + this.isInitted = false + return Promise.resolve() + } } diff --git a/packages/sqs/test/consumers/SqsPermisssionConsumer.deadLetterQueue.spec.ts b/packages/sqs/test/consumers/SqsPermisssionConsumer.deadLetterQueue.spec.ts index fac7b95a..dd4c3ece 100644 --- a/packages/sqs/test/consumers/SqsPermisssionConsumer.deadLetterQueue.spec.ts +++ b/packages/sqs/test/consumers/SqsPermisssionConsumer.deadLetterQueue.spec.ts @@ -4,7 +4,6 @@ import { waitAndRetry } from '@lokalise/node-core' import type { AwilixContainer } from 'awilix' import { Consumer } from 'sqs-consumer' import { afterEach, beforeEach, describe, expect, it } from 'vitest' -import z from 'zod' import type { SQSMessage } from '../../lib/types/MessageTypes' import { assertQueue, deleteQueue, getQueueAttributes } from '../../lib/utils/sqsUtils' From 35851f2349c9fefd1e89bc681229d1ee5524b54d Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 17 May 2024 16:25:33 +0300 Subject: [PATCH 2/5] Add comment --- packages/amqp/lib/AbstractAmqpPublisher.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index b252bfaa..fe9914c0 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -50,6 +50,11 @@ export abstract class AbstractAmqpPublisher if (!this.initPromise) { this.initPromise = this.init() } + + /** + * it is intentional that this promise is not awaited, that's how we keep the method invocation synchronous + * RabbitMQ publish by itself doesn't guarantee that your message is delivered successfully, so this kind of fire-and-forget is not strongly different from how amqp-lib behaves in the first place. + */ this.initPromise .then(() => { this.publish(message) From 286212c300669591e6a1c2e384f1cba2d6360a59 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 17 May 2024 16:27:42 +0300 Subject: [PATCH 3/5] revert date changes --- packages/core/lib/events/DomainEventEmitter.spec.ts | 2 +- packages/core/lib/events/baseEventSchemas.ts | 2 +- packages/core/lib/messages/MetadataFiller.ts | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/core/lib/events/DomainEventEmitter.spec.ts b/packages/core/lib/events/DomainEventEmitter.spec.ts index e2534205..b1fe7398 100644 --- a/packages/core/lib/events/DomainEventEmitter.spec.ts +++ b/packages/core/lib/events/DomainEventEmitter.spec.ts @@ -16,7 +16,7 @@ const createdEventPayload: CommonEventDefinitionSchemaType('').describe('event type name'), payload: z.optional(z.object({})).describe('event payload based on type'), }) diff --git a/packages/core/lib/messages/MetadataFiller.ts b/packages/core/lib/messages/MetadataFiller.ts index 2e10064a..efb2f255 100644 --- a/packages/core/lib/messages/MetadataFiller.ts +++ b/packages/core/lib/messages/MetadataFiller.ts @@ -6,7 +6,7 @@ import type { CommonEventDefinition } from '../events/eventTypes' import type { MessageMetadataType } from './baseMessageSchemas' export type IdGenerator = () => string -export type TimestampGenerator = () => Date +export type TimestampGenerator = () => string export type MetadataFillerOptions = { serviceId: string @@ -23,7 +23,7 @@ export type MetadataFiller< > = { produceMetadata(currentMessage: T, eventDefinition: D, precedingMessageMetadata?: M): M produceId(): string - produceTimestamp(): Date + produceTimestamp(): string } export class CommonMetadataFiller implements MetadataFiller { @@ -43,7 +43,7 @@ export class CommonMetadataFiller implements MetadataFiller { this.produceTimestamp = options.timestampGenerator ?? (() => { - return new Date() + return new Date().toISOString() }) } From 4096cd7344e79038032af64f05200a08dbab58ec Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 17 May 2024 18:01:03 +0300 Subject: [PATCH 4/5] Fix test --- packages/sns/lib/sns/AbstractSnsPublisher.ts | 1 + packages/sqs/lib/sqs/AbstractSqsPublisher.ts | 18 ++++++++++++++++-- .../publishers/SqsPermissionPublisher.spec.ts | 6 +----- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/packages/sns/lib/sns/AbstractSnsPublisher.ts b/packages/sns/lib/sns/AbstractSnsPublisher.ts index 9dc472c2..b0336c57 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisher.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisher.ts @@ -56,6 +56,7 @@ export abstract class AbstractSnsPublisher this.initPromise = this.init() } await this.initPromise + this.initPromise = undefined } try { diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts index 74eb0cbb..44db7800 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts @@ -1,6 +1,7 @@ import type { SendMessageCommandInput } from '@aws-sdk/client-sqs' import { SendMessageCommand } from '@aws-sdk/client-sqs' import type { Either } from '@lokalise/node-core' +import { InternalError } from '@lokalise/node-core' import type { AsyncPublisher, MessageInvalidFormatError, @@ -54,6 +55,7 @@ export abstract class AbstractSqsPublisher this.initPromise = this.init() } await this.initPromise + this.initPromise = undefined } try { @@ -85,8 +87,20 @@ export abstract class AbstractSqsPublisher await this.sqsClient.send(command) this.handleMessageProcessed(message, 'published') } catch (error) { - this.handleError(error) - throw error + const err = error as Error + this.handleError(err) + throw new InternalError({ + message: `Error while publishing to SQS: ${err.message}`, + errorCode: 'SQS_PUBLISH_ERROR', + details: { + publisher: this.constructor.name, + queueArn: this.queueArn, + queueName: this.queueName, + // @ts-ignore + messageType: message[this.messageTypeField] ?? 'unknown', + }, + cause: err, + }) } } diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts index 05d00942..6b544757 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts @@ -145,16 +145,12 @@ describe('SqsPermissionPublisher', () => { describe('publish', () => { let diContainer: AwilixContainer - let sqsClient: SQSClient let permissionPublisher: SqsPermissionPublisher beforeEach(async () => { diContainer = await registerDependencies() - sqsClient = diContainer.cradle.sqsClient await diContainer.cradle.permissionConsumer.close() permissionPublisher = diContainer.cradle.permissionPublisher - - await deleteQueue(sqsClient, SqsPermissionPublisher.QUEUE_NAME) }) afterEach(async () => { @@ -188,7 +184,7 @@ describe('SqsPermissionPublisher', () => { const spy = await permissionPublisher.handlerSpy.waitForMessageWithId('1', 'published') expect(spy.message).toEqual(message) expect(spy.processingResult).toBe('published') - }) + }, 99999999) it('publish a message auto-filling timestamp', async () => { const { permissionPublisher } = diContainer.cradle From 3790bdb1db3dcc2ebeb1166360ff540217aa8df4 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 17 May 2024 18:15:42 +0300 Subject: [PATCH 5/5] remove temp code --- packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts index 6b544757..fd24a3bc 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts @@ -184,7 +184,7 @@ describe('SqsPermissionPublisher', () => { const spy = await permissionPublisher.handlerSpy.waitForMessageWithId('1', 'published') expect(spy.message).toEqual(message) expect(spy.processingResult).toBe('published') - }, 99999999) + }) it('publish a message auto-filling timestamp', async () => { const { permissionPublisher } = diContainer.cradle