From eef9e90f2e3102f4d4885af3772b976ff93f07ac Mon Sep 17 00:00:00 2001 From: James Harris Date: Thu, 11 Apr 2024 06:55:10 +1000 Subject: [PATCH] WIP [ci skip] --- go.mod | 5 +- go.sum | 2 + internal/eventstream/append.go | 51 ++-- internal/eventstream/reader.go | 434 ++++++++++++---------------- internal/eventstream/reader_test.go | 346 ++++++++++++++++++++++ internal/eventstream/subscriber.go | 246 ++++++++++++++++ internal/eventstream/worker.go | 18 +- internal/messaging/failable.go | 4 +- internal/messaging/request.go | 40 +-- internal/optional/doc.go | 3 + internal/optional/optional.go | 60 ++++ internal/test/expect.go | 4 +- 12 files changed, 904 insertions(+), 309 deletions(-) create mode 100644 internal/eventstream/reader_test.go create mode 100644 internal/eventstream/subscriber.go create mode 100644 internal/optional/doc.go create mode 100644 internal/optional/optional.go diff --git a/go.mod b/go.mod index 4fe4b758..2f64f248 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22 require ( github.com/cespare/xxhash/v2 v2.3.0 github.com/dogmatiq/configkit v0.13.0 + github.com/dogmatiq/dapper v0.5.3 github.com/dogmatiq/discoverkit v0.1.2 github.com/dogmatiq/dogma v0.13.0 github.com/dogmatiq/enginekit v0.10.2 @@ -13,7 +14,7 @@ require ( github.com/dogmatiq/marshalkit v0.7.3 github.com/dogmatiq/persistencekit v0.9.3 github.com/dogmatiq/primo v0.2.0 - github.com/dogmatiq/spruce v0.1.0 + github.com/dogmatiq/spruce v0.1.1 github.com/google/go-cmp v0.6.0 go.opentelemetry.io/otel v1.25.0 go.opentelemetry.io/otel/metric v1.25.0 @@ -27,10 +28,10 @@ require ( require ( github.com/dogmatiq/cosyne v0.2.0 // indirect - github.com/dogmatiq/dapper v0.5.3 // indirect github.com/dogmatiq/dyad v1.0.0 // indirect github.com/dogmatiq/iago v0.4.0 // indirect github.com/dogmatiq/interopspec v0.5.3 // indirect + github.com/dogmatiq/jumble v0.1.0 // indirect github.com/dogmatiq/linger v1.1.0 // indirect github.com/dogmatiq/projectionkit v0.6.5 // indirect github.com/fxamacker/cbor/v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 2499cb1a..195ce5e0 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,8 @@ github.com/dogmatiq/projectionkit v0.6.5 h1:3Ues+QL5oVtJcx4WogMA6XjJF1QyOlcx1uRm github.com/dogmatiq/projectionkit v0.6.5/go.mod h1:FfbWIzePx6RDAl0yl/FZ/9UaGq6wkEDKvfeDi3dg4EE= github.com/dogmatiq/spruce v0.1.0 h1:xIcWPJA33Et+qIC1RORjP+8gSNtErdcTr0eEbtFk2oU= github.com/dogmatiq/spruce v0.1.0/go.mod h1:0+zqOtlidouuzQr2k2Od7RRFR7Sk4p373kVyTtH6ovw= +github.com/dogmatiq/spruce v0.1.1 h1:n4Jbv6MAZvcjowIR21b/4G9opiTKfp3m3+e3W3PTjbM= +github.com/dogmatiq/spruce v0.1.1/go.mod h1:0+zqOtlidouuzQr2k2Od7RRFR7Sk4p373kVyTtH6ovw= github.com/dogmatiq/sqltest v0.3.0 h1:DCwyLWfVk/ZHsqq5Itq3H/Lqsh/CIQ6nIRwI4YLywFc= github.com/dogmatiq/sqltest v0.3.0/go.mod h1:a8Da8NhU4m3lq5Sybhiv+ZQowSnGHWTIJHFNInVtffg= github.com/dogmatiq/testkit v0.13.11 h1:ikXg/Cxq58tzHL27JKCkVqUUElJCHcso7N/ymd3Wins= diff --git a/internal/eventstream/append.go b/internal/eventstream/append.go index 045b7b8c..324a065e 100644 --- a/internal/eventstream/append.go +++ b/internal/eventstream/append.go @@ -78,9 +78,9 @@ func (w *worker) handleAppend( return res, nil } - res, err = w.writeEventsToJournal(ctx, req) + pos, rec, err := w.writeEventsToJournal(ctx, req) if err == nil { - w.publishEvents(res.BeginOffset, req.Events) + w.publishEvents(pos, rec) return res, nil } @@ -171,41 +171,32 @@ func (w *worker) findPriorAppend( func (w *worker) writeEventsToJournal( ctx context.Context, req AppendRequest, -) (AppendResponse, error) { - before := w.nextOffset - after := w.nextOffset + Offset(len(req.Events)) - - if err := w.Journal.Append( - ctx, - w.nextPos, - eventstreamjournal. - NewRecordBuilder(). - WithStreamOffsetBefore(uint64(before)). - WithStreamOffsetAfter(uint64(after)). - WithEventsAppended(&eventstreamjournal.EventsAppended{ - Events: req.Events, - }). - Build(), - ); err != nil { - return AppendResponse{}, err +) (journal.Position, *eventstreamjournal.Record, error) { + pos := w.nextPos + rec := eventstreamjournal. + NewRecordBuilder(). + WithStreamOffsetBefore(uint64(w.nextOffset)). + WithStreamOffsetAfter(uint64(w.nextOffset) + uint64(len(req.Events))). + WithEventsAppended(&eventstreamjournal.EventsAppended{Events: req.Events}). + Build() + + if err := w.Journal.Append(ctx, pos, rec); err != nil { + return 0, nil, err } - for index, event := range req.Events { + w.nextPos++ + + for _, event := range req.Events { w.Logger.Info( "appended event to the stream", - slog.Uint64("journal_position", uint64(w.nextPos)), - slog.Uint64("stream_offset", uint64(before)+uint64(index)), + slog.Uint64("journal_position", uint64(pos)), + slog.Uint64("stream_offset", uint64(w.nextOffset)), slog.String("message_id", event.MessageId.AsString()), slog.String("description", event.Description), ) - } - w.nextPos++ - w.nextOffset = after + w.nextOffset++ + } - return AppendResponse{ - BeginOffset: before, - EndOffset: after, - AppendedByPriorAttempt: false, - }, nil + return pos, rec, nil } diff --git a/internal/eventstream/reader.go b/internal/eventstream/reader.go index 17f3873a..04ed36f9 100644 --- a/internal/eventstream/reader.go +++ b/internal/eventstream/reader.go @@ -2,135 +2,79 @@ package eventstream import ( "context" + "errors" "fmt" "log/slog" "time" - "github.com/dogmatiq/enginekit/protobuf/envelopepb" "github.com/dogmatiq/enginekit/protobuf/uuidpb" "github.com/dogmatiq/persistencekit/journal" "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" "github.com/dogmatiq/veracity/internal/messaging" - "github.com/dogmatiq/veracity/internal/signaling" ) -// A Subscriber is sent events from a stream, by way of a [Reader]. -type Subscriber struct { - // StreamID is the ID of the stream from which events are read. - StreamID *uuidpb.UUID - - // Offset is the offset of the next event to read. - // - // It must not be read or modified while the subscription is active. It is - // incremented as events are sent to the subscriber. - Offset Offset - - // Filter is a predicate function that returns true if the subscriber should - // receive the event in the given envelope. - // - // It is used to avoid filling the subscriber's channel with events they are - // not interested in. It is called by the event stream worker in its own - // goroutine, and hence must not block. - Filter func(*envelopepb.Envelope) bool - - // Events is the channel to which the subscriber's events are sent. - Events chan<- Event - - canceled signaling.Event -} +// defaultSubscribeTimeout is the default maximum time to wait for a +// subscription to be acknowledged by the supervisor before reverting to reading +// from the journal. +const defaultSubscribeTimeout = 3 * time.Second // A Reader reads ordered events from a stream. type Reader struct { Journals journal.BinaryStore SubscribeQueue *messaging.RequestQueue[*Subscriber] UnsubscribeQueue *messaging.RequestQueue[*Subscriber] + SubscribeTimeout time.Duration + Logger *slog.Logger } -// Read reads events from a stream and sends them to the given subscriber. +// Read sends events from a stream to the given subscriber's events channel. // -// It starts by reading events directly from the stream's journal records. Once -// it has "caught up" to the end of the journal it receives events in -// "real-time" from the supervisor of that stream. -// -// If the subscriber's channel becomes full, it reverts to reading from the -// journal until it catches up again. +// It first attempts to "sync" with the supervisor of the event stream to +// receive contemporary events in "real-time". If the subscriber's requested +// offset is too old to be obtained from the supervisor, or if the events +// channel becomes full, the reader obtains events directly from the journal +// until the last record is reached, then the process repeats. func (r *Reader) Read(ctx context.Context, sub *Subscriber) error { + if sub.id == nil { + sub.id = uuidpb.Generate() + } + for { - if err := r.readHistorical(ctx, sub); err != nil { + if err := r.readContemporary(ctx, sub); err != nil { return err } - if err := r.readContemporary(ctx, sub); err != nil { + if err := r.readHistorical(ctx, sub); err != nil { return err } } } -func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error { - j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID) - if err != nil { - return err - } - defer j.Close() - - searchBegin, searchEnd, err := j.Bounds(ctx) - if err != nil { - return err - } - - return journal.RangeFromSearchResult( - ctx, - j, - searchBegin, searchEnd, - eventstreamjournal.SearchByOffset(uint64(sub.Offset)), - func( - ctx context.Context, - pos journal.Position, - rec *eventstreamjournal.Record, - ) (bool, error) { - begin := Offset(rec.StreamOffsetBefore) - end := Offset(rec.StreamOffsetAfter) - - if begin == end { - // no events in this record - return true, nil - } - - if sub.Offset < begin || sub.Offset >= end { - return false, fmt.Errorf( - "event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)", - pos, - sub.Offset, - begin, - end, - ) - } - - index := sub.Offset - begin - - for _, env := range rec.GetEventsAppended().Events[index:] { - if !sub.Filter(env) { - sub.Offset++ - continue - } - - select { - case <-ctx.Done(): - return false, ctx.Err() - case sub.Events <- Event{sub.StreamID, sub.Offset, env}: - sub.Offset++ - } - } - - return true, nil - }, - ) -} - func (r *Reader) readContemporary(ctx context.Context, sub *Subscriber) error { // TODO: remote read + r.Logger.Debug( + "subscribing to receive contemporary events", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + if err := r.subscribe(ctx, sub); err != nil { + // If the subscription request times out, but the parent context isn't + // canceled we revert to reading from the journal (by returning nil). + if errors.Is(err, context.DeadlineExceeded) && ctx.Err() == nil { + r.Logger.Warn( + "timed-out waiting for supervisor to acknowledge subscription", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + return nil + } + return err } defer r.unsubscribe(ctx, sub) @@ -139,13 +83,33 @@ func (r *Reader) readContemporary(ctx context.Context, sub *Subscriber) error { case <-ctx.Done(): return ctx.Err() case <-sub.canceled.Signaled(): + r.Logger.Debug( + "subscription canceled by supervisor", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) return nil } } func (r *Reader) subscribe(ctx context.Context, sub *Subscriber) error { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) // TODO: make configurable - cancel() + // Impose our own subscription timeout. This handles the case that the + // supervisor is not running or cannot service our subscription request in a + // timely manner, in which case we will revert to reading from the journal. + timeout := r.SubscribeTimeout + if timeout <= 0 { + timeout = defaultSubscribeTimeout + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + select { + case <-ctx.Done(): + return ctx.Err() + case r.SubscribeQueue.Send() <- Request[sub]: if err := r.SubscribeQueue.Do(ctx, sub); err != nil { return fmt.Errorf("cannot subscribe to event stream: %w", err) @@ -171,180 +135,160 @@ func (r *Reader) unsubscribe(ctx context.Context, sub *Subscriber) error { return r.UnsubscribeQueue.Do(ctx, sub) } -// handleSubscribe adds sub to the subscriber list. -// -// It delivers any cached events that the subscriber has not yet seen. If the -// subscriber's requested event is older than the events in the cache the -// subscription is canceled immediately. -func (w *worker) handleSubscribe(sub *Subscriber) { - if !sub.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") +func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error { + j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID) + if err != nil { + return err } + defer j.Close() - if w.subscribers == nil { - w.subscribers = map[*Subscriber]struct{}{} - } - w.subscribers[sub] = struct{}{} - - w.Logger.Debug( - "subscription activated", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("requested_stream_offset", uint64(sub.Offset)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) + var records, delivered, filtered int - if sub.Offset >= w.nextOffset { - return - } + fn := func( + ctx context.Context, + pos journal.Position, + rec *eventstreamjournal.Record, + ) (bool, error) { + records++ - index := w.findInCache(sub.Offset) - - if index == -1 { - sub.canceled.Signal() - w.Logger.Warn( - "subscription canceled immediately due request for historical events", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Int("cached_event_count", len(w.recentEvents)), - slog.Uint64("requested_stream_offset", uint64(sub.Offset)), - slog.Uint64("next_stream_offset", uint64(w.nextOffset)), - ) - return - } + begin := Offset(rec.StreamOffsetBefore) + end := Offset(rec.StreamOffsetAfter) - for _, event := range w.recentEvents[index:] { - if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { - return + if begin == end { + // This condition is not producible at the moment, but is present to + // provide forward compatibility with future journal record types. + sub.beginPos = pos + 1 + return true, nil } - } -} - -// handleUnsubscribe removes sub from the subscriber list. -func (w *worker) handleUnsubscribe(sub *Subscriber) { - if !sub.StreamID.Equal(w.StreamID) { - panic("received request for a different stream ID") - } - - before := len(w.subscribers) - delete(w.subscribers, sub) - after := len(w.subscribers) - if before > after { - sub.canceled.Signal() + if sub.Offset < begin || sub.Offset >= end { + return false, fmt.Errorf( + "event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)", + pos, + sub.Offset, + begin, + end, + ) + } - w.Logger.Debug( - "subscription canceled by subscriber", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), - slog.Int("subscriber_count", after), - ) - } -} + sub.beginPos = pos + sub.beginPosIsDefinitive = true -// deliverResult is an enumeration of the possible outcomes of delivering an -// event to a subscriber. -type deliverResult int + index := sub.Offset - begin -const ( - // eventDelivered means that the event was sent to the subscriber's events - // channel, which may or may not be buffered. - eventDelivered deliverResult = iota + for _, env := range rec.GetEventsAppended().Events[index:] { + if sub.Filter != nil && !sub.Filter(env) { + sub.Offset++ + filtered++ + continue + } - // eventFiltered means that the event was filtered by the subscriber's - // filter function, and did not need to be delivered. - eventFiltered + select { + case <-ctx.Done(): + return false, ctx.Err() + case sub.Events <- Event{sub.StreamID, sub.Offset, env}: + sub.Offset++ + delivered++ + } + } - // subscriptionCanceled means that an attempt was made to send the event to - // the subscriber's event channel, but the channel buffer was full (or - // unbuffered and not ready to read), and so the subscription was canceled. - subscriptionCanceled -) + sub.beginPos++ -// deliverEventToSubscriber attempts to deliver an event to a subscriber's event -// channel. -func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { - if event.Offset > sub.Offset { - panic("event is out of order") + return true, nil } - if event.Offset < sub.Offset { - return eventFiltered + iter := r.searchHistorical + if sub.beginPosIsDefinitive { + iter = r.rangeHistorical } - if !sub.Filter(event.Envelope) { - sub.Offset++ - return eventFiltered + if err := iter(ctx, sub, j, fn); err != nil { + return err } - select { - case sub.Events <- event: - sub.Offset++ - return eventDelivered - - default: - delete(w.subscribers, sub) - sub.canceled.Signal() - - w.Logger.Warn( - "subscription canceled because the subscriber can not keep up with the event stream", - slog.String("channel_address", fmt.Sprint(sub.Events)), - slog.Int("channel_capacity", cap(sub.Events)), - slog.Int("channel_headroom", 0), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("stream_offset", uint64(event.Offset)), - ) + r.Logger.Debug( + "finished reading historical events from journal", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + slog.Int("journal_record_count", records), + slog.Int("events_delivered_count", delivered), + slog.Int("events_filtered_count", filtered), + ) - return subscriptionCanceled - } + return nil } -// publishEvents publishes the events to both the recent event cache and any -// interested subscribers. -func (w *worker) publishEvents( - offset Offset, - events []*envelopepb.Envelope, -) { - skip := w.growCache(len(events)) +// rangeHistorical delivers all (relevent) events to the subscriber, starting +// with the events in the record at position sub.beginPos. +func (r *Reader) rangeHistorical( + ctx context.Context, + sub *Subscriber, + j journal.Journal[*eventstreamjournal.Record], + fn journal.RangeFunc[*eventstreamjournal.Record], +) error { + r.Logger.Debug( + "ranging over historical events in journal", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + slog.Uint64("journal_begin_position", uint64(sub.beginPos)), + ) - for i, env := range events { - event := Event{w.StreamID, offset, env} - offset++ + err := j.Range(ctx, sub.beginPos, fn) - if i >= skip { - w.appendEventToCache(event) - } + if errors.Is(err, journal.ErrNotFound) { + // If we're ranging over a journal record that does not exist it means + // we've delivered all events but were unable to subscribe to receive + // contemporary events from the supervisor. + return nil + } - if len(w.subscribers) == 0 { - continue - } + return err +} - var delivered, filtered, canceled int +// searchHistorical performs a binary search to find the journal record that +// contains the next event to deliver to sub, then delivers that event an all +// subsequent events until the end of the journal. +func (r *Reader) searchHistorical( + ctx context.Context, + sub *Subscriber, + j journal.Journal[*eventstreamjournal.Record], + fn journal.RangeFunc[*eventstreamjournal.Record], +) error { + begin, end, err := j.Bounds(ctx) + if err != nil { + return err + } - for sub := range w.subscribers { - switch w.deliverEventToSubscriber(event, sub) { - case eventDelivered: - delivered++ - case eventFiltered: - filtered++ - case subscriptionCanceled: - canceled++ - } - } + begin = max(begin, sub.beginPos) - w.Logger.Debug( - "event published to subscribers", - slog.Uint64("stream_offset", uint64(event.Offset)), - slog.String("message_id", env.MessageId.AsString()), - slog.String("description", env.Description), - slog.Int("delivered_count", delivered), - slog.Int("filtered_count", filtered), - slog.Int("canceled_count", canceled), - ) + r.Logger.Debug( + "searching for historical events in journal", + slog.String("stream_id", sub.StreamID.AsString()), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + slog.Uint64("journal_begin_position", uint64(begin)), + slog.Uint64("journal_end_position", uint64(end)), + ) + + err = journal.RangeFromSearchResult( + ctx, + j, + begin, end, + eventstreamjournal.SearchByOffset(uint64(sub.Offset)), + fn, + ) + + if errors.Is(err, journal.ErrNotFound) { + // If the event is not in the journal then we don't want to re-search + // these same records in the future. + sub.beginPos = end + return nil } + + return err } diff --git a/internal/eventstream/reader_test.go b/internal/eventstream/reader_test.go new file mode 100644 index 00000000..faa28445 --- /dev/null +++ b/internal/eventstream/reader_test.go @@ -0,0 +1,346 @@ +package eventstream_test + +import ( + "context" + "errors" + "testing" + + . "github.com/dogmatiq/dogma/fixtures" + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/identitypb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + . "github.com/dogmatiq/marshalkit/fixtures" + "github.com/dogmatiq/persistencekit/driver/memory/memoryjournal" + "github.com/dogmatiq/spruce" + "github.com/dogmatiq/veracity/internal/envelope" + . "github.com/dogmatiq/veracity/internal/eventstream" + "github.com/dogmatiq/veracity/internal/test" +) + +func TestReader(t *testing.T) { + t.Parallel() + + type dependencies struct { + Journals *memoryjournal.BinaryStore + Supervisor *Supervisor + Packer *envelope.Packer + Reader *Reader + } + + setup := func(t test.TestingT) (deps dependencies) { + deps.Journals = &memoryjournal.BinaryStore{} + + logger := spruce.NewLogger(t) + + deps.Supervisor = &Supervisor{ + Journals: deps.Journals, + Logger: logger, + } + + deps.Packer = &envelope.Packer{ + Application: identitypb.New("", uuidpb.Generate()), + Marshaler: Marshaler, + } + + deps.Reader = &Reader{ + Journals: deps.Journals, + SubscribeQueue: &deps.Supervisor.SubscribeQueue, + UnsubscribeQueue: &deps.Supervisor.UnsubscribeQueue, + Logger: logger, + } + + return deps + } + + t.Run("it reads historical events", func(t *testing.T) { + streamID := uuidpb.Generate() + tctx := test.WithContext(t) + deps := setup(tctx) + + supervisor := test. + RunInBackground(t, "supervisor", deps.Supervisor.Run). + UntilStopped() + + env1 := deps.Packer.Pack(MessageE1) + env2 := deps.Packer.Pack(MessageE2) + env3 := deps.Packer.Pack(MessageE3) + + // Journal record with a single event. + if _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + env1, + }, + }, + ); err != nil { + t.Fatal(err) + } + + // Journal record with multiple events. + if _, err := deps.Supervisor.AppendQueue.Do( + tctx, + AppendRequest{ + StreamID: streamID, + Events: []*envelopepb.Envelope{ + env2, + env3, + }, + }, + ); err != nil { + t.Fatal(err) + } + + // Stop the supervisor now to verify that it's not necessary to read + // historical events. We set the timeout to the minimum value possible + // to reduce the test run-time. + supervisor.StopAndWait() + deps.Reader.SubscribeTimeout = 1 + + events := make(chan Event) + sub := &Subscriber{ + StreamID: streamID, + Events: events, + } + + ctx, cancel := context.WithCancel(tctx) + defer cancel() + + result := make(chan error, 1) + go func() { + result <- deps.Reader.Read(ctx, sub) + }() + + for offset, env := range []*envelopepb.Envelope{env1, env2, env3} { + test.ExpectChannelToReceive( + tctx, + events, + Event{ + StreamID: streamID, + Offset: Offset(offset), + Envelope: env, + }, + ) + } + + cancel() + err := <-result + if !errors.Is(err, context.Canceled) { + t.Fatalf("unexpected error: got %v, want %v", err, context.Canceled) + } + }) + + // t.Run("it reads contemporary events", func(t *testing.T) { + // streamID := uuidpb.Generate() + // tctx := test.WithContext(t) + // deps := setup(tctx) + + // test. + // RunInBackground(t, "supervisor", deps.Supervisor.Run). + // UntilTestEnds() + + // events := make(chan Event, 100) + // sub := &Subscriber{ + // StreamID: streamID, + // Events: events, + // } + + // ctx, cancel := context.WithCancel(tctx) + // defer cancel() + + // // Give the reader a different journal store so we know it isn't reading + // // the events from the journal. + // deps.Reader.Journals = &memoryjournal.BinaryStore{} + + // result := make(chan error, 1) + // go func() { + // result <- deps.Reader.Read(ctx, sub) + // }() + + // envelopes := []*envelopepb.Envelope{ + // deps.Packer.Pack(MessageE1), + // deps.Packer.Pack(MessageE2), + // deps.Packer.Pack(MessageE3), + // } + + // go func() { + // for _, env := range envelopes { + // _, err := deps.Supervisor.AppendQueue.Do( + // ctx, + // AppendRequest{ + // StreamID: streamID, + // Events: []*envelopepb.Envelope{env}, + // }, + // ) + // if err != nil { + // t.Error(err) + // } + // } + // }() + + // for offset, env := range envelopes { + // test.ExpectChannelToReceive( + // tctx, + // events, + // Event{ + // StreamID: streamID, + // Offset: Offset(offset), + // Envelope: env, + // }, + // ) + // } + + // cancel() + // err := <-result + // if !errors.Is(err, context.Canceled) { + // t.Fatalf("unexpected error: got %v, want %v", err, context.Canceled) + // } + // }) + + // t.Run("it reverts to the journal if it cannot keep up with the event stream", func(t *testing.T) { + // streamID := uuidpb.Generate() + // tctx := test.WithContext(t) + // deps := setup(tctx) + + // test. + // RunInBackground(t, "supervisor", deps.Supervisor.Run). + // UntilTestEnds() + + // events := make(chan Event) // note: unbuffered + // sub := &Subscriber{ + // StreamID: streamID, + // Events: events, + // } + + // ctx, cancel := context.WithCancel(tctx) + // defer cancel() + + // result := make(chan error, 1) + // go func() { + // result <- deps.Reader.Read(ctx, sub) + // }() + + // envelopes := []*envelopepb.Envelope{ + // deps.Packer.Pack(MessageE1), + // deps.Packer.Pack(MessageE2), + // deps.Packer.Pack(MessageE3), + // } + + // go func() { + // for _, env := range envelopes { + // _, err := deps.Supervisor.AppendQueue.Do( + // ctx, + // AppendRequest{ + // StreamID: streamID, + // Events: []*envelopepb.Envelope{env}, + // }, + // ) + // if err != nil { + // t.Error(err) + // } + // } + // }() + + // for offset, env := range envelopes { + // time.Sleep(500 * time.Microsecond) + // test.ExpectChannelToReceive( + // tctx, + // events, + // Event{ + // StreamID: streamID, + // Offset: Offset(offset), + // Envelope: env, + // }, + // ) + // } + + // cancel() + // err := <-result + // if !errors.Is(err, context.Canceled) { + // t.Fatalf("unexpected error: got %v, want %v", err, context.Canceled) + // } + // }) + + // t.Run("it does not duplicate or mis-order events", func(t *testing.T) { + // t.Parallel() + + // streamID := uuidpb.Generate() + // tctx := test.WithContext(t) + // deps := setup(tctx) + + // conflictingSupervisor := &Supervisor{ + // Journals: deps.Journals, + // Logger: spruce.NewLogger(t), + // } + + // test. + // RunInBackground(t, "supervisor-under-test", deps.Supervisor.Run). + // UntilTestEnds() + + // test. + // RunInBackground(t, "conflict-generating-supervisor", conflictingSupervisor.Run). + // UntilTestEnds() + + // appendLoop := func(ctx context.Context, s *Supervisor, base int) error { + // for n := range 100 { + // n += base + + // if _, err := s.AppendQueue.Do( + // ctx, + // AppendRequest{ + // StreamID: streamID, + // Events: []*envelopepb.Envelope{ + // deps.Packer.Pack(MessageE{ + // Value: n, + // }), + // }, + // }, + // ); err != nil { + // t.Logf("failed to append event #%d: %s", n, err) + // return err + // } + // } + + // return nil + // } + + // events := make(chan Event) + + // sub := &Subscriber{ + // StreamID: streamID, + // Events: events, + // } + + // ctx, cancel := context.WithCancel(tctx) + // g, ctx := errgroup.WithContext(ctx) + + // g.Go(func() error { return appendLoop(ctx, deps.Supervisor, 1000) }) + // g.Go(func() error { return appendLoop(ctx, conflictingSupervisor, 2000) }) + // g.Go(func() error { + // if err := deps.Reader.Read(ctx, sub); err != nil { + // t.Logf("failed to read events: %s", err) + // return err + // } + // return nil + // }) + + // for offset := Offset(0); offset < 200; offset++ { + // select { + // case <-ctx.Done(): + // if err := g.Wait(); err != nil { + // t.Fatal(err) + // } + // case e := <-events: + // if e.Offset != offset { + // t.Fatalf("expected offset: got %d, want %d", e.Offset, offset) + // } + // } + // } + + // cancel() + // if err := g.Wait(); err != nil { + // t.Log(err) + // } + // }) +} diff --git a/internal/eventstream/subscriber.go b/internal/eventstream/subscriber.go new file mode 100644 index 00000000..325d4561 --- /dev/null +++ b/internal/eventstream/subscriber.go @@ -0,0 +1,246 @@ +package eventstream + +import ( + "fmt" + "log/slog" + + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" + "github.com/dogmatiq/veracity/internal/signaling" +) + +// A Subscriber is sent events from a stream, by way of a [Reader]. +type Subscriber struct { + // StreamID is the ID of the stream from which events are read. + StreamID *uuidpb.UUID + + // Offset is the offset of the next event to read. + // + // It must not be read or modified while the subscription is active. It is + // incremented as events are sent to the subscriber. + Offset Offset + + // Filter is a predicate function that returns true if the subscriber should + // receive the event in the given envelope. + // + // It is used to avoid filling the subscriber's channel with events they are + // not interested in. It is called by the event stream worker in its own + // goroutine, and hence must not block. + Filter func(*envelopepb.Envelope) bool + + // Events is the channel to which the subscriber's events are sent. + Events chan<- Event + + // canceled indicates that the [Supervisor] has canceled the subscription. + canceled signaling.Event + + // id is a unique identifier for the subscriber. + id *uuidpb.UUID + + // beginPos is the journal position to begin ranging or search for the next + // event to deliver to the subscriber. + beginPos journal.Position + + // beginPosIsDefinitive is true if beginPos "definitive", meaning that it + // represents the exact position of the record containing the next event to + // deliver to the subscriber. + beginPosIsDefinitive bool +} + +// handleSubscribe adds sub to the subscriber list. +// +// It delivers any cached events that the subscriber has not yet seen. If the +// subscriber's requested event is older than the events in the cache the +// subscription is canceled immediately. +func (w *worker) handleSubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if w.subscribers == nil { + w.subscribers = map[*Subscriber]struct{}{} + } + w.subscribers[sub] = struct{}{} + + w.Logger.Debug( + "subscription activated", + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + + if sub.Offset >= w.nextOffset { + return + } + + index := w.findInCache(sub.Offset) + + if index == -1 { + w.Logger.Warn( + "subscription canceled immediately due to request for uncached historical events", + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("cached_event_count", len(w.recentEvents)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + + sub.canceled.Signal() + return + } + + for _, event := range w.recentEvents[index:] { + if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { + return + } + } +} + +// handleUnsubscribe removes sub from the subscriber list. +func (w *worker) handleUnsubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + before := len(w.subscribers) + delete(w.subscribers, sub) + after := len(w.subscribers) + + if before > after { + w.Logger.Debug( + "subscription canceled by subscriber", + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", after), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + sub.canceled.Signal() + } +} + +// deliverResult is an enumeration of the possible outcomes of delivering an +// event to a subscriber. +type deliverResult int + +const ( + // eventDelivered means that the event was sent to the subscriber's events + // channel, which may or may not be buffered. + eventDelivered deliverResult = iota + + // eventFiltered means that the event was filtered by the subscriber's + // filter function, and did not need to be delivered. + eventFiltered + + // subscriptionCanceled means that an attempt was made to send the event to + // the subscriber's event channel, but the channel buffer was full (or + // unbuffered and not ready to read), and so the subscription was canceled. + subscriptionCanceled +) + +// deliverEventToSubscriber attempts to deliver an event to a subscriber's event +// channel. +func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { + if event.Offset > sub.Offset { + panic("event is out of order") + } + + if event.Offset < sub.Offset { + return eventFiltered + } + + if sub.Filter != nil && !sub.Filter(event.Envelope) { + sub.Offset++ + return eventFiltered + } + + select { + case sub.Events <- event: + sub.Offset++ + return eventDelivered + + default: + delete(w.subscribers, sub) + + w.Logger.Warn( + "subscription canceled because the subscriber cannot keep up with the event stream", + slog.Int("subscriber_count", len(w.subscribers)), + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.String("subscriber_id", sub.id.AsString()), + slog.Int("subscriber_headroom", cap(sub.Events)-len(sub.Events)), + slog.Uint64("subscriber_stream_offset", uint64(sub.Offset)), + ) + + sub.canceled.Signal() + + return subscriptionCanceled + } +} + +// publishEvents publishes the events to both the recent event cache and any +// interested subscribers. +func (w *worker) publishEvents(pos journal.Position, rec *eventstreamjournal.Record) int { + op := rec.GetEventsAppended() + if op == nil { + return 0 + } + + skip := w.growCache(len(op.Events)) + + // Update the subscriber's position to refer to the record containing the + // events we're about to deliver. + for sub := range w.subscribers { + sub.beginPos = pos + sub.beginPosIsDefinitive = true + } + + offset := Offset(rec.StreamOffsetBefore) + for i, env := range op.Events { + event := Event{w.StreamID, offset, env} + offset++ + + if i >= skip { + w.appendEventToCache(event) + } + + if len(w.subscribers) == 0 { + continue + } + + var delivered, filtered, canceled int + + for sub := range w.subscribers { + switch w.deliverEventToSubscriber(event, sub) { + case eventDelivered: + delivered++ + case eventFiltered: + filtered++ + case subscriptionCanceled: + canceled++ + } + } + + w.Logger.Debug( + "event published to subscribers", + slog.Uint64("stream_offset", uint64(event.Offset)), + slog.String("message_id", env.MessageId.AsString()), + slog.String("description", env.Description), + slog.Int("subscriber_delivered_count", delivered), + slog.Int("subscriber_filtered_count", filtered), + slog.Int("subscriber_canceled_count", canceled), + ) + } + + // Any remaining (i.e. uncanceled) subscribers should now look for the + // following journal record. + for sub := range w.subscribers { + sub.beginPos++ + } + + return len(op.Events) +} diff --git a/internal/eventstream/worker.go b/internal/eventstream/worker.go index 4d6c66c6..25ef0e76 100644 --- a/internal/eventstream/worker.go +++ b/internal/eventstream/worker.go @@ -108,11 +108,13 @@ func (w *worker) tick(ctx context.Context) (bool, error) { return true, err case ex := <-w.SubscribeQueue.Recv(): + // TODO(jmalloc): Handle ex.Context cancelation. w.handleSubscribe(ex.Request) ex.Ok() return true, nil case ex := <-w.UnsubscribeQueue.Recv(): + // TODO(jmalloc): Handle ex.Context cancelation. w.handleUnsubscribe(ex.Request) ex.Ok() return true, nil @@ -151,15 +153,11 @@ func (w *worker) catchUpWithJournal(ctx context.Context) error { ) (ok bool, err error) { recordCount++ - events := rec.GetEventsAppended().GetEvents() - if len(events) != 0 { - if eventCount == 0 { - w.Logger.Warn("event stream contains events that were not appended by this worker") - } - w.publishEvents(Offset(rec.StreamOffsetBefore), events) - eventCount += len(events) + n := w.publishEvents(pos, rec) + if eventCount == 0 && n != 0 { + w.Logger.Warn("event stream contains events that were not appended by this worker") } - + eventCount += n w.nextPos = pos + 1 w.nextOffset = Offset(rec.StreamOffsetAfter) @@ -172,10 +170,10 @@ func (w *worker) catchUpWithJournal(ctx context.Context) error { if recordCount != 0 { w.Logger.Debug( "caught up to the end of the event stream journal", - slog.Int("record_count", recordCount), - slog.Int("event_count", eventCount), slog.Uint64("next_journal_position", uint64(w.nextPos)), slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("journal_record_count", recordCount), + slog.Int("event_count", eventCount), ) } diff --git a/internal/messaging/failable.go b/internal/messaging/failable.go index cdefa4a7..4f1df9d3 100644 --- a/internal/messaging/failable.go +++ b/internal/messaging/failable.go @@ -1,13 +1,13 @@ package messaging // Failable encapsulates a value of type T, or an error indicating that the -// value can not be obtained. +// value cannot be obtained. type Failable[T any] struct { value T err error } -// Get returns the value, or an error if the value can not be obtained. +// Get returns the value, or an error if the value cannot be obtained. func (f Failable[T]) Get() (T, error) { return f.value, f.err } diff --git a/internal/messaging/request.go b/internal/messaging/request.go index d5ff6a23..e56259a6 100644 --- a/internal/messaging/request.go +++ b/internal/messaging/request.go @@ -6,46 +6,45 @@ import ( ) // Request encapsulates a request. -type Request[Req any] struct { - Context context.Context - Request Req - Error chan<- error +type Request[T any] struct { + Request T + Response chan<- error } // Ok sends a successful response. -func (e Request[Req]) Ok() { - e.Error <- nil +func (e Request[T]) Ok() { + e.Response <- nil } // Err sends an error response. -func (e Request[Req]) Err(err error) { - e.Error <- err +func (e Request[T]) Err(err error) { + e.Response <- err } -// RequestQueue is a queue of requests. -type RequestQueue[Req any] struct { +// RequestQueue is a queue of requests of type T. +type RequestQueue[T any] struct { init sync.Once - queue chan Request[Req] + queue chan Request[T] } // Recv returns a channel that, when read, dequeues the next request. -func (q *RequestQueue[Req]) Recv() <-chan Request[Req] { +func (q *RequestQueue[T]) Recv() <-chan Request[T] { return q.getQueue() } // Send returns a channel that, when written, enqueues an request. -func (q *RequestQueue[Req]) Send() chan<- Request[Req] { +func (q *RequestQueue[T]) Send() chan<- Request[T] { return q.getQueue() } // Do performs a synchronous request. -func (q *RequestQueue[Req]) Do(ctx context.Context, req Req) error { - response := make(chan error, 1) +func (q *RequestQueue[T]) Do(ctx context.Context, v T) error { + req, response := q.NewRequest(ctx, v) select { case <-ctx.Done(): return ctx.Err() - case q.Send() <- Request[Req]{ctx, req, response}: + case q.Send() <- req: } select { @@ -56,9 +55,14 @@ func (q *RequestQueue[Req]) Do(ctx context.Context, req Req) error { } } -func (q *RequestQueue[Req]) getQueue() chan Request[Req] { +func (q *RequestQueue[T]) NewRequest(v T) (Request[T], <-chan error) { + response := make(chan error, 1) + return Request[T]{ctx, v, response}, response +} + +func (q *RequestQueue[T]) getQueue() chan Request[T] { q.init.Do(func() { - q.queue = make(chan Request[Req]) + q.queue = make(chan Request[T]) }) return q.queue } diff --git a/internal/optional/doc.go b/internal/optional/doc.go new file mode 100644 index 00000000..a222688e --- /dev/null +++ b/internal/optional/doc.go @@ -0,0 +1,3 @@ +// Package optional provides an "optional" type that can be used to represent a +// value that may or may not be present. +package optional diff --git a/internal/optional/optional.go b/internal/optional/optional.go new file mode 100644 index 00000000..f3a8861a --- /dev/null +++ b/internal/optional/optional.go @@ -0,0 +1,60 @@ +package optional + +// Optional represents an optional value of type T. +type Optional[T any] struct { + v T + ok bool +} + +// Zero returns an [Optional] that contains the zero-value of type T. +func Zero[T any]() Optional[T] { + return Optional[T]{ok: true} +} + +// Eq returns an [Optional] that contains the given value. +func Eq[T any](v T) Optional[T] { + return Optional[T]{v, true} +} + +// None returns an [Optional] that does not contain a value. +func None[T any]() Optional[T] { + return Optional[T]{} +} + +// Get returns the value. It panics if the value is not present. +func (o *Optional[T]) Get() T { + if !o.ok { + panic("optional value is not present") + } + return o.v +} + +// TryGet returns the value and a boolean indicating whether the value is +// present. +func (o *Optional[T]) TryGet() (v T, ok bool) { + return o.v, o.ok +} + +// IsPresent returns true if the value is present. +func (o *Optional[T]) IsPresent() bool { + return o.ok +} + +// Set sets the value. +func (o *Optional[T]) Set(v T) { + o.v = v + o.ok = true +} + +// Addr returns a pointer to the value, or nil if the value is not present. +func (o *Optional[T]) Addr() *T { + if o.ok { + return &o.v + } + return nil +} + +// Reset clears the value. +func (o *Optional[T]) Reset() { + *o = None[T]() +} diff --git a/internal/test/expect.go b/internal/test/expect.go index c121c9f3..ba9068d6 100644 --- a/internal/test/expect.go +++ b/internal/test/expect.go @@ -3,6 +3,7 @@ package test import ( "time" + "github.com/dogmatiq/dapper" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/protobuf/testing/protocmp" @@ -62,11 +63,10 @@ func ExpectChannelToReceive[T any]( want, options..., ) - t.Log("received the expected value on the channel") + t.Log("received the expected value on the channel:\n" + dapper.Format(got)) } else { t.Fatal("channel closed while expecting to receive a value") } - } return got