Skip to content

Commit

Permalink
Publishers lazy loading (#90)
Browse files Browse the repository at this point in the history
* SNS publisher lazy loading

* Minor change

* Adding SNS lazy loading test

* Trying to fix install

* Resolving TODOs

* Minor name adjust

* SQS publisher lazy loading

* doc update

* version update

* AMQP note on readme

* Removing unused imports
  • Loading branch information
CarlosGamero authored Feb 23, 2024
1 parent e8564c7 commit 006c67c
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 107 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ Mono-schema publishers only support a single message type and are simpler to imp
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema;
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
* `init()`, prepare publisher for use (e. g. establish all necessary connections);
* `init()`, prepare publisher for use (e. g. establish all necessary connections), it will be called automatically by `publish()` if not called before explicitly (lazy loading).
* `close()`, stop publisher use (e. g. disconnect);
* `publish()`, send a message to a queue or topic. It accepts the following parameters:
* `message` – a message following a `zod` schema;
* `options` – a protocol-dependent set of message parameters. For more information please check documentation for options for each protocol: [AMQP](https://amqp-node.github.io/amqplib/channel_api.html#channel_sendToQueue), [SQS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/interfaces/sendmessagecommandinput.html) and [SNS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sns/interfaces/publishcommandinput.html).

> **_NOTE:_** See [SqsPermissionPublisherMonoSchema.ts](./packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts) for a practical example.
> **_NOTE:_** Lazy loading is not supported for AMQP publishers.
#### Multi-schema publishers

Multi-schema publishers support multiple messages types. They implement the following public methods:
Expand Down
26 changes: 2 additions & 24 deletions packages/sns/lib/sns/AbstractSnsPublisherMonoSchema.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { PublishCommand } from '@aws-sdk/client-sns'
import type { PublishCommandInput } from '@aws-sdk/client-sns/dist-types/commands/PublishCommand'
import type { Either } from '@lokalise/node-core'
import type {
AsyncPublisher,
Expand Down Expand Up @@ -37,28 +35,8 @@ export abstract class AbstractSnsPublisherMonoSchema<MessagePayloadType extends
}
}

async publish(message: MessagePayloadType, options: SNSMessageOptions = {}): Promise<void> {
try {
this.messageSchema.parse(message)

if (this.logMessages) {
// @ts-ignore
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
this.logMessage(resolvedLogMessage)
}

const input = {
Message: JSON.stringify(message),
TopicArn: this.topicArn,
...options,
} satisfies PublishCommandInput
const command = new PublishCommand(input)
await this.snsClient.send(command)
this.handleMessageProcessed(message, 'published')
} catch (error) {
this.handleError(error)
throw error
}
publish(message: MessagePayloadType, options: SNSMessageOptions = {}): Promise<void> {
return this.internalPublish(message, this.messageSchema, options)
}

/* c8 ignore start */
Expand Down
31 changes: 5 additions & 26 deletions packages/sns/lib/sns/AbstractSnsPublisherMultiSchema.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { PublishCommand } from '@aws-sdk/client-sns'
import type { PublishCommandInput } from '@aws-sdk/client-sns/dist-types/commands/PublishCommand'
import type { Either } from '@lokalise/node-core'
import type {
AsyncPublisher,
Expand Down Expand Up @@ -40,31 +38,12 @@ export abstract class AbstractSnsPublisherMultiSchema<MessagePayloadType extends
}

async publish(message: MessagePayloadType, options: SNSMessageOptions = {}): Promise<void> {
try {
const resolveSchemaResult = this.resolveSchema(message)
if (resolveSchemaResult.error) {
throw resolveSchemaResult.error
}
resolveSchemaResult.result.parse(message)

if (this.logMessages) {
// @ts-ignore
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
this.logMessage(resolvedLogMessage)
}

const input = {
Message: JSON.stringify(message),
TopicArn: this.topicArn,
...options,
} satisfies PublishCommandInput
const command = new PublishCommand(input)
await this.snsClient.send(command)
this.handleMessageProcessed(message, 'published')
} catch (error) {
this.handleError(error)
throw error
const messageSchemaResult = this.resolveSchema(message)
if (messageSchemaResult.error) {
throw messageSchemaResult.error
}

return this.internalPublish(message, messageSchemaResult.result, options)
}

protected override resolveMessage(): Either<
Expand Down
38 changes: 38 additions & 0 deletions packages/sns/lib/sns/AbstractSnsService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type { SNSClient, CreateTopicCommandInput, Tag } from '@aws-sdk/client-sns'
import { PublishCommand } from '@aws-sdk/client-sns'
import type { PublishCommandInput } from '@aws-sdk/client-sns/dist-types/commands/PublishCommand'
import type {
QueueConsumerDependencies,
QueueDependencies,
Expand All @@ -8,10 +10,13 @@ import type {
ExistingQueueOptionsMultiSchema,
} from '@message-queue-toolkit/core'
import { AbstractQueueService } from '@message-queue-toolkit/core'
import type { ZodSchema } from 'zod'

import type { SNS_MESSAGE_BODY_TYPE } from '../types/MessageTypes'
import { deleteSns, initSns } from '../utils/snsInitter'

import type { SNSMessageOptions } from './AbstractSnsPublisherMonoSchema'

export type SNSDependencies = QueueDependencies & {
snsClient: SNSClient
}
Expand Down Expand Up @@ -97,4 +102,37 @@ export abstract class AbstractSnsService<

// eslint-disable-next-line @typescript-eslint/require-await
public override async close(): Promise<void> {}

protected async internalPublish(
message: MessagePayloadType,
messageSchema: ZodSchema<MessagePayloadType>,
options: SNSMessageOptions = {},
): Promise<void> {
if (this.topicArn === undefined) {
// Lazy loading
await this.init()
}

try {
messageSchema.parse(message)

if (this.logMessages) {
// @ts-ignore
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
this.logMessage(resolvedLogMessage)
}

const input = {
Message: JSON.stringify(message),
TopicArn: this.topicArn,
...options,
} satisfies PublishCommandInput
const command = new PublishCommand(input)
await this.snsClient.send(command)
this.handleMessageProcessed(message, 'published')
} catch (error) {
this.handleError(error)
throw error
}
}
}
2 changes: 1 addition & 1 deletion packages/sns/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sns",
"version": "9.0.0",
"version": "9.1.0",
"private": false,
"license": "MIT",
"description": "SNS adapter for message-queue-toolkit",
Expand Down
18 changes: 17 additions & 1 deletion packages/sns/test/publishers/SnsPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { SNSClient } from '@aws-sdk/client-sns'
import type { SQSClient } from '@aws-sdk/client-sqs'
import { waitAndRetry } from '@message-queue-toolkit/core'
import { waitAndRetry } from '@lokalise/node-core'
import type { SQSMessage } from '@message-queue-toolkit/sqs'
import { assertQueue, deleteQueue, FakeConsumerErrorResolver } from '@message-queue-toolkit/sqs'
import type { AwilixContainer } from 'awilix'
Expand Down Expand Up @@ -167,5 +167,21 @@ describe('SNSPermissionPublisher', () => {

consumer.stop()
})

it('publish message with lazy loading', async () => {
const newPublisher = new SnsPermissionPublisherMonoSchema(diContainer.cradle)

const message = {
id: '1',
userIds,
messageType: 'add',
permissions: perms,
} satisfies PERMISSIONS_MESSAGE_TYPE

await newPublisher.publish(message)

const res = await newPublisher.handlerSpy.waitForMessageWithId('1', 'published')
expect(res.message).toEqual(message)
})
})
})
25 changes: 1 addition & 24 deletions packages/sqs/lib/sqs/AbstractSqsPublisherMonoSchema.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import type { SendMessageCommandInput } from '@aws-sdk/client-sqs'
import { SendMessageCommand } from '@aws-sdk/client-sqs'
import type { Either } from '@lokalise/node-core'
import type {
AsyncPublisher,
Expand Down Expand Up @@ -41,28 +39,7 @@ export abstract class AbstractSqsPublisherMonoSchema<MessagePayloadType extends
}

async publish(message: MessagePayloadType, options: SQSMessageOptions = {}): Promise<void> {
try {
this.messageSchema.parse(message)

if (this.logMessages) {
// @ts-ignore
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
this.logMessage(resolvedLogMessage)
}

const input = {
// SendMessageRequest
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(message),
...options,
} satisfies SendMessageCommandInput
const command = new SendMessageCommand(input)
await this.sqsClient.send(command)
this.handleMessageProcessed(message, 'published')
} catch (error) {
this.handleError(error)
throw error
}
return this.internalPublish(message, this.messageSchema, options)
}

/* c8 ignore start */
Expand Down
32 changes: 5 additions & 27 deletions packages/sqs/lib/sqs/AbstractSqsPublisherMultiSchema.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import type { SendMessageCommandInput } from '@aws-sdk/client-sqs'
import { SendMessageCommand } from '@aws-sdk/client-sqs'
import type { Either } from '@lokalise/node-core'
import type {
AsyncPublisher,
Expand Down Expand Up @@ -44,32 +42,12 @@ export abstract class AbstractSqsPublisherMultiSchema<MessagePayloadType extends
}

async publish(message: MessagePayloadType, options: SQSMessageOptions = {}): Promise<void> {
try {
const resolveSchemaResult = this.resolveSchema(message)
if (resolveSchemaResult.error) {
throw resolveSchemaResult.error
}
resolveSchemaResult.result.parse(message)

if (this.logMessages) {
// @ts-ignore
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
this.logMessage(resolvedLogMessage)
}

const input = {
// SendMessageRequest
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(message),
...options,
} satisfies SendMessageCommandInput
const command = new SendMessageCommand(input)
await this.sqsClient.send(command)
this.handleMessageProcessed(message, 'published')
} catch (error) {
this.handleError(error)
throw error
const messageSchemaResult = this.resolveSchema(message)
if (messageSchemaResult.error) {
throw messageSchemaResult.error
}

return this.internalPublish(message, messageSchemaResult.result, options)
}

/* c8 ignore start */
Expand Down
39 changes: 38 additions & 1 deletion packages/sqs/lib/sqs/AbstractSqsService.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import type { SQSClient, CreateQueueRequest } from '@aws-sdk/client-sqs'
import type { SQSClient, CreateQueueRequest, SendMessageCommandInput } from '@aws-sdk/client-sqs'
import { SendMessageCommand } from '@aws-sdk/client-sqs'
import type {
QueueConsumerDependencies,
QueueDependencies,
NewQueueOptions,
ExistingQueueOptions,
} from '@message-queue-toolkit/core'
import { AbstractQueueService } from '@message-queue-toolkit/core'
import type { ZodSchema } from 'zod'

import type { SQSMessage } from '../types/MessageTypes'
import { deleteSqs, initSqs } from '../utils/sqsInitter'

import type { SQSCreationConfig } from './AbstractSqsConsumer'
import type { SQSMessageOptions } from './AbstractSqsPublisherMonoSchema'

export type SQSDependencies = QueueDependencies & {
sqsClient: SQSClient
Expand Down Expand Up @@ -71,6 +74,40 @@ export abstract class AbstractSqsService<
this.queueName = queueName
}

protected async internalPublish(
message: MessagePayloadType,
messageSchema: ZodSchema<MessagePayloadType>,
options: SQSMessageOptions = {},
): Promise<void> {
if (!this.queueArn) {
// Lazy loading
await this.init()
}

try {
messageSchema.parse(message)

if (this.logMessages) {
// @ts-ignore
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
this.logMessage(resolvedLogMessage)
}

const input = {
// SendMessageRequest
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(message),
...options,
} satisfies SendMessageCommandInput
const command = new SendMessageCommand(input)
await this.sqsClient.send(command)
this.handleMessageProcessed(message, 'published')
} catch (error) {
this.handleError(error)
throw error
}
}

// eslint-disable-next-line @typescript-eslint/require-await
public override async close(): Promise<void> {}
}
2 changes: 1 addition & 1 deletion packages/sqs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sqs",
"version": "9.0.1",
"version": "9.1.0",
"private": false,
"license": "MIT",
"description": "SQS adapter for message-queue-toolkit",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { userPermissionMap } from '../repositories/PermissionRepository'
import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext'
import type { Dependencies } from '../utils/testContext'

import type { SqsPermissionPublisherMonoSchema } from './SqsPermissionPublisherMonoSchema'
import { SqsPermissionPublisherMonoSchema } from './SqsPermissionPublisherMonoSchema'

const perms: [string, ...string[]] = ['perm1', 'perm2']
const userIds = [100, 200, 300]
Expand Down Expand Up @@ -116,5 +116,21 @@ describe('SqsPermissionPublisher', () => {

consumer.stop()
})

it('publish message with lazy loading', async () => {
const newPublisher = new SqsPermissionPublisherMonoSchema(diContainer.cradle)

const message = {
id: '1',
userIds,
messageType: 'add',
permissions: perms,
} satisfies PERMISSIONS_MESSAGE_TYPE

await newPublisher.publish(message)

const res = await newPublisher.handlerSpy.waitForMessageWithId('1', 'published')
expect(res.message).toEqual(message)
})
})
})

0 comments on commit 006c67c

Please sign in to comment.