Skip to content

Commit

Permalink
Add type-safe Domain event emitter and event registry
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad committed May 1, 2024
1 parent 1e0612c commit 08bd24c
Show file tree
Hide file tree
Showing 11 changed files with 453 additions and 10 deletions.
5 changes: 5 additions & 0 deletions packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ export { waitAndRetry } from './lib/utils/waitUtils'
export { parseMessage } from './lib/utils/parseUtils'

export { reloadConfig, isProduction } from './lib/utils/envUtils'

export { DomainEventEmitter } from './lib/events/DomainEventEmitter'
export { EventRegistry } from './lib/events/EventRegistry'
export { FakeListener } from './lib/events/fakes/FakeListener'
export * from './lib/events/eventTypes'
109 changes: 109 additions & 0 deletions packages/core/lib/events/DomainEventEmitter.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { randomUUID } from 'node:crypto'

import { waitAndRetry } from '@lokalise/node-core'
import type { AwilixContainer } from 'awilix'
import { afterAll, beforeAll, expect } from 'vitest'

import type { Dependencies } from '../../test/testContext'
import { registerDependencies, TestEvents } from '../../test/testContext'

import type { CommonEventDefinitionSchemaType } from './eventTypes'
import { FakeListener } from './fakes/FakeListener'

const createdEventPayload: CommonEventDefinitionSchemaType<typeof TestEvents.created> = {
payload: {
message: 'msg',
},
type: 'entity.created',
id: randomUUID(),
metadata: {
originApp: 'de',
producerApp: 'dede',
schemaVersion: '1',
correlationId: randomUUID(),
},
timestamp: new Date().toISOString(),
}

const updatedEventPayload: CommonEventDefinitionSchemaType<typeof TestEvents.updated> = {
...createdEventPayload,
type: 'entity.updated',
}

const expectedCreatedPayload = {
id: expect.any(String),
metadata: {
correlationId: expect.any(String),
originApp: 'de',
producerApp: 'dede',
schemaVersion: '1',
},
payload: {
message: 'msg',
},
timestamp: expect.any(String),
type: 'entity.created',
}

const expectedUpdatedPayload = {
...expectedCreatedPayload,
type: 'entity.updated',
}

describe('AutopilotEventEmitter', () => {
let diContainer: AwilixContainer<Dependencies>
beforeAll(async () => {
diContainer = await registerDependencies()
})

afterAll(async () => {
await diContainer.dispose()
})

it('emits event to anyListener', async () => {
const { eventEmitter } = diContainer.cradle
const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents)
eventEmitter.onAny(fakeListener)

await eventEmitter.emit(TestEvents.created, createdEventPayload)

await waitAndRetry(() => {
return fakeListener.receivedEvents.length > 0
})

expect(fakeListener.receivedEvents).toHaveLength(1)
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)
})

it('emits event to singleListener', async () => {
const { eventEmitter } = diContainer.cradle
const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents)
eventEmitter.on('entity.created', fakeListener)

await eventEmitter.emit(TestEvents.created, createdEventPayload)

await waitAndRetry(() => {
return fakeListener.receivedEvents.length > 0
})

expect(fakeListener.receivedEvents).toHaveLength(1)
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)
})

it('emits event to manyListener', async () => {
const { eventEmitter } = diContainer.cradle
const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents)
eventEmitter.onMany(['entity.created', 'entity.updated'], fakeListener)

await eventEmitter.emit(TestEvents.created, createdEventPayload)
await eventEmitter.emit(TestEvents.updated, updatedEventPayload)

await waitAndRetry(() => {
return fakeListener.receivedEvents.length === 2
})

expect(fakeListener.receivedEvents).toHaveLength(2)
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)
expect(fakeListener.receivedEvents[1]).toMatchObject(expectedUpdatedPayload)
})
})
103 changes: 103 additions & 0 deletions packages/core/lib/events/DomainEventEmitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { InternalError } from '@lokalise/node-core'

import type { EventRegistry } from './EventRegistry'
import type {
EventHandler,
AnyEventHandler,
SingleEventHandler,
CommonEventDefinition,
CommonEventDefinitionSchemaType,
EventTypeNames,
} from './eventTypes'

export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]> {
private readonly eventRegistry: EventRegistry<SupportedEvents>

private readonly eventHandlerMap: Record<
string,
EventHandler<CommonEventDefinitionSchemaType<SupportedEvents[number]>>[]
> = {}
private readonly anyHandlers: AnyEventHandler<SupportedEvents>[] = []

constructor({ eventRegistry }: { eventRegistry: EventRegistry<SupportedEvents> }) {
this.eventRegistry = eventRegistry
}

public async emit<SupportedEvent extends SupportedEvents[number]>(
supportedEvent: SupportedEvent,
data: Omit<CommonEventDefinitionSchemaType<SupportedEvent>, 'type'>,
) {
const eventTypeName = supportedEvent.schema.shape.type.value

if (!this.eventRegistry.isSupportedEvent(eventTypeName)) {
throw new InternalError({
errorCode: 'UNKNOWN_EVENT',
message: `Unknown event ${eventTypeName}`,
})
}

const eventHandlers = this.eventHandlerMap[eventTypeName]

// No relevant handlers are registered, we can stop processing
if (!eventHandlers && this.anyHandlers.length === 0) {
return
}

const validatedEvent = this.eventRegistry
.getEventDefinitionByTypeName(eventTypeName)
.schema.parse({
type: eventTypeName,
...data,
})

if (eventHandlers) {
for (const handler of eventHandlers) {
await handler.handleEvent(validatedEvent)
}
}

for (const handler of this.anyHandlers) {
await handler.handleEvent(validatedEvent)
}
}

/**
* Register handler for a specific event
*/
public on<EventTypeName extends EventTypeNames<SupportedEvents[number]>>(
eventTypeName: EventTypeName,
handler: SingleEventHandler<SupportedEvents, EventTypeName>,
) {
this.addOnHandler(eventTypeName, handler)
}

/**
* Register handler for multiple events
*/
public onMany<EventTypeName extends EventTypeNames<SupportedEvents[number]>>(
eventTypeNames: EventTypeName[],
handler: SingleEventHandler<SupportedEvents, EventTypeName>,
) {
for (const eventTypeName of eventTypeNames) {
this.on(eventTypeName, handler)
}
}

/**
* Register handler for all events supported by the emitter
*/
public onAny(handler: AnyEventHandler<SupportedEvents>) {
this.anyHandlers.push(handler)
}

private addOnHandler(
eventTypeName: EventTypeNames<SupportedEvents[number]>,
handler: EventHandler,
) {
if (!this.eventHandlerMap[eventTypeName]) {
this.eventHandlerMap[eventTypeName] = []
}

this.eventHandlerMap[eventTypeName].push(handler)
}
}
29 changes: 29 additions & 0 deletions packages/core/lib/events/EventRegistry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import type { CommonEventDefinition, EventTypeNames } from './eventTypes'

export class EventRegistry<SupportedEvents extends CommonEventDefinition[]> {
public readonly supportedEvents: SupportedEvents
private readonly supportedEventsSet: Set<string>
private readonly supportedEventMap: Record<string, CommonEventDefinition> = {}

constructor(supportedEvents: SupportedEvents) {
this.supportedEvents = supportedEvents
this.supportedEventsSet = new Set<string>()

for (const supportedEvent of supportedEvents) {
this.supportedEventMap[supportedEvent.schema.shape.type.value] = supportedEvent
this.supportedEventsSet.add(supportedEvent.schema.shape.type.value)
}
}

public getEventDefinitionByTypeName = <
EventTypeName extends EventTypeNames<SupportedEvents[number]>,
>(
eventTypeName: EventTypeName,
): CommonEventDefinition => {
return this.supportedEventMap[eventTypeName]
}

public isSupportedEvent(eventTypeName: string) {
return this.supportedEventsSet.has(eventTypeName)
}
}
37 changes: 37 additions & 0 deletions packages/core/lib/events/eventTypes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type { ZodObject, ZodTypeAny } from 'zod'
import type z from 'zod'

import type { BASE_MESSAGE_SCHEMA } from '../messages/baseMessageSchemas'

export type EventTypeNames<EventDefinition extends CommonEventDefinition> =
CommonEventDefinitionSchemaType<EventDefinition>['type']

// To be extended with transport-specific fields, e. g. "snsTopic" in specific libraries
export type CommonEventDefinition = {
schema: ZodObject<
Omit<(typeof BASE_MESSAGE_SCHEMA)['shape'], 'payload'> & { payload: ZodTypeAny }
>
}

export type CommonEventDefinitionSchemaType<T extends CommonEventDefinition> = z.infer<T['schema']>

export type EventHandler<
EventDefinitionSchema extends
CommonEventDefinitionSchemaType<CommonEventDefinition> = CommonEventDefinitionSchemaType<CommonEventDefinition>,
> = {
handleEvent(event: EventDefinitionSchema): void | Promise<void>
}

export type AnyEventHandler<EventDefinitions extends CommonEventDefinition[]> = EventHandler<
CommonEventDefinitionSchemaType<EventDefinitions[number]>
>

export type SingleEventHandler<
EventDefinition extends CommonEventDefinition[],
EventTypeName extends EventTypeNames<EventDefinition[number]>,
> = EventHandler<EventFromArrayByTypeName<EventDefinition, EventTypeName>>

type EventFromArrayByTypeName<
EventDefinition extends CommonEventDefinition[],
EventTypeName extends EventTypeNames<EventDefinition[number]>,
> = Extract<CommonEventDefinitionSchemaType<EventDefinition[number]>, { type: EventTypeName }>
15 changes: 15 additions & 0 deletions packages/core/lib/events/fakes/FakeListener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import type { AnyEventHandler, CommonEventDefinition } from '../eventTypes'

export class FakeListener<SupportedEvents extends CommonEventDefinition[]>
implements AnyEventHandler<SupportedEvents>
{
public receivedEvents: SupportedEvents[number]['schema']['_output'][] = []

constructor(_supportedEvents: SupportedEvents) {
this.receivedEvents = []
}

handleEvent(event: SupportedEvents[number]['schema']['_output']): void | Promise<void> {
this.receivedEvents.push(event)
}
}
22 changes: 22 additions & 0 deletions packages/core/lib/messages/baseMessageSchemas.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import z from 'zod'

export const BASE_MESSAGE_SCHEMA = z.object({
id: z.string().describe('event unique identifier'),
type: z.literal<string>('<replace.me>').describe('event type name'),
timestamp: z.string().datetime().describe('iso 8601 datetime'),
payload: z.optional(z.object({})).describe('event payload based on type'),
metadata: z
.object({
schemaVersion: z.string().min(1).describe('base event schema version'),
producerApp: z.string().min(1).describe('app/service that produced the event'),
originApp: z.string().min(1).describe('app/service that initiated the workflow'),
correlationId: z
.string()
.describe('unique identifier passed to all events in workflow chain'),
})
.describe('event metadata'),
})

export type BaseMessageType = z.infer<typeof BASE_MESSAGE_SCHEMA>

export type MessageMetadata = 'id' | 'timestamp' | 'type' | 'metadata'
3 changes: 2 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
"dependencies": {
"@lokalise/node-core": "^9.14.0",
"fast-equals": "^5.0.1",
"toad-cache": "^3.7.0"
"toad-cache": "^3.7.0",
"zod": "^3.23.5"
},
"devDependencies": {
"@types/node": "^20.11.25",
Expand Down
26 changes: 26 additions & 0 deletions packages/core/test/fakes/FakeLogger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { Logger } from '../../lib/types/MessageQueueTypes'

export class FakeLogger implements Logger {
public readonly loggedMessages: unknown[] = []
public readonly loggedWarnings: unknown[] = []
public readonly loggedErrors: unknown[] = []

debug(obj: unknown) {
this.loggedMessages.push(obj)
}
error(obj: unknown) {
this.loggedErrors.push(obj)
}
fatal(obj: unknown) {
this.loggedErrors.push(obj)
}
info(obj: unknown) {
this.loggedMessages.push(obj)
}
trace(obj: unknown) {
this.loggedMessages.push(obj)
}
warn(obj: unknown) {
this.loggedWarnings.push(obj)
}
}
Loading

0 comments on commit 08bd24c

Please sign in to comment.