From 00b8a817cd53758418755da4eea560f5fd6d333d Mon Sep 17 00:00:00 2001 From: CarlosGamero <101278162+CarlosGamero@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:22:00 +0200 Subject: [PATCH] Feat/bg handler gracefully shutdown (#212) * Adding dispose method and awaiting pending promises * Cleaning spy on dispose * Adding test * Promise all on dispose * Cleaning handlers on dispose * Release prepare --- .../lib/events/DomainEventEmitter.spec.ts | 718 +++++++++--------- .../core/lib/events/DomainEventEmitter.ts | 44 +- packages/core/package.json | 2 +- 3 files changed, 406 insertions(+), 358 deletions(-) diff --git a/packages/core/lib/events/DomainEventEmitter.spec.ts b/packages/core/lib/events/DomainEventEmitter.spec.ts index 7ef2e763..9cfb7c0c 100644 --- a/packages/core/lib/events/DomainEventEmitter.spec.ts +++ b/packages/core/lib/events/DomainEventEmitter.spec.ts @@ -59,374 +59,412 @@ describe('AutopilotEventEmitter', () => { }) afterEach(async () => { + await eventEmitter.dispose() await diContainer.dispose() }) - it('emits event to anyListener - foreground', async () => { - const fakeListener = new FakeListener() - eventEmitter.onAny(fakeListener) - const transactionManagerStartSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'startWithGroup', - ) - const transactionManagerStopSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'stop', - ) - - const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) - - expect(fakeListener.receivedEvents).toHaveLength(1) // is processed synchronously so no need to wait - expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) - - const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId( - emittedEvent.id, - 'consumed', - ) - expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) - - expect(transactionManagerStartSpy).toHaveBeenCalledOnce() - expect(transactionManagerStartSpy).toHaveBeenCalledWith( - 'fg_event_listener:entity.created:FakeListener', - expect.any(String), - 'entity.created', - ) - - expect(transactionManagerStopSpy).toHaveBeenCalledOnce() - expect(transactionManagerStopSpy).toHaveBeenCalledWith( - transactionManagerStartSpy.mock.calls[0][1], - true, - ) - }) - - it('emits event to anyListener - background', async () => { - const fakeListener = new FakeListener(100) - eventEmitter.onAny(fakeListener, true) - const transactionManagerStartSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'startWithGroup', - ) - const transactionManagerStopSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'stop', - ) - - const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) - expect(fakeListener.receivedEvents).toHaveLength(0) - - const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id) - expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) - // even thought event is consumed, the listener is still processing - // Wait for the event to be processed - expect(fakeListener.receivedEvents).toHaveLength(1) - expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) - - expect(transactionManagerStartSpy).toHaveBeenCalledOnce() - expect(transactionManagerStartSpy).toHaveBeenCalledWith( - 'bg_event_listener:entity.created:FakeListener', - expect.any(String), - 'entity.created', - ) - - expect(transactionManagerStopSpy).toHaveBeenCalledOnce() - expect(transactionManagerStopSpy).toHaveBeenCalledWith( - transactionManagerStartSpy.mock.calls[0][1], - true, - ) - }) - - it('emits event to anyListener and populates metadata', async () => { - const fakeListener = new FakeListener() - eventEmitter.onAny(fakeListener) - - const emittedEvent = await eventEmitter.emit(TestEvents.created, { - payload: { - message: 'msg', - }, + describe('emit', () => { + it('emits event to anyListener - foreground', async () => { + const fakeListener = new FakeListener() + eventEmitter.onAny(fakeListener) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) + + expect(fakeListener.receivedEvents).toHaveLength(1) // is processed synchronously so no need to wait + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + + const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId( + emittedEvent.id, + 'consumed', + ) + expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) + + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'fg_event_listener:entity.created:FakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + true, + ) }) - const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id) - - expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) - expect(fakeListener.receivedEvents).toHaveLength(1) - expect(fakeListener.receivedEvents[0]).toMatchObject({ - id: expect.any(String), - payload: { - message: 'msg', - }, - timestamp: expect.any(String), - type: 'entity.created', + it('emits event to anyListener - background', async () => { + const fakeListener = new FakeListener(100) + eventEmitter.onAny(fakeListener, true) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) + expect(fakeListener.receivedEvents).toHaveLength(0) + + const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id) + expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) + // even thought event is consumed, the listener is still processing + // Wait for the event to be processed + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'bg_event_listener:entity.created:FakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + true, + ) }) - expect(fakeListener.receivedEvents[0].metadata).toMatchObject({ - correlationId: expect.any(String), - schemaVersion: '1.0.0', - producedBy: 'test', - originatedFrom: 'test', + + it('emits event to anyListener and populates metadata', async () => { + const fakeListener = new FakeListener() + eventEmitter.onAny(fakeListener) + + const emittedEvent = await eventEmitter.emit(TestEvents.created, { + payload: { + message: 'msg', + }, + }) + + const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id) + + expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject({ + id: expect.any(String), + payload: { + message: 'msg', + }, + timestamp: expect.any(String), + type: 'entity.created', + }) + expect(fakeListener.receivedEvents[0].metadata).toMatchObject({ + correlationId: expect.any(String), + schemaVersion: '1.0.0', + producedBy: 'test', + originatedFrom: 'test', + }) }) - }) - it('can check spy for messages not being sent', async () => { - const fakeListener = new FakeListener() - eventEmitter.onAny(fakeListener) + it('can check spy for messages not being sent', async () => { + const fakeListener = new FakeListener() + eventEmitter.onAny(fakeListener) + + await eventEmitter.emit(TestEvents.created, createdEventPayload) - await eventEmitter.emit(TestEvents.created, createdEventPayload) + const notEmittedEvent = eventEmitter.handlerSpy.checkForMessage({ + type: 'entity.updated', + }) + const emittedEvent = eventEmitter.handlerSpy.checkForMessage({ + type: 'entity.created', + }) - const notEmittedEvent = eventEmitter.handlerSpy.checkForMessage({ - type: 'entity.updated', + expect(notEmittedEvent).toBeUndefined() + expect(emittedEvent).toBeDefined() }) - const emittedEvent = eventEmitter.handlerSpy.checkForMessage({ - type: 'entity.created', + + it('emits event to anyListener with metadata', async () => { + const fakeListener = new FakeListener() + eventEmitter.onAny(fakeListener) + + const partialCreatedEventPayload = { + ...createdEventPayload, + metadata: { + ...createdEventPayload.metadata, + producedBy: undefined, + }, + } + await eventEmitter.emit(TestEvents.created, partialCreatedEventPayload, { + correlationId: 'dummy', + }) + + const emitResult = await eventEmitter.handlerSpy.waitForMessage({ + type: 'entity.created', + }) + + expect(emitResult.message).toEqual({ + id: expect.any(String), + metadata: { + correlationId: createdEventPayload.metadata!.correlationId!, + originatedFrom: 'service', + producedBy: undefined, + schemaVersion: '1', + }, + payload: { + message: 'msg', + }, + timestamp: expect.any(String), + type: 'entity.created', + }) + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject({ + id: expect.any(String), + metadata: { + correlationId: expect.any(String), + originatedFrom: 'service', + producedBy: undefined, + schemaVersion: '1', + }, + payload: { + message: 'msg', + }, + timestamp: expect.any(String), + type: 'entity.created', + }) }) - expect(notEmittedEvent).toBeUndefined() - expect(emittedEvent).toBeDefined() - }) + it('emits event to singleListener - foreground', async () => { + const fakeListener = new FakeListener() + eventEmitter.on('entity.created', fakeListener) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + await eventEmitter.emit(TestEvents.created, createdEventPayload) + + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'fg_event_listener:entity.created:FakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + true, + ) + }) - it('emits event to anyListener with metadata', async () => { - const fakeListener = new FakeListener() - eventEmitter.onAny(fakeListener) - - const partialCreatedEventPayload = { - ...createdEventPayload, - metadata: { - ...createdEventPayload.metadata, - producedBy: undefined, - }, - } - await eventEmitter.emit(TestEvents.created, partialCreatedEventPayload, { - correlationId: 'dummy', + it('emits event to singleListener - background', async () => { + const fakeListener = new FakeListener(100) + eventEmitter.on('entity.created', fakeListener, true) + + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) + expect(fakeListener.receivedEvents).toHaveLength(0) + + await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id, 'consumed') + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'bg_event_listener:entity.created:FakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + true, + ) }) - const emitResult = await eventEmitter.handlerSpy.waitForMessage({ - type: 'entity.created', + it('emits event to manyListener - foreground', async () => { + const fakeListener = new FakeListener() + eventEmitter.onMany(['entity.created', 'entity.updated'], fakeListener) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + await eventEmitter.emit(TestEvents.created, createdEventPayload) + await eventEmitter.emit(TestEvents.updated, updatedEventPayload) + + expect(fakeListener.receivedEvents).toHaveLength(2) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + expect(fakeListener.receivedEvents[1]).toMatchObject(expectedUpdatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledTimes(2) + expect(transactionManagerStopSpy).toHaveBeenCalledTimes(2) }) - expect(emitResult.message).toEqual({ - id: expect.any(String), - metadata: { - correlationId: createdEventPayload.metadata!.correlationId!, - originatedFrom: 'service', - producedBy: undefined, - schemaVersion: '1', - }, - payload: { - message: 'msg', - }, - timestamp: expect.any(String), - type: 'entity.created', + it('emits event to manyListener - background', async () => { + const fakeListener = new FakeListener(100) + eventEmitter.onMany(['entity.created', 'entity.updated'], fakeListener, true) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + const eventEmitted1 = await eventEmitter.emit(TestEvents.created, createdEventPayload) + const emittedEvent2 = await eventEmitter.emit(TestEvents.updated, updatedEventPayload) + expect(fakeListener.receivedEvents).toHaveLength(0) + + await eventEmitter.handlerSpy.waitForMessageWithId(eventEmitted1.id, 'consumed') + await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent2.id, 'consumed') + + expect(fakeListener.receivedEvents).toHaveLength(2) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + expect(fakeListener.receivedEvents[1]).toMatchObject(expectedUpdatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledTimes(2) + expect(transactionManagerStopSpy).toHaveBeenCalledTimes(2) }) - expect(fakeListener.receivedEvents).toHaveLength(1) - expect(fakeListener.receivedEvents[0]).toMatchObject({ - id: expect.any(String), - metadata: { - correlationId: expect.any(String), - originatedFrom: 'service', - producedBy: undefined, - schemaVersion: '1', - }, - payload: { - message: 'msg', - }, - timestamp: expect.any(String), - type: 'entity.created', + + it('foreground listener error handling', async () => { + const fakeListener = new ErroredFakeListener() + eventEmitter.onAny(fakeListener) + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + await expect(eventEmitter.emit(TestEvents.created, createdEventPayload)).rejects.toThrow( + 'ErroredFakeListener error', + ) + + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'fg_event_listener:entity.created:ErroredFakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + false, + ) }) - }) - it('emits event to singleListener - foreground', async () => { - const fakeListener = new FakeListener() - eventEmitter.on('entity.created', fakeListener) - const transactionManagerStartSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'startWithGroup', - ) - const transactionManagerStopSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'stop', - ) - - await eventEmitter.emit(TestEvents.created, createdEventPayload) - - expect(fakeListener.receivedEvents).toHaveLength(1) - expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) - - expect(transactionManagerStartSpy).toHaveBeenCalledOnce() - expect(transactionManagerStartSpy).toHaveBeenCalledWith( - 'fg_event_listener:entity.created:FakeListener', - expect.any(String), - 'entity.created', - ) - - expect(transactionManagerStopSpy).toHaveBeenCalledOnce() - expect(transactionManagerStopSpy).toHaveBeenCalledWith( - transactionManagerStartSpy.mock.calls[0][1], - true, - ) + it('background listener error handling', async () => { + const fakeListener = new ErroredFakeListener(100) + eventEmitter.onAny(fakeListener, true) + const reporterSpy = vi.spyOn(diContainer.cradle.errorReporter, 'report') + const logSpy = vi.spyOn(diContainer.cradle.logger, 'error') + const transactionManagerStartSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'startWithGroup', + ) + const transactionManagerStopSpy = vi.spyOn( + diContainer.cradle.transactionObservabilityManager, + 'stop', + ) + + const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) + expect(fakeListener.receivedEvents).toHaveLength(0) + + await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id, 'consumed') + expect(fakeListener.receivedEvents).toHaveLength(1) + expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + + const expectedContext = { + event: JSON.stringify(emittedEvent), + eventHandlerId: 'ErroredFakeListener', + 'x-request-id': emittedEvent.metadata?.correlationId, + } + expect(reporterSpy).toHaveBeenCalledWith({ + error: expect.any(Error), + context: expectedContext, + }) + expect(logSpy).toHaveBeenCalledWith({ + error: expect.anything(), + message: 'ErroredFakeListener error', + ...expectedContext, + }) + + expect(transactionManagerStartSpy).toHaveBeenCalledOnce() + expect(transactionManagerStartSpy).toHaveBeenCalledWith( + 'bg_event_listener:entity.created:ErroredFakeListener', + expect.any(String), + 'entity.created', + ) + + expect(transactionManagerStopSpy).toHaveBeenCalledOnce() + expect(transactionManagerStopSpy).toHaveBeenCalledWith( + transactionManagerStartSpy.mock.calls[0][1], + false, + ) + }) }) - it('emits event to singleListener - background', async () => { - const fakeListener = new FakeListener(100) - eventEmitter.on('entity.created', fakeListener, true) - - const transactionManagerStartSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'startWithGroup', - ) - const transactionManagerStopSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'stop', - ) - - const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) - expect(fakeListener.receivedEvents).toHaveLength(0) - - await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id, 'consumed') - expect(fakeListener.receivedEvents).toHaveLength(1) - expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) - - expect(transactionManagerStartSpy).toHaveBeenCalledOnce() - expect(transactionManagerStartSpy).toHaveBeenCalledWith( - 'bg_event_listener:entity.created:FakeListener', - expect.any(String), - 'entity.created', - ) - - expect(transactionManagerStopSpy).toHaveBeenCalledOnce() - expect(transactionManagerStopSpy).toHaveBeenCalledWith( - transactionManagerStartSpy.mock.calls[0][1], - true, - ) - }) + describe('dispose', () => { + it('nothing in progress and error is thrown', async () => { + await expect(eventEmitter.dispose()).resolves.toBeUndefined() + }) - it('emits event to manyListener - foreground', async () => { - const fakeListener = new FakeListener() - eventEmitter.onMany(['entity.created', 'entity.updated'], fakeListener) - const transactionManagerStartSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'startWithGroup', - ) - const transactionManagerStopSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'stop', - ) - - await eventEmitter.emit(TestEvents.created, createdEventPayload) - await eventEmitter.emit(TestEvents.updated, updatedEventPayload) - - expect(fakeListener.receivedEvents).toHaveLength(2) - expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) - expect(fakeListener.receivedEvents[1]).toMatchObject(expectedUpdatedPayload) - - expect(transactionManagerStartSpy).toHaveBeenCalledTimes(2) - expect(transactionManagerStopSpy).toHaveBeenCalledTimes(2) - }) + it('BG work in progress so dispose should wait for handlers to be done', async () => { + const fakeListener1 = new FakeListener(100) + const fakeListener2 = new FakeListener(200) + eventEmitter.onMany(['entity.created'], fakeListener1, true) + eventEmitter.onMany(['entity.updated'], fakeListener2, true) - it('emits event to manyListener - background', async () => { - const fakeListener = new FakeListener(100) - eventEmitter.onMany(['entity.created', 'entity.updated'], fakeListener, true) - const transactionManagerStartSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'startWithGroup', - ) - const transactionManagerStopSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'stop', - ) - - const eventEmitted1 = await eventEmitter.emit(TestEvents.created, createdEventPayload) - const emittedEvent2 = await eventEmitter.emit(TestEvents.updated, updatedEventPayload) - expect(fakeListener.receivedEvents).toHaveLength(0) - - await eventEmitter.handlerSpy.waitForMessageWithId(eventEmitted1.id, 'consumed') - await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent2.id, 'consumed') - - expect(fakeListener.receivedEvents).toHaveLength(2) - expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) - expect(fakeListener.receivedEvents[1]).toMatchObject(expectedUpdatedPayload) - - expect(transactionManagerStartSpy).toHaveBeenCalledTimes(2) - expect(transactionManagerStopSpy).toHaveBeenCalledTimes(2) - }) + await eventEmitter.emit(TestEvents.created, createdEventPayload) + await eventEmitter.emit(TestEvents.updated, updatedEventPayload) + expect(fakeListener1.receivedEvents).toHaveLength(0) + expect(fakeListener2.receivedEvents).toHaveLength(0) - it('foreground listener error handling', async () => { - const fakeListener = new ErroredFakeListener() - eventEmitter.onAny(fakeListener) - const transactionManagerStartSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'startWithGroup', - ) - const transactionManagerStopSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'stop', - ) - - await expect(eventEmitter.emit(TestEvents.created, createdEventPayload)).rejects.toThrow( - 'ErroredFakeListener error', - ) - - expect(fakeListener.receivedEvents).toHaveLength(1) - expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) - - expect(transactionManagerStartSpy).toHaveBeenCalledOnce() - expect(transactionManagerStartSpy).toHaveBeenCalledWith( - 'fg_event_listener:entity.created:ErroredFakeListener', - expect.any(String), - 'entity.created', - ) - - expect(transactionManagerStopSpy).toHaveBeenCalledOnce() - expect(transactionManagerStopSpy).toHaveBeenCalledWith( - transactionManagerStartSpy.mock.calls[0][1], - false, - ) - }) + await eventEmitter.dispose() - it('background listener error handling', async () => { - const fakeListener = new ErroredFakeListener(100) - eventEmitter.onAny(fakeListener, true) - const reporterSpy = vi.spyOn(diContainer.cradle.errorReporter, 'report') - const logSpy = vi.spyOn(diContainer.cradle.logger, 'error') - const transactionManagerStartSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'startWithGroup', - ) - const transactionManagerStopSpy = vi.spyOn( - diContainer.cradle.transactionObservabilityManager, - 'stop', - ) - - const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) - expect(fakeListener.receivedEvents).toHaveLength(0) - - await eventEmitter.handlerSpy.waitForMessageWithId(emittedEvent.id, 'consumed') - expect(fakeListener.receivedEvents).toHaveLength(1) - expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) - - const expectedContext = { - event: JSON.stringify(emittedEvent), - eventHandlerId: 'ErroredFakeListener', - 'x-request-id': emittedEvent.metadata?.correlationId, - } - expect(reporterSpy).toHaveBeenCalledWith({ - error: expect.any(Error), - context: expectedContext, - }) - expect(logSpy).toHaveBeenCalledWith({ - error: expect.anything(), - message: 'ErroredFakeListener error', - ...expectedContext, + expect(fakeListener1.receivedEvents).toHaveLength(1) + expect(fakeListener2.receivedEvents).toHaveLength(1) + expect(fakeListener1.receivedEvents[0]).toMatchObject(expectedCreatedPayload) + expect(fakeListener2.receivedEvents[0]).toMatchObject(expectedUpdatedPayload) }) - expect(transactionManagerStartSpy).toHaveBeenCalledOnce() - expect(transactionManagerStartSpy).toHaveBeenCalledWith( - 'bg_event_listener:entity.created:ErroredFakeListener', - expect.any(String), - 'entity.created', - ) - - expect(transactionManagerStopSpy).toHaveBeenCalledOnce() - expect(transactionManagerStopSpy).toHaveBeenCalledWith( - transactionManagerStartSpy.mock.calls[0][1], - false, - ) + it('after dispose handlers are not called', async () => { + const fakeListener = new FakeListener() + eventEmitter.onAny(fakeListener) + + await eventEmitter.dispose() + + await eventEmitter.emit(TestEvents.created, createdEventPayload) + expect(fakeListener.receivedEvents).toHaveLength(0) + }) }) }) diff --git a/packages/core/lib/events/DomainEventEmitter.ts b/packages/core/lib/events/DomainEventEmitter.ts index c7ccf95d..f177c668 100644 --- a/packages/core/lib/events/DomainEventEmitter.ts +++ b/packages/core/lib/events/DomainEventEmitter.ts @@ -46,10 +46,12 @@ export class DomainEventEmitter CommonEventDefinitionConsumerSchemaType > - private readonly eventHandlerMap: Record< + private readonly eventHandlerMap: Map< string, Handlers>> > + private readonly inProgressBackgroundHandlerByEventId: Map> + constructor( deps: DomainEventEmitterDependencies, options: { @@ -65,7 +67,8 @@ export class DomainEventEmitter this._handlerSpy = resolveHandlerSpy>(options) - this.eventHandlerMap = {} + this.eventHandlerMap = new Map() + this.inProgressBackgroundHandlerByEventId = new Map() } get handlerSpy(): PublicHandlerSpy< @@ -79,6 +82,13 @@ export class DomainEventEmitter return this._handlerSpy } + public async dispose(): Promise { + await Promise.all(this.inProgressBackgroundHandlerByEventId.values()) + this.inProgressBackgroundHandlerByEventId.clear() + this.eventHandlerMap.clear() + this._handlerSpy?.clear() + } + public async emit( supportedEvent: SupportedEvent, data: Omit, 'type'>, @@ -108,6 +118,7 @@ export class DomainEventEmitter .getEventDefinitionByTypeName(eventTypeName) .publisherSchema.parse({ type: eventTypeName, ...data }) + // @ts-ignore await this.handleEvent(validatedEvent) // @ts-ignore @@ -122,12 +133,12 @@ export class DomainEventEmitter handler: SingleEventHandler, isBackgroundHandler = false, ) { - if (!this.eventHandlerMap[eventTypeName]) { - this.eventHandlerMap[eventTypeName] = { foreground: [], background: [] } + if (!this.eventHandlerMap.has(eventTypeName)) { + this.eventHandlerMap.set(eventTypeName, { foreground: [], background: [] }) } - if (isBackgroundHandler) this.eventHandlerMap[eventTypeName].background.push(handler) - else this.eventHandlerMap[eventTypeName].foreground.push(handler) + if (isBackgroundHandler) this.eventHandlerMap.get(eventTypeName)?.background.push(handler) + else this.eventHandlerMap.get(eventTypeName)?.foreground.push(handler) } /** @@ -151,21 +162,19 @@ export class DomainEventEmitter } private async handleEvent( - event: CommonEventDefinitionPublisherSchemaType, + event: CommonEventDefinitionConsumerSchemaType, ): Promise { - const eventHandlers = this.eventHandlerMap[event.type] ?? { - foreground: [], - background: [], - } + const eventHandlers = this.eventHandlerMap.get(event.type) + if (!eventHandlers) return for (const handler of eventHandlers.foreground) { await this.executeEventHandler(event, handler, false) } - const bgPromises = eventHandlers.background.map((handler) => - this.executeEventHandler(event, handler, true), - ) - Promise.all(bgPromises).then(() => { + const bgPromise = Promise.all( + eventHandlers.background.map((handler) => this.executeEventHandler(event, handler, true)), + ).then(() => { + this.inProgressBackgroundHandlerByEventId.delete(event.id) if (!this._handlerSpy) return this._handlerSpy.addProcessedMessage( { @@ -176,11 +185,12 @@ export class DomainEventEmitter event.id, ) }) + this.inProgressBackgroundHandlerByEventId.set(event.id, bgPromise) } private async executeEventHandler( - event: CommonEventDefinitionPublisherSchemaType, - handler: EventHandler>, + event: CommonEventDefinitionConsumerSchemaType, + handler: EventHandler>, isBackgroundHandler: boolean, ) { const transactionId = randomUUID() diff --git a/packages/core/package.json b/packages/core/package.json index 1fed1855..fb3ebf0a 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/core", - "version": "17.1.0", + "version": "17.2.0", "private": false, "license": "MIT", "description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently",