diff --git a/packages/amqp/package.json b/packages/amqp/package.json index 6ee6f870..746d67c4 100644 --- a/packages/amqp/package.json +++ b/packages/amqp/package.json @@ -25,7 +25,7 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^12.0.0", + "@lokalise/node-core": "^12.1.0", "zod": "^3.23.8" }, "peerDependencies": { diff --git a/packages/core/lib/events/DomainEventEmitter.spec.ts b/packages/core/lib/events/DomainEventEmitter.spec.ts index 16334d59..d72d8167 100644 --- a/packages/core/lib/events/DomainEventEmitter.spec.ts +++ b/packages/core/lib/events/DomainEventEmitter.spec.ts @@ -1,16 +1,15 @@ import { randomUUID } from 'node:crypto' import { waitAndRetry } from '@lokalise/node-core' -import type { - CommonEventDefinitionPublisherSchemaType, - ConsumerMessageSchema, -} from '@message-queue-toolkit/schemas' +import type { CommonEventDefinitionPublisherSchemaType } from '@message-queue-toolkit/schemas' import type { AwilixContainer } from 'awilix' -import { afterEach, beforeEach, describe, expect, it } from 'vitest' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import type { Dependencies } from '../../test/testContext' +import type { Dependencies, TestEventsType } from '../../test/testContext' import { TestEvents, registerDependencies } from '../../test/testContext' +import { ErroredFakeListener } from '../../test/fakes/ErroredFakeListener' +import type { DomainEventEmitter } from './DomainEventEmitter' import { FakeListener } from './fakes/FakeListener' const createdEventPayload: CommonEventDefinitionPublisherSchemaType = { @@ -53,38 +52,90 @@ const expectedUpdatedPayload = { describe('AutopilotEventEmitter', () => { let diContainer: AwilixContainer + let eventEmitter: DomainEventEmitter + beforeEach(async () => { diContainer = await registerDependencies() + eventEmitter = diContainer.cradle.eventEmitter }) afterEach(async () => { await diContainer.dispose() }) - it('emits event to anyListener', async () => { - const { eventEmitter } = diContainer.cradle - const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents) + it('emits event to anyListener - foreground', async () => { + const fakeListener = new FakeListener() eventEmitter.onAny(fakeListener) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) - const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId< - ConsumerMessageSchema - >(emittedEvent.id) + const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id) expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) - await waitAndRetry(() => { - return fakeListener.receivedEvents.length > 0 - }) + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'fg_event_listener:entity.created:FakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + true, + ) + }) + + it('emits event to anyListener - background', async () => { + const fakeListener = new FakeListener(100) + eventEmitter.onAny(fakeListener, true) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) + const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id) + expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) + // even thought event is consumed, the listener is still processing + expect(fakeListener.receivedEvents).toHaveLength(0) + // Wait for the event to be processed + await waitAndRetry(() => fakeListener.receivedEvents.length > 0) expect(fakeListener.receivedEvents).toHaveLength(1) expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'bg_event_listener:entity.created:FakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + true, + ) }) it('emits event to anyListener and populates metadata', async () => { - const { eventEmitter } = diContainer.cradle - const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents) + const fakeListener = new FakeListener() eventEmitter.onAny(fakeListener) const emittedEvent = await eventEmitter.emit(TestEvents.created, { @@ -93,16 +144,9 @@ describe('AutopilotEventEmitter', () => { }, }) - const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId< - ConsumerMessageSchema - >(emittedEvent.id) + const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id) expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) - - await waitAndRetry(() => { - return fakeListener.receivedEvents.length > 0 - }) - expect(fakeListener.receivedEvents).toHaveLength(1) expect(fakeListener.receivedEvents[0]).toMatchObject({ id: expect.any(String), @@ -121,20 +165,15 @@ describe('AutopilotEventEmitter', () => { }) it('can check spy for messages not being sent', async () => { - const { eventEmitter } = diContainer.cradle - const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents) + const fakeListener = new FakeListener() eventEmitter.onAny(fakeListener) await eventEmitter.emit(TestEvents.created, createdEventPayload) - const notEmittedEvent = eventEmitter.handlerSpy.checkForMessage< - ConsumerMessageSchema - >({ + const notEmittedEvent = eventEmitter.handlerSpy.checkForMessage({ type: 'entity.updated', }) - const emittedEvent = eventEmitter.handlerSpy.checkForMessage< - ConsumerMessageSchema - >({ + const emittedEvent = eventEmitter.handlerSpy.checkForMessage({ type: 'entity.created', }) @@ -143,8 +182,7 @@ describe('AutopilotEventEmitter', () => { }) it('emits event to anyListener with metadata', async () => { - const { eventEmitter } = diContainer.cradle - const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents) + const fakeListener = new FakeListener() eventEmitter.onAny(fakeListener) const partialCreatedEventPayload = { @@ -193,35 +231,203 @@ describe('AutopilotEventEmitter', () => { }) }) - it('emits event to singleListener', async () => { - const { eventEmitter } = diContainer.cradle - const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents) + it('emits event to singleListener - foreground', async () => { + const fakeListener = new FakeListener() eventEmitter.on('entity.created', fakeListener) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) await eventEmitter.emit(TestEvents.created, createdEventPayload) - await waitAndRetry(() => { - return fakeListener.receivedEvents.length > 0 - }) + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'fg_event_listener:entity.created:FakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + true, + ) + }) + + it('emits event to singleListener - background', async () => { + const fakeListener = new FakeListener(100) + eventEmitter.on('entity.created', fakeListener, true) + + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + await eventEmitter.emit(TestEvents.created, createdEventPayload) + // even thought event is consumed, the listener is still processing + expect(fakeListener.receivedEvents).toHaveLength(0) + // Wait for the event to be processed + await waitAndRetry(() => fakeListener.receivedEvents.length > 0) expect(fakeListener.receivedEvents).toHaveLength(1) expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'bg_event_listener:entity.created:FakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + true, + ) }) - it('emits event to manyListener', async () => { - const { eventEmitter } = diContainer.cradle - const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents) + it('emits event to manyListener - foreground', async () => { + const fakeListener = new FakeListener() eventEmitter.onMany(['entity.created', 'entity.updated'], fakeListener) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) await eventEmitter.emit(TestEvents.created, createdEventPayload) await eventEmitter.emit(TestEvents.updated, updatedEventPayload) - await waitAndRetry(() => { - return fakeListener.receivedEvents.length === 2 - }) + expect(fakeListener.receivedEvents).toHaveLength(2) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + expect(fakeListener.receivedEvents[1]).toMatchObject(expectedUpdatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledTimes(2) + expect(transactionManagerStopSpy).toHaveBeenCalledTimes(2) + }) + + it('emits event to manyListener - background', async () => { + const fakeListener = new FakeListener(100) + eventEmitter.onMany(['entity.created', 'entity.updated'], fakeListener, true) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + await eventEmitter.emit(TestEvents.created, createdEventPayload) + await eventEmitter.emit(TestEvents.updated, updatedEventPayload) + + expect(fakeListener.receivedEvents).toHaveLength(0) + await waitAndRetry(() => fakeListener.receivedEvents.length === 2) expect(fakeListener.receivedEvents).toHaveLength(2) expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) expect(fakeListener.receivedEvents[1]).toMatchObject(expectedUpdatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledTimes(2) + expect(transactionManagerStopSpy).toHaveBeenCalledTimes(2) + }) + + it('foreground listener error handling', async () => { + const fakeListener = new ErroredFakeListener() + eventEmitter.onAny(fakeListener) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + await expect(eventEmitter.emit(TestEvents.created, createdEventPayload)).rejects.toThrow( + 'ErroredFakeListener error', + ) + + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'fg_event_listener:entity.created:ErroredFakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + false, + ) + }) + + it('background listener error handling', async () => { + const fakeListener = new ErroredFakeListener(100) + eventEmitter.onAny(fakeListener, true) + const reporterSpy = vi.spyOn(diContainer.cradle.errorReporter, 'report') + const logSpy = vi.spyOn(diContainer.cradle.logger, 'error') + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) + + expect(fakeListener.receivedEvents).toHaveLength(0) + await waitAndRetry(() => fakeListener.receivedEvents.length === 1) + + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + + const expectedContext = { + event: JSON.stringify(emittedEvent), + eventHandlerId: 'ErroredFakeListener', + 'x-request-id': emittedEvent.metadata?.correlationId, + } + expect(reporterSpy).toHaveBeenCalledWith({ + error: expect.any(Error), + context: expectedContext, + }) + expect(logSpy).toHaveBeenCalledWith({ + error: expect.anything(), + message: 'ErroredFakeListener error', + ...expectedContext, + }) + + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'bg_event_listener:entity.created:ErroredFakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + false, + ) }) }) diff --git a/packages/core/lib/events/DomainEventEmitter.ts b/packages/core/lib/events/DomainEventEmitter.ts index 37ebd36d..a367c5fc 100644 --- a/packages/core/lib/events/DomainEventEmitter.ts +++ b/packages/core/lib/events/DomainEventEmitter.ts @@ -1,10 +1,17 @@ -import { InternalError } from '@lokalise/node-core' +import { + type ErrorReporter, + InternalError, + type TransactionObservabilityManager, + resolveGlobalErrorLogObject, +} from '@lokalise/node-core' import type { MetadataFiller } from '../messages/MetadataFiller' import type { HandlerSpy, HandlerSpyParams, PublicHandlerSpy } from '../queues/HandlerSpy' import { resolveHandlerSpy } from '../queues/HandlerSpy' +import { randomUUID } from 'node:crypto' import type { ConsumerMessageMetadataType } from '@message-queue-toolkit/schemas' +import type { Logger } from '../types/MessageQueueTypes' import type { EventRegistry } from './EventRegistry' import type { AnyEventHandler, @@ -16,36 +23,49 @@ import type { SingleEventHandler, } from './eventTypes' +export type DomainEventEmitterDependencies = { + eventRegistry: EventRegistry + metadataFiller: MetadataFiller + logger: Logger + errorReporter?: ErrorReporter + transactionObservabilityManager?: TransactionObservabilityManager +} + +type Handlers = { + background: T[] + foreground: T[] +} + export class DomainEventEmitter { private readonly eventRegistry: EventRegistry + private readonly metadataFiller: MetadataFiller + private readonly logger: Logger + private readonly errorReporter?: ErrorReporter + private readonly transactionObservabilityManager?: TransactionObservabilityManager + private readonly _handlerSpy?: HandlerSpy< + CommonEventDefinitionConsumerSchemaType + > private readonly eventHandlerMap: Record< string, - EventHandler>[] - > = {} - private readonly anyHandlers: AnyEventHandler[] = [] - private readonly metadataFiller: MetadataFiller - private _handlerSpy: - | HandlerSpy> - | undefined - + Handlers>> + > constructor( - { - eventRegistry, - metadataFiller, - }: { - eventRegistry: EventRegistry - metadataFiller: MetadataFiller - }, + deps: DomainEventEmitterDependencies, options: { handlerSpy?: HandlerSpy | HandlerSpyParams | boolean } = {}, ) { - this.eventRegistry = eventRegistry - this.metadataFiller = metadataFiller + this.eventRegistry = deps.eventRegistry + this.metadataFiller = deps.metadataFiller + this.logger = deps.logger + this.errorReporter = deps.errorReporter + this.transactionObservabilityManager = deps.transactionObservabilityManager this._handlerSpy = resolveHandlerSpy>(options) + + this.eventHandlerMap = {} } get handlerSpy(): PublicHandlerSpy< @@ -64,13 +84,16 @@ export class DomainEventEmitter data: Omit, 'type'>, precedingMessageMetadata?: Partial, ): Promise, 'type'>> { - if (!data.timestamp) { - data.timestamp = this.metadataFiller.produceTimestamp() - } - if (!data.id) { - data.id = this.metadataFiller.produceId() + const eventTypeName = supportedEvent.publisherSchema.shape.type.value + if (!this.eventRegistry.isSupportedEvent(eventTypeName)) { + throw new InternalError({ + errorCode: 'UNKNOWN_EVENT', + message: `Unknown event ${eventTypeName}`, + }) } + if (!data.timestamp) data.timestamp = this.metadataFiller.produceTimestamp() + if (!data.id) data.id = this.metadataFiller.produceId() if (!data.metadata) { data.metadata = this.metadataFiller.produceMetadata( // @ts-ignore @@ -79,52 +102,19 @@ export class DomainEventEmitter precedingMessageMetadata ?? {}, ) } - - if (!data.metadata.correlationId) { - data.metadata.correlationId = this.metadataFiller.produceId() - } - - const eventTypeName = supportedEvent.publisherSchema.shape.type.value - - if (!this.eventRegistry.isSupportedEvent(eventTypeName)) { - throw new InternalError({ - errorCode: 'UNKNOWN_EVENT', - message: `Unknown event ${eventTypeName}`, - }) - } - - const eventHandlers = this.eventHandlerMap[eventTypeName] - - // No relevant handlers are registered, we can stop processing - if (!eventHandlers && this.anyHandlers.length === 0) { - // @ts-ignore - return data - } + if (!data.metadata.correlationId) data.metadata.correlationId = this.metadataFiller.produceId() const validatedEvent = this.eventRegistry .getEventDefinitionByTypeName(eventTypeName) - .publisherSchema.parse({ - type: eventTypeName, - ...data, - }) + .publisherSchema.parse({ type: eventTypeName, ...data }) - if (eventHandlers) { - for (const handler of eventHandlers) { - await handler.handleEvent(validatedEvent) - } - } - - for (const handler of this.anyHandlers) { - await handler.handleEvent(validatedEvent) - } + await this.handleEvent(validatedEvent) if (this._handlerSpy) { this._handlerSpy.addProcessedMessage( { // @ts-ignore - message: { - ...validatedEvent, - }, + message: validatedEvent, processingResult: 'consumed', }, validatedEvent.id, @@ -141,8 +131,14 @@ export class DomainEventEmitter public on>( eventTypeName: EventTypeName, handler: SingleEventHandler, + isBackgroundHandler = false, ) { - this.addOnHandler(eventTypeName, handler) + if (!this.eventHandlerMap[eventTypeName]) { + this.eventHandlerMap[eventTypeName] = { foreground: [], background: [] } + } + + if (isBackgroundHandler) this.eventHandlerMap[eventTypeName].background.push(handler) + else this.eventHandlerMap[eventTypeName].foreground.push(handler) } /** @@ -151,27 +147,79 @@ export class DomainEventEmitter public onMany>( eventTypeNames: EventTypeName[], handler: SingleEventHandler, + isBackgroundHandler = false, ) { for (const eventTypeName of eventTypeNames) { - this.on(eventTypeName, handler) + this.on(eventTypeName, handler, isBackgroundHandler) } } /** * Register handler for all events supported by the emitter */ - public onAny(handler: AnyEventHandler) { - this.anyHandlers.push(handler) + public onAny(handler: AnyEventHandler, isBackgroundHandler = false) { + for (const supportedEvent of this.eventRegistry.supportedEvents) { + this.on(supportedEvent.consumerSchema.shape.type.value, handler, isBackgroundHandler) + } } - private addOnHandler( - eventTypeName: EventTypeNames, - handler: EventHandler, - ) { - if (!this.eventHandlerMap[eventTypeName]) { - this.eventHandlerMap[eventTypeName] = [] + private async handleEvent( + event: CommonEventDefinitionPublisherSchemaType, + ): Promise { + const eventHandlers = this.eventHandlerMap[event.type] ?? { + foreground: [], + background: [], + } + + for (const handler of eventHandlers.foreground) { + const transactionId = randomUUID() + let isSuccessfull = false + try { + this.transactionObservabilityManager?.startWithGroup( + this.buildTransactionKey(event, handler, false), + transactionId, + event.type, + ) + await handler.handleEvent(event) + isSuccessfull = true + } finally { + this.transactionObservabilityManager?.stop(transactionId, isSuccessfull) + } + } + + for (const handler of eventHandlers.background) { + const transactionId = randomUUID() + this.transactionObservabilityManager?.startWithGroup( + this.buildTransactionKey(event, handler, true), + transactionId, + event.type, + ) + + Promise.resolve(handler.handleEvent(event)) + .then(() => { + this.transactionObservabilityManager?.stop(transactionId, true) + }) + .catch((error) => { + this.transactionObservabilityManager?.stop(transactionId, false) + const context = { + event: JSON.stringify(event), + eventHandlerId: handler.eventHandlerId, + 'x-request-id': event.metadata?.correlationId, + } + this.logger.error({ + ...resolveGlobalErrorLogObject(error), + ...context, + }) + this.errorReporter?.report({ error: error, context }) + }) } + } - this.eventHandlerMap[eventTypeName].push(handler) + private buildTransactionKey( + event: CommonEventDefinitionPublisherSchemaType, + handler: EventHandler>, + isBackgroundHandler: boolean, + ): string { + return `${isBackgroundHandler ? 'bg' : 'fg'}_event_listener:${event.type}:${handler.eventHandlerId}` } } diff --git a/packages/core/lib/events/fakes/FakeListener.ts b/packages/core/lib/events/fakes/FakeListener.ts index 46fac1f1..31e4803a 100644 --- a/packages/core/lib/events/fakes/FakeListener.ts +++ b/packages/core/lib/events/fakes/FakeListener.ts @@ -1,15 +1,21 @@ +import { setTimeout } from 'node:timers/promises' import type { AnyEventHandler, CommonEventDefinition } from '../eventTypes' export class FakeListener implements AnyEventHandler { + readonly eventHandlerId = this.constructor.name + public receivedEvents: SupportedEvents[number]['publisherSchema']['_output'][] = [] + private readonly delay: number - constructor(_supportedEvents: SupportedEvents) { + constructor(delayMs?: number) { this.receivedEvents = [] + this.delay = delayMs ?? 0 } - handleEvent(event: SupportedEvents[number]['publisherSchema']['_output']): void | Promise { + async handleEvent(event: SupportedEvents[number]['publisherSchema']['_output']): Promise { + if (this.delay > 0) await setTimeout(this.delay) this.receivedEvents.push(event) } } diff --git a/packages/core/package.json b/packages/core/package.json index f18c43ab..964746de 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/core", - "version": "16.0.1", + "version": "17.0.0", "private": false, "license": "MIT", "description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently", @@ -25,8 +25,8 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^12.0.0", - "@message-queue-toolkit/schemas": "^3.0.0", + "@lokalise/node-core": "^12.1.0", + "@message-queue-toolkit/schemas": "^4.0.0", "fast-equals": "^5.0.1", "json-stream-stringify": "^3.1.4", "tmp": "^0.2.3", diff --git a/packages/core/test/fakes/ErroredFakeListener.ts b/packages/core/test/fakes/ErroredFakeListener.ts new file mode 100644 index 00000000..92d1df5b --- /dev/null +++ b/packages/core/test/fakes/ErroredFakeListener.ts @@ -0,0 +1,11 @@ +import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' +import { FakeListener } from '../../lib/events/fakes/FakeListener' + +export class ErroredFakeListener< + SupportedEvents extends CommonEventDefinition[], +> extends FakeListener { + async handleEvent(event: SupportedEvents[number]['publisherSchema']['_output']): Promise { + await super.handleEvent(event) + throw new Error(`${this.constructor.name} error`) + } +} diff --git a/packages/core/test/fakes/FakeTransactionObservabilityManager.ts b/packages/core/test/fakes/FakeTransactionObservabilityManager.ts new file mode 100644 index 00000000..74d1ea03 --- /dev/null +++ b/packages/core/test/fakes/FakeTransactionObservabilityManager.ts @@ -0,0 +1,9 @@ +import type { TransactionObservabilityManager } from '@lokalise/node-core' + +export class FakeTransactionObservabilityManager implements TransactionObservabilityManager { + start() {} + + startWithGroup() {} + + stop() {} +} diff --git a/packages/core/test/testContext.ts b/packages/core/test/testContext.ts index 5d02d3ac..7f37ee62 100644 --- a/packages/core/test/testContext.ts +++ b/packages/core/test/testContext.ts @@ -1,18 +1,19 @@ import type { ErrorReporter } from '@lokalise/node-core' -import type { Resolver } from 'awilix' +import { type Resolver, asClass } from 'awilix' import { Lifetime, asFunction, createContainer } from 'awilix' import { AwilixManager } from 'awilix-manager' import type { Logger } from 'pino' import pino from 'pino' import { z } from 'zod' +import type { TransactionObservabilityManager } from '@lokalise/node-core' +import { enrichMessageSchemaWithBase } from '@message-queue-toolkit/schemas' import { DomainEventEmitter } from '../lib/events/DomainEventEmitter' import { EventRegistry } from '../lib/events/EventRegistry' import type { CommonEventDefinition } from '../lib/events/eventTypes' import type { MetadataFiller } from '../lib/messages/MetadataFiller' import { CommonMetadataFiller } from '../lib/messages/MetadataFiller' -import { enrichMessageSchemaWithBase } from '../lib/messages/baseMessageSchemas' -import type { TransactionObservabilityManager } from '../lib/types/MessageQueueTypes' +import { FakeTransactionObservabilityManager } from './fakes/FakeTransactionObservabilityManager' export const SINGLETON_CONFIG = { lifetime: Lifetime.SINGLETON } @@ -65,16 +66,10 @@ export async function registerDependencies(dependencyOverrides: DependencyOverri return new EventRegistry(Object.values(TestEvents)) }, SINGLETON_CONFIG), - eventEmitter: asFunction((dependencies: Dependencies) => { - return new DomainEventEmitter( - { - metadataFiller: dependencies.metadataFiller, - eventRegistry: dependencies.eventRegistry, - }, - { - handlerSpy: true, - }, - ) + eventEmitter: asFunction((deps: Dependencies) => { + return new DomainEventEmitter(deps, { + handlerSpy: true, + }) }, SINGLETON_CONFIG), metadataFiller: asFunction(() => { return new CommonMetadataFiller({ @@ -83,14 +78,12 @@ export async function registerDependencies(dependencyOverrides: DependencyOverri }, SINGLETON_CONFIG), // vendor-specific dependencies - newRelicBackgroundTransactionManager: asFunction(() => { - return undefined - }, SINGLETON_CONFIG), + transactionObservabilityManager: asClass(FakeTransactionObservabilityManager, SINGLETON_CONFIG), errorReporter: asFunction(() => { return { report: () => {}, } satisfies ErrorReporter - }), + }, SINGLETON_CONFIG), } diContainer.register(diConfig) @@ -111,7 +104,7 @@ export interface Dependencies { awilixManager: AwilixManager // vendor-specific dependencies - newRelicBackgroundTransactionManager: TransactionObservabilityManager + transactionObservabilityManager: TransactionObservabilityManager errorReporter: ErrorReporter eventRegistry: EventRegistry diff --git a/packages/schemas/lib/events/eventTypes.ts b/packages/schemas/lib/events/eventTypes.ts index 43f21b6a..92630b38 100644 --- a/packages/schemas/lib/events/eventTypes.ts +++ b/packages/schemas/lib/events/eventTypes.ts @@ -12,10 +12,14 @@ export function isCommonEventDefinition(entity: unknown): entity is CommonEventD export type CommonEventDefinition = { consumerSchema: ZodObject< - Omit<(typeof CONSUMER_BASE_EVENT_SCHEMA)['shape'], 'payload'> & { payload: ZodTypeAny } + Omit<(typeof CONSUMER_BASE_EVENT_SCHEMA)['shape'], 'payload'> & { + payload: ZodTypeAny + } > publisherSchema: ZodObject< - Omit<(typeof PUBLISHER_BASE_EVENT_SCHEMA)['shape'], 'payload'> & { payload: ZodTypeAny } + Omit<(typeof PUBLISHER_BASE_EVENT_SCHEMA)['shape'], 'payload'> & { + payload: ZodTypeAny + } > schemaVersion?: string @@ -39,6 +43,7 @@ export type EventHandler< EventDefinitionSchema extends CommonEventDefinitionPublisherSchemaType = CommonEventDefinitionPublisherSchemaType, > = { + readonly eventHandlerId: string handleEvent(event: EventDefinitionSchema): void | Promise } diff --git a/packages/schemas/package.json b/packages/schemas/package.json index 9e2a6c1f..9b5b54b7 100644 --- a/packages/schemas/package.json +++ b/packages/schemas/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/schemas", - "version": "3.0.0", + "version": "4.0.0", "private": false, "license": "MIT", "description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently", @@ -30,8 +30,10 @@ "@biomejs/biome": "1.8.3", "@kibertoad/biome-config": "^1.2.1", "@types/node": "^22.0.0", + "@vitest/coverage-v8": "^2.0.4", "del-cli": "^5.1.0", - "typescript": "^5.5.3" + "typescript": "^5.5.3", + "vitest": "^2.0.4" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", "repository": { diff --git a/packages/sns/package.json b/packages/sns/package.json index 38ed386a..46d9d98a 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -25,7 +25,7 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^12.0.0", + "@lokalise/node-core": "^12.1.0", "sqs-consumer": "^11.0.1", "zod": "^3.23.8" }, diff --git a/packages/sqs/package.json b/packages/sqs/package.json index 8fb3b80c..fa9b2b5b 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -25,7 +25,7 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^12.0.0", + "@lokalise/node-core": "^12.1.0", "sqs-consumer": "^11.0.1", "zod": "^3.23.8" },