Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy init for AMQP #142

Merged
merged 5 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ They implement the following public methods:

> **_NOTE:_** See [SqsPermissionPublisher.ts](./packages/sqs/test/publishers/SqsPermissionPublisher.ts) for a practical example.

> **_NOTE:_** Lazy loading is not supported for AMQP publishers.


### Consumers

`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocol.
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
rabbitmq:
image: rabbitmq:3.11.20
image: rabbitmq:3.12.14
ports:
- ${DOCKER_RABBITMQ_CLIENT_PORT:-5672}:5672
- ${DOCKER_RABBITMQ_MANAGEMENT_PORT:-15672}:15672
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
],
"max-lines": ["error", { "max": 600 }],
"max-params": ["error", { "max": 4 }],
"max-statements": ["error", { "max": 20 }],
"max-statements": ["error", { "max": 30 }],
"complexity": ["error", { "max": 20 }]
},
"overrides": [
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
rabbitmq:
image: rabbitmq:3.11.20
image: rabbitmq:3.12.14
ports:
- ${DOCKER_RABBITMQ_CLIENT_PORT:-5672}:5672
- ${DOCKER_RABBITMQ_MANAGEMENT_PORT:-15672}:15672
Expand Down
23 changes: 23 additions & 0 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>

private initPromise?: Promise<void>

constructor(dependencies: AMQPDependencies, options: AMQPPublisherOptions<MessagePayloadType>) {
super(dependencies, options)

Expand All @@ -42,6 +44,22 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
}
resolveSchemaResult.result.parse(message)

// If it's not initted yet, do the lazy init
if (!this.isInitted) {
// avoid multiple concurrent inits
if (!this.initPromise) {
this.initPromise = this.init()
}
this.initPromise
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
.then(() => {
this.publish(message)
})
.catch((err) => {
this.handleError(err)
})
return
CarlosGamero marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* If the message doesn't have a timestamp field -> add it
* will be used on the consumer to prevent infinite retries on the same message
Expand Down Expand Up @@ -108,6 +126,11 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
throw new Error('Not implemented for publisher')
}

async close(): Promise<void> {
this.initPromise = undefined
await super.close()
}

override processMessage(): Promise<Either<'retryLater', 'success'>> {
throw new Error('Not implemented for publisher')
}
Expand Down
2 changes: 2 additions & 0 deletions packages/amqp/lib/AbstractAmqpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ export abstract class AbstractAmqpService<
if (this.connection) {
await this.receiveNewConnection(this.connection)
}
this.isInitted = true
}

public async reconnect() {
Expand All @@ -167,5 +168,6 @@ export abstract class AbstractAmqpService<
async close(): Promise<void> {
this.isShuttingDown = true
await this.destroyChannel()
this.isInitted = false
}
}
50 changes: 49 additions & 1 deletion packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ describe('PermissionPublisher', () => {
it('return details if publish failed', async () => {
expect.assertions(3)
try {
await permissionPublisher.close()
// @ts-ignore
permissionPublisher.channel = undefined
permissionPublisher.publish({
id: '11',
messageType: 'add',
Expand Down Expand Up @@ -243,6 +244,53 @@ describe('PermissionPublisher', () => {
})
})

it('publishes a message with lazy init', async () => {
await permissionConsumer.close()
await permissionPublisher.close()

const message = {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['100'],
timestamp: new Date(),
} satisfies PERMISSIONS_MESSAGE_TYPE

let receivedMessage: unknown
await channel.consume(AmqpPermissionPublisher.QUEUE_NAME, (message) => {
if (message === null) {
return
}
const decodedMessage = deserializeAmqpMessage(
message,
PERMISSIONS_MESSAGE_SCHEMA,
new FakeConsumerErrorResolver(),
)
receivedMessage = decodedMessage.result!
})

permissionPublisher.publish(message)

await waitAndRetry(() => !!receivedMessage)

expect(receivedMessage).toEqual({
parsedMessage: {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['100'],
timestamp: message.timestamp.toISOString(),
},
originalMessage: {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['100'],
timestamp: message.timestamp.toISOString(),
},
})
})

it('publishes a message auto-filling timestamp', async () => {
await permissionConsumer.close()

Expand Down
2 changes: 1 addition & 1 deletion packages/core/lib/events/DomainEventEmitter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const createdEventPayload: CommonEventDefinitionSchemaType<typeof TestEvents.cre
},
type: 'entity.created',
id: randomUUID(),
timestamp: new Date().toISOString(),
timestamp: new Date(),
metadata: {
originatedFrom: 'service',
producedBy: 'producer',
Expand Down
2 changes: 1 addition & 1 deletion packages/core/lib/events/baseEventSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { z } from 'zod'
// Core fields that describe either internal event or external message
export const BASE_EVENT_SCHEMA = z.object({
id: z.string().describe('event unique identifier'),
timestamp: z.string().datetime().describe('iso 8601 datetime'),
timestamp: z.date().describe('event creation date'),
type: z.literal<string>('<replace.me>').describe('event type name'),
payload: z.optional(z.object({})).describe('event payload based on type'),
})
Expand Down
6 changes: 3 additions & 3 deletions packages/core/lib/messages/MetadataFiller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type { CommonEventDefinition } from '../events/eventTypes'
import type { MessageMetadataType } from './baseMessageSchemas'

export type IdGenerator = () => string
export type TimestampGenerator = () => string
export type TimestampGenerator = () => Date

export type MetadataFillerOptions = {
serviceId: string
Expand All @@ -23,7 +23,7 @@ export type MetadataFiller<
> = {
produceMetadata(currentMessage: T, eventDefinition: D, precedingMessageMetadata?: M): M
produceId(): string
produceTimestamp(): string
produceTimestamp(): Date
}

export class CommonMetadataFiller implements MetadataFiller {
Expand All @@ -43,7 +43,7 @@ export class CommonMetadataFiller implements MetadataFiller {
this.produceTimestamp =
options.timestampGenerator ??
(() => {
return new Date().toISOString()
return new Date()
})
}

Expand Down
2 changes: 2 additions & 0 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export abstract class AbstractQueueService<
protected readonly locatorConfig?: QueueLocatorType
protected readonly deletionConfig?: DeletionConfig
protected readonly _handlerSpy?: HandlerSpy<MessagePayloadSchemas>
protected isInitted: boolean

get handlerSpy(): PublicHandlerSpy<MessagePayloadSchemas> {
if (!this._handlerSpy) {
Expand All @@ -75,6 +76,7 @@ export abstract class AbstractQueueService<

this.logMessages = options.logMessages ?? false
this._handlerSpy = resolveHandlerSpy<MessagePayloadSchemas>(options)
this.isInitted = false
}

protected abstract resolveSchema(
Expand Down
2 changes: 0 additions & 2 deletions packages/sns/lib/sns/AbstractSnsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>

private isInitted: boolean
private initPromise?: Promise<void>

constructor(dependencies: SNSDependencies, options: SNSPublisherOptions<MessagePayloadType>) {
Expand All @@ -42,7 +41,6 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
messageSchemas,
messageTypeField: options.messageTypeField,
})
this.isInitted = false
}

async publish(message: MessagePayloadType, options: SNSMessageOptions = {}): Promise<void> {
Expand Down
2 changes: 2 additions & 0 deletions packages/sns/lib/sns/AbstractSnsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ export abstract class AbstractSnsService<

const initResult = await initSns(this.snsClient, this.locatorConfig, this.creationConfig)
this.topicArn = initResult.topicArn
this.isInitted = true
}

public override close(): Promise<void> {
this.isInitted = false
return Promise.resolve()
}
}
4 changes: 2 additions & 2 deletions packages/sns/lib/sns/SnsPublisherManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { CommonSnsPublisher } from './CommonSnsPublisherFactory'
import type { SnsMessagePublishType, SnsPublisherManager } from './SnsPublisherManager'
import { FakeConsumer } from './fakes/FakeConsumer'

describe('AutopilotPublisherManager', () => {
describe('SnsPublisherManager', () => {
let diContainer: AwilixContainer<Dependencies>
let publisherManager: SnsPublisherManager<
CommonSnsPublisher<TestEventPayloadsType>,
Expand Down Expand Up @@ -80,7 +80,7 @@ describe('AutopilotPublisherManager', () => {
await fakeConsumer.close()
})

it('publish to a non existing topic will throw error', async () => {
it('publish to a non-existing topic will throw error', async () => {
await expect(
publisherManager.publish('non-existing-topic', {
type: 'entity.created',
Expand Down
2 changes: 0 additions & 2 deletions packages/sqs/lib/sqs/AbstractSqsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ export abstract class AbstractSqsPublisher<MessagePayloadType extends object>
implements AsyncPublisher<MessagePayloadType, SQSMessageOptions>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
private isInitted: boolean
private initPromise?: Promise<void>

constructor(
Expand All @@ -40,7 +39,6 @@ export abstract class AbstractSqsPublisher<MessagePayloadType extends object>
messageSchemas,
messageTypeField: options.messageTypeField,
})
this.isInitted = false
}

async publish(message: MessagePayloadType, options: SQSMessageOptions = {}): Promise<void> {
Expand Down
7 changes: 5 additions & 2 deletions packages/sqs/lib/sqs/AbstractSqsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ export abstract class AbstractSqsService<
this.queueName = queueName
this.queueUrl = queueUrl
this.queueArn = queueArn
this.isInitted = true
}

// eslint-disable-next-line @typescript-eslint/require-await
public override async close(): Promise<void> {}
public override close(): Promise<void> {
this.isInitted = false
return Promise.resolve()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { waitAndRetry } from '@lokalise/node-core'
import type { AwilixContainer } from 'awilix'
import { Consumer } from 'sqs-consumer'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import z from 'zod'

import type { SQSMessage } from '../../lib/types/MessageTypes'
import { assertQueue, deleteQueue, getQueueAttributes } from '../../lib/utils/sqsUtils'
Expand Down
Loading