Skip to content

Commit

Permalink
📝 update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
drieshooghe committed Oct 1, 2024
1 parent c2fe706 commit 5eeb986
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 27 deletions.
34 changes: 26 additions & 8 deletions docs/pages/advanced/event_pubsub.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,34 @@
title: Event Pub/Sub
---

# Event Publishers and Listeners
# Event Publishers and Subscribers

import { Callout } from 'nextra/components'

{/*
TODO:
- describe event publishers
- describe event listeners
*/}

<Callout type="warning" emoji="🚧">
This page is under construction.
</Callout>
</Callout>

## Event publishers
Whenever the EventStore appends events, the produced EventEnvelopes get published by the EventPublishers that are registered in the EventBus. A default EventPublisher takes care of publishing events internally, which allows us to create and register EventSubscribers that automatically listen for these events.

```typescript
@EventSubscriber(AccountOpenedEvent)
export class AccountOpenedEventSubscriber implements IEventSubscriber {
handle(envelope: EventEnvelope<AccountOpenedEvent>) {
...
}
}
```

To register an additional EventPublisher to push your EventEnvelopes to Redis, SNS, Kafka, etc. simply create one and register it as a provider.
This doesn't replace the default EventPublisher, but adds an additional one to the EventBus.

```typescript
@EventPublisher()
export class CustomEventPublisher implements IEventPublisher {
async publish(envelope: EventEnvelope<IEvent>): Promise<void> {
...
}
}
```
32 changes: 31 additions & 1 deletion docs/pages/advanced/event_serialization.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,34 @@ import { Callout } from 'nextra/components'

<Callout type="warning" emoji="🚧">
This page is under construction.
</Callout>
</Callout>

```typescript
@Event('account-opened')
export class AccountOpenedEvent implements IEvent {
constructor(
public readonly accountId: AccountId,
public readonly openedOn: Date,
public readonly accountOwnerIds?: AccountOwnerId[]
) {}
}

@EventSerializer(AccountOpenedEvent)
export class AccountOpenedEventSerializer implements IEventSerializer {
serialize({ accountId, openedOn, accountOwnerIds }: AccountOpenedEvent): IEventPayload<AccountOpenedEvent> {
return {
accountId: accountId.value,
openedOn: openedOn.toISOString(),
accountOwnerIds: accountOwnerIds?.map((id) => id.value)
};
}

deserialize({ id, openedOn, accountOwnerIds }: IEventPayload<AccountOpenedEvent>): AccountOpenedEvent {
const accountId = AccountId.from(id);
const openedOnDate = openedOn && new Date(openedOn);
const ownerIds = accountOwnerIds?.map((id) => AccountOwnerId.from(id));

return new AccountOpenedEvent(accountId, openedOnDate, ownerIds);
}
}
```
1 change: 1 addition & 0 deletions docs/pages/advanced/multitenancy.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ By default the event- and snapshot-store create a shared collection of streams f

```typescript copy
await this.eventStore.appendEvents(stream, account.version, events, 'tenant-1');
await this.accountSnapshotRepository.save(accountId, account, 'tenant-1');
await this.accountSnapshotRepository.load(accountId, 'tenant-1');
```

Expand Down
5 changes: 3 additions & 2 deletions docs/pages/start/events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ The difference between event handlers and event subscribers is crucial. Event ha

## Event Serialization

By default events are serialized and deserialized using the [class-transformer](https://www.npmjs.com/package/class-transformer) library. It is however possible to provide your own serialization logic.
For more information on how event serialization works, please refer to the [Event Serialization](/advanced/event_serialization) documentation.
By default events are serialized and deserialized using the [class-transformer](https://www.npmjs.com/package/class-transformer) library. This works well for simple events that only contain primitive types.
If you need more advanced events, for example containing Value Objects, it's possible to provide your own serialization logic by creating an EventSerializer.
For more information on how to create an Event Serializer, please refer to the [Event Serialization](/advanced/event_serialization) documentation.

## Guidelines for creating events

Expand Down
114 changes: 108 additions & 6 deletions docs/pages/start/snapshots.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,112 @@
title: Snapshots
---

# Snapshots & Snapshot Handlers
# Snapshots

import { Callout } from 'nextra/components'

<Callout type="warning" emoji="🚧">
This page is under construction.
</Callout>
Snapshots are an optimization that is completely optional. However, they become useful when event streams grow large, and reading them becomes slow. Snapshots allow you to store the state of an aggregate at a particular point in time, so you don't need to replay all events from the beginning of the stream each time the aggregate is loaded.

To start using snapshots using this library you will need to create a `SnapshotRepository`.

## Creating a Snapshot Repository

A `SnapshotRepository` is responsible for saving and loading snapshots, which are essentially instances of an aggregate at a certain revision. To create a snapshot repository for a specific aggregate you will need to make a snapshot repository class that extends the base `SnapshotRepository` class and uses the store behind the scenes to manage snapshots.

The base class provides the following methods:
- `save(id: Id, aggregate: A, pool?: ISnapshotPool): Promise<void>`: Saves a snapshot of the aggregate.
- `load(id: Id, pool?: ISnapshotPool): Promise<A>`: Loads the latest snapshot of the aggregate or returns a blank aggregate.
- `loadMany(ids: Id[], pool?: ISnapshotPool): Promise<A[]>`: Loads the latest snapshots of multiple aggregates.
- `*loadAll(filter?: { fromId?: Id; limit?: number; pool?: string }): AsyncGenerator<SnapshotEnvelope<A>[]>`: Search all latest snapshots from the store. Returns an async iterator.

The two methods you will need to implement are **`serialize`** and **`deserialize`**.

```typescript {3-4,5,15} copy
import { SnapshotRepository, Snapshot } from '@ocoda/event-sourcing';

@Snapshot(Account, { name: 'account', interval: 5 })
export class AccountSnapshotRepository extends SnapshotRepository<Account> {
serialize({ id, ownerIds, balance, openedOn, closedOn }: Account): ISnapshot<Account> {
return {
id: id.value,
ownerIds: ownerIds.map(({ value }) => value),
balance,
openedOn: openedOn ? openedOn.toISOString() : undefined,
closedOn: closedOn ? closedOn.toISOString() : undefined,
};
}

deserialize({ id, ownerIds, balance, openedOn, closedOn }: ISnapshot<Account>): Account {
const account = new Account();
account.id = AccountId.from(id);
account.ownerIds = ownerIds.map(AccountOwnerId.from);
account.balance = balance;
account.openedOn = openedOn && new Date(openedOn);
account.closedOn = closedOn && new Date(closedOn);

return account;
}
}
```

#### Breakdown of the code

- **`@Snapshot(Account, { name: 'account', interval: 5 })`:**
- The `@Snapshot()` decorator marks the class as a snapshot handler and specifies the aggregate type and the snapshot name.
- The `interval` option specifies how often a snapshot should be taken. In this case, a snapshot will be taken every 5 events.

- **`serialize({ id, ownerIds, balance, openedOn, closedOn }: Account)`:**
- The `serialize` method is responsible for converting an aggregate instance into a snapshot object. This method is called when saving a snapshot.

- **`deserialize({ id, ownerIds, balance, openedOn, closedOn }: ISnapshot<Account>): Account`:**
- The `deserialize` method is responsible for converting a snapshot object into an aggregate instance. This method is called when loading a snapshot.

## Plugging it into you aggregate repository

If you created a snapshot repository and registered it as a provider, you can now plug it into your aggregate repository to optimize the loading process.

```typescript {5,11,25,41} copy
@Injectable()
export class AccountRepository {
constructor(
private readonly eventStore: EventStore,
private readonly accountSnapshotRepository: AccountSnapshotRepository,
) {}

async getById(accountId: AccountId) {
const eventStream = EventStream.for<Account>(Account, accountId);

const account = await this.accountSnapshotRepository.load(accountId);

const events = this.eventStore.getEvents(eventStream, { fromVersion: account.version + 1 });

await account.loadFromHistory(events);

if (account.version < 1) {
throw new AccountNotFoundException(accountId.value);
}

return account;
}

async getByIds(accountIds: AccountId[]) {
const accounts = await this.accountSnapshotRepository.loadMany(accountIds, 'e2e');

for (const account of accounts) {
const eventStream = EventStream.for<Account>(Account, account.id);
const eventCursor = this.eventStore.getEvents(eventStream, { pool: 'e2e', fromVersion: account.version + 1 });
await account.loadFromHistory(eventCursor);
}

return accounts;
}

async save(account: Account): Promise<void> {
const events = account.commit();
const stream = EventStream.for<Account>(Account, account.id);

// Append the events to the event store
await this.eventStore.appendEvents(stream, account.version, events);
// Save a snapshot of the account
await this.accountSnapshotRepository.save(account.id, account);
}
}
```
6 changes: 2 additions & 4 deletions example/src/application/repositories/account.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ export class AccountRepository {
const events = account.commit();
const stream = EventStream.for<Account>(Account, account.id);

await Promise.all([
this.accountSnapshotRepository.save(account.id, account),
this.eventStore.appendEvents(stream, account.version, events),
]);
await this.eventStore.appendEvents(stream, account.version, events);
await this.accountSnapshotRepository.save(account.id, account);
}
}
2 changes: 1 addition & 1 deletion example/src/domain/models/account.snapshot-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Account, AccountId, AccountOwnerId } from './account.aggregate';

@Snapshot(Account, { name: 'account', interval: 5 })
export class AccountSnapshotRepository extends SnapshotRepository<Account> {
serialize({ id, ownerIds, balance, openedOn, closedOn }: Account) {
serialize({ id, ownerIds, balance, openedOn, closedOn }: Account): ISnapshot<Account> {
return {
id: id.value,
ownerIds: ownerIds.map(({ value }) => value),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ export class AccountRepository {
const events = account.commit();
const stream = EventStream.for<Account>(Account, account.id);

await Promise.all([
this.accountSnapshotRepository.save(account.id, account, 'e2e'),
this.eventStore.appendEvents(stream, account.version, events, 'e2e'),
]);
await this.eventStore.appendEvents(stream, account.version, events, 'e2e');
await this.accountSnapshotRepository.save(account.id, account, 'e2e');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Account, AccountId, AccountOwnerId } from './account.aggregate';

@Snapshot(Account, { name: 'account', interval: 5 })
export class AccountSnapshotRepository extends SnapshotRepository<Account> {
serialize({ id, ownerIds, balance, openedOn, closedOn }: Account) {
serialize({ id, ownerIds, balance, openedOn, closedOn }: Account): ISnapshot<Account> {
return {
id: id.value,
ownerIds: ownerIds.map(({ value }) => value),
Expand Down

0 comments on commit 5eeb986

Please sign in to comment.