Skip to content

Commit

Permalink
WIP [ci skip]
Browse files Browse the repository at this point in the history
  • Loading branch information
jmalloc committed Jul 15, 2024
1 parent cbd3d60 commit 2f807b3
Show file tree
Hide file tree
Showing 11 changed files with 929 additions and 407 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ require (

require (
github.com/dogmatiq/cosyne v0.2.0 // indirect
github.com/dogmatiq/dapper v0.5.3 // indirect
github.com/dogmatiq/dyad v1.0.0 // indirect
github.com/dogmatiq/iago v0.4.0 // indirect
github.com/dogmatiq/interopspec v0.5.3 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ github.com/dogmatiq/configkit v0.13.0 h1:pV2Pz0iBUBnRfOm6tbWVRXvuh2bWHBScOh8KfVp
github.com/dogmatiq/configkit v0.13.0/go.mod h1:9Sx3e0G9o/wPvRfhpKcS7+3bhYHmOyRBqKdXRZdDx7M=
github.com/dogmatiq/cosyne v0.2.0 h1:tO957BpS4I9kqSw31ds6Ef4CXvV8zPAqWzbXKElsGWg=
github.com/dogmatiq/cosyne v0.2.0/go.mod h1:dD8EZjbRX7FFw9t6P7l1nwoZbA7YxtOCfl9ZZAHPucU=
github.com/dogmatiq/dapper v0.5.3 h1:DZkitO0TiokaiZt+9J7UNnagW2ezSYmJUlDTXLWGf8g=
github.com/dogmatiq/dapper v0.5.3/go.mod h1:lrBXvNri2wXkk1T0muaTUqd5lVDwIBRKeOzVRU46XI0=
github.com/dogmatiq/dapper v0.5.2 h1:/pjwTEa/tLosrxuahsGa/LzOcbpnXZE+sQNv8YSr7ZI=
github.com/dogmatiq/dapper v0.5.2/go.mod h1:lrBXvNri2wXkk1T0muaTUqd5lVDwIBRKeOzVRU46XI0=
github.com/dogmatiq/discoverkit v0.1.2 h1:NFgFe151bINH3/mNrIS6w0fiEWToSVwIHrjCDiEHw/Y=
github.com/dogmatiq/discoverkit v0.1.2/go.mod h1:mUFlbosF4i5papOkUa+OfTLv09AU/1cAU7GvN0Qd+VI=
github.com/dogmatiq/dogma v0.13.0 h1:MKk9MHErGKD53Y+43I4fcoPZMQjX0N2DUZEc4rLp+Hk=
Expand Down
55 changes: 24 additions & 31 deletions internal/eventstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func (w *worker) handleAppend(
panic("received append request with no events")
}

// Reset the idle timer _after_ whatever work is done so it's duration is
// not "eaten up" by the work.
defer w.resetIdleTimer()

if req.LowestPossibleOffset > w.nextOffset {
Expand All @@ -78,9 +80,9 @@ func (w *worker) handleAppend(
return res, nil
}

res, err = w.writeEventsToJournal(ctx, req)
pos, rec, err := w.writeEventsToJournal(ctx, req)
if err == nil {
w.publishEvents(res.BeginOffset, req.Events)
w.publishEvents(pos, rec)
return res, nil
}

Expand Down Expand Up @@ -171,41 +173,32 @@ func (w *worker) findPriorAppend(
func (w *worker) writeEventsToJournal(
ctx context.Context,
req AppendRequest,
) (AppendResponse, error) {
before := w.nextOffset
after := w.nextOffset + Offset(len(req.Events))

if err := w.Journal.Append(
ctx,
w.nextPos,
eventstreamjournal.
NewRecordBuilder().
WithStreamOffsetBefore(uint64(before)).
WithStreamOffsetAfter(uint64(after)).
WithEventsAppended(&eventstreamjournal.EventsAppended{
Events: req.Events,
}).
Build(),
); err != nil {
return AppendResponse{}, err
) (journal.Position, *eventstreamjournal.Record, error) {
pos := w.nextPos
rec := eventstreamjournal.
NewRecordBuilder().
WithStreamOffsetBefore(uint64(w.nextOffset)).
WithStreamOffsetAfter(uint64(w.nextOffset) + uint64(len(req.Events))).
WithEventsAppended(&eventstreamjournal.EventsAppended{Events: req.Events}).
Build()

if err := w.Journal.Append(ctx, pos, rec); err != nil {
return 0, nil, err
}

for index, event := range req.Events {
w.nextPos++

for _, event := range req.Events {
w.Logger.Info(
"appended event to the stream",
slog.Uint64("journal_position", uint64(w.nextPos)),
slog.Uint64("stream_offset", uint64(before)+uint64(index)),
"event appended to stream",
slog.Uint64("journal_position", uint64(pos)),
slog.Uint64("stream_offset", uint64(w.nextOffset)),
slog.String("message_id", event.MessageId.AsString()),
slog.String("description", event.Description),
)
}

w.nextPos++
w.nextOffset = after
w.nextOffset++
}

return AppendResponse{
BeginOffset: before,
EndOffset: after,
AppendedByPriorAttempt: false,
}, nil
return pos, rec, nil
}
8 changes: 4 additions & 4 deletions internal/eventstream/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestAppend(t *testing.T) {
Desc: "optimistic concurrency conflict",
InduceFailure: func(ctx context.Context, t *testing.T, deps *dependencies) {
go func() {
if _, err := deps.Supervisor.AppendQueue.Do(
if _, err := deps.Supervisor.Append.Do(
ctx,
AppendRequest{
StreamID: streamID,
Expand All @@ -130,7 +130,7 @@ func TestAppend(t *testing.T) {
UntilStopped().
Stop()

if _, err := s.AppendQueue.Do(
if _, err := s.Append.Do(
ctx,
AppendRequest{
StreamID: streamID,
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestAppend(t *testing.T) {
RunInBackground(t, "event-seeding-supervisor", deps.Supervisor.Run).
UntilStopped()

res, err := deps.Supervisor.AppendQueue.Do(
res, err := deps.Supervisor.Append.Do(
tctx,
AppendRequest{
StreamID: streamID,
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestAppend(t *testing.T) {
t.Logf("append an event, attempt #%d", attempt)
attempt++

_, err := deps.Supervisor.AppendQueue.Do(tctx, req)
_, err := deps.Supervisor.Append.Do(tctx, req)
if err == nil {
break
}
Expand Down
25 changes: 17 additions & 8 deletions internal/eventstream/idle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ const (
// catchUpTimeout is the amount of time a worker WITH SUBSCRIBERS will wait
// after appending events before "catching up" with any journal records that
// have been appended by other nodes.
catchUpTimeout = 10 * time.Second
catchUpTimeout = 1 * time.Millisecond
)

// resetIdleTimer starts or resets the idle timer.
// resetIdleTimer (re)starts the idle timer.
func (w *worker) resetIdleTimer() {
timeout := shutdownTimeout
if len(w.subscribers) > 0 {
Expand All @@ -26,12 +26,14 @@ func (w *worker) resetIdleTimer() {

if w.idleTimer == nil {
w.idleTimer = time.NewTimer(timeout)
} else {
if !w.idleTimer.Stop() {
<-w.idleTimer.C
}
w.idleTimer.Reset(timeout)
return
}

if !w.idleTimer.Stop() {
<-w.idleTimer.C
}

w.idleTimer.Reset(timeout)
}

// handleIdle is called when the worker has not appended any new events for some
Expand All @@ -43,13 +45,20 @@ func (w *worker) resetIdleTimer() {
func (w *worker) handleIdle(ctx context.Context) (bool, error) {
if len(w.subscribers) == 0 {
w.Logger.Debug(
"event stream worker stopped due to inactivity",
"event stream worker is idle, shutting down",
slog.Uint64("next_journal_position", uint64(w.nextPos)),
slog.Uint64("next_stream_offset", uint64(w.nextOffset)),
)
return false, nil
}

w.Logger.Debug(
"event stream worker is idle with subscribers, polling journal",
slog.Uint64("next_journal_position", uint64(w.nextPos)),
slog.Uint64("next_stream_offset", uint64(w.nextOffset)),
slog.Int("subscriber_count", len(w.subscribers)),
)

if err := w.catchUpWithJournal(ctx); err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit 2f807b3

Please sign in to comment.