Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Domain event emitter V2 #134

Merged
merged 6 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ export { waitAndRetry } from './lib/utils/waitUtils'
export { parseMessage } from './lib/utils/parseUtils'

export { reloadConfig, isProduction } from './lib/utils/envUtils'

export { DomainEventEmitter } from './lib/events/DomainEventEmitter'
export { EventRegistry } from './lib/events/EventRegistry'
export { FakeListener } from './lib/events/fakes/FakeListener'
export * from './lib/events/eventTypes'
export * from './lib/messages/baseMessageSchemas'
109 changes: 109 additions & 0 deletions packages/core/lib/events/DomainEventEmitter.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { randomUUID } from 'node:crypto'

import { waitAndRetry } from '@lokalise/node-core'
import type { AwilixContainer } from 'awilix'
import { afterAll, beforeAll, expect } from 'vitest'

import type { Dependencies } from '../../test/testContext'
import { registerDependencies, TestEvents } from '../../test/testContext'

import type { CommonEventDefinitionSchemaType } from './eventTypes'
import { FakeListener } from './fakes/FakeListener'

const createdEventPayload: CommonEventDefinitionSchemaType<typeof TestEvents.created> = {
payload: {
message: 'msg',
},
type: 'entity.created',
id: randomUUID(),
timestamp: new Date().toISOString(),
metadata: {
originatedFrom: 'service',
producedBy: 'producer',
schemaVersion: '1',
correlationId: randomUUID(),
},
}

const updatedEventPayload: CommonEventDefinitionSchemaType<typeof TestEvents.updated> = {
...createdEventPayload,
type: 'entity.updated',
}

const expectedCreatedPayload = {
id: expect.any(String),
timestamp: expect.any(String),
payload: {
message: 'msg',
},
type: 'entity.created',
metadata: {
correlationId: expect.any(String),
originatedFrom: 'service',
producedBy: 'producer',
schemaVersion: '1',
},
}

const expectedUpdatedPayload = {
...expectedCreatedPayload,
type: 'entity.updated',
}

describe('AutopilotEventEmitter', () => {
let diContainer: AwilixContainer<Dependencies>
beforeAll(async () => {
diContainer = await registerDependencies()
})

afterAll(async () => {
await diContainer.dispose()
})

it('emits event to anyListener', async () => {
const { eventEmitter } = diContainer.cradle
const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents)
eventEmitter.onAny(fakeListener)

await eventEmitter.emit(TestEvents.created, createdEventPayload)

await waitAndRetry(() => {
return fakeListener.receivedEvents.length > 0
})

expect(fakeListener.receivedEvents).toHaveLength(1)
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)
})

it('emits event to singleListener', async () => {
const { eventEmitter } = diContainer.cradle
const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents)
eventEmitter.on('entity.created', fakeListener)

await eventEmitter.emit(TestEvents.created, createdEventPayload)

await waitAndRetry(() => {
return fakeListener.receivedEvents.length > 0
})

expect(fakeListener.receivedEvents).toHaveLength(1)
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)
})

it('emits event to manyListener', async () => {
const { eventEmitter } = diContainer.cradle
const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents)
eventEmitter.onMany(['entity.created', 'entity.updated'], fakeListener)

await eventEmitter.emit(TestEvents.created, createdEventPayload)
await eventEmitter.emit(TestEvents.updated, updatedEventPayload)

await waitAndRetry(() => {
return fakeListener.receivedEvents.length === 2
})

expect(fakeListener.receivedEvents).toHaveLength(2)
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)
expect(fakeListener.receivedEvents[1]).toMatchObject(expectedUpdatedPayload)
})
})
103 changes: 103 additions & 0 deletions packages/core/lib/events/DomainEventEmitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { InternalError } from '@lokalise/node-core'

import type { EventRegistry } from './EventRegistry'
import type {
EventHandler,
AnyEventHandler,
SingleEventHandler,
CommonEventDefinition,
CommonEventDefinitionSchemaType,
EventTypeNames,
} from './eventTypes'

export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]> {
private readonly eventRegistry: EventRegistry<SupportedEvents>

private readonly eventHandlerMap: Record<
string,
EventHandler<CommonEventDefinitionSchemaType<SupportedEvents[number]>>[]
> = {}
private readonly anyHandlers: AnyEventHandler<SupportedEvents>[] = []

constructor({ eventRegistry }: { eventRegistry: EventRegistry<SupportedEvents> }) {
this.eventRegistry = eventRegistry
}

public async emit<SupportedEvent extends SupportedEvents[number]>(
supportedEvent: SupportedEvent,
data: Omit<CommonEventDefinitionSchemaType<SupportedEvent>, 'type'>,
) {
const eventTypeName = supportedEvent.schema.shape.type.value

if (!this.eventRegistry.isSupportedEvent(eventTypeName)) {
throw new InternalError({
errorCode: 'UNKNOWN_EVENT',
message: `Unknown event ${eventTypeName}`,
})
}

const eventHandlers = this.eventHandlerMap[eventTypeName]

// No relevant handlers are registered, we can stop processing
if (!eventHandlers && this.anyHandlers.length === 0) {
return
}

const validatedEvent = this.eventRegistry
.getEventDefinitionByTypeName(eventTypeName)
.schema.parse({
type: eventTypeName,
...data,
})

if (eventHandlers) {
for (const handler of eventHandlers) {
await handler.handleEvent(validatedEvent)
}
}

for (const handler of this.anyHandlers) {
await handler.handleEvent(validatedEvent)
}
}

/**
* Register handler for a specific event
*/
public on<EventTypeName extends EventTypeNames<SupportedEvents[number]>>(
eventTypeName: EventTypeName,
handler: SingleEventHandler<SupportedEvents, EventTypeName>,
) {
this.addOnHandler(eventTypeName, handler)
}

/**
* Register handler for multiple events
*/
public onMany<EventTypeName extends EventTypeNames<SupportedEvents[number]>>(
eventTypeNames: EventTypeName[],
handler: SingleEventHandler<SupportedEvents, EventTypeName>,
) {
for (const eventTypeName of eventTypeNames) {
this.on(eventTypeName, handler)
}
}

/**
* Register handler for all events supported by the emitter
*/
public onAny(handler: AnyEventHandler<SupportedEvents>) {
this.anyHandlers.push(handler)
}

private addOnHandler(
eventTypeName: EventTypeNames<SupportedEvents[number]>,
handler: EventHandler,
) {
if (!this.eventHandlerMap[eventTypeName]) {
this.eventHandlerMap[eventTypeName] = []
}

this.eventHandlerMap[eventTypeName].push(handler)
}
}
29 changes: 29 additions & 0 deletions packages/core/lib/events/EventRegistry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import type { CommonEventDefinition, EventTypeNames } from './eventTypes'

export class EventRegistry<SupportedEvents extends CommonEventDefinition[]> {
public readonly supportedEvents: SupportedEvents
private readonly supportedEventsSet: Set<string>
private readonly supportedEventMap: Record<string, CommonEventDefinition> = {}

constructor(supportedEvents: SupportedEvents) {
this.supportedEvents = supportedEvents
this.supportedEventsSet = new Set<string>()

for (const supportedEvent of supportedEvents) {
this.supportedEventMap[supportedEvent.schema.shape.type.value] = supportedEvent
this.supportedEventsSet.add(supportedEvent.schema.shape.type.value)
}
}

public getEventDefinitionByTypeName = <
EventTypeName extends EventTypeNames<SupportedEvents[number]>,
>(
eventTypeName: EventTypeName,
): CommonEventDefinition => {
return this.supportedEventMap[eventTypeName]
}

public isSupportedEvent(eventTypeName: string) {
return this.supportedEventsSet.has(eventTypeName)
}
}
37 changes: 37 additions & 0 deletions packages/core/lib/events/eventTypes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type { ZodObject, ZodTypeAny } from 'zod'
import type z from 'zod'

import type { BASE_MESSAGE_SCHEMA } from '../messages/baseMessageSchemas'

export type EventTypeNames<EventDefinition extends CommonEventDefinition> =
CommonEventDefinitionSchemaType<EventDefinition>['type']

// To be extended with transport-specific fields, e. g. "snsTopic" in specific libraries
export type CommonEventDefinition = {
schema: ZodObject<
Omit<(typeof BASE_MESSAGE_SCHEMA)['shape'], 'payload'> & { payload: ZodTypeAny }
>
}

export type CommonEventDefinitionSchemaType<T extends CommonEventDefinition> = z.infer<T['schema']>

export type EventHandler<
EventDefinitionSchema extends
CommonEventDefinitionSchemaType<CommonEventDefinition> = CommonEventDefinitionSchemaType<CommonEventDefinition>,
> = {
handleEvent(event: EventDefinitionSchema): void | Promise<void>
}

export type AnyEventHandler<EventDefinitions extends CommonEventDefinition[]> = EventHandler<
CommonEventDefinitionSchemaType<EventDefinitions[number]>
>

export type SingleEventHandler<
EventDefinition extends CommonEventDefinition[],
EventTypeName extends EventTypeNames<EventDefinition[number]>,
> = EventHandler<EventFromArrayByTypeName<EventDefinition, EventTypeName>>

type EventFromArrayByTypeName<
EventDefinition extends CommonEventDefinition[],
EventTypeName extends EventTypeNames<EventDefinition[number]>,
> = Extract<CommonEventDefinitionSchemaType<EventDefinition[number]>, { type: EventTypeName }>
15 changes: 15 additions & 0 deletions packages/core/lib/events/fakes/FakeListener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import type { AnyEventHandler, CommonEventDefinition } from '../eventTypes'

export class FakeListener<SupportedEvents extends CommonEventDefinition[]>
implements AnyEventHandler<SupportedEvents>
{
public receivedEvents: SupportedEvents[number]['schema']['_output'][] = []

constructor(_supportedEvents: SupportedEvents) {
this.receivedEvents = []
}

handleEvent(event: SupportedEvents[number]['schema']['_output']): void | Promise<void> {
this.receivedEvents.push(event)
}
}
28 changes: 28 additions & 0 deletions packages/core/lib/messages/baseMessageSchemas.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import z from 'zod'

// Core fields that describe event
export const BASE_MESSAGE_SCHEMA = z.object({
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
id: z.string().describe('event unique identifier'),
timestamp: z.string().datetime().describe('iso 8601 datetime'),
type: z.literal<string>('<replace.me>').describe('event type name'),
payload: z.optional(z.object({})).describe('event payload based on type'),
})

// Extra fields that are optional for the event processing
export const EXTENDED_MESSAGE_SCHEMA = BASE_MESSAGE_SCHEMA.extend({
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the most important part to review and criticize

Copy link
Collaborator

@CarlosGamero CarlosGamero May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the specific schema extension be part of apps using the library? I understand we want to use this schema but not sure if having it as part of the library is a good idea.
Also, if we want to add/modify a property we will need to release a new version (which is a potential cause of major releases) or create our extension so having our extension from the beginning could be beneficial.
What do you think?

Btw, no strong opinion from my side, I am also fine keeping it here but just raising the topic in case it can be useful

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the specific schema extension be part of apps using the library?

Yes, definitely. Which is why there is an optional extended schema for those who want a reasonable default (e. g. us), but the library itself doesn't rely on it and supports having your own.

If we ever decide to change it, I would expect we would be adding it as a separate schema, e. g. EXTENDED_MESSAGE_SCHEMA_V2, so that those who are happy with the original one, can keep using it.

Copy link
Collaborator

@CarlosGamero CarlosGamero May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the point was that I am not sure if the library should just define the minimum it needs to work and be open to extensions (it is already) without providing things the users can define based on their use cases, and are potential points of change. Also, to avoid having EXTENDED_MESSAGE_SCHEMA_V{{N}} schemas for backward compatibility.

I think in our case, I would suggest going with creating our own extension (could be MY_SCHEMA = EXTENDED_MESSAGE_SCHEMA) to have flexibility and not have to modify the library to just modify the schema

But as I mentioned, I am not sure if this concern is valid or if it is stupid, so if you think it is fine, I am also fine and please feel free to resolve this thread if it is the case

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in our case, I would suggest going with creating our own extension (could be MY_SCHEMA = EXTENDED_MESSAGE_SCHEMA) to have flexibility and not have to modify the library to just modify the schema (same problem we have with WS rooms, where to add a room we need a new library version)

Very fair point. But also for standartization sake I would like to have one single place where our common schema is defined, so that each service doesn't have to do it over and over again. It doesn't have to be this library, though, we could put it e. g. in shared-ts-libs as a part of events-common.

I would still provide EXTENDED_MESSAGE_SCHEMA as an opinionated starting point for people/teams who might benefit from it, but won't couple to it anywhere in message-queue-toolkit itself.

I agree that when we need to adjust our schema, we should be able to do it outside of message-queue-toolkit.

metadata: z
.object({
schemaVersion: z.string().min(1).describe('message schema version'),
producedBy: z.string().min(1).describe('app/service that produced the message'),
originatedFrom: z
.string()
.min(1)
.describe('app/service that initiated entire workflow that led to creating this message'),
correlationId: z
.string()
.describe('unique identifier passed to all events in workflow chain'),
})
.describe('event metadata'),
})

export type BaseMessageType = z.infer<typeof BASE_MESSAGE_SCHEMA>
3 changes: 2 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
"dependencies": {
"@lokalise/node-core": "^9.14.0",
"fast-equals": "^5.0.1",
"toad-cache": "^3.7.0"
"toad-cache": "^3.7.0",
"zod": "^3.23.5"
},
"devDependencies": {
"@types/node": "^20.11.25",
Expand Down
26 changes: 26 additions & 0 deletions packages/core/test/fakes/FakeLogger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { Logger } from '../../lib/types/MessageQueueTypes'

export class FakeLogger implements Logger {
public readonly loggedMessages: unknown[] = []
public readonly loggedWarnings: unknown[] = []
public readonly loggedErrors: unknown[] = []

debug(obj: unknown) {
this.loggedMessages.push(obj)
}
error(obj: unknown) {
this.loggedErrors.push(obj)
}
fatal(obj: unknown) {
this.loggedErrors.push(obj)
}
info(obj: unknown) {
this.loggedMessages.push(obj)
}
trace(obj: unknown) {
this.loggedMessages.push(obj)
}
warn(obj: unknown) {
this.loggedWarnings.push(obj)
}
}
Loading
Loading