Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy init for AMQP #142

Merged
merged 5 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ They implement the following public methods:

> **_NOTE:_** See [SqsPermissionPublisher.ts](./packages/sqs/test/publishers/SqsPermissionPublisher.ts) for a practical example.

> **_NOTE:_** Lazy loading is not supported for AMQP publishers.


### Consumers

`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocol.
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
rabbitmq:
image: rabbitmq:3.11.20
image: rabbitmq:3.12.14
ports:
- ${DOCKER_RABBITMQ_CLIENT_PORT:-5672}:5672
- ${DOCKER_RABBITMQ_MANAGEMENT_PORT:-15672}:15672
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
],
"max-lines": ["error", { "max": 600 }],
"max-params": ["error", { "max": 4 }],
"max-statements": ["error", { "max": 20 }],
"max-statements": ["error", { "max": 30 }],
"complexity": ["error", { "max": 20 }]
},
"overrides": [
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
rabbitmq:
image: rabbitmq:3.11.20
image: rabbitmq:3.12.14
ports:
- ${DOCKER_RABBITMQ_CLIENT_PORT:-5672}:5672
- ${DOCKER_RABBITMQ_MANAGEMENT_PORT:-15672}:15672
Expand Down
28 changes: 28 additions & 0 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>

private initPromise?: Promise<void>

constructor(dependencies: AMQPDependencies, options: AMQPPublisherOptions<MessagePayloadType>) {
super(dependencies, options)

Expand All @@ -42,6 +44,27 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
}
resolveSchemaResult.result.parse(message)

// If it's not initted yet, do the lazy init
if (!this.isInitted) {
// avoid multiple concurrent inits
if (!this.initPromise) {
this.initPromise = this.init()
}

/**
* it is intentional that this promise is not awaited, that's how we keep the method invocation synchronous
* RabbitMQ publish by itself doesn't guarantee that your message is delivered successfully, so this kind of fire-and-forget is not strongly different from how amqp-lib behaves in the first place.
*/
this.initPromise
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
.then(() => {
this.publish(message)
})
.catch((err) => {
this.handleError(err)
})
return
CarlosGamero marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* If the message doesn't have a timestamp field -> add it
* will be used on the consumer to prevent infinite retries on the same message
Expand Down Expand Up @@ -108,6 +131,11 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
throw new Error('Not implemented for publisher')
}

async close(): Promise<void> {
this.initPromise = undefined
await super.close()
}

override processMessage(): Promise<Either<'retryLater', 'success'>> {
throw new Error('Not implemented for publisher')
}
Expand Down
2 changes: 2 additions & 0 deletions packages/amqp/lib/AbstractAmqpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ export abstract class AbstractAmqpService<
if (this.connection) {
await this.receiveNewConnection(this.connection)
}
this.isInitted = true
}

public async reconnect() {
Expand All @@ -167,5 +168,6 @@ export abstract class AbstractAmqpService<
async close(): Promise<void> {
this.isShuttingDown = true
await this.destroyChannel()
this.isInitted = false
}
}
50 changes: 49 additions & 1 deletion packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ describe('PermissionPublisher', () => {
it('return details if publish failed', async () => {
expect.assertions(3)
try {
await permissionPublisher.close()
// @ts-ignore
permissionPublisher.channel = undefined
permissionPublisher.publish({
id: '11',
messageType: 'add',
Expand Down Expand Up @@ -243,6 +244,53 @@ describe('PermissionPublisher', () => {
})
})

it('publishes a message with lazy init', async () => {
await permissionConsumer.close()
await permissionPublisher.close()

const message = {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['100'],
timestamp: new Date(),
} satisfies PERMISSIONS_MESSAGE_TYPE

let receivedMessage: unknown
await channel.consume(AmqpPermissionPublisher.QUEUE_NAME, (message) => {
if (message === null) {
return
}
const decodedMessage = deserializeAmqpMessage(
message,
PERMISSIONS_MESSAGE_SCHEMA,
new FakeConsumerErrorResolver(),
)
receivedMessage = decodedMessage.result!
})

permissionPublisher.publish(message)

await waitAndRetry(() => !!receivedMessage)

expect(receivedMessage).toEqual({
parsedMessage: {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['100'],
timestamp: message.timestamp.toISOString(),
},
originalMessage: {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['100'],
timestamp: message.timestamp.toISOString(),
},
})
})

it('publishes a message auto-filling timestamp', async () => {
await permissionConsumer.close()

Expand Down
2 changes: 2 additions & 0 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export abstract class AbstractQueueService<
protected readonly locatorConfig?: QueueLocatorType
protected readonly deletionConfig?: DeletionConfig
protected readonly _handlerSpy?: HandlerSpy<MessagePayloadSchemas>
protected isInitted: boolean

get handlerSpy(): PublicHandlerSpy<MessagePayloadSchemas> {
if (!this._handlerSpy) {
Expand All @@ -75,6 +76,7 @@ export abstract class AbstractQueueService<

this.logMessages = options.logMessages ?? false
this._handlerSpy = resolveHandlerSpy<MessagePayloadSchemas>(options)
this.isInitted = false
}

protected abstract resolveSchema(
Expand Down
3 changes: 1 addition & 2 deletions packages/sns/lib/sns/AbstractSnsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>

private isInitted: boolean
private initPromise?: Promise<void>

constructor(dependencies: SNSDependencies, options: SNSPublisherOptions<MessagePayloadType>) {
Expand All @@ -42,7 +41,6 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
messageSchemas,
messageTypeField: options.messageTypeField,
})
this.isInitted = false
}

async publish(message: MessagePayloadType, options: SNSMessageOptions = {}): Promise<void> {
Expand All @@ -58,6 +56,7 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
this.initPromise = this.init()
}
await this.initPromise
this.initPromise = undefined
}

try {
Expand Down
2 changes: 2 additions & 0 deletions packages/sns/lib/sns/AbstractSnsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ export abstract class AbstractSnsService<

const initResult = await initSns(this.snsClient, this.locatorConfig, this.creationConfig)
this.topicArn = initResult.topicArn
this.isInitted = true
}

public override close(): Promise<void> {
this.isInitted = false
return Promise.resolve()
}
}
4 changes: 2 additions & 2 deletions packages/sns/lib/sns/SnsPublisherManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { CommonSnsPublisher } from './CommonSnsPublisherFactory'
import type { SnsMessagePublishType, SnsPublisherManager } from './SnsPublisherManager'
import { FakeConsumer } from './fakes/FakeConsumer'

describe('AutopilotPublisherManager', () => {
describe('SnsPublisherManager', () => {
let diContainer: AwilixContainer<Dependencies>
let publisherManager: SnsPublisherManager<
CommonSnsPublisher<TestEventPayloadsType>,
Expand Down Expand Up @@ -80,7 +80,7 @@ describe('AutopilotPublisherManager', () => {
await fakeConsumer.close()
})

it('publish to a non existing topic will throw error', async () => {
it('publish to a non-existing topic will throw error', async () => {
await expect(
publisherManager.publish('non-existing-topic', {
type: 'entity.created',
Expand Down
20 changes: 16 additions & 4 deletions packages/sqs/lib/sqs/AbstractSqsPublisher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { SendMessageCommandInput } from '@aws-sdk/client-sqs'
import { SendMessageCommand } from '@aws-sdk/client-sqs'
import type { Either } from '@lokalise/node-core'
import { InternalError } from '@lokalise/node-core'
import type {
AsyncPublisher,
MessageInvalidFormatError,
Expand All @@ -26,7 +27,6 @@ export abstract class AbstractSqsPublisher<MessagePayloadType extends object>
implements AsyncPublisher<MessagePayloadType, SQSMessageOptions>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
private isInitted: boolean
private initPromise?: Promise<void>

constructor(
Expand All @@ -40,7 +40,6 @@ export abstract class AbstractSqsPublisher<MessagePayloadType extends object>
messageSchemas,
messageTypeField: options.messageTypeField,
})
this.isInitted = false
}

async publish(message: MessagePayloadType, options: SQSMessageOptions = {}): Promise<void> {
Expand All @@ -56,6 +55,7 @@ export abstract class AbstractSqsPublisher<MessagePayloadType extends object>
this.initPromise = this.init()
}
await this.initPromise
this.initPromise = undefined
}

try {
Expand Down Expand Up @@ -87,8 +87,20 @@ export abstract class AbstractSqsPublisher<MessagePayloadType extends object>
await this.sqsClient.send(command)
this.handleMessageProcessed(message, 'published')
} catch (error) {
this.handleError(error)
throw error
const err = error as Error
this.handleError(err)
throw new InternalError({
message: `Error while publishing to SQS: ${err.message}`,
errorCode: 'SQS_PUBLISH_ERROR',
details: {
publisher: this.constructor.name,
queueArn: this.queueArn,
queueName: this.queueName,
// @ts-ignore
messageType: message[this.messageTypeField] ?? 'unknown',
},
cause: err,
})
}
}

Expand Down
7 changes: 5 additions & 2 deletions packages/sqs/lib/sqs/AbstractSqsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ export abstract class AbstractSqsService<
this.queueName = queueName
this.queueUrl = queueUrl
this.queueArn = queueArn
this.isInitted = true
}

// eslint-disable-next-line @typescript-eslint/require-await
public override async close(): Promise<void> {}
public override close(): Promise<void> {
this.isInitted = false
return Promise.resolve()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { waitAndRetry } from '@lokalise/node-core'
import type { AwilixContainer } from 'awilix'
import { Consumer } from 'sqs-consumer'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import z from 'zod'

import type { SQSMessage } from '../../lib/types/MessageTypes'
import { assertQueue, deleteQueue, getQueueAttributes } from '../../lib/utils/sqsUtils'
Expand Down
4 changes: 0 additions & 4 deletions packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,12 @@ describe('SqsPermissionPublisher', () => {

describe('publish', () => {
let diContainer: AwilixContainer<Dependencies>
let sqsClient: SQSClient
let permissionPublisher: SqsPermissionPublisher

beforeEach(async () => {
diContainer = await registerDependencies()
sqsClient = diContainer.cradle.sqsClient
await diContainer.cradle.permissionConsumer.close()
permissionPublisher = diContainer.cradle.permissionPublisher

await deleteQueue(sqsClient, SqsPermissionPublisher.QUEUE_NAME)
})

afterEach(async () => {
Expand Down
Loading