Skip to content

Commit

Permalink
WIP [ci skip]
Browse files Browse the repository at this point in the history
  • Loading branch information
jmalloc committed Apr 10, 2024
1 parent 386c22a commit f2485ab
Show file tree
Hide file tree
Showing 12 changed files with 901 additions and 382 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22
require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/dogmatiq/configkit v0.13.0
github.com/dogmatiq/dapper v0.5.3
github.com/dogmatiq/discoverkit v0.1.2
github.com/dogmatiq/dogma v0.13.0
github.com/dogmatiq/enginekit v0.10.2
Expand All @@ -27,10 +28,10 @@ require (

require (
github.com/dogmatiq/cosyne v0.2.0 // indirect
github.com/dogmatiq/dapper v0.5.3 // indirect
github.com/dogmatiq/dyad v1.0.0 // indirect
github.com/dogmatiq/iago v0.4.0 // indirect
github.com/dogmatiq/interopspec v0.5.3 // indirect
github.com/dogmatiq/jumble v0.1.0 // indirect
github.com/dogmatiq/linger v1.1.0 // indirect
github.com/dogmatiq/projectionkit v0.6.5 // indirect
github.com/fxamacker/cbor/v2 v2.4.0 // indirect
Expand Down
74 changes: 0 additions & 74 deletions go.sum

Large diffs are not rendered by default.

51 changes: 21 additions & 30 deletions internal/eventstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (w *worker) handleAppend(
return res, nil
}

res, err = w.writeEventsToJournal(ctx, req)
pos, rec, err := w.writeEventsToJournal(ctx, req)
if err == nil {
w.publishEvents(res.BeginOffset, req.Events)
w.publishEvents(pos, rec)
return res, nil
}

Expand Down Expand Up @@ -171,41 +171,32 @@ func (w *worker) findPriorAppend(
func (w *worker) writeEventsToJournal(
ctx context.Context,
req AppendRequest,
) (AppendResponse, error) {
before := w.nextOffset
after := w.nextOffset + Offset(len(req.Events))

if err := w.Journal.Append(
ctx,
w.nextPos,
eventstreamjournal.
NewRecordBuilder().
WithStreamOffsetBefore(uint64(before)).
WithStreamOffsetAfter(uint64(after)).
WithEventsAppended(&eventstreamjournal.EventsAppended{
Events: req.Events,
}).
Build(),
); err != nil {
return AppendResponse{}, err
) (journal.Position, *eventstreamjournal.Record, error) {
pos := w.nextPos
rec := eventstreamjournal.
NewRecordBuilder().
WithStreamOffsetBefore(uint64(w.nextOffset)).
WithStreamOffsetAfter(uint64(w.nextOffset) + uint64(len(req.Events))).
WithEventsAppended(&eventstreamjournal.EventsAppended{Events: req.Events}).
Build()

if err := w.Journal.Append(ctx, pos, rec); err != nil {
return 0, nil, err
}

for index, event := range req.Events {
w.nextPos++

for _, event := range req.Events {
w.Logger.Info(
"appended event to the stream",
slog.Uint64("journal_position", uint64(w.nextPos)),
slog.Uint64("stream_offset", uint64(before)+uint64(index)),
slog.Uint64("journal_position", uint64(pos)),
slog.Uint64("stream_offset", uint64(w.nextOffset)),
slog.String("message_id", event.MessageId.AsString()),
slog.String("description", event.Description),
)
}

w.nextPos++
w.nextOffset = after
w.nextOffset++
}

return AppendResponse{
BeginOffset: before,
EndOffset: after,
AppendedByPriorAttempt: false,
}, nil
return pos, rec, nil
}
Loading

0 comments on commit f2485ab

Please sign in to comment.