Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for passing full event definitions #158

Merged
merged 3 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions packages/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

The library provides support for both direct exchanges and topic exchanges.

> **_NOTE:_** Check [README.md](../../README.md) for transport-agnostic library documentation.
>
> **_NOTE:_** Check [README.md](../../README.md) for transport-agnostic library documentation.

## Publishers

Expand All @@ -25,42 +24,44 @@ Example:

```ts
export const TEST_AMQP_CONFIG: AmqpConfig = {
vhost: '',
hostname: 'localhost',
username: 'guest',
password: 'guest',
port: 5672,
useTls: false,
vhost: '',
hostname: 'localhost',
username: 'guest',
password: 'guest',
port: 5672,
useTls: false,
}

const amqpConnectionManager = new AmqpConnectionManager(config, logger)
await amqpConnectionManager.init()

const publisher = new TestAmqpPublisher(
{ amqpConnectionManager },
{
// other amqp options
})
{ amqpConnectionManager },
{
// other amqp options
},
)
await publisher.init()

const consumer = new TestAmqpConsumer(
{ amqpConnectionManager },
{
// other amqp options
})
{ amqpConnectionManager },
{
// other amqp options
},
)
await consumer.start()

// break connection, to simulate unexpected disconnection in production
await (await amqpConnectionManager.getConnection()).close()

const message = {
// some test message
const message = {
// some test message
}

// This will fail, but will trigger reconnection within amqpConnectionManager
publisher.publish(message)
// eventually connection is reestablished and propagated across all the AMQP services that use same amqpConnectionManager

// eventually connection is reestablished and propagated across all the AMQP services that use same amqpConnectionManager

// This will succeed and consumer, which also received new connection, will be able to consume it
publisher.publish(message)
Expand Down
13 changes: 3 additions & 10 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@ import type {
QueueConsumer,
QueueConsumerOptions,
TransactionObservabilityManager,
} from '@message-queue-toolkit/core'
import {
isMessageError,
parseMessage,
HandlerContainer,
MessageSchemaContainer,
} from '@message-queue-toolkit/core'
import { isMessageError, parseMessage, HandlerContainer } from '@message-queue-toolkit/core'
import type { Connection, Message } from 'amqplib'

import type {
Expand Down Expand Up @@ -100,11 +96,8 @@ export abstract class AbstractAmqpConsumer<
? options.locatorConfig.queueName
: options.creationConfig!.queueName

const messageSchemas = options.handlers.map((entry) => entry.schema)
this.messageSchemaContainer = new MessageSchemaContainer<MessagePayloadType>({
messageSchemas,
messageTypeField: options.messageTypeField,
})
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
this.messageSchemaContainer = this.resolveConsumerMessageSchemaContainer(options)

this.handlerContainer = new HandlerContainer<
MessagePayloadType,
ExecutionContext,
Expand Down
9 changes: 3 additions & 6 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import type {
BarrierResult,
CommonCreationConfigType,
MessageInvalidFormatError,
MessageSchemaContainer,
MessageValidationError,
QueuePublisherOptions,
SyncPublisher,
} from '@message-queue-toolkit/core'
import { objectToBuffer, MessageSchemaContainer } from '@message-queue-toolkit/core'
import { objectToBuffer } from '@message-queue-toolkit/core'
import type { ZodSchema } from 'zod'

import type { AMQPDependencies } from './AbstractAmqpService'
Expand Down Expand Up @@ -49,11 +50,7 @@ export abstract class AbstractAmqpPublisher<
) {
super(dependencies, options)

const messageSchemas = options.messageSchemas
this.messageSchemaContainer = new MessageSchemaContainer<MessagePayloadType>({
messageSchemas,
messageTypeField: options.messageTypeField,
})
this.messageSchemaContainer = this.resolvePublisherMessageSchemaContainer(options)
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
this.exchange = options.exchange
}

Expand Down
46 changes: 38 additions & 8 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { types } from 'node:util'

import type { ErrorReporter, ErrorResolver, Either } from '@lokalise/node-core'
import { resolveGlobalErrorLogObject } from '@lokalise/node-core'
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import type { ZodSchema, ZodType } from 'zod'

import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors'
Expand All @@ -13,12 +14,14 @@ import { toDatePreprocessor } from '../utils/toDateProcessor'
import type {
BarrierCallback,
BarrierResult,
MessageHandlerConfig,
Prehandler,
PrehandlerResult,
PreHandlingOutputs,
} from './HandlerContainer'
import type { HandlerSpy, PublicHandlerSpy } from './HandlerSpy'
import { resolveHandlerSpy } from './HandlerSpy'
import { MessageSchemaContainer } from './MessageSchemaContainer'

export type Deserializer<MessagePayloadType extends object> = (
message: unknown,
Expand Down Expand Up @@ -90,6 +93,37 @@ export abstract class AbstractQueueService<
this.isInitted = false
}

protected resolveConsumerMessageSchemaContainer(options: {
handlers: MessageHandlerConfig<MessagePayloadSchemas, ExecutionContext, PrehandlerOutput>[]
messageTypeField: string
}) {
const messageSchemas = options.handlers.map((entry) => entry.schema)
// @ts-expect-error This should no longer be necessary in upcoming TypeScript updates, filter will narrow down the type
const messageDefinitions: CommonEventDefinition[] = options.handlers
.map((entry) => entry.definition)
.filter((entry) => entry !== undefined)

return new MessageSchemaContainer<MessagePayloadSchemas>({
messageSchemas,
messageDefinitions,
messageTypeField: options.messageTypeField,
})
}

protected resolvePublisherMessageSchemaContainer(options: {
messageSchemas: readonly ZodSchema<MessagePayloadSchemas>[]
messageTypeField: string
}) {
const messageSchemas = options.messageSchemas
const messageDefinitions: readonly CommonEventDefinition[] = []

return new MessageSchemaContainer<MessagePayloadSchemas>({
messageSchemas,
messageDefinitions,
messageTypeField: options.messageTypeField,
})
}

protected abstract resolveSchema(
message: MessagePayloadSchemas,
): Either<Error, ZodSchema<MessagePayloadSchemas>>
Expand Down Expand Up @@ -128,14 +162,10 @@ export abstract class AbstractQueueService<

protected handleError(err: unknown, context?: Record<string, unknown>) {
const logObject = resolveGlobalErrorLogObject(err)
if (logObject === 'string') {
this.logger.error(context, logObject)
} else if (typeof logObject === 'object') {
this.logger.error({
...logObject,
...context,
})
}
this.logger.error({
...logObject,
...context,
})
if (types.isNativeError(err)) {
this.errorReporter.report({ error: err, context })
}
Expand Down
17 changes: 15 additions & 2 deletions packages/core/lib/queues/HandlerContainer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type { Either } from '@lokalise/node-core'
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import { isCommonEventDefinition } from '@message-queue-toolkit/schemas'
import type { ZodSchema } from 'zod'

import type { DoNotProcessMessageError } from '../errors/DoNotProcessError'
Expand Down Expand Up @@ -70,6 +72,7 @@ export class MessageHandlerConfig<
const BarrierOutput = unknown,
> {
public readonly schema: ZodSchema<MessagePayloadSchema>
public readonly definition?: CommonEventDefinition
public readonly handler: Handler<
MessagePayloadSchema,
ExecutionContext,
Expand Down Expand Up @@ -98,8 +101,10 @@ export class MessageHandlerConfig<
PrehandlerOutput,
BarrierOutput
>,
eventDefinition?: CommonEventDefinition,
) {
this.schema = schema
this.definition = eventDefinition
this.handler = handler
this.messageLogFormatter = options?.messageLogFormatter ?? defaultLogFormatter
this.preHandlerBarrier = options?.preHandlerBarrier
Expand All @@ -125,7 +130,7 @@ export class MessageHandlerConfigBuilder<
}

addConfig<MessagePayloadSchema extends MessagePayloadSchemas, const BarrierOutput>(
schema: ZodSchema<MessagePayloadSchema>,
schema: ZodSchema<MessagePayloadSchema> | CommonEventDefinition,
handler: Handler<MessagePayloadSchema, ExecutionContext, PrehandlerOutput, BarrierOutput>,
options?: HandlerConfigOptions<
MessagePayloadSchema,
Expand All @@ -134,6 +139,12 @@ export class MessageHandlerConfigBuilder<
BarrierOutput
>,
) {
const resolvedSchema: ZodSchema<MessagePayloadSchema> = isCommonEventDefinition(schema)
? // @ts-ignore
(schema.consumerSchema as ZodSchema<MessagePayloadSchema>)
: schema
const definition = isCommonEventDefinition(schema) ? schema : undefined

this.configs.push(
// @ts-ignore
new MessageHandlerConfig<
Expand All @@ -142,10 +153,11 @@ export class MessageHandlerConfigBuilder<
PrehandlerOutput,
BarrierOutput
>(
schema,
resolvedSchema,
// @ts-ignore
handler,
options,
definition,
),
)
return this
Expand All @@ -165,6 +177,7 @@ export type Handler<
message: MessagePayloadSchemas,
context: ExecutionContext,
preHandlingOutputs: PreHandlingOutputs<PrehandlerOutput, BarrierOutput>,
definition?: CommonEventDefinition,
) => Promise<Either<'retryLater', 'success'>>

export type HandlerContainerOptions<
Expand Down
17 changes: 17 additions & 0 deletions packages/core/lib/queues/MessageSchemaContainer.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import type { Either } from '@lokalise/node-core'
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import type { ZodSchema } from 'zod'

export type MessageSchemaContainerOptions<MessagePayloadSchemas extends object> = {
messageDefinitions: readonly CommonEventDefinition[]
messageSchemas: readonly ZodSchema<MessagePayloadSchemas>[]
messageTypeField: string
}

export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
public readonly messageDefinitions: Record<string, CommonEventDefinition>
private readonly messageSchemas: Record<string, ZodSchema<MessagePayloadSchemas>>
private readonly messageTypeField: string

constructor(options: MessageSchemaContainerOptions<MessagePayloadSchemas>) {
this.messageTypeField = options.messageTypeField
this.messageSchemas = this.resolveSchemaMap(options.messageSchemas)
this.messageDefinitions = this.resolveDefinitionsMap(options.messageDefinitions ?? [])
}

public resolveSchema(
Expand Down Expand Up @@ -42,4 +46,17 @@ export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
{} as Record<string, ZodSchema<MessagePayloadSchemas>>,
)
}

private resolveDefinitionsMap(
supportedDefinitions: readonly CommonEventDefinition[],
): Record<string, CommonEventDefinition> {
return supportedDefinitions.reduce(
(acc, definition) => {
// @ts-ignore
acc[definition.publisherSchema.shape[this.messageTypeField].value] = definition
return acc
},
{} as Record<string, CommonEventDefinition>,
)
}
}
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^9.17.0",
"@lokalise/node-core": "^9.21.0",
"@message-queue-toolkit/schemas": "^1.0.0",
"fast-equals": "^5.0.1",
"toad-cache": "^3.7.0",
Expand Down
11 changes: 11 additions & 0 deletions packages/schemas/lib/events/eventTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import type { CONSUMER_BASE_EVENT_SCHEMA, PUBLISHER_BASE_EVENT_SCHEMA } from './
export type EventTypeNames<EventDefinition extends CommonEventDefinition> =
CommonEventDefinitionConsumerSchemaType<EventDefinition>['type']

export function isCommonEventDefinition(entity: unknown): entity is CommonEventDefinition {
return (entity as CommonEventDefinition).publisherSchema !== undefined
}

export type CommonEventDefinition = {
consumerSchema: ZodObject<
Omit<(typeof CONSUMER_BASE_EVENT_SCHEMA)['shape'], 'payload'> & { payload: ZodTypeAny }
Expand All @@ -16,6 +20,13 @@ export type CommonEventDefinition = {
Omit<(typeof PUBLISHER_BASE_EVENT_SCHEMA)['shape'], 'payload'> & { payload: ZodTypeAny }
>
schemaVersion?: string

//
// Metadata used for automated documentation generation
//
producedBy?: readonly string[] // Service ids for all the producers of this event.
domain?: string // Domain of the event
tags?: readonly string[] // Free-form tags for the event
}

export type CommonEventDefinitionConsumerSchemaType<T extends CommonEventDefinition> = z.infer<
Expand Down
8 changes: 2 additions & 6 deletions packages/sns/lib/sns/AbstractSnsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import type {
MessageInvalidFormatError,
MessageValidationError,
QueuePublisherOptions,
MessageSchemaContainer,
} from '@message-queue-toolkit/core'
import { MessageSchemaContainer } from '@message-queue-toolkit/core'

import type { SNSCreationConfig, SNSDependencies, SNSQueueLocatorType } from './AbstractSnsService'
import { AbstractSnsService } from './AbstractSnsService'
Expand All @@ -36,11 +36,7 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
constructor(dependencies: SNSDependencies, options: SNSPublisherOptions<MessagePayloadType>) {
super(dependencies, options)

const messageSchemas = options.messageSchemas
this.messageSchemaContainer = new MessageSchemaContainer<MessagePayloadType>({
messageSchemas,
messageTypeField: options.messageTypeField,
})
this.messageSchemaContainer = this.resolvePublisherMessageSchemaContainer(options)
}

async publish(message: MessagePayloadType, options: SNSMessageOptions = {}): Promise<void> {
Expand Down
Loading
Loading