From 2f807b3a5018d6d2779bea4f1b8a5ed0c09d5250 Mon Sep 17 00:00:00 2001 From: James Harris Date: Tue, 16 Jul 2024 07:52:51 +1000 Subject: [PATCH] WIP [ci skip] --- go.mod | 1 - go.sum | 4 +- internal/eventstream/append.go | 55 ++-- internal/eventstream/append_test.go | 8 +- internal/eventstream/idle.go | 25 +- internal/eventstream/reader.go | 474 ++++++++++++---------------- internal/eventstream/reader_test.go | 390 +++++++++++++++++++++++ internal/eventstream/subscriber.go | 251 +++++++++++++++ internal/eventstream/supervisor.go | 20 +- internal/eventstream/worker.go | 44 ++- internal/messaging/request.go | 64 ---- 11 files changed, 929 insertions(+), 407 deletions(-) create mode 100644 internal/eventstream/reader_test.go create mode 100644 internal/eventstream/subscriber.go delete mode 100644 internal/messaging/request.go diff --git a/go.mod b/go.mod index 87901a36..df6c58a8 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( require ( github.com/dogmatiq/cosyne v0.2.0 // indirect - github.com/dogmatiq/dapper v0.5.3 // indirect github.com/dogmatiq/dyad v1.0.0 // indirect github.com/dogmatiq/iago v0.4.0 // indirect github.com/dogmatiq/interopspec v0.5.3 // indirect diff --git a/go.sum b/go.sum index bf0534d5..63a0eb8d 100644 --- a/go.sum +++ b/go.sum @@ -46,8 +46,8 @@ github.com/dogmatiq/configkit v0.13.0 h1:pV2Pz0iBUBnRfOm6tbWVRXvuh2bWHBScOh8KfVp github.com/dogmatiq/configkit v0.13.0/go.mod h1:9Sx3e0G9o/wPvRfhpKcS7+3bhYHmOyRBqKdXRZdDx7M= github.com/dogmatiq/cosyne v0.2.0 h1:tO957BpS4I9kqSw31ds6Ef4CXvV8zPAqWzbXKElsGWg= github.com/dogmatiq/cosyne v0.2.0/go.mod h1:dD8EZjbRX7FFw9t6P7l1nwoZbA7YxtOCfl9ZZAHPucU= -github.com/dogmatiq/dapper v0.5.3 h1:DZkitO0TiokaiZt+9J7UNnagW2ezSYmJUlDTXLWGf8g= -github.com/dogmatiq/dapper v0.5.3/go.mod h1:lrBXvNri2wXkk1T0muaTUqd5lVDwIBRKeOzVRU46XI0= +github.com/dogmatiq/dapper v0.5.2 h1:/pjwTEa/tLosrxuahsGa/LzOcbpnXZE+sQNv8YSr7ZI= +github.com/dogmatiq/dapper v0.5.2/go.mod h1:lrBXvNri2wXkk1T0muaTUqd5lVDwIBRKeOzVRU46XI0= github.com/dogmatiq/discoverkit v0.1.2 h1:NFgFe151bINH3/mNrIS6w0fiEWToSVwIHrjCDiEHw/Y= github.com/dogmatiq/discoverkit v0.1.2/go.mod h1:mUFlbosF4i5papOkUa+OfTLv09AU/1cAU7GvN0Qd+VI= github.com/dogmatiq/dogma v0.13.0 h1:MKk9MHErGKD53Y+43I4fcoPZMQjX0N2DUZEc4rLp+Hk= diff --git a/internal/eventstream/append.go b/internal/eventstream/append.go index 045b7b8c..c329fb25 100644 --- a/internal/eventstream/append.go +++ b/internal/eventstream/append.go @@ -52,6 +52,8 @@ func (w *worker) handleAppend( panic("received append request with no events") } + // Reset the idle timer _after_ whatever work is done so it's duration is + // not "eaten up" by the work. defer w.resetIdleTimer() if req.LowestPossibleOffset > w.nextOffset { @@ -78,9 +80,9 @@ func (w *worker) handleAppend( return res, nil } - res, err = w.writeEventsToJournal(ctx, req) + pos, rec, err := w.writeEventsToJournal(ctx, req) if err == nil { - w.publishEvents(res.BeginOffset, req.Events) + w.publishEvents(pos, rec) return res, nil } @@ -171,41 +173,32 @@ func (w *worker) findPriorAppend( func (w *worker) writeEventsToJournal( ctx context.Context, req AppendRequest, -) (AppendResponse, error) { - before := w.nextOffset - after := w.nextOffset + Offset(len(req.Events)) - - if err := w.Journal.Append( - ctx, - w.nextPos, - eventstreamjournal. - NewRecordBuilder(). - WithStreamOffsetBefore(uint64(before)). - WithStreamOffsetAfter(uint64(after)). - WithEventsAppended(&eventstreamjournal.EventsAppended{ - Events: req.Events, - }). - Build(), - ); err != nil { - return AppendResponse{}, err +) (journal.Position, *eventstreamjournal.Record, error) { + pos := w.nextPos + rec := eventstreamjournal. + NewRecordBuilder(). + WithStreamOffsetBefore(uint64(w.nextOffset)). + WithStreamOffsetAfter(uint64(w.nextOffset) + uint64(len(req.Events))). + WithEventsAppended(&eventstreamjournal.EventsAppended{Events: req.Events}). + Build() + + if err := w.Journal.Append(ctx, pos, rec); err != nil { + return 0, nil, err } - for index, event := range req.Events { + w.nextPos++ + + for _, event := range req.Events { w.Logger.Info( - "appended event to the stream", - slog.Uint64("journal_position", uint64(w.nextPos)), - slog.Uint64("stream_offset", uint64(before)+uint64(index)), + "event appended to stream", + slog.Uint64("journal_position", uint64(pos)), + slog.Uint64("stream_offset", uint64(w.nextOffset)), slog.String("message_id", event.MessageId.AsString()), slog.String("description", event.Description), ) - } - w.nextPos++ - w.nextOffset = after + w.nextOffset++ + } - return AppendResponse{ - BeginOffset: before, - EndOffset: after, - AppendedByPriorAttempt: false, - }, nil + return pos, rec, nil } diff --git a/internal/eventstream/append_test.go b/internal/eventstream/append_test.go index 0e19c28e..2aa00ef0 100644 --- a/internal/eventstream/append_test.go +++ b/internal/eventstream/append_test.go @@ -105,7 +105,7 @@ func TestAppend(t *testing.T) { Desc: "optimistic concurrency conflict", InduceFailure: func(ctx context.Context, t *testing.T, deps *dependencies) { go func() { - if _, err := deps.Supervisor.AppendQueue.Do( + if _, err := deps.Supervisor.Append.Do( ctx, AppendRequest{ StreamID: streamID, @@ -130,7 +130,7 @@ func TestAppend(t *testing.T) { UntilStopped(). Stop() - if _, err := s.AppendQueue.Do( + if _, err := s.Append.Do( ctx, AppendRequest{ StreamID: streamID, @@ -162,7 +162,7 @@ func TestAppend(t *testing.T) { RunInBackground(t, "event-seeding-supervisor", deps.Supervisor.Run). UntilStopped() - res, err := deps.Supervisor.AppendQueue.Do( + res, err := deps.Supervisor.Append.Do( tctx, AppendRequest{ StreamID: streamID, @@ -220,7 +220,7 @@ func TestAppend(t *testing.T) { t.Logf("append an event, attempt #%d", attempt) attempt++ - _, err := deps.Supervisor.AppendQueue.Do(tctx, req) + _, err := deps.Supervisor.Append.Do(tctx, req) if err == nil { break } diff --git a/internal/eventstream/idle.go b/internal/eventstream/idle.go index 49e20550..d82fe4b5 100644 --- a/internal/eventstream/idle.go +++ b/internal/eventstream/idle.go @@ -14,10 +14,10 @@ const ( // catchUpTimeout is the amount of time a worker WITH SUBSCRIBERS will wait // after appending events before "catching up" with any journal records that // have been appended by other nodes. - catchUpTimeout = 10 * time.Second + catchUpTimeout = 1 * time.Millisecond ) -// resetIdleTimer starts or resets the idle timer. +// resetIdleTimer (re)starts the idle timer. func (w *worker) resetIdleTimer() { timeout := shutdownTimeout if len(w.subscribers) > 0 { @@ -26,12 +26,14 @@ func (w *worker) resetIdleTimer() { if w.idleTimer == nil { w.idleTimer = time.NewTimer(timeout) - } else { - if !w.idleTimer.Stop() { - <-w.idleTimer.C - } - w.idleTimer.Reset(timeout) + return } + + if !w.idleTimer.Stop() { + <-w.idleTimer.C + } + + w.idleTimer.Reset(timeout) } // handleIdle is called when the worker has not appended any new events for some @@ -43,13 +45,20 @@ func (w *worker) resetIdleTimer() { func (w *worker) handleIdle(ctx context.Context) (bool, error) { if len(w.subscribers) == 0 { w.Logger.Debug( - "event stream worker stopped due to inactivity", + "event stream worker is idle, shutting down", slog.Uint64("next_journal_position", uint64(w.nextPos)), slog.Uint64("next_stream_offset", uint64(w.nextOffset)), ) return false, nil } + w.Logger.Debug( + "event stream worker is idle with subscribers, polling journal", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), + ) + if err := w.catchUpWithJournal(ctx); err != nil { return false, err } diff --git a/internal/eventstream/reader.go b/internal/eventstream/reader.go index 17f3873a..cea58ee1 100644 --- a/internal/eventstream/reader.go +++ b/internal/eventstream/reader.go @@ -2,349 +2,295 @@ package eventstream import ( "context" + "errors" "fmt" "log/slog" "time" - "github.com/dogmatiq/enginekit/protobuf/envelopepb" "github.com/dogmatiq/enginekit/protobuf/uuidpb" "github.com/dogmatiq/persistencekit/journal" "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" "github.com/dogmatiq/veracity/internal/messaging" - "github.com/dogmatiq/veracity/internal/signaling" ) -// A Subscriber is sent events from a stream, by way of a [Reader]. -type Subscriber struct { - // StreamID is the ID of the stream from which events are read. - StreamID *uuidpb.UUID - - // Offset is the offset of the next event to read. - // - // It must not be read or modified while the subscription is active. It is - // incremented as events are sent to the subscriber. - Offset Offset - - // Filter is a predicate function that returns true if the subscriber should - // receive the event in the given envelope. - // - // It is used to avoid filling the subscriber's channel with events they are - // not interested in. It is called by the event stream worker in its own - // goroutine, and hence must not block. - Filter func(*envelopepb.Envelope) bool - - // Events is the channel to which the subscriber's events are sent. - Events chan<- Event - - canceled signaling.Event -} +// defaultSubscribeTimeout is the default maximum time to wait for a +// subscription to be acknowledged by the worker before reverting to reading +// from the journal. +const defaultSubscribeTimeout = 3 * time.Second // A Reader reads ordered events from a stream. type Reader struct { Journals journal.BinaryStore - SubscribeQueue *messaging.RequestQueue[*Subscriber] - UnsubscribeQueue *messaging.RequestQueue[*Subscriber] + SubscribeQueue *messaging.ExchangeQueue[*Subscriber, messaging.None] + UnsubscribeQueue *messaging.ExchangeQueue[*Subscriber, messaging.None] + SubscribeTimeout time.Duration + Logger *slog.Logger } -// Read reads events from a stream and sends them to the given subscriber. -// -// It starts by reading events directly from the stream's journal records. Once -// it has "caught up" to the end of the journal it receives events in -// "real-time" from the supervisor of that stream. +// Read sends events from a stream to the given subscriber's events channel. // -// If the subscriber's channel becomes full, it reverts to reading from the -// journal until it catches up again. +// It first attempts to "sync" with the local worker to receive contemporary +// events in "real-time". If the subscriber's requested offset is too old to be +// obtained from the worker, or if the events channel becomes full, the reader +// obtains events directly from the journal until the last record is reached, +// then the process repeats. func (r *Reader) Read(ctx context.Context, sub *Subscriber) error { + if sub.id == nil { + sub.id = uuidpb.Generate() + } + for { - if err := r.readHistorical(ctx, sub); err != nil { + if err := r.readContemporary(ctx, sub); err != nil { return err } - if err := r.readContemporary(ctx, sub); err != nil { + if err := r.readHistorical(ctx, sub); err != nil { return err } } } -func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error { - j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID) - if err != nil { - return err - } - defer j.Close() - - searchBegin, searchEnd, err := j.Bounds(ctx) - if err != nil { - return err - } - - return journal.RangeFromSearchResult( - ctx, - j, - searchBegin, searchEnd, - eventstreamjournal.SearchByOffset(uint64(sub.Offset)), - func( - ctx context.Context, - pos journal.Position, - rec *eventstreamjournal.Record, - ) (bool, error) { - begin := Offset(rec.StreamOffsetBefore) - end := Offset(rec.StreamOffsetAfter) - - if begin == end { - // no events in this record - return true, nil - } - - if sub.Offset < begin || sub.Offset >= end { - return false, fmt.Errorf( - "event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)", - pos, - sub.Offset, - begin, - end, - ) - } - - index := sub.Offset - begin - - for _, env := range rec.GetEventsAppended().Events[index:] { - if !sub.Filter(env) { - sub.Offset++ - continue - } - - select { - case <-ctx.Done(): - return false, ctx.Err() - case sub.Events <- Event{sub.StreamID, sub.Offset, env}: - sub.Offset++ - } - } - - return true, nil - }, - ) -} - func (r *Reader) readContemporary(ctx context.Context, sub *Subscriber) error { // TODO: remote read + r.Logger.Debug( + "subscribing to receive contemporary events from local event stream worker", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + if err := r.subscribe(ctx, sub); err != nil { + // If the subscription request times out, but the parent context isn't + // canceled we revert to reading from the journal (by returning nil). + if errors.Is(err, context.DeadlineExceeded) && ctx.Err() == nil { + r.Logger.Warn( + "timed-out waiting for local event stream worker to acknowledge subscription", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + return nil + } + return err } - defer r.unsubscribe(ctx, sub) select { case <-ctx.Done(): + r.unsubscribe(sub) return ctx.Err() case <-sub.canceled.Signaled(): + r.Logger.Debug( + "subscription canceled by local event stream worker", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) return nil } } func (r *Reader) subscribe(ctx context.Context, sub *Subscriber) error { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) // TODO: make configurable - cancel() - - if err := r.SubscribeQueue.Do(ctx, sub); err != nil { - return fmt.Errorf("cannot subscribe to event stream: %w", err) + // Impose our own subscription timeout. This handles the case that the + // supervisor/worker is not running or cannot service our subscription + // request in a timely manner, in which case we will revert to reading from + // the journal. + timeout := r.SubscribeTimeout + if timeout <= 0 { + timeout = defaultSubscribeTimeout } - return nil -} - -func (r *Reader) unsubscribe(ctx context.Context, sub *Subscriber) error { - ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - // Cancel the unsubscribe context when the subscription is canceled, - // regardless of the reason. - // - // This handles the situation where the subscription is canceled because the - // worker shutdown (and hence wont service the unsubscribe request). - go func() { - <-sub.canceled.Signaled() - cancel() - }() - - return r.UnsubscribeQueue.Do(ctx, sub) -} + req, done := r.SubscribeQueue.New(sub) -// handleSubscribe adds sub to the subscriber list. -// -// It delivers any cached events that the subscriber has not yet seen. If the -// subscriber's requested event is older than the events in the cache the -// subscription is canceled immediately. -func (w *worker) handleSubscribe(sub *Subscriber) { - if !sub.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") + select { + case <-ctx.Done(): + return ctx.Err() + case r.SubscribeQueue.Send() <- req: } - if w.subscribers == nil { - w.subscribers = map[*Subscriber]struct{}{} - } - w.subscribers[sub] = struct{}{} - - w.Logger.Debug( - "subscription activated", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("requested_stream_offset", uint64(sub.Offset)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) + // We don't want to use the context timeout waiting for the response, + // because then we wont know if the subscription was actually accepted. The + // worker ALWAYS sends a response to any subscription request it receives + // from the queue. + res := <-done - if sub.Offset >= w.nextOffset { - return + if _, err := res.Get(); err != nil { + return fmt.Errorf("cannot subscribe to event stream: %w", err) } - index := w.findInCache(sub.Offset) - - if index == -1 { - sub.canceled.Signal() - w.Logger.Warn( - "subscription canceled immediately due request for historical events", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Int("cached_event_count", len(w.recentEvents)), - slog.Uint64("requested_stream_offset", uint64(sub.Offset)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) - return - } + return nil +} - for _, event := range w.recentEvents[index:] { - if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { - return - } - } +func (r *Reader) unsubscribe(sub *Subscriber) { + // TODO: use a latch to indicate unsubscribing? + req, _ := r.UnsubscribeQueue.New(sub) + r.UnsubscribeQueue.Send() <- req + <-sub.canceled.Signaled() } -// handleUnsubscribe removes sub from the subscriber list. -func (w *worker) handleUnsubscribe(sub *Subscriber) { - if !sub.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") +func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error { + j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID) + if err != nil { + return fmt.Errorf("unable to open journal: %w", err) } + defer j.Close() - before := len(w.subscribers) - delete(w.subscribers, sub) - after := len(w.subscribers) + var records, delivered, filtered int - if before > after { - sub.canceled.Signal() + fn := func( + ctx context.Context, + pos journal.Position, + rec *eventstreamjournal.Record, + ) (bool, error) { + records++ - w.Logger.Debug( - "subscription canceled by subscriber", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", after), - ) - } -} + begin := Offset(rec.StreamOffsetBefore) + end := Offset(rec.StreamOffsetAfter) -// deliverResult is an enumeration of the possible outcomes of delivering an -// event to a subscriber. -type deliverResult int + if begin == end { + // This condition is not producible at the moment, but is present to + // provide forward compatibility with future journal record types. + sub.beginPos = pos + 1 + return true, nil + } -const ( - // eventDelivered means that the event was sent to the subscriber's events - // channel, which may or may not be buffered. - eventDelivered deliverResult = iota + if sub.Offset < begin || sub.Offset >= end { + return false, fmt.Errorf( + "event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)", + pos, + sub.Offset, + begin, + end, + ) + } - // eventFiltered means that the event was filtered by the subscriber's - // filter function, and did not need to be delivered. - eventFiltered + sub.beginPos = pos + sub.beginPosIsDefinitive = true - // subscriptionCanceled means that an attempt was made to send the event to - // the subscriber's event channel, but the channel buffer was full (or - // unbuffered and not ready to read), and so the subscription was canceled. - subscriptionCanceled -) + index := sub.Offset - begin + + for _, env := range rec.GetEventsAppended().Events[index:] { + if sub.Filter != nil && !sub.Filter(env) { + sub.Offset++ + filtered++ + continue + } -// deliverEventToSubscriber attempts to deliver an event to a subscriber's event -// channel. -func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { - if event.Offset > sub.Offset { - panic("event is out of order") + select { + case <-ctx.Done(): + return false, ctx.Err() + case sub.Events <- Event{sub.StreamID, sub.Offset, env}: + sub.Offset++ + delivered++ + } + } + + sub.beginPos++ + + return true, nil } - if event.Offset < sub.Offset { - return eventFiltered + iter := r.searchHistorical + if sub.beginPosIsDefinitive { + iter = r.rangeHistorical } - if !sub.Filter(event.Envelope) { - sub.Offset++ - return eventFiltered + if err := iter(ctx, sub, j, fn); err != nil { + return err } - select { - case sub.Events <- event: - sub.Offset++ - return eventDelivered - - default: - delete(w.subscribers, sub) - sub.canceled.Signal() - - w.Logger.Warn( - "subscription canceled because the subscriber can not keep up with the event stream", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", 0), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("stream_offset", uint64(event.Offset)), - ) + r.Logger.Debug( + "finished reading historical events from journal", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + slog.Int("journal_record_count", records), + slog.Int("events_delivered_count", delivered), + slog.Int("events_filtered_count", filtered), + ) - return subscriptionCanceled - } + return nil } -// publishEvents publishes the events to both the recent event cache and any -// interested subscribers. -func (w *worker) publishEvents( - offset Offset, - events []*envelopepb.Envelope, -) { - skip := w.growCache(len(events)) - - for i, env := range events { - event := Event{w.StreamID, offset, env} - offset++ +// rangeHistorical delivers all (relevent) events to the subscriber, starting +// with the events in the record at position sub.beginPos. +func (r *Reader) rangeHistorical( + ctx context.Context, + sub *Subscriber, + j journal.Journal[*eventstreamjournal.Record], + fn journal.RangeFunc[*eventstreamjournal.Record], +) error { + r.Logger.Debug( + "ranging over historical events in journal", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + slog.Uint64("journal_begin_position", uint64(sub.beginPos)), + ) - if i >= skip { - w.appendEventToCache(event) + if err := j.Range(ctx, sub.beginPos, fn); err != nil { + if errors.Is(err, journal.ErrNotFound) { + // If we're ranging over a journal record that does not exist it + // means we've delivered all events but were unable to subscribe to + // receive contemporary events from the worker. + return nil } + return fmt.Errorf("unable to range over journal: %w", err) + } - if len(w.subscribers) == 0 { - continue - } + return nil +} + +// searchHistorical performs a binary search to find the journal record that +// contains the next event to deliver to sub, then delivers that event an all +// subsequent events until the end of the journal. +func (r *Reader) searchHistorical( + ctx context.Context, + sub *Subscriber, + j journal.Journal[*eventstreamjournal.Record], + fn journal.RangeFunc[*eventstreamjournal.Record], +) error { + begin, end, err := j.Bounds(ctx) + if err != nil { + return fmt.Errorf("unable to read journal bounds: %w", err) + } - var delivered, filtered, canceled int + begin = max(begin, sub.beginPos) - for sub := range w.subscribers { - switch w.deliverEventToSubscriber(event, sub) { - case eventDelivered: - delivered++ - case eventFiltered: - filtered++ - case subscriptionCanceled: - canceled++ - } - } + r.Logger.Debug( + "searching for historical events in journal", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + slog.Uint64("journal_begin_position", uint64(begin)), + slog.Uint64("journal_end_position", uint64(end)), + ) - w.Logger.Debug( - "event published to subscribers", - slog.Uint64("stream_offset", uint64(event.Offset)), - slog.String("message_id", env.MessageId.AsString()), - slog.String("description", env.Description), - slog.Int("delivered_count", delivered), - slog.Int("filtered_count", filtered), - slog.Int("canceled_count", canceled), - ) + if err := journal.RangeFromSearchResult( + ctx, + j, + begin, end, + eventstreamjournal.SearchByOffset(uint64(sub.Offset)), + fn, + ); err != nil { + if errors.Is(err, journal.ErrNotFound) { + // If the event is not in the journal then we don't want to + // re-search these same records in the future. + sub.beginPos = end + return nil + } + return fmt.Errorf("unable to search journal: %w", err) } + + return nil } diff --git a/internal/eventstream/reader_test.go b/internal/eventstream/reader_test.go new file mode 100644 index 00000000..02073926 --- /dev/null +++ b/internal/eventstream/reader_test.go @@ -0,0 +1,390 @@ +package eventstream_test + +import ( + "context" + "testing" + "time" + + . "github.com/dogmatiq/dogma/fixtures" + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/identitypb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + . "github.com/dogmatiq/marshalkit/fixtures" + "github.com/dogmatiq/persistencekit/driver/memory/memoryjournal" + "github.com/dogmatiq/spruce" + "github.com/dogmatiq/veracity/internal/envelope" + . "github.com/dogmatiq/veracity/internal/eventstream" + "github.com/dogmatiq/veracity/internal/test" +) + +func TestReader(t *testing.T) { + t.Parallel() + + type dependencies struct { + Journals *memoryjournal.BinaryStore + Supervisor *Supervisor + Packer *envelope.Packer + Reader *Reader + } + + setup := func(t test.TestingT) (deps dependencies) { + deps.Journals = &memoryjournal.BinaryStore{} + + logger := spruce.NewLogger(t) + + deps.Supervisor = &Supervisor{ + Journals: deps.Journals, + Logger: logger, + } + + deps.Packer = &envelope.Packer{ + Application: identitypb.New("", uuidpb.Generate()), + Marshaler: Marshaler, + } + + deps.Reader = &Reader{ + Journals: deps.Journals, + SubscribeQueue: &deps.Supervisor.SubscribeQueue, + UnsubscribeQueue: &deps.Supervisor.UnsubscribeQueue, + Logger: logger, + } + + return deps + } + + t.Run("it reads historical events from the journal", func(t *testing.T) { + t.Parallel() + + streamID := uuidpb.Generate() + tctx := test.WithContext(t) + deps := setup(tctx) + + supervisor := test. + RunInBackground(t, "supervisor", deps.Supervisor.Run). + UntilStopped() + + env1 := deps.Packer.Pack(MessageE1) + env2 := deps.Packer.Pack(MessageE2) + env3 := deps.Packer.Pack(MessageE3) + + // Journal record with a single event. + if _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + env1, + }, + }, + ); err != nil { + t.Fatal(err) + } + + // Journal record with multiple events. + if _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + env2, + env3, + }, + }, + ); err != nil { + t.Fatal(err) + } + + // Stop the supervisor now to verify that it's not necessary to read + // historical events. We set the timeout to the minimum value possible + // to reduce the test run-time. + supervisor.StopAndWait() + deps.Reader.SubscribeTimeout = 1 + + events := make(chan Event) + sub := &Subscriber{ + StreamID: streamID, + Events: events, + } + + test. + RunInBackground(t, "reader", func(ctx context.Context) error { + return deps.Reader.Read(ctx, sub) + }). + UntilTestEnds() + + for offset, env := range []*envelopepb.Envelope{env1, env2, env3} { + test.ExpectChannelToReceive( + tctx, + events, + Event{ + StreamID: streamID, + Offset: Offset(offset), + Envelope: env, + }, + ) + } + }) + + t.Run("it reads contemporary events via a supervisor subscription", func(t *testing.T) { + t.Parallel() + + streamID := uuidpb.Generate() + tctx := test.WithContext(t) + deps := setup(tctx) + + test. + RunInBackground(t, "supervisor", deps.Supervisor.Run). + UntilTestEnds() + + // Give the reader a different journal store so we know it isn't reading + // the getting events from the journal. + deps.Reader.Journals = &memoryjournal.BinaryStore{} + + events := make(chan Event, 100) + sub := &Subscriber{ + StreamID: streamID, + Events: events, + } + + test. + RunInBackground(t, "reader", func(ctx context.Context) error { + return deps.Reader.Read(ctx, sub) + }). + UntilTestEnds() + + env1 := deps.Packer.Pack(MessageE1) + env2 := deps.Packer.Pack(MessageE2) + env3 := deps.Packer.Pack(MessageE3) + + go func() { + // Journal record with a single event. + if _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + env1, + }, + }, + ); err != nil { + t.Error(err) + return + } + + // Journal record with multiple events. + if _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + env2, + env3, + }, + }, + ); err != nil { + t.Error(err) + return + } + }() + + for offset, env := range []*envelopepb.Envelope{env1, env2, env3} { + test.ExpectChannelToReceive( + tctx, + events, + Event{ + StreamID: streamID, + Offset: Offset(offset), + Envelope: env, + }, + ) + } + }) + + t.Run("it reverts to the journal if it cannot keep up with the event stream", func(t *testing.T) { + t.Parallel() + + streamID := uuidpb.Generate() + tctx := test.WithContext(t) + deps := setup(tctx) + + test. + RunInBackground(t, "supervisor", deps.Supervisor.Run). + UntilTestEnds() + + events := make(chan Event) // note: unbuffered + sub := &Subscriber{ + StreamID: streamID, + Events: events, + } + + test. + RunInBackground(t, "reader", func(ctx context.Context) error { + return deps.Reader.Read(ctx, sub) + }). + UntilTestEnds() + + envelopes := []*envelopepb.Envelope{ + deps.Packer.Pack(MessageE1), + deps.Packer.Pack(MessageE2), + deps.Packer.Pack(MessageE3), + } + + go func() { + for _, env := range envelopes { + _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{env}, + }, + ) + if err != nil { + t.Error(err) + return + } + } + }() + + for offset, env := range envelopes { + time.Sleep(500 * time.Microsecond) // make the consumer slow + test.ExpectChannelToReceive( + tctx, + events, + Event{ + StreamID: streamID, + Offset: Offset(offset), + Envelope: env, + }, + ) + } + }) + + t.Run("it does not duplicate or mis-order events when there are competing supervisors", func(t *testing.T) { + t.Parallel() + + streamID := uuidpb.Generate() + tctx := test.WithContext(t) + deps := setup(tctx) + + // emulate a supervisor running on another node + remoteSupervisor := &Supervisor{ + Journals: deps.Journals, + Logger: deps.Supervisor.Logger.With("supervisor", "remote"), + } + + deps.Supervisor.Logger = deps.Supervisor.Logger.With("supervisor", "local") + + test. + RunInBackground(t, "local-supervisor", deps.Supervisor.Run). + UntilTestEnds() + + test. + RunInBackground(t, "remote-supervisor", remoteSupervisor.Run). + UntilTestEnds() + + // use a small buffer, allowing it to revert to reading from the + // journal as an added sanity check. + events := make(chan Event, 5) + sub := &Subscriber{ + StreamID: streamID, + Events: events, + } + + test. + RunInBackground(t, "reader", func(ctx context.Context) error { + return deps.Reader.Read(ctx, sub) + }). + UntilTestEnds() + + const eventsPerSupervisor = 1 + + appendLoop := func(ctx context.Context, s *Supervisor, source string) error { + handlerID := uuidpb.Generate() + + for n := range eventsPerSupervisor { + env := deps.Packer.Pack( + MessageE{Value: n}, + envelope.WithHandler( + // Abuse the "source handler" field of the envelope to + // discriminate between events produced by our local and + // remote supervisors. + identitypb.New(source, handlerID), + ), + ) + + if _, err := s.AppendQueue.Do( + ctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{env}, + }, + ); err != nil { + t.Logf("failed to append event #%d: %s", n, err) + return err + } + } + + return nil + } + + test. + RunInBackground(t, "local-append-loop", func(ctx context.Context) error { + return appendLoop(ctx, deps.Supervisor, "local") + }). + BeforeTestEnds() + + test. + RunInBackground(t, "remote-append-loop", func(ctx context.Context) error { + return appendLoop(ctx, remoteSupervisor, "remote") + }). + BeforeTestEnds() + + nextLocal := 0 + nextRemote := 0 + + for offset := Offset(0); offset < eventsPerSupervisor*2; offset++ { + t.Logf("waiting for event at offset %d", offset) + + select { + case <-tctx.Done(): + t.Fatal(tctx.Err()) + + case e := <-events: + if e.Offset != offset { + t.Fatalf("unexpected offset: got %d, want %d", e.Offset, offset) + } + + next := &nextLocal + if e.Envelope.SourceHandler.Name == "remote" { + next = &nextRemote + } + + m, err := deps.Packer.Unpack(e.Envelope) + if err != nil { + t.Fatalf("unable to unpack event: %s", err) + } + + got := m.(MessageE) + want := MessageE{Value: float64(*next)} + + test.Expect( + t, + "unexpected message from "+e.Envelope.SourceHandler.Name+" supervisor", + got, + want, + ) + + *next++ + t.Logf("received expected event %q from %q supervisor", e.Envelope.Description, e.Envelope.SourceHandler.Name) + } + } + + if nextLocal != eventsPerSupervisor { + t.Errorf("unexpected number of events from local supervisor: got %d, want %d", nextLocal, eventsPerSupervisor) + } + + if nextRemote != eventsPerSupervisor { + t.Errorf("unexpected number of events from remote supervisor: got %d, want %d", nextRemote, eventsPerSupervisor) + } + }) +} diff --git a/internal/eventstream/subscriber.go b/internal/eventstream/subscriber.go new file mode 100644 index 00000000..034111f3 --- /dev/null +++ b/internal/eventstream/subscriber.go @@ -0,0 +1,251 @@ +package eventstream + +import ( + "log/slog" + + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" + "github.com/dogmatiq/veracity/internal/signaling" +) + +// A Subscriber is sent events from a stream, by way of a [Reader]. +type Subscriber struct { + // StreamID is the ID of the stream from which events are read. + StreamID *uuidpb.UUID + + // Offset is the offset of the next event to read. + // + // It must not be read or modified while the subscription is active. It is + // incremented as events are sent to the subscriber. + Offset Offset + + // Filter is a predicate function that returns true if the subscriber should + // receive the event in the given envelope. + // + // It is used to avoid filling the subscriber's channel with events they are + // not interested in. It is called by the event stream worker in its own + // goroutine, and hence must not block. + Filter func(*envelopepb.Envelope) bool + + // Events is the channel to which the subscriber's events are sent. + Events chan<- Event + + // canceled indicates that the [Supervisor] has canceled the subscription. + canceled signaling.Event + + // id is a unique identifier for the subscriber. + id *uuidpb.UUID + + // beginPos is the journal position to begin ranging or search for the next + // event to deliver to the subscriber. + beginPos journal.Position + + // beginPosIsDefinitive is true if beginPos "definitive", meaning that it + // represents the exact position of the record containing the next event to + // deliver to the subscriber. + beginPosIsDefinitive bool +} + +// handleSubscribe adds sub to the subscriber list. +// +// It delivers any cached events that the subscriber has not yet seen. If the +// subscriber's requested event is older than the events in the cache the +// subscription is canceled immediately. +func (w *worker) handleSubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if w.subscribers == nil { + w.subscribers = map[*Subscriber]struct{}{} + } + w.subscribers[sub] = struct{}{} + defer w.resetIdleTimer() + + if sub.Offset >= w.nextOffset { + w.Logger.Debug( + "subscription activated, waiting for new events", + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + return + } + + index := w.findInCache(sub.Offset) + + if index == -1 { + delete(w.subscribers, sub) + w.Logger.Warn( + "subscription not activated due to request for uncached historical events", + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("cached_event_count", len(w.recentEvents)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + sub.canceled.Signal() + return + } + + w.Logger.Debug( + "subscription activated, delivering cached events", + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + + for _, event := range w.recentEvents[index:] { + if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { + return + } + } +} + +// handleUnsubscribe removes sub from the subscriber list. +func (w *worker) handleUnsubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + before := len(w.subscribers) + delete(w.subscribers, sub) + after := len(w.subscribers) + + if before > after { + w.Logger.Debug( + "subscription canceled by subscriber", + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", after), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + sub.canceled.Signal() + w.resetIdleTimer() + } +} + +// deliverResult is an enumeration of the possible outcomes of delivering an +// event to a subscriber. +type deliverResult int + +const ( + // eventDelivered means that the event was sent to the subscriber's events + // channel, which may or may not be buffered. + eventDelivered deliverResult = iota + + // eventFiltered means that the event was filtered by the subscriber's + // filter function, and did not need to be delivered. + eventFiltered + + // subscriptionCanceled means that an attempt was made to send the event to + // the subscriber's event channel, but the channel buffer was full (or + // unbuffered and not ready to read), and so the subscription was canceled. + subscriptionCanceled +) + +// deliverEventToSubscriber attempts to deliver an event to a subscriber's event +// channel. +func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { + if event.Offset > sub.Offset { + panic("event is out of order") + } + + if event.Offset < sub.Offset { + return eventFiltered + } + + if sub.Filter != nil && !sub.Filter(event.Envelope) { + sub.Offset++ + return eventFiltered + } + + select { + case sub.Events <- event: + sub.Offset++ + return eventDelivered + + default: + delete(w.subscribers, sub) + w.Logger.Warn( + "subscription canceled because the subscriber cannot keep up with the event stream", + slog.Int("subscriber_count", len(w.subscribers)), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + sub.canceled.Signal() + w.resetIdleTimer() + + return subscriptionCanceled + } +} + +// publishEvents publishes the events to both the recent event cache and any +// interested subscribers. +func (w *worker) publishEvents(pos journal.Position, rec *eventstreamjournal.Record) { + op := rec.GetEventsAppended() + if op == nil { + return + } + + skip := w.growCache(len(op.Events)) + + // Update the subscriber's position to refer to the record containing the + // events we're about to deliver. + for sub := range w.subscribers { + sub.beginPos = pos + sub.beginPosIsDefinitive = true + } + + offset := Offset(rec.StreamOffsetBefore) + for i, env := range op.Events { + event := Event{w.StreamID, offset, env} + offset++ + + if i >= skip { + w.appendEventToCache(event) + } + + if len(w.subscribers) == 0 { + continue + } + + var delivered, filtered, canceled int + + for sub := range w.subscribers { + switch w.deliverEventToSubscriber(event, sub) { + case eventDelivered: + delivered++ + case eventFiltered: + filtered++ + case subscriptionCanceled: + canceled++ + } + } + + w.Logger.Debug( + "event published to subscribers", + slog.Uint64("stream_offset", uint64(event.Offset)), + slog.String("message_id", env.MessageId.AsString()), + slog.String("description", env.Description), + slog.Int("subscriber_delivered_count", delivered), + slog.Int("subscriber_filtered_count", filtered), + slog.Int("subscriber_canceled_count", canceled), + ) + } + + // Any remaining (i.e. uncanceled) subscribers should now look for the + // following journal record. + for sub := range w.subscribers { + sub.beginPos++ + } +} diff --git a/internal/eventstream/supervisor.go b/internal/eventstream/supervisor.go index d514435f..22490fd2 100644 --- a/internal/eventstream/supervisor.go +++ b/internal/eventstream/supervisor.go @@ -21,8 +21,8 @@ var errShuttingDown = errors.New("event stream sub-system is shutting down") type Supervisor struct { Journals journal.BinaryStore AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] - SubscribeQueue messaging.RequestQueue[*Subscriber] - UnsubscribeQueue messaging.RequestQueue[*Subscriber] + SubscribeQueue messaging.ExchangeQueue[*Subscriber, messaging.None] + UnsubscribeQueue messaging.ExchangeQueue[*Subscriber, messaging.None] Logger *slog.Logger shutdown signaling.Latch @@ -93,29 +93,29 @@ func (s *Supervisor) appendState( // subscribeState forwards a subscribe request to the appropriate worker. func (s *Supervisor) subscribeState( ctx context.Context, - req messaging.Request[*Subscriber], + ex messaging.Exchange[*Subscriber, messaging.None], ) fsm.Action { - w, err := s.workerByStreamID(ctx, req.Request.StreamID) + w, err := s.workerByStreamID(ctx, ex.Request.StreamID) if err != nil { - req.Err(errShuttingDown) + ex.Err(errShuttingDown) return fsm.Fail(err) } - return forwardToWorker(ctx, s, w.SubscribeQueue.Send(), req) + return forwardToWorker(ctx, s, w.SubscribeQueue.Send(), ex) } // unsubscribeState forwards an unsubscribe request to the appropriate worker. func (s *Supervisor) unsubscribeState( ctx context.Context, - req messaging.Request[*Subscriber], + ex messaging.Exchange[*Subscriber, messaging.None], ) fsm.Action { - w, ok := s.workers.TryGet(req.Request.StreamID) + w, ok := s.workers.TryGet(ex.Request.StreamID) if !ok { - req.Ok() + ex.Zero() return fsm.EnterState(s.idleState) } - return forwardToWorker(ctx, s, w.UnsubscribeQueue.Send(), req) + return forwardToWorker(ctx, s, w.UnsubscribeQueue.Send(), ex) } func forwardToWorker[ diff --git a/internal/eventstream/worker.go b/internal/eventstream/worker.go index 4d6c66c6..9a513baa 100644 --- a/internal/eventstream/worker.go +++ b/internal/eventstream/worker.go @@ -2,6 +2,7 @@ package eventstream import ( "context" + "fmt" "log/slog" "time" @@ -24,10 +25,10 @@ type worker struct { AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] // SubscribeQueue is a queue of requests to subscribe to the stream. - SubscribeQueue messaging.RequestQueue[*Subscriber] + SubscribeQueue messaging.ExchangeQueue[*Subscriber, messaging.None] // UnsubscribeQueue is a queue of requests to unsubscribe from the stream. - UnsubscribeQueue messaging.RequestQueue[*Subscriber] + UnsubscribeQueue messaging.ExchangeQueue[*Subscriber, messaging.None] // Shutdown signals the worker to stop when it next becomes idle. Shutdown signaling.Latch @@ -65,22 +66,20 @@ func (w *worker) Run(ctx context.Context) (err error) { pos, rec, ok, err := journal.LastRecord(ctx, w.Journal) if err != nil { - return err + return fmt.Errorf("unable to find most recent journal record: %w", err) } if ok { w.nextPos = pos + 1 w.nextOffset = Offset(rec.StreamOffsetAfter) - - w.Logger.Debug( - "event stream journal has existing records", - slog.Uint64("next_journal_position", uint64(w.nextPos)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) - } else { - w.Logger.Debug("event stream journal is empty") } + w.Logger.Debug( + "event stream worker started", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + w.resetIdleTimer() defer w.idleTimer.Stop() @@ -109,12 +108,12 @@ func (w *worker) tick(ctx context.Context) (bool, error) { case ex := <-w.SubscribeQueue.Recv(): w.handleSubscribe(ex.Request) - ex.Ok() + ex.Zero() return true, nil case ex := <-w.UnsubscribeQueue.Recv(): w.handleUnsubscribe(ex.Request) - ex.Ok() + ex.Zero() return true, nil case <-w.idleTimer.C: @@ -151,13 +150,12 @@ func (w *worker) catchUpWithJournal(ctx context.Context) error { ) (ok bool, err error) { recordCount++ - events := rec.GetEventsAppended().GetEvents() - if len(events) != 0 { + if n := int(rec.StreamOffsetAfter - rec.StreamOffsetBefore); n != 0 { if eventCount == 0 { - w.Logger.Warn("event stream contains events that were not appended by this worker") + w.Logger.Warn("event stream journal contains records with undelivered events") } - w.publishEvents(Offset(rec.StreamOffsetBefore), events) - eventCount += len(events) + w.publishEvents(pos, rec) + eventCount += n } w.nextPos = pos + 1 @@ -165,17 +163,17 @@ func (w *worker) catchUpWithJournal(ctx context.Context) error { return true, nil }, - ); err != nil { - return err + ); journal.IgnoreNotFound(err) != nil { + return fmt.Errorf("unable to range over journal: %w", err) } if recordCount != 0 { w.Logger.Debug( - "caught up to the end of the event stream journal", - slog.Int("record_count", recordCount), - slog.Int("event_count", eventCount), + "processed latest records from event stream journal", slog.Uint64("next_journal_position", uint64(w.nextPos)), slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("journal_record_count", recordCount), + slog.Int("event_count", eventCount), ) } diff --git a/internal/messaging/request.go b/internal/messaging/request.go deleted file mode 100644 index d5ff6a23..00000000 --- a/internal/messaging/request.go +++ /dev/null @@ -1,64 +0,0 @@ -package messaging - -import ( - "context" - "sync" -) - -// Request encapsulates a request. -type Request[Req any] struct { - Context context.Context - Request Req - Error chan<- error -} - -// Ok sends a successful response. -func (e Request[Req]) Ok() { - e.Error <- nil -} - -// Err sends an error response. -func (e Request[Req]) Err(err error) { - e.Error <- err -} - -// RequestQueue is a queue of requests. -type RequestQueue[Req any] struct { - init sync.Once - queue chan Request[Req] -} - -// Recv returns a channel that, when read, dequeues the next request. -func (q *RequestQueue[Req]) Recv() <-chan Request[Req] { - return q.getQueue() -} - -// Send returns a channel that, when written, enqueues an request. -func (q *RequestQueue[Req]) Send() chan<- Request[Req] { - return q.getQueue() -} - -// Do performs a synchronous request. -func (q *RequestQueue[Req]) Do(ctx context.Context, req Req) error { - response := make(chan error, 1) - - select { - case <-ctx.Done(): - return ctx.Err() - case q.Send() <- Request[Req]{ctx, req, response}: - } - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-response: - return err - } -} - -func (q *RequestQueue[Req]) getQueue() chan Request[Req] { - q.init.Do(func() { - q.queue = make(chan Request[Req]) - }) - return q.queue -}