Skip to content

Commit

Permalink
WIP [ci skip]
Browse files Browse the repository at this point in the history
  • Loading branch information
jmalloc committed Apr 10, 2024
1 parent 6eacb83 commit eef9e90
Show file tree
Hide file tree
Showing 12 changed files with 904 additions and 309 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22
require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/dogmatiq/configkit v0.13.0
github.com/dogmatiq/dapper v0.5.3
github.com/dogmatiq/discoverkit v0.1.2
github.com/dogmatiq/dogma v0.13.0
github.com/dogmatiq/enginekit v0.10.2
Expand All @@ -13,7 +14,7 @@ require (
github.com/dogmatiq/marshalkit v0.7.3
github.com/dogmatiq/persistencekit v0.9.3
github.com/dogmatiq/primo v0.2.0
github.com/dogmatiq/spruce v0.1.0
github.com/dogmatiq/spruce v0.1.1
github.com/google/go-cmp v0.6.0
go.opentelemetry.io/otel v1.25.0
go.opentelemetry.io/otel/metric v1.25.0
Expand All @@ -27,10 +28,10 @@ require (

require (
github.com/dogmatiq/cosyne v0.2.0 // indirect
github.com/dogmatiq/dapper v0.5.3 // indirect
github.com/dogmatiq/dyad v1.0.0 // indirect
github.com/dogmatiq/iago v0.4.0 // indirect
github.com/dogmatiq/interopspec v0.5.3 // indirect
github.com/dogmatiq/jumble v0.1.0 // indirect
github.com/dogmatiq/linger v1.1.0 // indirect
github.com/dogmatiq/projectionkit v0.6.5 // indirect
github.com/fxamacker/cbor/v2 v2.4.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/dogmatiq/projectionkit v0.6.5 h1:3Ues+QL5oVtJcx4WogMA6XjJF1QyOlcx1uRm
github.com/dogmatiq/projectionkit v0.6.5/go.mod h1:FfbWIzePx6RDAl0yl/FZ/9UaGq6wkEDKvfeDi3dg4EE=
github.com/dogmatiq/spruce v0.1.0 h1:xIcWPJA33Et+qIC1RORjP+8gSNtErdcTr0eEbtFk2oU=
github.com/dogmatiq/spruce v0.1.0/go.mod h1:0+zqOtlidouuzQr2k2Od7RRFR7Sk4p373kVyTtH6ovw=
github.com/dogmatiq/spruce v0.1.1 h1:n4Jbv6MAZvcjowIR21b/4G9opiTKfp3m3+e3W3PTjbM=
github.com/dogmatiq/spruce v0.1.1/go.mod h1:0+zqOtlidouuzQr2k2Od7RRFR7Sk4p373kVyTtH6ovw=
github.com/dogmatiq/sqltest v0.3.0 h1:DCwyLWfVk/ZHsqq5Itq3H/Lqsh/CIQ6nIRwI4YLywFc=
github.com/dogmatiq/sqltest v0.3.0/go.mod h1:a8Da8NhU4m3lq5Sybhiv+ZQowSnGHWTIJHFNInVtffg=
github.com/dogmatiq/testkit v0.13.11 h1:ikXg/Cxq58tzHL27JKCkVqUUElJCHcso7N/ymd3Wins=
Expand Down
51 changes: 21 additions & 30 deletions internal/eventstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (w *worker) handleAppend(
return res, nil
}

res, err = w.writeEventsToJournal(ctx, req)
pos, rec, err := w.writeEventsToJournal(ctx, req)
if err == nil {
w.publishEvents(res.BeginOffset, req.Events)
w.publishEvents(pos, rec)
return res, nil
}

Expand Down Expand Up @@ -171,41 +171,32 @@ func (w *worker) findPriorAppend(
func (w *worker) writeEventsToJournal(
ctx context.Context,
req AppendRequest,
) (AppendResponse, error) {
before := w.nextOffset
after := w.nextOffset + Offset(len(req.Events))

if err := w.Journal.Append(
ctx,
w.nextPos,
eventstreamjournal.
NewRecordBuilder().
WithStreamOffsetBefore(uint64(before)).
WithStreamOffsetAfter(uint64(after)).
WithEventsAppended(&eventstreamjournal.EventsAppended{
Events: req.Events,
}).
Build(),
); err != nil {
return AppendResponse{}, err
) (journal.Position, *eventstreamjournal.Record, error) {
pos := w.nextPos
rec := eventstreamjournal.
NewRecordBuilder().
WithStreamOffsetBefore(uint64(w.nextOffset)).
WithStreamOffsetAfter(uint64(w.nextOffset) + uint64(len(req.Events))).
WithEventsAppended(&eventstreamjournal.EventsAppended{Events: req.Events}).
Build()

if err := w.Journal.Append(ctx, pos, rec); err != nil {
return 0, nil, err
}

for index, event := range req.Events {
w.nextPos++

for _, event := range req.Events {
w.Logger.Info(
"appended event to the stream",
slog.Uint64("journal_position", uint64(w.nextPos)),
slog.Uint64("stream_offset", uint64(before)+uint64(index)),
slog.Uint64("journal_position", uint64(pos)),
slog.Uint64("stream_offset", uint64(w.nextOffset)),
slog.String("message_id", event.MessageId.AsString()),
slog.String("description", event.Description),
)
}

w.nextPos++
w.nextOffset = after
w.nextOffset++
}

return AppendResponse{
BeginOffset: before,
EndOffset: after,
AppendedByPriorAttempt: false,
}, nil
return pos, rec, nil
}
Loading

0 comments on commit eef9e90

Please sign in to comment.