diff --git a/README.md b/README.md index fd5c50a6..96287183 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ Mono-schema publishers only support a single message type and are simpler to imp * `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema; * `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`. * `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`. -* `init()`, prepare publisher for use (e. g. establish all necessary connections); +* `init()`, prepare publisher for use (e. g. establish all necessary connections), it will be called automatically by `publish()` if not called before explicitly (lazy loading). * `close()`, stop publisher use (e. g. disconnect); * `publish()`, send a message to a queue or topic. It accepts the following parameters: * `message` – a message following a `zod` schema; @@ -42,6 +42,8 @@ Mono-schema publishers only support a single message type and are simpler to imp > **_NOTE:_** See [SqsPermissionPublisherMonoSchema.ts](./packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts) for a practical example. +> **_NOTE:_** Lazy loading is not supported for AMQP publishers. + #### Multi-schema publishers Multi-schema publishers support multiple messages types. They implement the following public methods: diff --git a/packages/sns/lib/sns/AbstractSnsPublisherMonoSchema.ts b/packages/sns/lib/sns/AbstractSnsPublisherMonoSchema.ts index 71085216..7e8acb3f 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisherMonoSchema.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisherMonoSchema.ts @@ -1,5 +1,3 @@ -import { PublishCommand } from '@aws-sdk/client-sns' -import type { PublishCommandInput } from '@aws-sdk/client-sns/dist-types/commands/PublishCommand' import type { Either } from '@lokalise/node-core' import type { AsyncPublisher, @@ -37,28 +35,8 @@ export abstract class AbstractSnsPublisherMonoSchema { - try { - this.messageSchema.parse(message) - - if (this.logMessages) { - // @ts-ignore - const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField]) - this.logMessage(resolvedLogMessage) - } - - const input = { - Message: JSON.stringify(message), - TopicArn: this.topicArn, - ...options, - } satisfies PublishCommandInput - const command = new PublishCommand(input) - await this.snsClient.send(command) - this.handleMessageProcessed(message, 'published') - } catch (error) { - this.handleError(error) - throw error - } + publish(message: MessagePayloadType, options: SNSMessageOptions = {}): Promise { + return this.internalPublish(message, this.messageSchema, options) } /* c8 ignore start */ diff --git a/packages/sns/lib/sns/AbstractSnsPublisherMultiSchema.ts b/packages/sns/lib/sns/AbstractSnsPublisherMultiSchema.ts index f55b7b31..f8ea2b3f 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisherMultiSchema.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisherMultiSchema.ts @@ -1,5 +1,3 @@ -import { PublishCommand } from '@aws-sdk/client-sns' -import type { PublishCommandInput } from '@aws-sdk/client-sns/dist-types/commands/PublishCommand' import type { Either } from '@lokalise/node-core' import type { AsyncPublisher, @@ -40,31 +38,12 @@ export abstract class AbstractSnsPublisherMultiSchema { - try { - const resolveSchemaResult = this.resolveSchema(message) - if (resolveSchemaResult.error) { - throw resolveSchemaResult.error - } - resolveSchemaResult.result.parse(message) - - if (this.logMessages) { - // @ts-ignore - const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField]) - this.logMessage(resolvedLogMessage) - } - - const input = { - Message: JSON.stringify(message), - TopicArn: this.topicArn, - ...options, - } satisfies PublishCommandInput - const command = new PublishCommand(input) - await this.snsClient.send(command) - this.handleMessageProcessed(message, 'published') - } catch (error) { - this.handleError(error) - throw error + const messageSchemaResult = this.resolveSchema(message) + if (messageSchemaResult.error) { + throw messageSchemaResult.error } + + return this.internalPublish(message, messageSchemaResult.result, options) } protected override resolveMessage(): Either< diff --git a/packages/sns/lib/sns/AbstractSnsService.ts b/packages/sns/lib/sns/AbstractSnsService.ts index a72d8cc6..70474e06 100644 --- a/packages/sns/lib/sns/AbstractSnsService.ts +++ b/packages/sns/lib/sns/AbstractSnsService.ts @@ -1,4 +1,6 @@ import type { SNSClient, CreateTopicCommandInput, Tag } from '@aws-sdk/client-sns' +import { PublishCommand } from '@aws-sdk/client-sns' +import type { PublishCommandInput } from '@aws-sdk/client-sns/dist-types/commands/PublishCommand' import type { QueueConsumerDependencies, QueueDependencies, @@ -8,10 +10,13 @@ import type { ExistingQueueOptionsMultiSchema, } from '@message-queue-toolkit/core' import { AbstractQueueService } from '@message-queue-toolkit/core' +import type { ZodSchema } from 'zod' import type { SNS_MESSAGE_BODY_TYPE } from '../types/MessageTypes' import { deleteSns, initSns } from '../utils/snsInitter' +import type { SNSMessageOptions } from './AbstractSnsPublisherMonoSchema' + export type SNSDependencies = QueueDependencies & { snsClient: SNSClient } @@ -97,4 +102,37 @@ export abstract class AbstractSnsService< // eslint-disable-next-line @typescript-eslint/require-await public override async close(): Promise {} + + protected async internalPublish( + message: MessagePayloadType, + messageSchema: ZodSchema, + options: SNSMessageOptions = {}, + ): Promise { + if (this.topicArn === undefined) { + // Lazy loading + await this.init() + } + + try { + messageSchema.parse(message) + + if (this.logMessages) { + // @ts-ignore + const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField]) + this.logMessage(resolvedLogMessage) + } + + const input = { + Message: JSON.stringify(message), + TopicArn: this.topicArn, + ...options, + } satisfies PublishCommandInput + const command = new PublishCommand(input) + await this.snsClient.send(command) + this.handleMessageProcessed(message, 'published') + } catch (error) { + this.handleError(error) + throw error + } + } } diff --git a/packages/sns/package.json b/packages/sns/package.json index 30fe9902..4e3bc4b8 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sns", - "version": "9.0.0", + "version": "9.1.0", "private": false, "license": "MIT", "description": "SNS adapter for message-queue-toolkit", diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index 78aa9577..5e27ab29 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -1,6 +1,6 @@ import type { SNSClient } from '@aws-sdk/client-sns' import type { SQSClient } from '@aws-sdk/client-sqs' -import { waitAndRetry } from '@message-queue-toolkit/core' +import { waitAndRetry } from '@lokalise/node-core' import type { SQSMessage } from '@message-queue-toolkit/sqs' import { assertQueue, deleteQueue, FakeConsumerErrorResolver } from '@message-queue-toolkit/sqs' import type { AwilixContainer } from 'awilix' @@ -167,5 +167,21 @@ describe('SNSPermissionPublisher', () => { consumer.stop() }) + + it('publish message with lazy loading', async () => { + const newPublisher = new SnsPermissionPublisherMonoSchema(diContainer.cradle) + + const message = { + id: '1', + userIds, + messageType: 'add', + permissions: perms, + } satisfies PERMISSIONS_MESSAGE_TYPE + + await newPublisher.publish(message) + + const res = await newPublisher.handlerSpy.waitForMessageWithId('1', 'published') + expect(res.message).toEqual(message) + }) }) }) diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisherMonoSchema.ts b/packages/sqs/lib/sqs/AbstractSqsPublisherMonoSchema.ts index 2b050e94..0a3b9644 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisherMonoSchema.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisherMonoSchema.ts @@ -1,5 +1,3 @@ -import type { SendMessageCommandInput } from '@aws-sdk/client-sqs' -import { SendMessageCommand } from '@aws-sdk/client-sqs' import type { Either } from '@lokalise/node-core' import type { AsyncPublisher, @@ -41,28 +39,7 @@ export abstract class AbstractSqsPublisherMonoSchema { - try { - this.messageSchema.parse(message) - - if (this.logMessages) { - // @ts-ignore - const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField]) - this.logMessage(resolvedLogMessage) - } - - const input = { - // SendMessageRequest - QueueUrl: this.queueUrl, - MessageBody: JSON.stringify(message), - ...options, - } satisfies SendMessageCommandInput - const command = new SendMessageCommand(input) - await this.sqsClient.send(command) - this.handleMessageProcessed(message, 'published') - } catch (error) { - this.handleError(error) - throw error - } + return this.internalPublish(message, this.messageSchema, options) } /* c8 ignore start */ diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisherMultiSchema.ts b/packages/sqs/lib/sqs/AbstractSqsPublisherMultiSchema.ts index a28c8546..a4de0e93 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisherMultiSchema.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisherMultiSchema.ts @@ -1,5 +1,3 @@ -import type { SendMessageCommandInput } from '@aws-sdk/client-sqs' -import { SendMessageCommand } from '@aws-sdk/client-sqs' import type { Either } from '@lokalise/node-core' import type { AsyncPublisher, @@ -44,32 +42,12 @@ export abstract class AbstractSqsPublisherMultiSchema { - try { - const resolveSchemaResult = this.resolveSchema(message) - if (resolveSchemaResult.error) { - throw resolveSchemaResult.error - } - resolveSchemaResult.result.parse(message) - - if (this.logMessages) { - // @ts-ignore - const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField]) - this.logMessage(resolvedLogMessage) - } - - const input = { - // SendMessageRequest - QueueUrl: this.queueUrl, - MessageBody: JSON.stringify(message), - ...options, - } satisfies SendMessageCommandInput - const command = new SendMessageCommand(input) - await this.sqsClient.send(command) - this.handleMessageProcessed(message, 'published') - } catch (error) { - this.handleError(error) - throw error + const messageSchemaResult = this.resolveSchema(message) + if (messageSchemaResult.error) { + throw messageSchemaResult.error } + + return this.internalPublish(message, messageSchemaResult.result, options) } /* c8 ignore start */ diff --git a/packages/sqs/lib/sqs/AbstractSqsService.ts b/packages/sqs/lib/sqs/AbstractSqsService.ts index 4c360061..5930956b 100644 --- a/packages/sqs/lib/sqs/AbstractSqsService.ts +++ b/packages/sqs/lib/sqs/AbstractSqsService.ts @@ -1,4 +1,5 @@ -import type { SQSClient, CreateQueueRequest } from '@aws-sdk/client-sqs' +import type { SQSClient, CreateQueueRequest, SendMessageCommandInput } from '@aws-sdk/client-sqs' +import { SendMessageCommand } from '@aws-sdk/client-sqs' import type { QueueConsumerDependencies, QueueDependencies, @@ -6,11 +7,13 @@ import type { ExistingQueueOptions, } from '@message-queue-toolkit/core' import { AbstractQueueService } from '@message-queue-toolkit/core' +import type { ZodSchema } from 'zod' import type { SQSMessage } from '../types/MessageTypes' import { deleteSqs, initSqs } from '../utils/sqsInitter' import type { SQSCreationConfig } from './AbstractSqsConsumer' +import type { SQSMessageOptions } from './AbstractSqsPublisherMonoSchema' export type SQSDependencies = QueueDependencies & { sqsClient: SQSClient @@ -71,6 +74,40 @@ export abstract class AbstractSqsService< this.queueName = queueName } + protected async internalPublish( + message: MessagePayloadType, + messageSchema: ZodSchema, + options: SQSMessageOptions = {}, + ): Promise { + if (!this.queueArn) { + // Lazy loading + await this.init() + } + + try { + messageSchema.parse(message) + + if (this.logMessages) { + // @ts-ignore + const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField]) + this.logMessage(resolvedLogMessage) + } + + const input = { + // SendMessageRequest + QueueUrl: this.queueUrl, + MessageBody: JSON.stringify(message), + ...options, + } satisfies SendMessageCommandInput + const command = new SendMessageCommand(input) + await this.sqsClient.send(command) + this.handleMessageProcessed(message, 'published') + } catch (error) { + this.handleError(error) + throw error + } + } + // eslint-disable-next-line @typescript-eslint/require-await public override async close(): Promise {} } diff --git a/packages/sqs/package.json b/packages/sqs/package.json index 78c0cccd..c054a757 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sqs", - "version": "9.0.1", + "version": "9.1.0", "private": false, "license": "MIT", "description": "SQS adapter for message-queue-toolkit", diff --git a/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts index f659f718..c0e1f005 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts @@ -16,7 +16,7 @@ import { userPermissionMap } from '../repositories/PermissionRepository' import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' -import type { SqsPermissionPublisherMonoSchema } from './SqsPermissionPublisherMonoSchema' +import { SqsPermissionPublisherMonoSchema } from './SqsPermissionPublisherMonoSchema' const perms: [string, ...string[]] = ['perm1', 'perm2'] const userIds = [100, 200, 300] @@ -116,5 +116,21 @@ describe('SqsPermissionPublisher', () => { consumer.stop() }) + + it('publish message with lazy loading', async () => { + const newPublisher = new SqsPermissionPublisherMonoSchema(diContainer.cradle) + + const message = { + id: '1', + userIds, + messageType: 'add', + permissions: perms, + } satisfies PERMISSIONS_MESSAGE_TYPE + + await newPublisher.publish(message) + + const res = await newPublisher.handlerSpy.waitForMessageWithId('1', 'published') + expect(res.message).toEqual(message) + }) }) })