Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for passing full event definitions #158

Merged
merged 3 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
QueueConsumerOptions,
TransactionObservabilityManager,
} from '@message-queue-toolkit/core'
import {

Check warning on line 12 in packages/amqp/lib/AbstractAmqpConsumer.ts

View workflow job for this annotation

GitHub Actions / linting

Imports "MessageSchemaContainer" are only used as type
isMessageError,
parseMessage,
HandlerContainer,
Expand Down Expand Up @@ -100,11 +100,8 @@
? options.locatorConfig.queueName
: options.creationConfig!.queueName

const messageSchemas = options.handlers.map((entry) => entry.schema)
this.messageSchemaContainer = new MessageSchemaContainer<MessagePayloadType>({
messageSchemas,
messageTypeField: options.messageTypeField,
})
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
this.messageSchemaContainer = this.resolveConsumerMessageSchemaContainer(options)

this.handlerContainer = new HandlerContainer<
MessagePayloadType,
ExecutionContext,
Expand Down
6 changes: 1 addition & 5 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
QueuePublisherOptions,
SyncPublisher,
} from '@message-queue-toolkit/core'
import { objectToBuffer, MessageSchemaContainer } from '@message-queue-toolkit/core'

Check warning on line 11 in packages/amqp/lib/AbstractAmqpPublisher.ts

View workflow job for this annotation

GitHub Actions / linting

Imports "MessageSchemaContainer" are only used as type
import type { ZodSchema } from 'zod'

import type { AMQPDependencies } from './AbstractAmqpService'
Expand Down Expand Up @@ -49,11 +49,7 @@
) {
super(dependencies, options)

const messageSchemas = options.messageSchemas
this.messageSchemaContainer = new MessageSchemaContainer<MessagePayloadType>({
messageSchemas,
messageTypeField: options.messageTypeField,
})
this.messageSchemaContainer = this.resolvePublisherMessageSchemaContainer(options)
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
this.exchange = options.exchange
}

Expand Down
46 changes: 38 additions & 8 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { types } from 'node:util'

import type { ErrorReporter, ErrorResolver, Either } from '@lokalise/node-core'
import { resolveGlobalErrorLogObject } from '@lokalise/node-core'
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import type { ZodSchema, ZodType } from 'zod'

import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors'
Expand All @@ -13,12 +14,14 @@ import { toDatePreprocessor } from '../utils/toDateProcessor'
import type {
BarrierCallback,
BarrierResult,
MessageHandlerConfig,
Prehandler,
PrehandlerResult,
PreHandlingOutputs,
} from './HandlerContainer'
import type { HandlerSpy, PublicHandlerSpy } from './HandlerSpy'
import { resolveHandlerSpy } from './HandlerSpy'
import { MessageSchemaContainer } from './MessageSchemaContainer'

export type Deserializer<MessagePayloadType extends object> = (
message: unknown,
Expand Down Expand Up @@ -90,6 +93,37 @@ export abstract class AbstractQueueService<
this.isInitted = false
}

protected resolveConsumerMessageSchemaContainer(options: {
handlers: MessageHandlerConfig<MessagePayloadSchemas, ExecutionContext, PrehandlerOutput>[]
messageTypeField: string
}) {
const messageSchemas = options.handlers.map((entry) => entry.schema)
// @ts-expect-error This should no longer be necessary in upcoming TypeScript updates, filter will narrow down the type
const messageDefinitions: CommonEventDefinition[] = options.handlers
.map((entry) => entry.definition)
.filter((entry) => entry !== undefined)

return new MessageSchemaContainer<MessagePayloadSchemas>({
messageSchemas,
messageDefinitions,
messageTypeField: options.messageTypeField,
})
}

protected resolvePublisherMessageSchemaContainer(options: {
messageSchemas: readonly ZodSchema<MessagePayloadSchemas>[]
messageTypeField: string
}) {
const messageSchemas = options.messageSchemas
const messageDefinitions: readonly CommonEventDefinition[] = []

return new MessageSchemaContainer<MessagePayloadSchemas>({
messageSchemas,
messageDefinitions,
messageTypeField: options.messageTypeField,
})
}

protected abstract resolveSchema(
message: MessagePayloadSchemas,
): Either<Error, ZodSchema<MessagePayloadSchemas>>
Expand Down Expand Up @@ -128,14 +162,10 @@ export abstract class AbstractQueueService<

protected handleError(err: unknown, context?: Record<string, unknown>) {
const logObject = resolveGlobalErrorLogObject(err)
if (logObject === 'string') {
this.logger.error(context, logObject)
} else if (typeof logObject === 'object') {
this.logger.error({
...logObject,
...context,
})
}
this.logger.error({
...logObject,
...context,
})
if (types.isNativeError(err)) {
this.errorReporter.report({ error: err, context })
}
Expand Down
17 changes: 15 additions & 2 deletions packages/core/lib/queues/HandlerContainer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type { Either } from '@lokalise/node-core'
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import { isCommonEventDefinition } from '@message-queue-toolkit/schemas'
import type { ZodSchema } from 'zod'

import type { DoNotProcessMessageError } from '../errors/DoNotProcessError'
Expand Down Expand Up @@ -70,6 +72,7 @@ export class MessageHandlerConfig<
const BarrierOutput = unknown,
> {
public readonly schema: ZodSchema<MessagePayloadSchema>
public readonly definition?: CommonEventDefinition
public readonly handler: Handler<
MessagePayloadSchema,
ExecutionContext,
Expand Down Expand Up @@ -98,8 +101,10 @@ export class MessageHandlerConfig<
PrehandlerOutput,
BarrierOutput
>,
eventDefinition?: CommonEventDefinition,
) {
this.schema = schema
this.definition = eventDefinition
this.handler = handler
this.messageLogFormatter = options?.messageLogFormatter ?? defaultLogFormatter
this.preHandlerBarrier = options?.preHandlerBarrier
Expand All @@ -125,7 +130,7 @@ export class MessageHandlerConfigBuilder<
}

addConfig<MessagePayloadSchema extends MessagePayloadSchemas, const BarrierOutput>(
schema: ZodSchema<MessagePayloadSchema>,
schema: ZodSchema<MessagePayloadSchema> | CommonEventDefinition,
handler: Handler<MessagePayloadSchema, ExecutionContext, PrehandlerOutput, BarrierOutput>,
options?: HandlerConfigOptions<
MessagePayloadSchema,
Expand All @@ -134,6 +139,12 @@ export class MessageHandlerConfigBuilder<
BarrierOutput
>,
) {
const resolvedSchema: ZodSchema<MessagePayloadSchema> = isCommonEventDefinition(schema)
? // @ts-ignore
(schema.consumerSchema as ZodSchema<MessagePayloadSchema>)
: schema
const definition = isCommonEventDefinition(schema) ? schema : undefined

this.configs.push(
// @ts-ignore
new MessageHandlerConfig<
Expand All @@ -142,10 +153,11 @@ export class MessageHandlerConfigBuilder<
PrehandlerOutput,
BarrierOutput
>(
schema,
resolvedSchema,
// @ts-ignore
handler,
options,
definition,
),
)
return this
Expand All @@ -165,6 +177,7 @@ export type Handler<
message: MessagePayloadSchemas,
context: ExecutionContext,
preHandlingOutputs: PreHandlingOutputs<PrehandlerOutput, BarrierOutput>,
definition?: CommonEventDefinition,
) => Promise<Either<'retryLater', 'success'>>

export type HandlerContainerOptions<
Expand Down
17 changes: 17 additions & 0 deletions packages/core/lib/queues/MessageSchemaContainer.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import type { Either } from '@lokalise/node-core'
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import type { ZodSchema } from 'zod'

export type MessageSchemaContainerOptions<MessagePayloadSchemas extends object> = {
messageDefinitions: readonly CommonEventDefinition[]
messageSchemas: readonly ZodSchema<MessagePayloadSchemas>[]
messageTypeField: string
}

export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
public readonly messageDefinitions: Record<string, CommonEventDefinition>
private readonly messageSchemas: Record<string, ZodSchema<MessagePayloadSchemas>>
private readonly messageTypeField: string

constructor(options: MessageSchemaContainerOptions<MessagePayloadSchemas>) {
this.messageTypeField = options.messageTypeField
this.messageSchemas = this.resolveSchemaMap(options.messageSchemas)
this.messageDefinitions = this.resolveDefinitionsMap(options.messageDefinitions ?? [])
}

public resolveSchema(
Expand Down Expand Up @@ -42,4 +46,17 @@ export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
{} as Record<string, ZodSchema<MessagePayloadSchemas>>,
)
}

private resolveDefinitionsMap(
supportedDefinitions: readonly CommonEventDefinition[],
): Record<string, CommonEventDefinition> {
return supportedDefinitions.reduce(
(acc, definition) => {
// @ts-ignore
acc[definition.publisherSchema.shape[this.messageTypeField].value] = definition
return acc
},
{} as Record<string, CommonEventDefinition>,
)
}
}
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^9.17.0",
"@lokalise/node-core": "^9.21.0",
"@message-queue-toolkit/schemas": "^1.0.0",
"fast-equals": "^5.0.1",
"toad-cache": "^3.7.0",
Expand Down
11 changes: 11 additions & 0 deletions packages/schemas/lib/events/eventTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import type { CONSUMER_BASE_EVENT_SCHEMA, PUBLISHER_BASE_EVENT_SCHEMA } from './
export type EventTypeNames<EventDefinition extends CommonEventDefinition> =
CommonEventDefinitionConsumerSchemaType<EventDefinition>['type']

export function isCommonEventDefinition(entity: unknown): entity is CommonEventDefinition {
return (entity as CommonEventDefinition).publisherSchema !== undefined
}

export type CommonEventDefinition = {
consumerSchema: ZodObject<
Omit<(typeof CONSUMER_BASE_EVENT_SCHEMA)['shape'], 'payload'> & { payload: ZodTypeAny }
Expand All @@ -16,6 +20,13 @@ export type CommonEventDefinition = {
Omit<(typeof PUBLISHER_BASE_EVENT_SCHEMA)['shape'], 'payload'> & { payload: ZodTypeAny }
>
schemaVersion?: string

//
// Metadata used for automated documentation generation
//
producedBy?: readonly string[] // Service ids for all the producers of this event.
domain?: string // Domain of the event
tags?: readonly string[] // Free-form tags for the event
}

export type CommonEventDefinitionConsumerSchemaType<T extends CommonEventDefinition> = z.infer<
Expand Down
8 changes: 2 additions & 6 deletions packages/sns/lib/sns/AbstractSnsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import type {
MessageInvalidFormatError,
MessageValidationError,
QueuePublisherOptions,
MessageSchemaContainer,
} from '@message-queue-toolkit/core'
import { MessageSchemaContainer } from '@message-queue-toolkit/core'

import type { SNSCreationConfig, SNSDependencies, SNSQueueLocatorType } from './AbstractSnsService'
import { AbstractSnsService } from './AbstractSnsService'
Expand All @@ -36,11 +36,7 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
constructor(dependencies: SNSDependencies, options: SNSPublisherOptions<MessagePayloadType>) {
super(dependencies, options)

const messageSchemas = options.messageSchemas
this.messageSchemaContainer = new MessageSchemaContainer<MessagePayloadType>({
messageSchemas,
messageTypeField: options.messageTypeField,
})
this.messageSchemaContainer = this.resolvePublisherMessageSchemaContainer(options)
}

async publish(message: MessagePayloadType, options: SNSMessageOptions = {}): Promise<void> {
Expand Down
17 changes: 6 additions & 11 deletions packages/sns/lib/sns/SnsPublisherManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { enrichMessageSchemaWithBase } from '@message-queue-toolkit/core'
import type { AwilixContainer } from 'awilix'
import z from 'zod'

import { SnsSqsEntityConsumer } from '../../test/consumers/SnsSqsEntityConsumer'
import type {
Dependencies,
TestEventPublishPayloadsType,
Expand All @@ -13,7 +14,6 @@ import { registerDependencies, TestEvents } from '../../test/utils/testContext'

import { CommonSnsPublisher } from './CommonSnsPublisherFactory'
import type { SnsPublisherManager } from './SnsPublisherManager'
import { FakeConsumer } from './fakes/FakeConsumer'

describe('SnsPublisherManager', () => {
let diContainer: AwilixContainer<Dependencies>
Expand All @@ -23,7 +23,7 @@ describe('SnsPublisherManager', () => {
>

beforeAll(async () => {
diContainer = await registerDependencies()
diContainer = await registerDependencies({}, false)
publisherManager = diContainer.cradle.publisherManager
})

Expand All @@ -34,13 +34,8 @@ describe('SnsPublisherManager', () => {
describe('publish', () => {
it('publishes to a correct publisher', async () => {
// Given
const fakeConsumer = new FakeConsumer(
diContainer.cradle,
'queue',
TestEvents.created.snsTopic,
TestEvents.created.consumerSchema,
)
await fakeConsumer.start()
const consumer = new SnsSqsEntityConsumer(diContainer.cradle)
await consumer.start()

// When
const publishedMessage = await publisherManager.publish(TestEvents.created.snsTopic, {
Expand All @@ -54,7 +49,7 @@ describe('SnsPublisherManager', () => {
.handlerSpy(TestEvents.created.snsTopic)
.waitForMessageWithId(publishedMessage.id)

const consumerResult = await fakeConsumer.handlerSpy.waitForMessageWithId(publishedMessage.id)
const consumerResult = await consumer.handlerSpy.waitForMessageWithId(publishedMessage.id)
const publishedMessageResult = await handlerSpyPromise

expect(consumerResult.processingResult).toBe('consumed')
Expand All @@ -75,7 +70,7 @@ describe('SnsPublisherManager', () => {
type: 'entity.created',
})

await fakeConsumer.close()
await consumer.close()
})

it('message publishing is type-safe', async () => {
Expand Down
Loading
Loading