From 08bd24cf4f583784e29918c188c0086e6bb2fae5 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Wed, 1 May 2024 15:40:29 +0300 Subject: [PATCH] Add type-safe Domain event emitter and event registry --- packages/core/index.ts | 5 + .../lib/events/DomainEventEmitter.spec.ts | 109 ++++++++++++++++++ .../core/lib/events/DomainEventEmitter.ts | 103 +++++++++++++++++ packages/core/lib/events/EventRegistry.ts | 29 +++++ packages/core/lib/events/eventTypes.ts | 37 ++++++ .../core/lib/events/fakes/FakeListener.ts | 15 +++ .../core/lib/messages/baseMessageSchemas.ts | 22 ++++ packages/core/package.json | 3 +- packages/core/test/fakes/FakeLogger.ts | 26 +++++ packages/core/test/testContext.ts | 101 ++++++++++++++++ packages/core/vitest.config.mts | 13 +-- 11 files changed, 453 insertions(+), 10 deletions(-) create mode 100644 packages/core/lib/events/DomainEventEmitter.spec.ts create mode 100644 packages/core/lib/events/DomainEventEmitter.ts create mode 100644 packages/core/lib/events/EventRegistry.ts create mode 100644 packages/core/lib/events/eventTypes.ts create mode 100644 packages/core/lib/events/fakes/FakeListener.ts create mode 100644 packages/core/lib/messages/baseMessageSchemas.ts create mode 100644 packages/core/test/fakes/FakeLogger.ts create mode 100644 packages/core/test/testContext.ts diff --git a/packages/core/index.ts b/packages/core/index.ts index 8df915e5..35bbcf87 100644 --- a/packages/core/index.ts +++ b/packages/core/index.ts @@ -50,3 +50,8 @@ export { waitAndRetry } from './lib/utils/waitUtils' export { parseMessage } from './lib/utils/parseUtils' export { reloadConfig, isProduction } from './lib/utils/envUtils' + +export { DomainEventEmitter } from './lib/events/DomainEventEmitter' +export { EventRegistry } from './lib/events/EventRegistry' +export { FakeListener } from './lib/events/fakes/FakeListener' +export * from './lib/events/eventTypes' diff --git a/packages/core/lib/events/DomainEventEmitter.spec.ts b/packages/core/lib/events/DomainEventEmitter.spec.ts new file mode 100644 index 00000000..7f1ca626 --- /dev/null +++ b/packages/core/lib/events/DomainEventEmitter.spec.ts @@ -0,0 +1,109 @@ +import { randomUUID } from 'node:crypto' + +import { waitAndRetry } from '@lokalise/node-core' +import type { AwilixContainer } from 'awilix' +import { afterAll, beforeAll, expect } from 'vitest' + +import type { Dependencies } from '../../test/testContext' +import { registerDependencies, TestEvents } from '../../test/testContext' + +import type { CommonEventDefinitionSchemaType } from './eventTypes' +import { FakeListener } from './fakes/FakeListener' + +const createdEventPayload: CommonEventDefinitionSchemaType = { + payload: { + message: 'msg', + }, + type: 'entity.created', + id: randomUUID(), + metadata: { + originApp: 'de', + producerApp: 'dede', + schemaVersion: '1', + correlationId: randomUUID(), + }, + timestamp: new Date().toISOString(), +} + +const updatedEventPayload: CommonEventDefinitionSchemaType = { + ...createdEventPayload, + type: 'entity.updated', +} + +const expectedCreatedPayload = { + id: expect.any(String), + metadata: { + correlationId: expect.any(String), + originApp: 'de', + producerApp: 'dede', + schemaVersion: '1', + }, + payload: { + message: 'msg', + }, + timestamp: expect.any(String), + type: 'entity.created', +} + +const expectedUpdatedPayload = { + ...expectedCreatedPayload, + type: 'entity.updated', +} + +describe('AutopilotEventEmitter', () => { + let diContainer: AwilixContainer + beforeAll(async () => { + diContainer = await registerDependencies() + }) + + afterAll(async () => { + await diContainer.dispose() + }) + + it('emits event to anyListener', async () => { + const { eventEmitter } = diContainer.cradle + const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents) + eventEmitter.onAny(fakeListener) + + await eventEmitter.emit(TestEvents.created, createdEventPayload) + + await waitAndRetry(() => { + return fakeListener.receivedEvents.length > 0 + }) + + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + }) + + it('emits event to singleListener', async () => { + const { eventEmitter } = diContainer.cradle + const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents) + eventEmitter.on('entity.created', fakeListener) + + await eventEmitter.emit(TestEvents.created, createdEventPayload) + + await waitAndRetry(() => { + return fakeListener.receivedEvents.length > 0 + }) + + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + }) + + it('emits event to manyListener', async () => { + const { eventEmitter } = diContainer.cradle + const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents) + eventEmitter.onMany(['entity.created', 'entity.updated'], fakeListener) + + await eventEmitter.emit(TestEvents.created, createdEventPayload) + await eventEmitter.emit(TestEvents.updated, updatedEventPayload) + + await waitAndRetry(() => { + return fakeListener.receivedEvents.length === 2 + }) + + expect(fakeListener.receivedEvents).toHaveLength(2) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + expect(fakeListener.receivedEvents[1]).toMatchObject(expectedUpdatedPayload) + }) +}) diff --git a/packages/core/lib/events/DomainEventEmitter.ts b/packages/core/lib/events/DomainEventEmitter.ts new file mode 100644 index 00000000..f017f256 --- /dev/null +++ b/packages/core/lib/events/DomainEventEmitter.ts @@ -0,0 +1,103 @@ +import { InternalError } from '@lokalise/node-core' + +import type { EventRegistry } from './EventRegistry' +import type { + EventHandler, + AnyEventHandler, + SingleEventHandler, + CommonEventDefinition, + CommonEventDefinitionSchemaType, + EventTypeNames, +} from './eventTypes' + +export class DomainEventEmitter { + private readonly eventRegistry: EventRegistry + + private readonly eventHandlerMap: Record< + string, + EventHandler>[] + > = {} + private readonly anyHandlers: AnyEventHandler[] = [] + + constructor({ eventRegistry }: { eventRegistry: EventRegistry }) { + this.eventRegistry = eventRegistry + } + + public async emit( + supportedEvent: SupportedEvent, + data: Omit, 'type'>, + ) { + const eventTypeName = supportedEvent.schema.shape.type.value + + if (!this.eventRegistry.isSupportedEvent(eventTypeName)) { + throw new InternalError({ + errorCode: 'UNKNOWN_EVENT', + message: `Unknown event ${eventTypeName}`, + }) + } + + const eventHandlers = this.eventHandlerMap[eventTypeName] + + // No relevant handlers are registered, we can stop processing + if (!eventHandlers && this.anyHandlers.length === 0) { + return + } + + const validatedEvent = this.eventRegistry + .getEventDefinitionByTypeName(eventTypeName) + .schema.parse({ + type: eventTypeName, + ...data, + }) + + if (eventHandlers) { + for (const handler of eventHandlers) { + await handler.handleEvent(validatedEvent) + } + } + + for (const handler of this.anyHandlers) { + await handler.handleEvent(validatedEvent) + } + } + + /** + * Register handler for a specific event + */ + public on>( + eventTypeName: EventTypeName, + handler: SingleEventHandler, + ) { + this.addOnHandler(eventTypeName, handler) + } + + /** + * Register handler for multiple events + */ + public onMany>( + eventTypeNames: EventTypeName[], + handler: SingleEventHandler, + ) { + for (const eventTypeName of eventTypeNames) { + this.on(eventTypeName, handler) + } + } + + /** + * Register handler for all events supported by the emitter + */ + public onAny(handler: AnyEventHandler) { + this.anyHandlers.push(handler) + } + + private addOnHandler( + eventTypeName: EventTypeNames, + handler: EventHandler, + ) { + if (!this.eventHandlerMap[eventTypeName]) { + this.eventHandlerMap[eventTypeName] = [] + } + + this.eventHandlerMap[eventTypeName].push(handler) + } +} diff --git a/packages/core/lib/events/EventRegistry.ts b/packages/core/lib/events/EventRegistry.ts new file mode 100644 index 00000000..e8182330 --- /dev/null +++ b/packages/core/lib/events/EventRegistry.ts @@ -0,0 +1,29 @@ +import type { CommonEventDefinition, EventTypeNames } from './eventTypes' + +export class EventRegistry { + public readonly supportedEvents: SupportedEvents + private readonly supportedEventsSet: Set + private readonly supportedEventMap: Record = {} + + constructor(supportedEvents: SupportedEvents) { + this.supportedEvents = supportedEvents + this.supportedEventsSet = new Set() + + for (const supportedEvent of supportedEvents) { + this.supportedEventMap[supportedEvent.schema.shape.type.value] = supportedEvent + this.supportedEventsSet.add(supportedEvent.schema.shape.type.value) + } + } + + public getEventDefinitionByTypeName = < + EventTypeName extends EventTypeNames, + >( + eventTypeName: EventTypeName, + ): CommonEventDefinition => { + return this.supportedEventMap[eventTypeName] + } + + public isSupportedEvent(eventTypeName: string) { + return this.supportedEventsSet.has(eventTypeName) + } +} diff --git a/packages/core/lib/events/eventTypes.ts b/packages/core/lib/events/eventTypes.ts new file mode 100644 index 00000000..6ec46494 --- /dev/null +++ b/packages/core/lib/events/eventTypes.ts @@ -0,0 +1,37 @@ +import type { ZodObject, ZodTypeAny } from 'zod' +import type z from 'zod' + +import type { BASE_MESSAGE_SCHEMA } from '../messages/baseMessageSchemas' + +export type EventTypeNames = + CommonEventDefinitionSchemaType['type'] + +// To be extended with transport-specific fields, e. g. "snsTopic" in specific libraries +export type CommonEventDefinition = { + schema: ZodObject< + Omit<(typeof BASE_MESSAGE_SCHEMA)['shape'], 'payload'> & { payload: ZodTypeAny } + > +} + +export type CommonEventDefinitionSchemaType = z.infer + +export type EventHandler< + EventDefinitionSchema extends + CommonEventDefinitionSchemaType = CommonEventDefinitionSchemaType, +> = { + handleEvent(event: EventDefinitionSchema): void | Promise +} + +export type AnyEventHandler = EventHandler< + CommonEventDefinitionSchemaType +> + +export type SingleEventHandler< + EventDefinition extends CommonEventDefinition[], + EventTypeName extends EventTypeNames, +> = EventHandler> + +type EventFromArrayByTypeName< + EventDefinition extends CommonEventDefinition[], + EventTypeName extends EventTypeNames, +> = Extract, { type: EventTypeName }> diff --git a/packages/core/lib/events/fakes/FakeListener.ts b/packages/core/lib/events/fakes/FakeListener.ts new file mode 100644 index 00000000..b6e3f300 --- /dev/null +++ b/packages/core/lib/events/fakes/FakeListener.ts @@ -0,0 +1,15 @@ +import type { AnyEventHandler, CommonEventDefinition } from '../eventTypes' + +export class FakeListener + implements AnyEventHandler +{ + public receivedEvents: SupportedEvents[number]['schema']['_output'][] = [] + + constructor(_supportedEvents: SupportedEvents) { + this.receivedEvents = [] + } + + handleEvent(event: SupportedEvents[number]['schema']['_output']): void | Promise { + this.receivedEvents.push(event) + } +} diff --git a/packages/core/lib/messages/baseMessageSchemas.ts b/packages/core/lib/messages/baseMessageSchemas.ts new file mode 100644 index 00000000..a038e61e --- /dev/null +++ b/packages/core/lib/messages/baseMessageSchemas.ts @@ -0,0 +1,22 @@ +import z from 'zod' + +export const BASE_MESSAGE_SCHEMA = z.object({ + id: z.string().describe('event unique identifier'), + type: z.literal('').describe('event type name'), + timestamp: z.string().datetime().describe('iso 8601 datetime'), + payload: z.optional(z.object({})).describe('event payload based on type'), + metadata: z + .object({ + schemaVersion: z.string().min(1).describe('base event schema version'), + producerApp: z.string().min(1).describe('app/service that produced the event'), + originApp: z.string().min(1).describe('app/service that initiated the workflow'), + correlationId: z + .string() + .describe('unique identifier passed to all events in workflow chain'), + }) + .describe('event metadata'), +}) + +export type BaseMessageType = z.infer + +export type MessageMetadata = 'id' | 'timestamp' | 'type' | 'metadata' diff --git a/packages/core/package.json b/packages/core/package.json index 14e2fe9d..55b84aa8 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -27,7 +27,8 @@ "dependencies": { "@lokalise/node-core": "^9.14.0", "fast-equals": "^5.0.1", - "toad-cache": "^3.7.0" + "toad-cache": "^3.7.0", + "zod": "^3.23.5" }, "devDependencies": { "@types/node": "^20.11.25", diff --git a/packages/core/test/fakes/FakeLogger.ts b/packages/core/test/fakes/FakeLogger.ts new file mode 100644 index 00000000..fe7f5a16 --- /dev/null +++ b/packages/core/test/fakes/FakeLogger.ts @@ -0,0 +1,26 @@ +import type { Logger } from '../../lib/types/MessageQueueTypes' + +export class FakeLogger implements Logger { + public readonly loggedMessages: unknown[] = [] + public readonly loggedWarnings: unknown[] = [] + public readonly loggedErrors: unknown[] = [] + + debug(obj: unknown) { + this.loggedMessages.push(obj) + } + error(obj: unknown) { + this.loggedErrors.push(obj) + } + fatal(obj: unknown) { + this.loggedErrors.push(obj) + } + info(obj: unknown) { + this.loggedMessages.push(obj) + } + trace(obj: unknown) { + this.loggedMessages.push(obj) + } + warn(obj: unknown) { + this.loggedWarnings.push(obj) + } +} diff --git a/packages/core/test/testContext.ts b/packages/core/test/testContext.ts new file mode 100644 index 00000000..76762d46 --- /dev/null +++ b/packages/core/test/testContext.ts @@ -0,0 +1,101 @@ +import type { ErrorReporter } from '@lokalise/node-core' +import type { Resolver } from 'awilix' +import { asClass, asFunction, createContainer, Lifetime } from 'awilix' +import { AwilixManager } from 'awilix-manager' +import type { Logger } from 'pino' +import pino from 'pino' +import { z } from 'zod' + +import { DomainEventEmitter } from '../lib/events/DomainEventEmitter' +import { EventRegistry } from '../lib/events/EventRegistry' +import { BASE_MESSAGE_SCHEMA } from '../lib/messages/baseMessageSchemas' +import type { TransactionObservabilityManager } from '../lib/types/MessageQueueTypes' + +export const SINGLETON_CONFIG = { lifetime: Lifetime.SINGLETON } + +export type DependencyOverrides = Partial + +const TestLogger: Logger = pino() + +export const TestEvents = { + created: { + schema: BASE_MESSAGE_SCHEMA.extend({ + type: z.literal('entity.created'), + payload: z.object({ + message: z.string(), + }), + }), + }, + + updated: { + schema: BASE_MESSAGE_SCHEMA.extend({ + type: z.literal('entity.updated'), + payload: z.object({ + message: z.string(), + }), + }), + }, +} as const + +export type TestEventsType = (typeof TestEvents)[keyof typeof TestEvents][] + +export async function registerDependencies(dependencyOverrides: DependencyOverrides = {}) { + const diContainer = createContainer({ + injectionMode: 'PROXY', + }) + const awilixManager = new AwilixManager({ + diContainer, + asyncDispose: true, + asyncInit: true, + eagerInject: true, + }) + + const diConfig: DiConfig = { + logger: asFunction(() => { + return TestLogger + }, SINGLETON_CONFIG), + awilixManager: asFunction(() => { + return awilixManager + }, SINGLETON_CONFIG), + + eventRegistry: asFunction(() => { + return new EventRegistry(Object.values(TestEvents)) + }, SINGLETON_CONFIG), + + eventEmitter: asClass(DomainEventEmitter, SINGLETON_CONFIG), + + // vendor-specific dependencies + newRelicBackgroundTransactionManager: asFunction(() => { + return undefined + }, SINGLETON_CONFIG), + errorReporter: asFunction(() => { + return { + report: () => {}, + } satisfies ErrorReporter + }), + } + diContainer.register(diConfig) + + for (const [dependencyKey, dependencyValue] of Object.entries(dependencyOverrides)) { + diContainer.register(dependencyKey, dependencyValue) + } + + await awilixManager.executeInit() + + return diContainer +} + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +type DiConfig = Record> + +export interface Dependencies { + logger: Logger + awilixManager: AwilixManager + + // vendor-specific dependencies + newRelicBackgroundTransactionManager: TransactionObservabilityManager + + errorReporter: ErrorReporter + eventRegistry: EventRegistry + eventEmitter: DomainEventEmitter +} diff --git a/packages/core/vitest.config.mts b/packages/core/vitest.config.mts index 13dabfe3..3afa8ed2 100644 --- a/packages/core/vitest.config.mts +++ b/packages/core/vitest.config.mts @@ -3,11 +3,6 @@ import { defineConfig } from 'vitest/config' export default defineConfig({ test: { globals: true, - poolOptions: { - threads: { - singleThread: true, - }, - }, watch: false, environment: 'node', reporters: ['default'], @@ -17,10 +12,10 @@ export default defineConfig({ reporter: ['text'], all: true, thresholds: { - lines: 20, - functions: 42, - branches: 65, - statements: 20, + lines: 35, + functions: 65, + branches: 80, + statements: 35, }, }, },