Skip to content

Commit

Permalink
feat(core): Implement new blocking event handler API
Browse files Browse the repository at this point in the history
Relates to #2735
  • Loading branch information
michaelbromley committed Mar 19, 2024
1 parent c69e4ac commit 1c69499
Show file tree
Hide file tree
Showing 2 changed files with 399 additions and 28 deletions.
258 changes: 231 additions & 27 deletions packages/core/src/event-bus/event-bus.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { firstValueFrom, Subject } from 'rxjs';
import { QueryRunner } from 'typeorm';
import { beforeEach, describe, expect, it, vi } from 'vitest';

Expand All @@ -20,7 +21,7 @@ describe('EventBus', () => {
it('can publish without subscribers', () => {
const event = new TestEvent('foo');

expect(() => eventBus.publish(event)).not.toThrow();
expect(async () => await eventBus.publish(event)).not.toThrow();
});

describe('ofType()', () => {
Expand All @@ -29,7 +30,7 @@ describe('EventBus', () => {
const event = new TestEvent('foo');
eventBus.ofType(TestEvent).subscribe(handler);

eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler).toHaveBeenCalledTimes(1);
Expand All @@ -43,9 +44,9 @@ describe('EventBus', () => {
const event3 = new TestEvent('baz');
eventBus.ofType(TestEvent).subscribe(handler);

eventBus.publish(event1);
eventBus.publish(event2);
eventBus.publish(event3);
await eventBus.publish(event1);
await eventBus.publish(event2);
await eventBus.publish(event3);
await new Promise(resolve => setImmediate(resolve));

expect(handler).toHaveBeenCalledTimes(3);
Expand All @@ -63,7 +64,7 @@ describe('EventBus', () => {
eventBus.ofType(TestEvent).subscribe(handler2);
eventBus.ofType(TestEvent).subscribe(handler3);

eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler1).toHaveBeenCalledWith(event);
Expand All @@ -76,7 +77,7 @@ describe('EventBus', () => {
const event = new OtherTestEvent('foo');
eventBus.ofType(TestEvent).subscribe(handler);

eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler).not.toHaveBeenCalled();
Expand All @@ -87,15 +88,15 @@ describe('EventBus', () => {
const event = new TestEvent('foo');
const subscription = eventBus.ofType(TestEvent).subscribe(handler);

eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler).toHaveBeenCalledTimes(1);

subscription.unsubscribe();

eventBus.publish(event);
eventBus.publish(event);
await eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler).toHaveBeenCalledTimes(1);
Expand All @@ -108,16 +109,16 @@ describe('EventBus', () => {
const subscription1 = eventBus.ofType(TestEvent).subscribe(handler1);
const subscription2 = eventBus.ofType(TestEvent).subscribe(handler2);

eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler1).toHaveBeenCalledTimes(1);
expect(handler2).toHaveBeenCalledTimes(1);

subscription1.unsubscribe();

eventBus.publish(event);
eventBus.publish(event);
await eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler1).toHaveBeenCalledTimes(1);
Expand All @@ -131,7 +132,7 @@ describe('EventBus', () => {
const event = new TestEvent('foo');
eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler);

eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler).toHaveBeenCalledTimes(1);
Expand All @@ -145,9 +146,9 @@ describe('EventBus', () => {
const event3 = new TestEvent('baz');
eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler);

eventBus.publish(event1);
eventBus.publish(event2);
eventBus.publish(event3);
await eventBus.publish(event1);
await eventBus.publish(event2);
await eventBus.publish(event3);
await new Promise(resolve => setImmediate(resolve));

expect(handler).toHaveBeenCalledTimes(3);
Expand All @@ -165,7 +166,7 @@ describe('EventBus', () => {
eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler2);
eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler3);

eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler1).toHaveBeenCalledWith(event);
Expand All @@ -178,7 +179,7 @@ describe('EventBus', () => {
const event = new OtherTestEvent('foo');
eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler);

eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler).not.toHaveBeenCalled();
Expand All @@ -189,7 +190,7 @@ describe('EventBus', () => {
const event = new ChildTestEvent('bar', 'foo');
eventBus.filter(vendureEvent => vendureEvent instanceof TestEvent).subscribe(handler);

eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler).toHaveBeenCalled();
Expand All @@ -202,15 +203,15 @@ describe('EventBus', () => {
.filter(vendureEvent => vendureEvent instanceof TestEvent)
.subscribe(handler);

eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler).toHaveBeenCalledTimes(1);

subscription.unsubscribe();

eventBus.publish(event);
eventBus.publish(event);
await eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler).toHaveBeenCalledTimes(1);
Expand All @@ -227,22 +228,222 @@ describe('EventBus', () => {
.filter(vendureEvent => vendureEvent instanceof TestEvent)
.subscribe(handler2);

eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler1).toHaveBeenCalledTimes(1);
expect(handler2).toHaveBeenCalledTimes(1);

subscription1.unsubscribe();

eventBus.publish(event);
eventBus.publish(event);
await eventBus.publish(event);
await eventBus.publish(event);
await new Promise(resolve => setImmediate(resolve));

expect(handler1).toHaveBeenCalledTimes(1);
expect(handler2).toHaveBeenCalledTimes(3);
});
});

describe('blocking event handlers', () => {
it('calls the handler function', async () => {
const event = new TestEvent('foo');
const spy = vi.fn((e: VendureEvent) => undefined);
eventBus.registerBlockingEventHandler({
handler: e => spy(e),
id: 'test-handler',
event: TestEvent,
});

await eventBus.publish(event);

expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith(event);
});

it('throws when attempting to register with a duplicate id', () => {
eventBus.registerBlockingEventHandler({
handler: e => undefined,
id: 'test-handler',
event: TestEvent,
});
expect(() => {
eventBus.registerBlockingEventHandler({
handler: e => undefined,
id: 'test-handler',
event: TestEvent,
});
}).toThrowError(
'A handler with the id "test-handler" is already registered for the event TestEvent',
);
});

it('calls multiple handler functions', async () => {
const event = new TestEvent('foo');
const spy1 = vi.fn((e: VendureEvent) => undefined);
const spy2 = vi.fn((e: VendureEvent) => undefined);
eventBus.registerBlockingEventHandler({
handler: e => spy1(e),
id: 'test-handler1',
event: TestEvent,
});
eventBus.registerBlockingEventHandler({
handler: e => spy2(e),
id: 'test-handler2',
event: TestEvent,
});

await eventBus.publish(event);

expect(spy1).toHaveBeenCalledTimes(1);
expect(spy1).toHaveBeenCalledWith(event);
expect(spy2).toHaveBeenCalledTimes(1);
expect(spy2).toHaveBeenCalledWith(event);
});

it('handles multiple events', async () => {
const event1 = new TestEvent('foo');
const event2 = new OtherTestEvent('bar');
const spy = vi.fn((e: VendureEvent) => undefined);
eventBus.registerBlockingEventHandler({
handler: e => spy(e),
id: 'test-handler',
event: [TestEvent, OtherTestEvent],
});

await eventBus.publish(event1);
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith(event1);

await eventBus.publish(event2);
expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenCalledWith(event2);
});

it('publish method throws in a handler throws', async () => {
const event = new TestEvent('foo');
eventBus.registerBlockingEventHandler({
handler: () => {
throw new Error('test error');
},
id: 'test-handler',
event: TestEvent,
});

await expect(eventBus.publish(event)).rejects.toThrow('test error');
});

it('order of execution with "before" property', async () => {
const event = new TestEvent('foo');
const spy = vi.fn((input: string) => undefined);
eventBus.registerBlockingEventHandler({
handler: e => spy('test-handler1'),
id: 'test-handler1',
event: TestEvent,
});
eventBus.registerBlockingEventHandler({
handler: e => spy('test-handler2'),
id: 'test-handler2',
event: TestEvent,
before: 'test-handler1',
});

await eventBus.publish(event);

expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenNthCalledWith(1, 'test-handler2');
expect(spy).toHaveBeenNthCalledWith(2, 'test-handler1');
});

it('order of execution with "after" property', async () => {
const event = new TestEvent('foo');
const spy = vi.fn((input: string) => undefined);
eventBus.registerBlockingEventHandler({
handler: e => spy('test-handler1'),
id: 'test-handler1',
event: TestEvent,
after: 'test-handler2',
});
eventBus.registerBlockingEventHandler({
handler: e => spy('test-handler2'),
id: 'test-handler2',
event: TestEvent,
});

await eventBus.publish(event);

expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenNthCalledWith(1, 'test-handler2');
expect(spy).toHaveBeenNthCalledWith(2, 'test-handler1');
});

it('throws if there is a cycle in before ordering', () => {
const spy = vi.fn((input: string) => undefined);
eventBus.registerBlockingEventHandler({
handler: e => spy('test-handler1'),
id: 'test-handler1',
event: TestEvent,
before: 'test-handler2',
});

expect(() =>
eventBus.registerBlockingEventHandler({
handler: e => spy('test-handler2'),
id: 'test-handler2',
event: TestEvent,
before: 'test-handler1',
}),
).toThrowError(
'Circular dependency detected between event handlers test-handler1 and test-handler2',
);
});

it('throws if there is a cycle in after ordering', () => {
const spy = vi.fn((input: string) => undefined);
eventBus.registerBlockingEventHandler({
handler: e => spy('test-handler1'),
id: 'test-handler1',
event: TestEvent,
after: 'test-handler2',
});

expect(() =>
eventBus.registerBlockingEventHandler({
handler: e => spy('test-handler2'),
id: 'test-handler2',
event: TestEvent,
after: 'test-handler1',
}),
).toThrowError(
'Circular dependency detected between event handlers test-handler1 and test-handler2',
);
});

it('blocks execution of the publish method', async () => {
const event = new TestEvent('foo');
const subject = new Subject<void>();
eventBus.registerBlockingEventHandler({
handler: e => firstValueFrom(subject.asObservable()),
id: 'test-handler',
event: TestEvent,
});
const publishPromise = eventBus.publish(event);
expect(publishPromise).toBeInstanceOf(Promise);

let resolved = false;
void publishPromise.then(() => (resolved = true));

expect(resolved).toBe(false);
await new Promise(resolve => setTimeout(resolve, 50));
expect(resolved).toBe(false);
// Handler only resolves after the subject emits
subject.next();
// Allow the event loop to tick
await new Promise(resolve => setTimeout(resolve, 0));
// Now the promise should be resolved
expect(resolved).toBe(true);
});
});
});

class TestEvent extends VendureEvent {
Expand All @@ -252,7 +453,10 @@ class TestEvent extends VendureEvent {
}

class ChildTestEvent extends TestEvent {
constructor(public childPayload: string, payload: string) {
constructor(
public childPayload: string,
payload: string,
) {
super(payload);
}
}
Expand Down
Loading

0 comments on commit 1c69499

Please sign in to comment.