diff --git a/internal/eventstream/reader.go b/internal/eventstream/reader.go new file mode 100644 index 00000000..2d04baa2 --- /dev/null +++ b/internal/eventstream/reader.go @@ -0,0 +1,171 @@ +package eventstream + +import ( + "context" + "fmt" + "time" + + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" + "github.com/dogmatiq/veracity/internal/messaging" + "github.com/dogmatiq/veracity/internal/signaling" +) + +// A Subscriber is sent events from a stream, by way of a [Reader]. +type Subscriber struct { + // StreamID is the ID of the stream from which events are read. + StreamID *uuidpb.UUID + + // Offset is the offset of the next event to read. + // + // It must not be read or modified while the subscription is active. It is + // incremented as events are sent to the subscriber. + Offset Offset + + // Filter is a predicate function that returns true if the subscriber should + // receive the event in the given envelope. + // + // It is used to avoid filling the subscriber's channel with events they are + // not interested in. It is called by the event stream worker in its own + // goroutine, and hence must not block. + Filter func(*envelopepb.Envelope) bool + + // Events is the channel to which the subscriber's events are sent. + Events chan<- Event + + canceled signaling.Event +} + +// A Reader reads ordered events from a stream. +type Reader struct { + Journals journal.BinaryStore + SubscribeQueue *messaging.RequestQueue[*Subscriber] + UnsubscribeQueue *messaging.RequestQueue[*Subscriber] +} + +// Read reads events from a stream and sends them to the given subscriber. +// +// It starts by reading events directly from the stream's journal records. Once +// it has "caught up" to the end of the journal it receives events in +// "real-time" from the supervisor of that stream. +// +// If the subscriber's channel becomes full, it reverts to reading from the +// journal until it catches up again. +func (r *Reader) Read(ctx context.Context, sub *Subscriber) error { + for { + if err := r.readHistorical(ctx, sub); err != nil { + return err + } + + if err := r.readContemporary(ctx, sub); err != nil { + return err + } + } +} + +func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error { + j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID) + if err != nil { + return err + } + defer j.Close() + + searchBegin, searchEnd, err := j.Bounds(ctx) + if err != nil { + return err + } + + return journal.RangeFromSearchResult( + ctx, + j, + searchBegin, searchEnd, + eventstreamjournal.SearchByOffset(uint64(sub.Offset)), + func( + ctx context.Context, + pos journal.Position, + rec *eventstreamjournal.Record, + ) (bool, error) { + begin := Offset(rec.StreamOffsetBefore) + end := Offset(rec.StreamOffsetAfter) + + if begin == end { + // no events in this record + return true, nil + } + + if sub.Offset < begin || sub.Offset >= end { + return false, fmt.Errorf( + "event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)", + pos, + sub.Offset, + begin, + end, + ) + } + + index := sub.Offset - begin + + for _, env := range rec.GetEventsAppended().Events[index:] { + if !sub.Filter(env) { + sub.Offset++ + continue + } + + select { + case <-ctx.Done(): + return false, ctx.Err() + case sub.Events <- Event{sub.StreamID, sub.Offset, env}: + sub.Offset++ + } + } + + return true, nil + }, + ) +} + +func (r *Reader) readContemporary(ctx context.Context, sub *Subscriber) error { + // TODO: remote read + + if err := r.subscribe(ctx, sub); err != nil { + return err + } + defer r.unsubscribe(ctx, sub) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-sub.canceled.Signaled(): + return nil + } +} + +func (r *Reader) subscribe(ctx context.Context, sub *Subscriber) error { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) // TODO: make configurable + cancel() + + if err := r.SubscribeQueue.Do(ctx, sub); err != nil { + return fmt.Errorf("cannot subscribe to event stream: %w", err) + } + + return nil +} + +func (r *Reader) unsubscribe(ctx context.Context, sub *Subscriber) error { + ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) + defer cancel() + + // Cancel the unsubscribe context when the subscription is canceled, + // regardless of the reason. + // + // This handles the situation where the subscription is canceled because the + // worker shutdown (and hence wont service the unsubscribe request). + go func() { + <-sub.canceled.Signaled() + cancel() + }() + + return r.UnsubscribeQueue.Do(ctx, sub) +} diff --git a/internal/eventstream/supervisor.go b/internal/eventstream/supervisor.go index dd5e0fc9..d514435f 100644 --- a/internal/eventstream/supervisor.go +++ b/internal/eventstream/supervisor.go @@ -13,15 +13,17 @@ import ( "github.com/dogmatiq/veracity/internal/signaling" ) -// errShuttingDown is sent in response to append requests that are not serviced +// errShuttingDown is sent in response to requests that are not serviced // because of an error within the event stream supervisor or a worker. var errShuttingDown = errors.New("event stream sub-system is shutting down") // A Supervisor coordinates event stream workers. type Supervisor struct { - Journals journal.BinaryStore - AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] - Logger *slog.Logger + Journals journal.BinaryStore + AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] + SubscribeQueue messaging.RequestQueue[*Subscriber] + UnsubscribeQueue messaging.RequestQueue[*Subscriber] + Logger *slog.Logger shutdown signaling.Latch workers uuidpb.Map[*worker] @@ -64,12 +66,18 @@ func (s *Supervisor) idleState(ctx context.Context) fsm.Action { return fsm.StayInCurrentState() case ex := <-s.AppendQueue.Recv(): - return fsm.With(ex).EnterState(s.forwardAppendState) + return fsm.With(ex).EnterState(s.appendState) + + case req := <-s.SubscribeQueue.Recv(): + return fsm.With(req).EnterState(s.subscribeState) + + case req := <-s.UnsubscribeQueue.Recv(): + return fsm.With(req).EnterState(s.unsubscribeState) } } -// forwardAppendState forwards an append request to the appropriate worker. -func (s *Supervisor) forwardAppendState( +// appendState forwards an append request to the appropriate worker. +func (s *Supervisor) appendState( ctx context.Context, ex messaging.Exchange[AppendRequest, AppendResponse], ) fsm.Action { @@ -79,20 +87,59 @@ func (s *Supervisor) forwardAppendState( return fsm.Fail(err) } + return forwardToWorker(ctx, s, w.AppendQueue.Send(), ex) +} + +// subscribeState forwards a subscribe request to the appropriate worker. +func (s *Supervisor) subscribeState( + ctx context.Context, + req messaging.Request[*Subscriber], +) fsm.Action { + w, err := s.workerByStreamID(ctx, req.Request.StreamID) + if err != nil { + req.Err(errShuttingDown) + return fsm.Fail(err) + } + + return forwardToWorker(ctx, s, w.SubscribeQueue.Send(), req) +} + +// unsubscribeState forwards an unsubscribe request to the appropriate worker. +func (s *Supervisor) unsubscribeState( + ctx context.Context, + req messaging.Request[*Subscriber], +) fsm.Action { + w, ok := s.workers.TryGet(req.Request.StreamID) + if !ok { + req.Ok() + return fsm.EnterState(s.idleState) + } + + return forwardToWorker(ctx, s, w.UnsubscribeQueue.Send(), req) +} + +func forwardToWorker[ + T interface{ Err(error) }, +]( + ctx context.Context, + s *Supervisor, + q chan<- T, + v T, +) fsm.Action { select { case <-ctx.Done(): - ex.Err(errShuttingDown) + v.Err(errShuttingDown) return fsm.Stop() case res := <-s.workerStopped: s.workers.Delete(res.StreamID) if res.Err != nil { - ex.Err(errShuttingDown) + v.Err(errShuttingDown) return fsm.Fail(res.Err) } return fsm.StayInCurrentState() - case w.AppendQueue.Send() <- ex: + case q <- v: return fsm.EnterState(s.idleState) } } @@ -142,7 +189,8 @@ func (s *Supervisor) startWorkerForStreamID( } w := &worker{ - Journal: j, + StreamID: streamID, + Journal: j, Logger: s.Logger.With( slog.String("stream_id", streamID.AsString()), ), diff --git a/internal/eventstream/worker.go b/internal/eventstream/worker.go index 3d2dab45..b8c51f60 100644 --- a/internal/eventstream/worker.go +++ b/internal/eventstream/worker.go @@ -5,44 +5,63 @@ import ( "log/slog" "time" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" "github.com/dogmatiq/persistencekit/journal" "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" - "github.com/dogmatiq/veracity/internal/fsm" "github.com/dogmatiq/veracity/internal/messaging" "github.com/dogmatiq/veracity/internal/signaling" ) -const defaultIdleTimeout = 5 * time.Minute - // A worker manages the state of an event stream. type worker struct { + // StreamID is the ID of the event stream that the worker manages. + StreamID *uuidpb.UUID + // Journal stores the event stream's state. Journal journal.Journal[*eventstreamjournal.Record] // AppendQueue is a queue of requests to append events to the stream. AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] + // SubscribeQueue is a queue of requests to subscribe to the stream. + SubscribeQueue messaging.RequestQueue[*Subscriber] + + // UnsubscribeQueue is a queue of requests to unsubscribe from the stream. + UnsubscribeQueue messaging.RequestQueue[*Subscriber] + // Shutdown signals the worker to stop when it next becomes idle. Shutdown signaling.Latch - // IdleTimeout is the maximum amount of time the worker will sit idle before - // shutting down. If it is non-positive, defaultIdleTimeout is used. - IdleTimeout time.Duration - // Logger is the target for log messages about the stream. Logger *slog.Logger - pos journal.Position - off Offset + nextPos journal.Position + nextOffset Offset + recentEvents []Event + idleTimer *time.Timer + subscribers map[*Subscriber]struct{} } // Run starts the worker. // -// It processes requests until ctx is canceled, r.Shutdown is latched, or -// an error occurrs. +// It processes requests until ctx is canceled, an error occurs, the worker is +// shutdown by the supervisor, or the idle timeout expires. func (w *worker) Run(ctx context.Context) (err error) { - w.Logger.DebugContext(ctx, "event stream worker started") - defer w.Logger.DebugContext(ctx, "event stream worker stopped") + defer func() { + if err != nil { + w.Logger.Debug( + "event stream worker stopped due to an error", + slog.String("error", err.Error()), + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), + ) + } + + for sub := range w.subscribers { + sub.canceled.Signal() + } + }() pos, rec, ok, err := journal.LastRecord(ctx, w.Journal) if err != nil { @@ -50,165 +69,115 @@ func (w *worker) Run(ctx context.Context) (err error) { } if ok { - w.pos = pos + 1 - w.off = Offset(rec.StreamOffsetAfter) + w.nextPos = pos + 1 + w.nextOffset = Offset(rec.StreamOffsetAfter) + + w.Logger.Debug( + "event stream journal has existing records", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + } else { + w.Logger.Debug("event stream journal is empty") } - return fsm.Start(ctx, w.idleState) -} + w.resetIdleTimer() + defer w.idleTimer.Stop() -// idleState waits for a request or the shutdown signal. -func (w *worker) idleState(ctx context.Context) fsm.Action { - duration := w.IdleTimeout - if duration <= 0 { - duration = defaultIdleTimeout + for { + ok, err := w.tick(ctx) + if !ok || err != nil { + return err + } } +} - timeout := time.NewTimer(duration) - defer timeout.Stop() - +// tick handles a single event stream operation. +func (w *worker) tick(ctx context.Context) (bool, error) { select { case <-ctx.Done(): - return fsm.Stop() - - case <-w.Shutdown.Signaled(): - return fsm.Stop() - - case <-timeout.C: - return fsm.Stop() + return false, ctx.Err() case ex := <-w.AppendQueue.Recv(): - return fsm.With(ex).EnterState(w.handleAppendState) - } -} + res, err := w.handleAppend(ctx, ex.Request) + if err != nil { + ex.Err(errShuttingDown) + } else { + ex.Ok(res) + } + return true, err -// handleAppendState appends events to the stream. -func (w *worker) handleAppendState( - ctx context.Context, - ex messaging.Exchange[AppendRequest, AppendResponse], -) fsm.Action { - n := len(ex.Request.Events) - if n == 0 { - panic("cannot record zero events") - } + case ex := <-w.SubscribeQueue.Recv(): + w.handleSubscribe(ex.Request) + ex.Ok() + return true, nil - res, err := w.appendEvents(ctx, ex.Request) - if err != nil { - ex.Err(err) - return fsm.Fail(err) - } + case ex := <-w.UnsubscribeQueue.Recv(): + w.handleUnsubscribe(ex.Request) + ex.Ok() + return true, nil - ex.Ok(res) + case <-w.idleTimer.C: + return w.handleIdle(ctx) - if res.AppendedByPriorAttempt { - return fsm.EnterState(w.idleState) + case <-w.Shutdown.Signaled(): + w.Logger.Debug( + "event stream worker stopped by supervisor", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), + ) + return false, nil } - - return fsm.EnterState(w.idleState) } -// appendEvents writes the events in req to the journal if they have not been -// written already. It returns the offset of the first event. -func (w *worker) appendEvents( - ctx context.Context, - req AppendRequest, -) (AppendResponse, error) { - if w.mightBeDuplicates(req) { - if rec, err := w.findAppendRecord(ctx, req); err == nil { - for i, e := range req.Events { - w.Logger.WarnContext( - ctx, - "ignored event that has already been appended to the stream", - slog.Uint64("stream_offset", uint64(rec.StreamOffsetBefore)+uint64(i)), - slog.String("message_id", e.MessageId.AsString()), - slog.String("description", e.Description), - ) +// catchUpWithJournal reads the journal to catch up with any records that have +// been appended by other nodes. +// +// It is called whenever the worker has some indication that it may be out of +// date, such as when there is an OCC conflict. It is also called periodically +// by otherwise idle workers. +func (w *worker) catchUpWithJournal(ctx context.Context) error { + recordCount := 0 + eventCount := 0 + + if err := w.Journal.Range( + ctx, + w.nextPos, + func( + ctx context.Context, + pos journal.Position, + rec *eventstreamjournal.Record, + ) (ok bool, err error) { + recordCount++ + + events := rec.GetEventsAppended().GetEvents() + if len(events) != 0 { + if eventCount == 0 { + w.Logger.Warn("event stream contains events that were not appended by this worker") + } + w.publishEvents(Offset(rec.StreamOffsetBefore), events) + eventCount += len(events) } - return AppendResponse{ - BeginOffset: Offset(rec.StreamOffsetBefore), - EndOffset: Offset(rec.StreamOffsetAfter), - AppendedByPriorAttempt: true, - }, nil - } else if err != journal.ErrNotFound { - return AppendResponse{}, err - } - } - - before := w.off - after := w.off + Offset(len(req.Events)) + w.nextPos = pos + 1 + w.nextOffset = Offset(rec.StreamOffsetAfter) - if err := w.Journal.Append( - ctx, - w.pos, - eventstreamjournal. - NewRecordBuilder(). - WithStreamOffsetBefore(uint64(before)). - WithStreamOffsetAfter(uint64(after)). - WithEventsAppended(&eventstreamjournal.EventsAppended{ - Events: req.Events, - }). - Build(), + return true, nil + }, ); err != nil { - return AppendResponse{}, err + return err } - for i, e := range req.Events { - w.Logger.InfoContext( - ctx, - "appended event to the stream", - slog.Uint64("stream_offset", uint64(before)+uint64(i)), - slog.String("message_id", e.MessageId.AsString()), - slog.String("description", e.Description), + if recordCount != 0 { + w.Logger.Debug( + "caught up to the end of the event stream journal", + slog.Int("record_count", recordCount), + slog.Int("event_count", eventCount), + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), ) } - w.pos++ - w.off = after - - return AppendResponse{ - BeginOffset: before, - EndOffset: after, - AppendedByPriorAttempt: false, - }, nil -} - -// mightBeDuplicates returns true if it's possible that the events in req have -// already been appended to the stream. -func (w *worker) mightBeDuplicates(req AppendRequest) bool { - // The events can't be duplicates if the lowest possible offset that - // they could have been appended is the current end of the stream. - return req.LowestPossibleOffset < w.off -} - -// findAppendRecord searches the journal to find the record that contains the -// append operation for the given events. -// -// TODO: This is a brute-force approach that searches the journal directly -// (though efficiently). We could improve upon this approach by keeping some -// in-memory state of recent event IDs (either explicitly, or via a bloom -// filter, for example). -func (w *worker) findAppendRecord( - ctx context.Context, - req AppendRequest, -) (*eventstreamjournal.Record, error) { - return journal.ScanFromSearchResult( - ctx, - w.Journal, - 0, - w.pos, - eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)), - func( - ctx context.Context, - _ journal.Position, - rec *eventstreamjournal.Record, - ) (*eventstreamjournal.Record, bool, error) { - if op := rec.GetEventsAppended(); op != nil { - targetID := req.Events[0].MessageId - candidateID := op.Events[0].MessageId - return rec, candidateID.Equal(targetID), nil - } - return nil, false, nil - }, - ) + return nil } diff --git a/internal/eventstream/workerappend.go b/internal/eventstream/workerappend.go new file mode 100644 index 00000000..ad0d4158 --- /dev/null +++ b/internal/eventstream/workerappend.go @@ -0,0 +1,226 @@ +package eventstream + +import ( + "context" + "log/slog" + + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" +) + +func (w *worker) handleAppend( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + if !req.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if len(req.Events) == 0 { + // We panic rather than just failing the exchange because we never want + // empty requests to occupy space in the worker's queue. The sender + // should simply not send empty requests. + panic("received append request with no events") + } + + defer w.resetIdleTimer() + + if req.LowestPossibleOffset > w.nextOffset { + if err := w.catchUpWithJournal(ctx); err != nil { + return AppendResponse{}, err + } + } + + for { + res, err := w.findPriorAppend(ctx, req) + if err != nil { + return AppendResponse{}, err + } + + if res.AppendedByPriorAttempt { + for index, event := range req.Events { + w.Logger.Info( + "discarded duplicate event", + slog.Uint64("stream_offset", uint64(res.BeginOffset)+uint64(index)), + slog.String("message_id", event.MessageId.AsString()), + slog.String("description", event.Description), + ) + } + return res, nil + } + + res, err = w.writeEventsToJournal(ctx, req) + if err == nil { + w.publishEvents(res.BeginOffset, req.Events) + return res, nil + } + + if err != journal.ErrConflict { + return AppendResponse{}, err + } + + if err := w.catchUpWithJournal(ctx); err != nil { + return AppendResponse{}, err + } + } +} + +// findPriorAppend returns an [AppendResponse] if the given [AppendRequest] has +// already been handled. +func (w *worker) findPriorAppend( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + // If the lowest possible offset is ahead of the next offset the request is + // malformed. Either theres a bug in Veracity, or the journal has suffered + // catastrophic data loss. + if req.LowestPossibleOffset > w.nextOffset { + panic("lowest possible offset is greater than the next offset") + } + + // If the lowest possible offset is equal to the next offset, no events + // have been recorded since the the request was created, and hence there + // can be no prior append attempt. + if req.LowestPossibleOffset == w.nextOffset { + return AppendResponse{}, nil + } + + // If the lowest possible offset is in the cache, we can check for + // duplicates without using the journal. We search using the last event in + // the request as it's the most likely to still be in the cache. + lowestPossibleOffset := req.LowestPossibleOffset + Offset(len(req.Events)) + + if cacheIndex := w.findInCache(lowestPossibleOffset); cacheIndex != -1 { + lastMessageIndex := len(req.Events) - 1 + lastMessageID := req.Events[lastMessageIndex].MessageId + + for _, event := range w.recentEvents[cacheIndex:] { + if event.Envelope.MessageId.Equal(lastMessageID) { + return AppendResponse{ + // We know the offset of the last message in the request, so + // we can compute the offset of the first message, even if + // it's no longer in the cache. + BeginOffset: event.Offset - Offset(lastMessageIndex), + EndOffset: event.Offset + 1, + AppendedByPriorAttempt: true, + }, nil + } + } + } + + // Finally, we search the journal for the record containing the events. + rec, err := journal.ScanFromSearchResult( + ctx, + w.Journal, + 0, + w.nextPos, + eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)), + func( + _ context.Context, + _ journal.Position, + rec *eventstreamjournal.Record, + ) (*eventstreamjournal.Record, bool, error) { + if op := rec.GetEventsAppended(); op != nil { + targetID := req.Events[0].MessageId + candidateID := op.Events[0].MessageId + return rec, candidateID.Equal(targetID), nil + } + return nil, false, nil + }, + ) + if err != nil { + return AppendResponse{}, journal.IgnoreNotFound(err) + } + + return AppendResponse{ + BeginOffset: Offset(rec.StreamOffsetBefore), + EndOffset: Offset(rec.StreamOffsetAfter), + AppendedByPriorAttempt: true, + }, nil +} + +func (w *worker) writeEventsToJournal( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + before := w.nextOffset + after := w.nextOffset + Offset(len(req.Events)) + + if err := w.Journal.Append( + ctx, + w.nextPos, + eventstreamjournal. + NewRecordBuilder(). + WithStreamOffsetBefore(uint64(before)). + WithStreamOffsetAfter(uint64(after)). + WithEventsAppended(&eventstreamjournal.EventsAppended{ + Events: req.Events, + }). + Build(), + ); err != nil { + return AppendResponse{}, err + } + + for index, event := range req.Events { + w.Logger.Info( + "appended event to the stream", + slog.Uint64("journal_position", uint64(w.nextPos)), + slog.Uint64("stream_offset", uint64(before)+uint64(index)), + slog.String("message_id", event.MessageId.AsString()), + slog.String("description", event.Description), + ) + } + + w.nextPos++ + w.nextOffset = after + + return AppendResponse{ + BeginOffset: before, + EndOffset: after, + AppendedByPriorAttempt: false, + }, nil +} + +// publishEvents publishes the events to both the recent event cache and any +// interested subscribers. +func (w *worker) publishEvents( + offset Offset, + events []*envelopepb.Envelope, +) { + w.growCache(len(events)) + + for _, env := range events { + event := Event{w.StreamID, offset, env} + offset++ + + w.appendEventToCache(event) + + if len(w.subscribers) == 0 { + continue + } + + var delivered, filtered, canceled int + + for sub := range w.subscribers { + switch w.deliverEventToSubscriber(event, sub) { + case eventDelivered: + delivered++ + case eventFiltered: + filtered++ + case subscriptionCanceled: + canceled++ + } + } + + w.Logger.Debug( + "event published to subscribers", + slog.Uint64("stream_offset", uint64(event.Offset)), + slog.String("message_id", env.MessageId.AsString()), + slog.String("description", env.Description), + slog.Int("delivered_count", delivered), + slog.Int("filtered_count", filtered), + slog.Int("canceled_count", canceled), + ) + } +} diff --git a/internal/eventstream/workercache.go b/internal/eventstream/workercache.go new file mode 100644 index 00000000..072ecb5e --- /dev/null +++ b/internal/eventstream/workercache.go @@ -0,0 +1,80 @@ +package eventstream + +import ( + "slices" + "time" +) + +const ( + // maxCacheAge is the maximum age of an event that will be retained in the + // cache of recent events. + maxCacheAge = 1 * time.Minute + + // maxCacheCapacity is the maximum number of events that will be retained in + // the cache of recent events. + maxCacheCapacity = 1000 +) + +// findInCache returns the index of the event with the given offset in the cache +// of recent events, or -1 if the event is not in the cache. +func (w *worker) findInCache(offset Offset) int { + begin := w.nextOffset - Offset(len(w.recentEvents)) + + if begin <= offset && offset < w.nextOffset { + return int(offset - begin) + } + + return -1 +} + +// growCache grows the cache capacity to fit an additional n events. It removes +// old events if necessary. +// +// It returns the number of events that may be added to the cache. +func (w *worker) growCache(n int) int { + begin := 0 + end := len(w.recentEvents) + + if end > maxCacheCapacity { + panic("cache is over capacity, always use appendToCache() to add events") + } + + if n >= maxCacheCapacity { + // We've requested the entire cache, so just clear it entirely. + end = 0 + n = maxCacheCapacity + } else { + // Otherwise, first remove any events that are older than the cache TTL. + for index, event := range w.recentEvents[begin:end] { + createdAt := event.Envelope.CreatedAt.AsTime() + + if time.Since(createdAt) < maxCacheAge { + begin += index + break + } + } + + // Then, if we still don't have enough space, remove the oldest events. + capacity := end - begin + n + if capacity > maxCacheCapacity { + begin += capacity - maxCacheCapacity + } + } + + // Note, the slice indices are computed without modifying the slice so that + // we only perform a single copy operation. + copy(w.recentEvents, w.recentEvents[begin:end]) + + w.recentEvents = w.recentEvents[:end-begin] + w.recentEvents = slices.Grow(w.recentEvents, n) + + return n +} + +// appendEventToCache appends the given event to the cache of recent events. +func (w *worker) appendEventToCache(event Event) { + if len(w.recentEvents) == cap(w.recentEvents) { + panic("cache is at capacity, call purgeCache() before appending") + } + w.recentEvents = append(w.recentEvents, event) +} diff --git a/internal/eventstream/workeridle.go b/internal/eventstream/workeridle.go new file mode 100644 index 00000000..49e20550 --- /dev/null +++ b/internal/eventstream/workeridle.go @@ -0,0 +1,60 @@ +package eventstream + +import ( + "context" + "log/slog" + "time" +) + +const ( + // shutdownTimeout is the amount of time a worker WITH NO SUBSCRIBERS will + // wait after appending events before shutting down. + shutdownTimeout = 5 * time.Minute + + // catchUpTimeout is the amount of time a worker WITH SUBSCRIBERS will wait + // after appending events before "catching up" with any journal records that + // have been appended by other nodes. + catchUpTimeout = 10 * time.Second +) + +// resetIdleTimer starts or resets the idle timer. +func (w *worker) resetIdleTimer() { + timeout := shutdownTimeout + if len(w.subscribers) > 0 { + timeout = catchUpTimeout + } + + if w.idleTimer == nil { + w.idleTimer = time.NewTimer(timeout) + } else { + if !w.idleTimer.Stop() { + <-w.idleTimer.C + } + w.idleTimer.Reset(timeout) + } +} + +// handleIdle is called when the worker has not appended any new events for some +// period of time. +// +// If there are no subscribers, it returns false, indicating that the worker +// should shutdown. Otherwise, it reads the journal to see if there are new +// events to deliver to the subscribers. +func (w *worker) handleIdle(ctx context.Context) (bool, error) { + if len(w.subscribers) == 0 { + w.Logger.Debug( + "event stream worker stopped due to inactivity", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + return false, nil + } + + if err := w.catchUpWithJournal(ctx); err != nil { + return false, err + } + + w.resetIdleTimer() + + return true, nil +} diff --git a/internal/eventstream/workersubscriber.go b/internal/eventstream/workersubscriber.go new file mode 100644 index 00000000..eb8a2325 --- /dev/null +++ b/internal/eventstream/workersubscriber.go @@ -0,0 +1,139 @@ +package eventstream + +import ( + "fmt" + "log/slog" +) + +// handleSubscribe adds sub to the subscriber list. +// +// It delivers any cached events that the subscriber has not yet seen. If the +// subscriber's requested event is older than the events in the cache the +// subscription is canceled immediately. +func (w *worker) handleSubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if w.subscribers == nil { + w.subscribers = map[*Subscriber]struct{}{} + } + w.subscribers[sub] = struct{}{} + + w.Logger.Debug( + "subscription activated", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Uint64("requested_stream_offset", uint64(sub.Offset)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + + if sub.Offset >= w.nextOffset { + return + } + + index := w.findInCache(sub.Offset) + + if index == -1 { + sub.canceled.Signal() + w.Logger.Warn( + "subscription canceled immediately due request for historical events", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Int("cached_event_count", len(w.recentEvents)), + slog.Uint64("requested_stream_offset", uint64(sub.Offset)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + return + } + + for _, event := range w.recentEvents[index:] { + if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { + return + } + } +} + +// handleUnsubscribe removes sub from the subscriber list. +func (w *worker) handleUnsubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + before := len(w.subscribers) + delete(w.subscribers, sub) + after := len(w.subscribers) + + if before > after { + sub.canceled.Signal() + + w.Logger.Debug( + "subscription canceled by subscriber", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", after), + ) + } +} + +// deliverResult is an enumeration of the possible outcomes of delivering an +// event to a subscriber. +type deliverResult int + +const ( + // eventDelivered means that the event was sent to the subscriber's events + // channel, which may or may not be buffered. + eventDelivered deliverResult = iota + + // eventFiltered means that the event was filtered by the subscriber's + // filter function, and did not need to be delivered. + eventFiltered + + // subscriptionCanceled means that an attempt was made to send the event to + // the subscriber's event channel, but the channel buffer was full (or + // unbuffered and not ready to read), and so the subscription was canceled. + subscriptionCanceled +) + +// deliverEventToSubscriber attempts to deliver an event to a subscriber's event +// channel. +func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { + if event.Offset > sub.Offset { + panic("event is out of order") + } + + if event.Offset < sub.Offset { + return eventFiltered + } + + if !sub.Filter(event.Envelope) { + sub.Offset++ + return eventFiltered + } + + select { + case sub.Events <- event: + sub.Offset++ + return eventDelivered + + default: + delete(w.subscribers, sub) + sub.canceled.Signal() + + w.Logger.Warn( + "subscription canceled because the subscriber can not keep up with the event stream", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", 0), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Uint64("stream_offset", uint64(event.Offset)), + ) + + return subscriptionCanceled + } +} diff --git a/internal/messaging/request.go b/internal/messaging/request.go new file mode 100644 index 00000000..d5ff6a23 --- /dev/null +++ b/internal/messaging/request.go @@ -0,0 +1,64 @@ +package messaging + +import ( + "context" + "sync" +) + +// Request encapsulates a request. +type Request[Req any] struct { + Context context.Context + Request Req + Error chan<- error +} + +// Ok sends a successful response. +func (e Request[Req]) Ok() { + e.Error <- nil +} + +// Err sends an error response. +func (e Request[Req]) Err(err error) { + e.Error <- err +} + +// RequestQueue is a queue of requests. +type RequestQueue[Req any] struct { + init sync.Once + queue chan Request[Req] +} + +// Recv returns a channel that, when read, dequeues the next request. +func (q *RequestQueue[Req]) Recv() <-chan Request[Req] { + return q.getQueue() +} + +// Send returns a channel that, when written, enqueues an request. +func (q *RequestQueue[Req]) Send() chan<- Request[Req] { + return q.getQueue() +} + +// Do performs a synchronous request. +func (q *RequestQueue[Req]) Do(ctx context.Context, req Req) error { + response := make(chan error, 1) + + select { + case <-ctx.Done(): + return ctx.Err() + case q.Send() <- Request[Req]{ctx, req, response}: + } + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-response: + return err + } +} + +func (q *RequestQueue[Req]) getQueue() chan Request[Req] { + q.init.Do(func() { + q.queue = make(chan Request[Req]) + }) + return q.queue +}