Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/spy for bg listeners #206

Merged
merged 4 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 17 additions & 18 deletions packages/core/lib/events/DomainEventEmitter.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { randomUUID } from 'node:crypto'

import { waitAndRetry } from '@lokalise/node-core'
import type { CommonEventDefinitionPublisherSchemaType } from '@message-queue-toolkit/schemas'
import type { AwilixContainer } from 'awilix'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
Expand Down Expand Up @@ -77,11 +76,14 @@ describe('AutopilotEventEmitter', () => {

const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload)

const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id)
expect(fakeListener.receivedEvents).toHaveLength(1) // is processed synchronously so no need to wait
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)

const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(
emittedEvent.id,
'consumed',
)
expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value)
expect(fakeListener.receivedEvents).toHaveLength(1)
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)

expect(transactionManagerStartSpy).toHaveBeenCalledOnce()
expect(transactionManagerStartSpy).toHaveBeenCalledWith(
Expand Down Expand Up @@ -110,13 +112,12 @@ describe('AutopilotEventEmitter', () => {
)

const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload)
const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id)
expect(fakeListener.receivedEvents).toHaveLength(0)

const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id)
expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value)
// even thought event is consumed, the listener is still processing
expect(fakeListener.receivedEvents).toHaveLength(0)
// Wait for the event to be processed
await waitAndRetry(() => fakeListener.receivedEvents.length > 0)
expect(fakeListener.receivedEvents).toHaveLength(1)
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)

Expand Down Expand Up @@ -275,12 +276,10 @@ describe('AutopilotEventEmitter', () => {
'stop',
)

await eventEmitter.emit(TestEvents.created, createdEventPayload)

// even thought event is consumed, the listener is still processing
const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload)
expect(fakeListener.receivedEvents).toHaveLength(0)
// Wait for the event to be processed
await waitAndRetry(() => fakeListener.receivedEvents.length > 0)

await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id, 'consumed')
expect(fakeListener.receivedEvents).toHaveLength(1)
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)

Expand Down Expand Up @@ -333,11 +332,12 @@ describe('AutopilotEventEmitter', () => {
'stop',
)

await eventEmitter.emit(TestEvents.created, createdEventPayload)
await eventEmitter.emit(TestEvents.updated, updatedEventPayload)

const eventEmitted1 = await eventEmitter.emit(TestEvents.created, createdEventPayload)
const emittedEvent2 = await eventEmitter.emit(TestEvents.updated, updatedEventPayload)
expect(fakeListener.receivedEvents).toHaveLength(0)
await waitAndRetry(() => fakeListener.receivedEvents.length === 2)

await eventEmitter.handlerSpy.waitForMessageWithId(eventEmitted1.id, 'consumed')
await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent2.id, 'consumed')

expect(fakeListener.receivedEvents).toHaveLength(2)
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)
Expand Down Expand Up @@ -395,10 +395,9 @@ describe('AutopilotEventEmitter', () => {
)

const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload)

expect(fakeListener.receivedEvents).toHaveLength(0)
await waitAndRetry(() => fakeListener.receivedEvents.length === 1)

await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id, 'consumed')
expect(fakeListener.receivedEvents).toHaveLength(1)
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)

Expand Down
93 changes: 45 additions & 48 deletions packages/core/lib/events/DomainEventEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,6 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>

await this.handleEvent(validatedEvent)

if (this._handlerSpy) {
this._handlerSpy.addProcessedMessage(
{
// @ts-ignore
message: validatedEvent,
processingResult: 'consumed',
},
validatedEvent.id,
)
}

// @ts-ignore
return validatedEvent
}
Expand Down Expand Up @@ -158,9 +147,7 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>
* Register handler for all events supported by the emitter
*/
public onAny(handler: AnyEventHandler<SupportedEvents>, isBackgroundHandler = false) {
for (const supportedEvent of this.eventRegistry.supportedEvents) {
this.on(supportedEvent.consumerSchema.shape.type.value, handler, isBackgroundHandler)
}
this.onMany(Array.from(this.eventRegistry.supportedEventTypes), handler, isBackgroundHandler)
}

private async handleEvent<SupportedEvent extends SupportedEvents[number]>(
Expand All @@ -172,46 +159,56 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>
}

for (const handler of eventHandlers.foreground) {
const transactionId = randomUUID()
let isSuccessfull = false
try {
this.transactionObservabilityManager?.startWithGroup(
this.buildTransactionKey(event, handler, false),
transactionId,
event.type,
)
await handler.handleEvent(event)
isSuccessfull = true
} finally {
this.transactionObservabilityManager?.stop(transactionId, isSuccessfull)
}
await this.executeEventHandler(event, handler, false)
}

for (const handler of eventHandlers.background) {
const transactionId = randomUUID()
const bgPromises = eventHandlers.background.map((handler) =>
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
this.executeEventHandler(event, handler, true),
)
Promise.all(bgPromises).then(() => {
if (!this._handlerSpy) return
this._handlerSpy.addProcessedMessage(
{
// @ts-ignore
message: event,
processingResult: 'consumed',
},
event.id,
)
})
}

private async executeEventHandler<SupportedEvent extends SupportedEvents[number]>(
event: CommonEventDefinitionPublisherSchemaType<SupportedEvent>,
handler: EventHandler<CommonEventDefinitionPublisherSchemaType<SupportedEvent>>,
isBackgroundHandler: boolean,
) {
const transactionId = randomUUID()
let isSuccessful = false
try {
this.transactionObservabilityManager?.startWithGroup(
this.buildTransactionKey(event, handler, true),
this.buildTransactionKey(event, handler, isBackgroundHandler),
transactionId,
event.type,
)

Promise.resolve(handler.handleEvent(event))
.then(() => {
this.transactionObservabilityManager?.stop(transactionId, true)
})
.catch((error) => {
this.transactionObservabilityManager?.stop(transactionId, false)
const context = {
event: JSON.stringify(event),
eventHandlerId: handler.eventHandlerId,
'x-request-id': event.metadata?.correlationId,
}
this.logger.error({
...resolveGlobalErrorLogObject(error),
...context,
})
this.errorReporter?.report({ error: error, context })
})
await handler.handleEvent(event)
isSuccessful = true
} catch (error) {
if (!isBackgroundHandler) throw error

const context = {
event: JSON.stringify(event),
eventHandlerId: handler.eventHandlerId,
'x-request-id': event.metadata?.correlationId,
}
this.logger.error({
...resolveGlobalErrorLogObject(error),
...context,
})
// biome-ignore lint/suspicious/noExplicitAny: TODO: improve error type
this.errorReporter?.report({ error: error as any, context })
} finally {
this.transactionObservabilityManager?.stop(transactionId, isSuccessful)
}
}

Expand Down
8 changes: 4 additions & 4 deletions packages/core/lib/events/EventRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ import type { CommonEventDefinition, EventTypeNames } from './eventTypes'

export class EventRegistry<SupportedEvents extends CommonEventDefinition[]> {
public readonly supportedEvents: SupportedEvents
private readonly supportedEventsSet: Set<string>
public readonly supportedEventTypes: Set<string>
private readonly supportedEventMap: Record<string, CommonEventDefinition> = {}

constructor(supportedEvents: SupportedEvents) {
this.supportedEvents = supportedEvents
this.supportedEventsSet = new Set<string>()
this.supportedEventTypes = new Set<string>()

for (const supportedEvent of supportedEvents) {
this.supportedEventMap[supportedEvent.consumerSchema.shape.type.value] = supportedEvent
this.supportedEventsSet.add(supportedEvent.consumerSchema.shape.type.value)
this.supportedEventTypes.add(supportedEvent.consumerSchema.shape.type.value)
}
}

Expand All @@ -24,6 +24,6 @@ export class EventRegistry<SupportedEvents extends CommonEventDefinition[]> {
}

public isSupportedEvent(eventTypeName: string) {
return this.supportedEventsSet.has(eventTypeName)
return this.supportedEventTypes.has(eventTypeName)
}
}
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/core",
"version": "17.0.1",
"version": "17.0.2",
"private": false,
"license": "MIT",
"description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently",
Expand Down
Loading