-
-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MongoDB Event Store] Projection Registration & Helper for single stream projections #127
base: main
Are you sure you want to change the base?
Conversation
src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts
Outdated
Show resolved
Hide resolved
projections?: Array<Projection<EventType>>, | ||
) { | ||
return Promise.all((projections ?? []).map((project) => project(params))); | ||
async function handleProjections< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FIX: So we have here two types of projections:
- Projections to the same document (let's call them "inline").
- Projections to external collection.
For the first type of projections, it'd be better to use $set
and store them in the same update as appending events. Then you have the certainty that you won't have race condition and change will be atomic.
For the second part, what you did here should be fine, of course keeping in mind, that process may die in between and those other projections won't be updated, and that we need to apply retries, as mentioned in the first PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same thought for inline projections, but got a little stuck on aggregating the events that are being appended and getting the correct initial state. Should inline projections return the state to be added to the document? That way the appendToStream func has something to add to the update body or is there a better way to handle that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that there are multiple options to handle inline projections:
- return the updated projections object,
- return a Map of updated projections,
- return the updated syntax of MongoDB.
I think that we can start with what's the easiest to support. I would vote to do the following when the projections are registered:
- Read the projections subdocument (so it contains all inline projections).
- Run handle methods on top of it. The single inline projection registration would need to contain key of the projection in the projection subdocument. This key could be optional and fall back to "default" if not provided, as my guess is that most of the people would just have one document.
- Update it in the same change using
$set
operator.
@alex-laycalvert, thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, I think the name
that we register the projection with should suffice as a key, stored in details.name
on the projection. We could also add a separate key property.
I'll play around with this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alex-laycalvert Sounds good. 👌
It would be also good to double check if our structure is good for indexing strategies. I'm not sure what plays well in Mongo 👍
src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alex-laycalvert thanks for sending the PR! Could you check my comments? :)
@alex-laycalvert thank you for applying suggestions. I’ll go through them tomorrow 👌 |
src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.e2e.spec.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts
Outdated
Show resolved
Hide resolved
} | ||
export type EventStreamEvent<EventType extends Event = Event> = | ||
EventStream<EventType>['events'][number]; | ||
type MongoDBInlineProjection< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CONCERN: @alex-laycalvert is there a reason why you not extend the ProjectionDefinition from the regular Emmett package? See
export interface TypedProjectionDefinition< |
And how PostgreSQL projection does it:
export type PostgreSQLProjectionDefinition< |
Also, best if projection definition is distinct from the registration (inline, async) so someone could change whether it's run inline or async without changing the projection definition.
src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.ts
Outdated
Show resolved
Hide resolved
name: options.name, | ||
canHandle: options.canHandle, | ||
handle: async (events, { streamName, collection }) => { | ||
const stream = await collection.findOne({ streamName }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not be loading anything additionally in inline projection, as we already should have already this data in the stream state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think I might need your help on this for inline projections. I can't seem to figure out how we can have the projection handler be passed the stream state if we run the inline projections before the update so that we can have a single update instead of multiple when appending to the stream.
It feels like there's kind of a dependency circle between the handler needing to know the initial state, either current short projection on doc (requires a read), or the inline projections would be passed all events on the stream and not just the ones about to append, but that would need to be done after the stream update meaining a second update required for the projection handler if that makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll adjust the type definition in #132. I think that the best way will be to return inline projections together with the event stream and allow the reading result to be passed to append as it was (e.g. { expectedStreamState: ReadResult }
then we can read that from read result and assume that's the current projection state. Together with optimistic concurrency and atomic update, it should be sufficient.
For that, I'll need to adjust the command handler and event store definition to allow the extended event stream result to be taken (specific for event store type).
@alex-laycalvert, I added some follow up comments/concerns. If they're unclear feel free to ping me here, on Discord. I'm also happy to add commits on my own to apply them if that would be easier for you. |
@alex-laycalvert thank you for the changes; I'll work in the next few days on the projections stuff, after I adjust the event store and command handler types definition in #132 👍 |
appendToStream
callshortInfoProjection
as a helper to create projections on event streams for a read modelBigInt
.The
streamPosition
is stored as astring
value since MongoDB doesn't directly support storingBigInt
objects from JS.