Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AP-4870 async event handlers support #201

Merged
merged 28 commits into from
Sep 2, 2024
Merged
Changes from 5 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3082a3f
AP-4870 Extracting event building to a private method
CarlosGamero Aug 28, 2024
0aef8cd
AP-4870 Lint warning fix
CarlosGamero Aug 28, 2024
632fc16
AP-4870 Moving event handling to a separate method
CarlosGamero Aug 28, 2024
fd879b1
AP-4870 simplification of main method
CarlosGamero Aug 28, 2024
2a43e9c
AP-4870 API to support async handlers
CarlosGamero Aug 28, 2024
7ca9296
Merge branch 'main' into feat/AP-4870-async_event_handlers_support
CarlosGamero Aug 28, 2024
14d5f80
AP-4870 fixing sync handling
CarlosGamero Aug 28, 2024
5daff93
AP-4870 Improving naming
CarlosGamero Aug 28, 2024
a5100e6
AP-4870 Issue fix
CarlosGamero Aug 28, 2024
648510d
AP-4870 Minor test changes
CarlosGamero Aug 28, 2024
c4b3af9
AP-4870 FakeDelayedListener
CarlosGamero Aug 28, 2024
9aa598f
AP-4870 BG listeners simple handling
CarlosGamero Aug 28, 2024
aa16cbe
AP-4870 Adding bg listeners tests
CarlosGamero Aug 28, 2024
9e92cae
AP-4870 Reporting error + logging
CarlosGamero Aug 28, 2024
a9d3928
AP-4870 Improving error logging
CarlosGamero Aug 28, 2024
b609464
AP-4870 Making logger and errorReporter mandatory
CarlosGamero Aug 29, 2024
c922895
Release prepare
CarlosGamero Aug 29, 2024
4b9d773
node-core update
CarlosGamero Aug 29, 2024
c7d0985
schemas release prepare
CarlosGamero Aug 29, 2024
9d02746
Core using new schema version
CarlosGamero Aug 29, 2024
18b52aa
Build fix
CarlosGamero Aug 29, 2024
789f708
AP-4870 Implementing bg listenner observability
CarlosGamero Aug 29, 2024
0f0ec74
AP-4870 test fixes
CarlosGamero Aug 29, 2024
ff88629
AP-4870 Adding tests
CarlosGamero Aug 29, 2024
892cf5b
AP-4870 PR suggestions
CarlosGamero Aug 30, 2024
74ff154
AP-4870 Transaction observability on bg listeners
CarlosGamero Aug 30, 2024
7f6c902
Simplify any handlers
CarlosGamero Sep 2, 2024
c4c6362
Removing comment
CarlosGamero Sep 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 65 additions & 72 deletions packages/core/lib/events/DomainEventEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { InternalError } from '@lokalise/node-core'
import { type ErrorReporter, InternalError } from '@lokalise/node-core'

import type { MetadataFiller } from '../messages/MetadataFiller'
import type { HandlerSpy, HandlerSpyParams, PublicHandlerSpy } from '../queues/HandlerSpy'
import { resolveHandlerSpy } from '../queues/HandlerSpy'

import type { ConsumerMessageMetadataType } from '@message-queue-toolkit/schemas'
import type { Logger } from '../types/MessageQueueTypes'
import type { EventRegistry } from './EventRegistry'
import type {
AnyEventHandler,
Expand All @@ -16,36 +17,51 @@ import type {
SingleEventHandler,
} from './eventTypes'

export type DomainEventEmitterDependencies<SupportedEvents extends CommonEventDefinition[]> = {
eventRegistry: EventRegistry<SupportedEvents>
metadataFiller: MetadataFiller
// TODO: make them mandatory is a breaking change, decide if we are fine with that
logger?: Logger
CarlosGamero marked this conversation as resolved.
Show resolved Hide resolved
errorReporter?: ErrorReporter
}

// TODO: not sure how to call it
type Handlers<T> = {
sync: T[]
async: T[]
}

export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]> {
private readonly eventRegistry: EventRegistry<SupportedEvents>
private readonly metadataFiller: MetadataFiller
private readonly logger?: Logger
private readonly errorReporter?: ErrorReporter
private readonly _handlerSpy?: HandlerSpy<
CommonEventDefinitionConsumerSchemaType<SupportedEvents[number]>
>

private readonly eventHandlerMap: Record<
string,
EventHandler<CommonEventDefinitionPublisherSchemaType<SupportedEvents[number]>>[]
> = {}
private readonly anyHandlers: AnyEventHandler<SupportedEvents>[] = []
private readonly metadataFiller: MetadataFiller
private _handlerSpy:
| HandlerSpy<CommonEventDefinitionConsumerSchemaType<SupportedEvents[number]>>
| undefined
Handlers<EventHandler<CommonEventDefinitionPublisherSchemaType<SupportedEvents[number]>>>
>
private readonly anyHandlers: Handlers<AnyEventHandler<SupportedEvents>>

constructor(
{
eventRegistry,
metadataFiller,
}: {
eventRegistry: EventRegistry<SupportedEvents>
metadataFiller: MetadataFiller
},
deps: DomainEventEmitterDependencies<SupportedEvents>,
options: {
handlerSpy?: HandlerSpy<object> | HandlerSpyParams | boolean
} = {},
) {
this.eventRegistry = eventRegistry
this.metadataFiller = metadataFiller
this.eventRegistry = deps.eventRegistry
this.metadataFiller = deps.metadataFiller
this.logger = deps.logger
this.errorReporter = deps.errorReporter

this._handlerSpy =
resolveHandlerSpy<CommonEventDefinitionConsumerSchemaType<SupportedEvents[number]>>(options)

this.eventHandlerMap = {}
this.anyHandlers = { sync: [], async: [] }
}

get handlerSpy(): PublicHandlerSpy<
Expand All @@ -64,13 +80,16 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>,
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>,
): Promise<Omit<CommonEventDefinitionConsumerSchemaType<SupportedEvent>, 'type'>> {
if (!data.timestamp) {
data.timestamp = this.metadataFiller.produceTimestamp()
}
if (!data.id) {
data.id = this.metadataFiller.produceId()
const eventTypeName = supportedEvent.publisherSchema.shape.type.value
if (!this.eventRegistry.isSupportedEvent(eventTypeName)) {
throw new InternalError({
errorCode: 'UNKNOWN_EVENT',
message: `Unknown event ${eventTypeName}`,
})
}

if (!data.timestamp) data.timestamp = this.metadataFiller.produceTimestamp()
if (!data.id) data.id = this.metadataFiller.produceId()
if (!data.metadata) {
data.metadata = this.metadataFiller.produceMetadata(
// @ts-ignore
Expand All @@ -79,52 +98,19 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>
precedingMessageMetadata ?? {},
)
}

if (!data.metadata.correlationId) {
data.metadata.correlationId = this.metadataFiller.produceId()
}

const eventTypeName = supportedEvent.publisherSchema.shape.type.value

if (!this.eventRegistry.isSupportedEvent(eventTypeName)) {
throw new InternalError({
errorCode: 'UNKNOWN_EVENT',
message: `Unknown event ${eventTypeName}`,
})
}

const eventHandlers = this.eventHandlerMap[eventTypeName]

// No relevant handlers are registered, we can stop processing
if (!eventHandlers && this.anyHandlers.length === 0) {
// @ts-ignore
return data
}
if (!data.metadata.correlationId) data.metadata.correlationId = this.metadataFiller.produceId()
kibertoad marked this conversation as resolved.
Show resolved Hide resolved

const validatedEvent = this.eventRegistry
.getEventDefinitionByTypeName(eventTypeName)
.publisherSchema.parse({
type: eventTypeName,
...data,
})
.publisherSchema.parse({ type: eventTypeName, ...data })

if (eventHandlers) {
for (const handler of eventHandlers) {
await handler.handleEvent(validatedEvent)
}
}

for (const handler of this.anyHandlers) {
await handler.handleEvent(validatedEvent)
}
await this.handleEvent(validatedEvent)

if (this._handlerSpy) {
this._handlerSpy.addProcessedMessage(
{
// @ts-ignore
message: {
...validatedEvent,
},
message: validatedEvent,
processingResult: 'consumed',
},
validatedEvent.id,
Expand All @@ -141,8 +127,14 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>
public on<EventTypeName extends EventTypeNames<SupportedEvents[number]>>(
eventTypeName: EventTypeName,
handler: SingleEventHandler<SupportedEvents, EventTypeName>,
asyncHandler = false,
) {
this.addOnHandler(eventTypeName, handler)
if (!this.eventHandlerMap[eventTypeName]) {
this.eventHandlerMap[eventTypeName] = { sync: [], async: [] }
}

if (asyncHandler) this.eventHandlerMap[eventTypeName].async.push(handler)
else this.eventHandlerMap[eventTypeName].sync.push(handler)
}

/**
Expand All @@ -151,27 +143,28 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>
public onMany<EventTypeName extends EventTypeNames<SupportedEvents[number]>>(
eventTypeNames: EventTypeName[],
handler: SingleEventHandler<SupportedEvents, EventTypeName>,
asyncHandler = false,
) {
for (const eventTypeName of eventTypeNames) {
this.on(eventTypeName, handler)
this.on(eventTypeName, handler, asyncHandler)
}
}

/**
* Register handler for all events supported by the emitter
*/
public onAny(handler: AnyEventHandler<SupportedEvents>) {
this.anyHandlers.push(handler)
public onAny(handler: AnyEventHandler<SupportedEvents>, asyncHandler = false) {
if (asyncHandler) this.anyHandlers.async.push(handler)
else this.anyHandlers.sync.push(handler)
}

private addOnHandler(
eventTypeName: EventTypeNames<SupportedEvents[number]>,
handler: EventHandler,
) {
if (!this.eventHandlerMap[eventTypeName]) {
this.eventHandlerMap[eventTypeName] = []
}

this.eventHandlerMap[eventTypeName].push(handler)
private async handleEvent<SupportedEvent extends SupportedEvents[number]>(
_event: CommonEventDefinitionPublisherSchemaType<SupportedEvent>,
): Promise<void> {
// TODO: implement
//const handlers = [...(this.eventHandlerMap[event.type] ?? []), ...this.anyHandlers]
//for (const handler of handlers) {
// await handler.handleEvent(event)
//}
}
}
Loading