diff --git a/packages/core/src/event-bus/event-bus.spec.ts b/packages/core/src/event-bus/event-bus.spec.ts index f749077e36..3bc4f04426 100644 --- a/packages/core/src/event-bus/event-bus.spec.ts +++ b/packages/core/src/event-bus/event-bus.spec.ts @@ -1,3 +1,4 @@ +import { firstValueFrom, Subject } from 'rxjs'; import { QueryRunner } from 'typeorm'; import { beforeEach, describe, expect, it, vi } from 'vitest'; @@ -20,7 +21,7 @@ describe('EventBus', () => { it('can publish without subscribers', () => { const event = new TestEvent('foo'); - expect(() => eventBus.publish(event)).not.toThrow(); + expect(async () => await eventBus.publish(event)).not.toThrow(); }); describe('ofType()', () => { @@ -29,7 +30,7 @@ describe('EventBus', () => { const event = new TestEvent('foo'); eventBus.ofType(TestEvent).subscribe(handler); - eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler).toHaveBeenCalledTimes(1); @@ -43,9 +44,9 @@ describe('EventBus', () => { const event3 = new TestEvent('baz'); eventBus.ofType(TestEvent).subscribe(handler); - eventBus.publish(event1); - eventBus.publish(event2); - eventBus.publish(event3); + await eventBus.publish(event1); + await eventBus.publish(event2); + await eventBus.publish(event3); await new Promise(resolve => setImmediate(resolve)); expect(handler).toHaveBeenCalledTimes(3); @@ -63,7 +64,7 @@ describe('EventBus', () => { eventBus.ofType(TestEvent).subscribe(handler2); eventBus.ofType(TestEvent).subscribe(handler3); - eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler1).toHaveBeenCalledWith(event); @@ -76,7 +77,7 @@ describe('EventBus', () => { const event = new OtherTestEvent('foo'); eventBus.ofType(TestEvent).subscribe(handler); - eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler).not.toHaveBeenCalled(); @@ -87,15 +88,15 @@ describe('EventBus', () => { const event = new TestEvent('foo'); const subscription = eventBus.ofType(TestEvent).subscribe(handler); - eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler).toHaveBeenCalledTimes(1); subscription.unsubscribe(); - eventBus.publish(event); - eventBus.publish(event); + await eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler).toHaveBeenCalledTimes(1); @@ -108,7 +109,7 @@ describe('EventBus', () => { const subscription1 = eventBus.ofType(TestEvent).subscribe(handler1); const subscription2 = eventBus.ofType(TestEvent).subscribe(handler2); - eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler1).toHaveBeenCalledTimes(1); @@ -116,8 +117,8 @@ describe('EventBus', () => { subscription1.unsubscribe(); - eventBus.publish(event); - eventBus.publish(event); + await eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler1).toHaveBeenCalledTimes(1); @@ -131,7 +132,7 @@ describe('EventBus', () => { const event = new TestEvent('foo'); eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler); - eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler).toHaveBeenCalledTimes(1); @@ -145,9 +146,9 @@ describe('EventBus', () => { const event3 = new TestEvent('baz'); eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler); - eventBus.publish(event1); - eventBus.publish(event2); - eventBus.publish(event3); + await eventBus.publish(event1); + await eventBus.publish(event2); + await eventBus.publish(event3); await new Promise(resolve => setImmediate(resolve)); expect(handler).toHaveBeenCalledTimes(3); @@ -165,7 +166,7 @@ describe('EventBus', () => { eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler2); eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler3); - eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler1).toHaveBeenCalledWith(event); @@ -178,7 +179,7 @@ describe('EventBus', () => { const event = new OtherTestEvent('foo'); eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler); - eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler).not.toHaveBeenCalled(); @@ -189,7 +190,7 @@ describe('EventBus', () => { const event = new ChildTestEvent('bar', 'foo'); eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler); - eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler).toHaveBeenCalled(); @@ -202,15 +203,15 @@ describe('EventBus', () => { .filter(vendureEvent => vendureEvent instanceof TestEvent) .subscribe(handler); - eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler).toHaveBeenCalledTimes(1); subscription.unsubscribe(); - eventBus.publish(event); - eventBus.publish(event); + await eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler).toHaveBeenCalledTimes(1); @@ -227,7 +228,7 @@ describe('EventBus', () => { .filter(vendureEvent => vendureEvent instanceof TestEvent) .subscribe(handler2); - eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler1).toHaveBeenCalledTimes(1); @@ -235,14 +236,214 @@ describe('EventBus', () => { subscription1.unsubscribe(); - eventBus.publish(event); - eventBus.publish(event); + await eventBus.publish(event); + await eventBus.publish(event); await new Promise(resolve => setImmediate(resolve)); expect(handler1).toHaveBeenCalledTimes(1); expect(handler2).toHaveBeenCalledTimes(3); }); }); + + describe('blocking event handlers', () => { + it('calls the handler function', async () => { + const event = new TestEvent('foo'); + const spy = vi.fn((e: VendureEvent) => undefined); + eventBus.registerBlockingEventHandler({ + handler: e => spy(e), + id: 'test-handler', + event: TestEvent, + }); + + await eventBus.publish(event); + + expect(spy).toHaveBeenCalledTimes(1); + expect(spy).toHaveBeenCalledWith(event); + }); + + it('throws when attempting to register with a duplicate id', () => { + eventBus.registerBlockingEventHandler({ + handler: e => undefined, + id: 'test-handler', + event: TestEvent, + }); + expect(() => { + eventBus.registerBlockingEventHandler({ + handler: e => undefined, + id: 'test-handler', + event: TestEvent, + }); + }).toThrowError( + 'A handler with the id "test-handler" is already registered for the event TestEvent', + ); + }); + + it('calls multiple handler functions', async () => { + const event = new TestEvent('foo'); + const spy1 = vi.fn((e: VendureEvent) => undefined); + const spy2 = vi.fn((e: VendureEvent) => undefined); + eventBus.registerBlockingEventHandler({ + handler: e => spy1(e), + id: 'test-handler1', + event: TestEvent, + }); + eventBus.registerBlockingEventHandler({ + handler: e => spy2(e), + id: 'test-handler2', + event: TestEvent, + }); + + await eventBus.publish(event); + + expect(spy1).toHaveBeenCalledTimes(1); + expect(spy1).toHaveBeenCalledWith(event); + expect(spy2).toHaveBeenCalledTimes(1); + expect(spy2).toHaveBeenCalledWith(event); + }); + + it('handles multiple events', async () => { + const event1 = new TestEvent('foo'); + const event2 = new OtherTestEvent('bar'); + const spy = vi.fn((e: VendureEvent) => undefined); + eventBus.registerBlockingEventHandler({ + handler: e => spy(e), + id: 'test-handler', + event: [TestEvent, OtherTestEvent], + }); + + await eventBus.publish(event1); + expect(spy).toHaveBeenCalledTimes(1); + expect(spy).toHaveBeenCalledWith(event1); + + await eventBus.publish(event2); + expect(spy).toHaveBeenCalledTimes(2); + expect(spy).toHaveBeenCalledWith(event2); + }); + + it('publish method throws in a handler throws', async () => { + const event = new TestEvent('foo'); + eventBus.registerBlockingEventHandler({ + handler: () => { + throw new Error('test error'); + }, + id: 'test-handler', + event: TestEvent, + }); + + await expect(eventBus.publish(event)).rejects.toThrow('test error'); + }); + + it('order of execution with "before" property', async () => { + const event = new TestEvent('foo'); + const spy = vi.fn((input: string) => undefined); + eventBus.registerBlockingEventHandler({ + handler: e => spy('test-handler1'), + id: 'test-handler1', + event: TestEvent, + }); + eventBus.registerBlockingEventHandler({ + handler: e => spy('test-handler2'), + id: 'test-handler2', + event: TestEvent, + before: 'test-handler1', + }); + + await eventBus.publish(event); + + expect(spy).toHaveBeenCalledTimes(2); + expect(spy).toHaveBeenNthCalledWith(1, 'test-handler2'); + expect(spy).toHaveBeenNthCalledWith(2, 'test-handler1'); + }); + + it('order of execution with "after" property', async () => { + const event = new TestEvent('foo'); + const spy = vi.fn((input: string) => undefined); + eventBus.registerBlockingEventHandler({ + handler: e => spy('test-handler1'), + id: 'test-handler1', + event: TestEvent, + after: 'test-handler2', + }); + eventBus.registerBlockingEventHandler({ + handler: e => spy('test-handler2'), + id: 'test-handler2', + event: TestEvent, + }); + + await eventBus.publish(event); + + expect(spy).toHaveBeenCalledTimes(2); + expect(spy).toHaveBeenNthCalledWith(1, 'test-handler2'); + expect(spy).toHaveBeenNthCalledWith(2, 'test-handler1'); + }); + + it('throws if there is a cycle in before ordering', () => { + const spy = vi.fn((input: string) => undefined); + eventBus.registerBlockingEventHandler({ + handler: e => spy('test-handler1'), + id: 'test-handler1', + event: TestEvent, + before: 'test-handler2', + }); + + expect(() => + eventBus.registerBlockingEventHandler({ + handler: e => spy('test-handler2'), + id: 'test-handler2', + event: TestEvent, + before: 'test-handler1', + }), + ).toThrowError( + 'Circular dependency detected between event handlers test-handler1 and test-handler2', + ); + }); + + it('throws if there is a cycle in after ordering', () => { + const spy = vi.fn((input: string) => undefined); + eventBus.registerBlockingEventHandler({ + handler: e => spy('test-handler1'), + id: 'test-handler1', + event: TestEvent, + after: 'test-handler2', + }); + + expect(() => + eventBus.registerBlockingEventHandler({ + handler: e => spy('test-handler2'), + id: 'test-handler2', + event: TestEvent, + after: 'test-handler1', + }), + ).toThrowError( + 'Circular dependency detected between event handlers test-handler1 and test-handler2', + ); + }); + + it('blocks execution of the publish method', async () => { + const event = new TestEvent('foo'); + const subject = new Subject(); + eventBus.registerBlockingEventHandler({ + handler: e => firstValueFrom(subject.asObservable()), + id: 'test-handler', + event: TestEvent, + }); + const publishPromise = eventBus.publish(event); + expect(publishPromise).toBeInstanceOf(Promise); + + let resolved = false; + void publishPromise.then(() => (resolved = true)); + + expect(resolved).toBe(false); + await new Promise(resolve => setTimeout(resolve, 50)); + expect(resolved).toBe(false); + // Handler only resolves after the subject emits + subject.next(); + // Allow the event loop to tick + await new Promise(resolve => setTimeout(resolve, 0)); + // Now the promise should be resolved + expect(resolved).toBe(true); + }); + }); }); class TestEvent extends VendureEvent { @@ -252,7 +453,10 @@ class TestEvent extends VendureEvent { } class ChildTestEvent extends TestEvent { - constructor(public childPayload: string, payload: string) { + constructor( + public childPayload: string, + payload: string, + ) { super(payload); } } diff --git a/packages/core/src/event-bus/event-bus.ts b/packages/core/src/event-bus/event-bus.ts index 7ef726c98c..a421225632 100644 --- a/packages/core/src/event-bus/event-bus.ts +++ b/packages/core/src/event-bus/event-bus.ts @@ -7,10 +7,50 @@ import { EntityManager } from 'typeorm'; import { RequestContext } from '../api/common/request-context'; import { TRANSACTION_MANAGER_KEY } from '../common/constants'; +import { Logger } from '../config/logger/vendure-logger'; import { TransactionSubscriber, TransactionSubscriberError } from '../connection/transaction-subscriber'; import { VendureEvent } from './vendure-event'; +/** + * @description + * Options for registering a blocking event handler. + * + * @since 2.2.0 + * @docsCategory events + */ +export type BlockingEventHandlerOptions = { + /** + * @description + * The event type to which the handler should listen. + * Can be a single event type or an array of event types. + */ + event: Type | Array>; + /** + * @description + * The handler function which will be executed when the event is published. + * If the handler returns a Promise, the event publishing code will wait for the Promise to resolve + * before continuing. Any errors thrown by the handler will cause the event publishing code to fail. + */ + handler: (event: T) => void | Promise; + /** + * @description + * A unique identifier for the handler. This can then be used to specify the order in which + * handlers should be executed using the `before` and `after` options in other handlers. + */ + id: string; + /** + * @description + * The ID of another handler which this handler should execute before. + */ + before?: string; + /** + * @description + * The ID of another handler which this handler should execute after. + */ + after?: string; +}; + /** * @description * The EventBus is used to globally publish events which can then be subscribed to. @@ -58,6 +98,7 @@ import { VendureEvent } from './vendure-event'; export class EventBus implements OnModuleDestroy { private eventStream = new Subject(); private destroy$ = new Subject(); + private blockingEventHandlers = new Map, Array>>(); constructor(private transactionSubscriber: TransactionSubscriber) {} @@ -65,8 +106,9 @@ export class EventBus implements OnModuleDestroy { * @description * Publish an event which any subscribers can react to. */ - publish(event: T): void { + async publish(event: T): Promise { this.eventStream.next(event); + await this.executeBlockingEventHandlers(event); } /** @@ -105,11 +147,136 @@ export class EventBus implements OnModuleDestroy { ) as Observable; } + /** + * @description + * Register an event handler function which will be executed when an event of the given type is published, + * and will block execution of the code which published the event until the handler has completed. + * + * This is useful when you need assurance that the event handler has successfully completed, and you want + * the triggering code to fail if the handler fails. + * + * This API should be used with caution, as errors or performance issues in the handler can cause the + * associated operation to be slow or fail entirely. For this reason, any handler which takes longer than + * 100ms to execute will log a warning. Any non-trivial task to be performed in a blocking event handler + * should be offloaded to a background job using the {@link JobQueueService}. + * + * @example + * ```ts + * eventBus.registerBlockingEventHandler({ + * event: OrderStateTransitionEvent, + * id: 'my-order-state-transition-handler', + * handler: async (event) => { + * // perform some synchronous task + * } + * }); + * ``` + * + * @since 2.2.0 + */ + registerBlockingEventHandler(handlerOptions: BlockingEventHandlerOptions) { + const events = Array.isArray(handlerOptions.event) ? handlerOptions.event : [handlerOptions.event]; + + for (const event of events) { + let handlers = this.blockingEventHandlers.get(event); + const handlerWithIdAlreadyExists = handlers?.some(h => h.id === handlerOptions.id); + if (handlerWithIdAlreadyExists) { + throw new Error( + `A handler with the id "${handlerOptions.id}" is already registered for the event ${event.name}`, + ); + } + + if (handlers) { + handlers.push(handlerOptions); + } else { + handlers = [handlerOptions]; + } + const orderedHandlers = this.orderEventHandlers(handlers); + this.blockingEventHandlers.set(event, orderedHandlers); + } + } + /** @internal */ onModuleDestroy(): any { this.destroy$.next(); } + private async executeBlockingEventHandlers(event: T): Promise { + const blockingHandlers = this.blockingEventHandlers.get(event.constructor as Type); + for (const options of blockingHandlers || []) { + const timeStart = new Date().getTime(); + await options.handler(event); + const timeEnd = new Date().getTime(); + const timeTaken = timeEnd - timeStart; + Logger.debug(`Blocking event handler ${options.id} took ${timeTaken}ms`); + if (timeTaken > 100) { + Logger.warn( + [ + `Blocking event handler ${options.id} took ${timeTaken}ms`, + `Consider optimizing the handler by moving the logic to a background job or using a more efficient algorithm.`, + ].join('\n'), + ); + } + } + } + + private orderEventHandlers( + handlers: Array>, + ): Array> { + let orderedHandlers: Array> = []; + const handlerMap: Map> = new Map(); + + // Create a map of handlers by ID for efficient lookup + for (const handler of handlers) { + handlerMap.set(handler.id, handler); + } + + // Helper function to recursively add handlers in correct order + const addHandler = (handler: BlockingEventHandlerOptions) => { + // If the handler is already in the ordered list, skip it + if (orderedHandlers.includes(handler)) { + return; + } + + // If an "after" handler is specified, add it recursively + if (handler.after) { + const afterHandler = handlerMap.get(handler.after); + if (afterHandler) { + if (afterHandler.after === handler.id) { + throw new Error( + `Circular dependency detected between event handlers ${handler.id} and ${afterHandler.id}`, + ); + } + orderedHandlers = orderedHandlers.filter(h => h.id !== afterHandler.id); + addHandler(afterHandler); + } + } + + // Add the current handler + orderedHandlers.push(handler); + + // If a "before" handler is specified, add it recursively + if (handler.before) { + const beforeHandler = handlerMap.get(handler.before); + if (beforeHandler) { + if (beforeHandler.before === handler.id) { + throw new Error( + `Circular dependency detected between event handlers ${handler.id} and ${beforeHandler.id}`, + ); + } + orderedHandlers = orderedHandlers.filter(h => h.id !== beforeHandler.id); + addHandler(beforeHandler); + } + } + }; + + // Start adding handlers from the original list + for (const handler of handlers) { + addHandler(handler); + } + + return orderedHandlers; + } + /** * If the Event includes a RequestContext property, we need to check for any active transaction * associated with it, and if there is one, we await that transaction to either commit or rollback