diff --git a/packages/core/lib/events/DomainEventEmitter.spec.ts b/packages/core/lib/events/DomainEventEmitter.spec.ts index d72d8167..7ef2e763 100644 --- a/packages/core/lib/events/DomainEventEmitter.spec.ts +++ b/packages/core/lib/events/DomainEventEmitter.spec.ts @@ -1,6 +1,5 @@ import { randomUUID } from 'node:crypto' -import { waitAndRetry } from '@lokalise/node-core' import type { CommonEventDefinitionPublisherSchemaType } from '@message-queue-toolkit/schemas' import type { AwilixContainer } from 'awilix' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' @@ -77,11 +76,14 @@ describe('AutopilotEventEmitter', () => { const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) - const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id) + expect(fakeListener.receivedEvents).toHaveLength(1) // is processed synchronously so no need to wait + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId( + emittedEvent.id, + 'consumed', + ) expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) - expect(fakeListener.receivedEvents).toHaveLength(1) - expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) expect(transactionManagerStartSpy).toHaveBeenCalledOnce() expect(transactionManagerStartSpy).toHaveBeenCalledWith( @@ -110,13 +112,12 @@ describe('AutopilotEventEmitter', () => { ) const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) - const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id) + expect(fakeListener.receivedEvents).toHaveLength(0) + const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id) expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) // even thought event is consumed, the listener is still processing - expect(fakeListener.receivedEvents).toHaveLength(0) // Wait for the event to be processed - await waitAndRetry(() => fakeListener.receivedEvents.length > 0) expect(fakeListener.receivedEvents).toHaveLength(1) expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) @@ -275,12 +276,10 @@ describe('AutopilotEventEmitter', () => { 'stop', ) - await eventEmitter.emit(TestEvents.created, createdEventPayload) - - // even thought event is consumed, the listener is still processing + const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) expect(fakeListener.receivedEvents).toHaveLength(0) - // Wait for the event to be processed - await waitAndRetry(() => fakeListener.receivedEvents.length > 0) + + await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id, 'consumed') expect(fakeListener.receivedEvents).toHaveLength(1) expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) @@ -333,11 +332,12 @@ describe('AutopilotEventEmitter', () => { 'stop', ) - await eventEmitter.emit(TestEvents.created, createdEventPayload) - await eventEmitter.emit(TestEvents.updated, updatedEventPayload) - + const eventEmitted1 = await eventEmitter.emit(TestEvents.created, createdEventPayload) + const emittedEvent2 = await eventEmitter.emit(TestEvents.updated, updatedEventPayload) expect(fakeListener.receivedEvents).toHaveLength(0) - await waitAndRetry(() => fakeListener.receivedEvents.length === 2) + + await eventEmitter.handlerSpy.waitForMessageWithId(eventEmitted1.id, 'consumed') + await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent2.id, 'consumed') expect(fakeListener.receivedEvents).toHaveLength(2) expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) @@ -395,10 +395,9 @@ describe('AutopilotEventEmitter', () => { ) const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) - expect(fakeListener.receivedEvents).toHaveLength(0) - await waitAndRetry(() => fakeListener.receivedEvents.length === 1) + await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id, 'consumed') expect(fakeListener.receivedEvents).toHaveLength(1) expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) diff --git a/packages/core/lib/events/DomainEventEmitter.ts b/packages/core/lib/events/DomainEventEmitter.ts index a367c5fc..c7ccf95d 100644 --- a/packages/core/lib/events/DomainEventEmitter.ts +++ b/packages/core/lib/events/DomainEventEmitter.ts @@ -110,17 +110,6 @@ export class DomainEventEmitter await this.handleEvent(validatedEvent) - if (this._handlerSpy) { - this._handlerSpy.addProcessedMessage( - { - // @ts-ignore - message: validatedEvent, - processingResult: 'consumed', - }, - validatedEvent.id, - ) - } - // @ts-ignore return validatedEvent } @@ -158,9 +147,7 @@ export class DomainEventEmitter * Register handler for all events supported by the emitter */ public onAny(handler: AnyEventHandler, isBackgroundHandler = false) { - for (const supportedEvent of this.eventRegistry.supportedEvents) { - this.on(supportedEvent.consumerSchema.shape.type.value, handler, isBackgroundHandler) - } + this.onMany(Array.from(this.eventRegistry.supportedEventTypes), handler, isBackgroundHandler) } private async handleEvent( @@ -172,46 +159,56 @@ export class DomainEventEmitter } for (const handler of eventHandlers.foreground) { - const transactionId = randomUUID() - let isSuccessfull = false - try { - this.transactionObservabilityManager?.startWithGroup( - this.buildTransactionKey(event, handler, false), - transactionId, - event.type, - ) - await handler.handleEvent(event) - isSuccessfull = true - } finally { - this.transactionObservabilityManager?.stop(transactionId, isSuccessfull) - } + await this.executeEventHandler(event, handler, false) } - for (const handler of eventHandlers.background) { - const transactionId = randomUUID() + const bgPromises = eventHandlers.background.map((handler) => + this.executeEventHandler(event, handler, true), + ) + Promise.all(bgPromises).then(() => { + if (!this._handlerSpy) return + this._handlerSpy.addProcessedMessage( + { + // @ts-ignore + message: event, + processingResult: 'consumed', + }, + event.id, + ) + }) + } + + private async executeEventHandler( + event: CommonEventDefinitionPublisherSchemaType, + handler: EventHandler>, + isBackgroundHandler: boolean, + ) { + const transactionId = randomUUID() + let isSuccessful = false + try { this.transactionObservabilityManager?.startWithGroup( - this.buildTransactionKey(event, handler, true), + this.buildTransactionKey(event, handler, isBackgroundHandler), transactionId, event.type, ) - - Promise.resolve(handler.handleEvent(event)) - .then(() => { - this.transactionObservabilityManager?.stop(transactionId, true) - }) - .catch((error) => { - this.transactionObservabilityManager?.stop(transactionId, false) - const context = { - event: JSON.stringify(event), - eventHandlerId: handler.eventHandlerId, - 'x-request-id': event.metadata?.correlationId, - } - this.logger.error({ - ...resolveGlobalErrorLogObject(error), - ...context, - }) - this.errorReporter?.report({ error: error, context }) - }) + await handler.handleEvent(event) + isSuccessful = true + } catch (error) { + if (!isBackgroundHandler) throw error + + const context = { + event: JSON.stringify(event), + eventHandlerId: handler.eventHandlerId, + 'x-request-id': event.metadata?.correlationId, + } + this.logger.error({ + ...resolveGlobalErrorLogObject(error), + ...context, + }) + // biome-ignore lint/suspicious/noExplicitAny: TODO: improve error type + this.errorReporter?.report({ error: error as any, context }) + } finally { + this.transactionObservabilityManager?.stop(transactionId, isSuccessful) } } diff --git a/packages/core/lib/events/EventRegistry.ts b/packages/core/lib/events/EventRegistry.ts index f46f0572..a2601b38 100644 --- a/packages/core/lib/events/EventRegistry.ts +++ b/packages/core/lib/events/EventRegistry.ts @@ -2,16 +2,16 @@ import type { CommonEventDefinition, EventTypeNames } from './eventTypes' export class EventRegistry { public readonly supportedEvents: SupportedEvents - private readonly supportedEventsSet: Set + public readonly supportedEventTypes: Set private readonly supportedEventMap: Record = {} constructor(supportedEvents: SupportedEvents) { this.supportedEvents = supportedEvents - this.supportedEventsSet = new Set() + this.supportedEventTypes = new Set() for (const supportedEvent of supportedEvents) { this.supportedEventMap[supportedEvent.consumerSchema.shape.type.value] = supportedEvent - this.supportedEventsSet.add(supportedEvent.consumerSchema.shape.type.value) + this.supportedEventTypes.add(supportedEvent.consumerSchema.shape.type.value) } } @@ -24,6 +24,6 @@ export class EventRegistry { } public isSupportedEvent(eventTypeName: string) { - return this.supportedEventsSet.has(eventTypeName) + return this.supportedEventTypes.has(eventTypeName) } } diff --git a/packages/core/package.json b/packages/core/package.json index d07a6ae9..b0f8bf9b 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/core", - "version": "17.0.1", + "version": "17.0.2", "private": false, "license": "MIT", "description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently",