diff --git a/internal/eventstream/reader.go b/internal/eventstream/reader.go new file mode 100644 index 00000000..4cbc05ed --- /dev/null +++ b/internal/eventstream/reader.go @@ -0,0 +1,146 @@ +package eventstream + +import ( + "context" + "fmt" + "time" + + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" + "github.com/dogmatiq/veracity/internal/messaging" + "github.com/dogmatiq/veracity/internal/signaling" +) + +type Subscriber struct { + StreamID *uuidpb.UUID + Offset Offset + Filter func(*envelopepb.Envelope) bool + Events chan<- Event + + canceled signaling.Event +} + +type Reader struct { + Journals journal.BinaryStore + SubscribeQueue *messaging.ExchangeQueue[*Subscriber, struct{}] + UnsubscribeQueue *messaging.ExchangeQueue[*Subscriber, struct{}] +} + +func (r *Reader) Read(ctx context.Context, sub *Subscriber) error { + for { + if err := r.readHistorical(ctx, sub); err != nil { + return err + } + + if err := r.readContemporary(ctx, sub); err != nil { + return err + } + } +} + +func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error { + j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID) + if err != nil { + return err + } + defer j.Close() + + searchBegin, searchEnd, err := j.Bounds(ctx) + if err != nil { + return err + } + + return journal.RangeFromSearchResult( + ctx, + j, + searchBegin, searchEnd, + eventstreamjournal.SearchByOffset(uint64(sub.Offset)), + func( + ctx context.Context, + pos journal.Position, + rec *eventstreamjournal.Record, + ) (bool, error) { + begin := Offset(rec.StreamOffsetBefore) + end := Offset(rec.StreamOffsetAfter) + + if begin == end { + // no events in this record + return true, nil + } + + if sub.Offset < begin || sub.Offset >= end { + return false, fmt.Errorf( + "event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)", + pos, + sub.Offset, + begin, + end, + ) + } + + index := sub.Offset - begin + + for _, env := range rec.GetEventsAppended().Events[index:] { + if sub.Filter(env) { + sub.Offset++ + continue + } + + select { + case <-ctx.Done(): + return false, ctx.Err() + case sub.Events <- Event{sub.StreamID, sub.Offset, env}: + sub.Offset++ + } + } + + return true, nil + }, + ) +} + +func (r *Reader) readContemporary(ctx context.Context, cur *Subscriber) error { + // TODO: remote read + + if err := r.subscribe(ctx, cur); err != nil { + return err + } + defer r.unsubscribe(cur) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-cur.canceled.Signaled(): + return nil + } +} + +func (r *Reader) subscribe(ctx context.Context, cur *Subscriber) error { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) // TODO: make configurable + cancel() + + if _, err := r.SubscribeQueue.Exchange(ctx, cur); err != nil { + return fmt.Errorf("cannot subscribe to event stream: %w", err) + } + + return nil +} + +func (r *Reader) unsubscribe(cur *Subscriber) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Cancel the unsubscribe context when the subscription is canceled, + // regardless of the reason. + // + // This handles the situation where the subscription is canceled because the + // worker shutdown (and hence wont service the unsubscribe request). + go func() { + <-cur.canceled.Signaled() + cancel() + }() + + r.UnsubscribeQueue.Exchange(ctx, cur) +} diff --git a/internal/eventstream/subscriber.go b/internal/eventstream/subscriber.go deleted file mode 100644 index 909f18ea..00000000 --- a/internal/eventstream/subscriber.go +++ /dev/null @@ -1,39 +0,0 @@ -package eventstream - -import ( - "github.com/dogmatiq/enginekit/protobuf/uuidpb" -) - -// SubscribeRequest is a request to receive notifications when events are -// appended to a stream. -type SubscribeRequest struct { - // StreamID is the ID of the stream from which events are received. - StreamID *uuidpb.UUID -} - -// SubscribeResponse is the successful response to a [SubscribeRequest]. -type SubscribeResponse struct { - // Offset is the offset of the first event that will be delivered to the - // subscriber. - Offset Offset - - // Events is the channel to which events are sent. - // - // The channel is closed when the subscription is canceled, either because - // an [UnsubscribeRequest] was issued, the event stream supervisor is - // shutdown, or the channel's buffer is full. - Events chan<- Event -} - -// UnsubscribeRequest is a request to stop receiving notifications when events -// are appended to a stream. -type UnsubscribeRequest struct { - // StreamID is the ID of the stream from which events are being received. - StreamID *uuidpb.UUID - - // Events is the channel to be unsubscribed. - Events chan<- Event -} - -// UnsubscribeResponse is the successful response to an [UnsubscribeRequest]. -type UnsubscribeResponse struct{} diff --git a/internal/eventstream/supervisor.go b/internal/eventstream/supervisor.go index 0e6efdb1..ed238e1d 100644 --- a/internal/eventstream/supervisor.go +++ b/internal/eventstream/supervisor.go @@ -13,7 +13,7 @@ import ( "github.com/dogmatiq/veracity/internal/signaling" ) -// errShuttingDown is sent in response to append requests that are not serviced +// errShuttingDown is sent in response to requests that are not serviced // because of an error within the event stream supervisor or a worker. var errShuttingDown = errors.New("event stream sub-system is shutting down") @@ -21,8 +21,8 @@ var errShuttingDown = errors.New("event stream sub-system is shutting down") type Supervisor struct { Journals journal.BinaryStore AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] - SubscribeQueue messaging.ExchangeQueue[SubscribeRequest, SubscribeResponse] - UnsubscribeQueue messaging.ExchangeQueue[UnsubscribeRequest, UnsubscribeResponse] + SubscribeQueue messaging.ExchangeQueue[*Subscriber, struct{}] + UnsubscribeQueue messaging.ExchangeQueue[*Subscriber, struct{}] Logger *slog.Logger shutdown signaling.Latch @@ -95,14 +95,14 @@ func (s *Supervisor) appendState( // subscribeState forwards a subscribe request to the appropriate worker. func (s *Supervisor) subscribeState( ctx context.Context, - ex messaging.Exchange[SubscribeRequest, SubscribeResponse], + ex messaging.Exchange[*Subscriber, struct{}], ) fsm.Action { return forwardToWorker( ctx, s, ex.Request.StreamID, ex, - func(w *worker) *messaging.ExchangeQueue[SubscribeRequest, SubscribeResponse] { + func(w *worker) *messaging.ExchangeQueue[*Subscriber, struct{}] { return &w.SubscribeQueue }, ) @@ -111,14 +111,16 @@ func (s *Supervisor) subscribeState( // unsubscribeState forwards an unsubscribe request to the appropriate worker. func (s *Supervisor) unsubscribeState( ctx context.Context, - ex messaging.Exchange[UnsubscribeRequest, UnsubscribeResponse], + ex messaging.Exchange[*Subscriber, struct{}], ) fsm.Action { + // TODO: this will currently start a worker even if it's not already running + // (and hence the subscription cannot be active) return forwardToWorker( ctx, s, ex.Request.StreamID, ex, - func(w *worker) *messaging.ExchangeQueue[UnsubscribeRequest, UnsubscribeResponse] { + func(w *worker) *messaging.ExchangeQueue[*Subscriber, struct{}] { return &w.UnsubscribeQueue }, ) @@ -200,7 +202,8 @@ func (s *Supervisor) startWorkerForStreamID( } w := &worker{ - Journal: j, + StreamID: streamID, + Journal: j, Logger: s.Logger.With( slog.String("stream_id", streamID.AsString()), ), diff --git a/internal/eventstream/worker.go b/internal/eventstream/worker.go index 1414aa79..6be42b6f 100644 --- a/internal/eventstream/worker.go +++ b/internal/eventstream/worker.go @@ -2,11 +2,9 @@ package eventstream import ( "context" - "fmt" "log/slog" "time" - "github.com/dogmatiq/enginekit/protobuf/envelopepb" "github.com/dogmatiq/enginekit/protobuf/uuidpb" "github.com/dogmatiq/persistencekit/journal" "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" @@ -14,14 +12,9 @@ import ( "github.com/dogmatiq/veracity/internal/signaling" ) -const ( - defaultIdleTimeout = 5 * time.Minute - defaultSubscriberCapacity = 128 // TODO(jmalloc): make this configurable -) - // A worker manages the state of an event stream. type worker struct { - // streamID is the ID of the event stream that the worker manages. + // StreamID is the ID of the event stream that the worker manages. StreamID *uuidpb.UUID // Journal stores the event stream's state. @@ -31,26 +24,22 @@ type worker struct { AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] // SubscribeQueue is a queue of requests to subscribe to the stream. - SubscribeQueue messaging.ExchangeQueue[SubscribeRequest, SubscribeResponse] + SubscribeQueue messaging.ExchangeQueue[*Subscriber, struct{}] // UnsubscribeQueue is a queue of requests to unsubscribe from the stream. - UnsubscribeQueue messaging.ExchangeQueue[UnsubscribeRequest, UnsubscribeResponse] + UnsubscribeQueue messaging.ExchangeQueue[*Subscriber, struct{}] // Shutdown signals the worker to stop when it next becomes idle. Shutdown signaling.Latch - // IdleTimeout is the maximum amount of time the worker will wait to receive - // an append request before polling the journal and/or shutting down. If it - // is non-positive, defaultIdleTimeout is used. - IdleTimeout time.Duration - // Logger is the target for log messages about the stream. Logger *slog.Logger - pos journal.Position - offset Offset - idleTimer *time.Timer - subscribers map[chan<- Event]struct{} + nextPos journal.Position + nextOffset Offset + recentEvents []Event + idleTimer *time.Timer + subscribers map[*Subscriber]struct{} } // Run starts the worker. @@ -61,16 +50,16 @@ func (w *worker) Run(ctx context.Context) (err error) { defer func() { if err != nil { w.Logger.Debug( - "event stream worker stopped due to error", + "event stream worker stopped due to an error", slog.String("error", err.Error()), - slog.Uint64("next_journal_position", uint64(w.pos)), - slog.Uint64("next_stream_offset", uint64(w.offset)), + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), slog.Int("subscriber_count", len(w.subscribers)), ) } for sub := range w.subscribers { - close(sub) + sub.canceled.Signal() } }() @@ -80,13 +69,13 @@ func (w *worker) Run(ctx context.Context) (err error) { } if ok { - w.pos = pos + 1 - w.offset = Offset(rec.StreamOffsetAfter) + w.nextPos = pos + 1 + w.nextOffset = Offset(rec.StreamOffsetAfter) w.Logger.Debug( "event stream journal has existing records", - slog.Uint64("next_journal_position", uint64(w.pos)), - slog.Uint64("next_stream_offset", uint64(w.offset)), + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), ) } else { w.Logger.Debug("event stream journal is empty") @@ -96,350 +85,58 @@ func (w *worker) Run(ctx context.Context) (err error) { defer w.idleTimer.Stop() for { - select { - case <-ctx.Done(): - return ctx.Err() - - case ex := <-w.AppendQueue.Recv(): - if err := w.append(ctx, ex); err != nil { - return err - } - - case ex := <-w.SubscribeQueue.Recv(): - w.subscribe(ex) - - case ex := <-w.UnsubscribeQueue.Recv(): - w.unsubscribe(ex) - - case <-w.idleTimer.C: - if ok, err := w.idle(ctx); !ok || err != nil { - return err - } - - case <-w.Shutdown.Signaled(): - w.Logger.Debug( - "event stream worker stopped by supervisor", - slog.Uint64("next_journal_position", uint64(w.pos)), - slog.Uint64("next_stream_offset", uint64(w.offset)), - slog.Int("subscriber_count", len(w.subscribers)), - ) - return nil + ok, err := w.tick(ctx) + if !ok || err != nil { + return err } } } -// append processes an [AppendRequest]. -func (w *worker) append( - ctx context.Context, - ex messaging.Exchange[AppendRequest, AppendResponse], -) (err error) { - if len(ex.Request.Events) == 0 { - panic("cannot record zero events") - } - - defer func() { - w.resetIdleTimer() +func (w *worker) tick(ctx context.Context) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + case ex := <-w.AppendQueue.Recv(): + res, err := w.handleAppend(ctx, ex.Request) if err != nil { - ex.Err(err) - } - }() - - for { - // Record the events in the journal. - res, err := w.writeEvents(ctx, ex.Request) - if err != nil { - if err != journal.ErrConflict { - return err - } - - // If there was a conflict, we first read the conflicting records, - // then retry to write these events. - if err := w.readEvents(ctx); err == nil { - return err - } - - continue - } - - // Respond with an ok regardless of whether the events were appended by this - // request or a prior one, making the operation is idempotent from the - // perspective of the request sender. - ex.Ok(res) - - // Dispatch the events to subscribers only if they were appended by this - // request, we never "redeliver" them. - if !res.AppendedByPriorAttempt { - w.deliverEvents(res.BeginOffset, ex.Request.Events) - } - - return nil - } -} - -// subscribe processes a [SubscribeRequest]. -func (w *worker) subscribe( - ex messaging.Exchange[SubscribeRequest, SubscribeResponse], -) { - if w.subscribers == nil { - w.subscribers = map[chan<- Event]struct{}{} - } - - sub := make(chan Event, defaultSubscriberCapacity) - w.subscribers[sub] = struct{}{} - - w.Logger.Debug( - "subscription activated", - slog.String("channel_address", fmt.Sprint(sub)), - slog.Int("channel_capacity", cap(sub)), - slog.Int("channel_headroom", cap(sub)-len(sub)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("next_stream_offset", uint64(w.offset)), - ) - - ex.Ok(SubscribeResponse{ - Offset: w.offset, - Events: sub, - }) -} - -func (w *worker) unsubscribe( - ex messaging.Exchange[UnsubscribeRequest, UnsubscribeResponse], -) { - sub := ex.Request.Events - - before := len(w.subscribers) - delete(w.subscribers, sub) - after := len(w.subscribers) - - if before > after { - close(sub) - - w.Logger.Debug( - "subscription canceled by subscriber", - slog.String("channel_address", fmt.Sprint(sub)), - slog.Int("channel_capacity", cap(sub)), - slog.Int("channel_headroom", cap(sub)-len(sub)), - slog.Int("subscriber_count", after), - ) - } - - ex.Ok(UnsubscribeResponse{}) -} - -// writeEvents records the events in req to the journal if they have not been -// recorded already. It returns the offset of the first event. -func (w *worker) writeEvents( - ctx context.Context, - req AppendRequest, -) (AppendResponse, error) { - if w.mightBeDuplicates(req) { - if pos, rec, err := w.findAppendRecord(ctx, req); err == nil { - for i, e := range req.Events { - w.Logger.Warn( - "ignored event that has already been appended to the stream", - slog.Uint64("journal_position", uint64(pos)), - slog.Uint64("stream_offset", uint64(rec.StreamOffsetBefore)+uint64(i)), - slog.String("message_id", e.MessageId.AsString()), - slog.String("description", e.Description), - ) - } - - return AppendResponse{ - BeginOffset: Offset(rec.StreamOffsetBefore), - EndOffset: Offset(rec.StreamOffsetAfter), - AppendedByPriorAttempt: true, - }, nil - } else if err != journal.ErrNotFound { - return AppendResponse{}, err - } - } - - before := w.offset - after := w.offset + Offset(len(req.Events)) - - if err := w.Journal.Append( - ctx, - w.pos, - eventstreamjournal. - NewRecordBuilder(). - WithStreamOffsetBefore(uint64(before)). - WithStreamOffsetAfter(uint64(after)). - WithEventsAppended(&eventstreamjournal.EventsAppended{ - Events: req.Events, - }). - Build(), - ); err != nil { - return AppendResponse{}, err - } - - for i, e := range req.Events { - w.Logger.Info( - "appended event to the stream", - slog.Uint64("journal_position", uint64(w.pos)), - slog.Uint64("stream_offset", uint64(before)+uint64(i)), - slog.String("message_id", e.MessageId.AsString()), - slog.String("description", e.Description), - ) - } - - w.pos++ - w.offset = after - - return AppendResponse{ - BeginOffset: before, - EndOffset: after, - AppendedByPriorAttempt: false, - }, nil -} - -// mightBeDuplicates returns true if it's possible that the events in req have -// already been appended to the stream. -func (w *worker) mightBeDuplicates(req AppendRequest) bool { - // TODO: We could improve upon this by keeping some in-memory representation - // of recent event IDs (either explicitly, or a bloom filter, for example). - - // The events can't be duplicates if the lowest possible offset that - // they could have been appended is the current end of the stream. - return req.LowestPossibleOffset < w.offset -} - -// findAppendRecord searches the journal to find the record that contains the -// append operation for the given events. -func (w *worker) findAppendRecord( - ctx context.Context, - req AppendRequest, -) (pos journal.Position, rec *eventstreamjournal.Record, err error) { - rec, err = journal.ScanFromSearchResult( - ctx, - w.Journal, - 0, - w.pos, - eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)), - func( - ctx context.Context, - p journal.Position, - rec *eventstreamjournal.Record, - ) (*eventstreamjournal.Record, bool, error) { - if op := rec.GetEventsAppended(); op != nil { - targetID := req.Events[0].MessageId - candidateID := op.Events[0].MessageId - - if candidateID.Equal(targetID) { - pos = p - return rec, true, nil - } - } - - return nil, false, nil - }, - ) - - return pos, rec, err -} - -// deliverEvents delivers events that have been appended to the stream to all -// subscribers. -func (w *worker) deliverEvents( - offset Offset, - events []*envelopepb.Envelope, -) { - for _, env := range events { - if len(w.subscribers) == 0 { - // There are no subscribers left, either because there were never - // any, or they've all descynchronized. - break + ex.Err(errShuttingDown) + } else { + ex.Ok(res) } + return true, err - e := Event{ - StreamID: w.StreamID, - Offset: offset, - Envelope: env, - } - offset++ + case ex := <-w.SubscribeQueue.Recv(): + w.handleSubscribe(ex.Request) + ex.Ok(struct{}{}) + return true, nil - var delivered, canceled int + case ex := <-w.UnsubscribeQueue.Recv(): + w.handleUnsubscribe(ex.Request) + ex.Ok(struct{}{}) + return true, nil - for sub := range w.subscribers { - select { - case sub <- e: - delivered++ - default: - canceled++ - - delete(w.subscribers, sub) - close(sub) - - w.Logger.Warn( - "subscription canceled due to insufficient channel headroom", - slog.String("channel_address", fmt.Sprint(sub)), - slog.Int("channel_capacity", cap(sub)), - slog.Int("channel_headroom", cap(sub)-len(sub)), - slog.Int("subscriber_count", len(w.subscribers)), - slog.Uint64("stream_offset", uint64(e.Offset)), - ) - } - } + case <-w.idleTimer.C: + return w.handleIdle(ctx) + case <-w.Shutdown.Signaled(): w.Logger.Debug( - "event delivered to subscribers", - slog.Uint64("stream_offset", uint64(e.Offset)), - slog.String("message_id", env.MessageId.AsString()), - slog.String("description", env.Description), - slog.Int("delivery_count", delivered), - slog.Int("cancelation_count", canceled), - ) - } -} - -// resetIdleTimer resets the idle timer to the configured timeout. -func (w *worker) resetIdleTimer() { - d := w.IdleTimeout - if d <= 0 { - d = defaultIdleTimeout - } - - if w.idleTimer == nil { - w.idleTimer = time.NewTimer(d) - } else { - if !w.idleTimer.Stop() { - <-w.idleTimer.C - } - w.idleTimer.Reset(d) - } -} - -func (w *worker) idle(ctx context.Context) (bool, error) { - // If there are no subscribers there's literally nothing to do, so the - // worker can shutdown. - if len(w.subscribers) == 0 { - w.Logger.Debug( - "event stream worker stopped due to inactivity", - slog.Uint64("next_journal_position", uint64(w.pos)), - slog.Uint64("next_stream_offset", uint64(w.offset)), + "event stream worker stopped by supervisor", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + slog.Int("subscriber_count", len(w.subscribers)), ) return false, nil } - - // Otherwise, we poll the journal to see if some other node has appended new - // events that we can deliver to the subscribers. - if err := w.readEvents(ctx); err != nil { - return false, err - } - - w.resetIdleTimer() - - return true, nil } -// readEvents reads events from the journal and delivers them to subscribers. -func (w *worker) readEvents(ctx context.Context) error { +func (w *worker) catchUpWithJournal(ctx context.Context) error { recordCount := 0 eventCount := 0 if err := w.Journal.Range( ctx, - w.pos, + w.nextPos, func( ctx context.Context, pos journal.Position, @@ -452,12 +149,12 @@ func (w *worker) readEvents(ctx context.Context) error { if eventCount == 0 { w.Logger.Warn("event stream contains events that were not appended by this worker") } - w.deliverEvents(Offset(rec.StreamOffsetBefore), events) + w.publishEvents(Offset(rec.StreamOffsetBefore), events) eventCount += len(events) } - w.pos = pos + 1 - w.offset = Offset(rec.StreamOffsetAfter) + w.nextPos = pos + 1 + w.nextOffset = Offset(rec.StreamOffsetAfter) return true, nil }, @@ -470,8 +167,8 @@ func (w *worker) readEvents(ctx context.Context) error { "processed journal records that were not appended by this worker", slog.Int("record_count", recordCount), slog.Int("event_count", eventCount), - slog.Uint64("next_journal_position", uint64(w.pos)), - slog.Uint64("next_stream_offset", uint64(w.offset)), + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), ) } diff --git a/internal/eventstream/workerappend.go b/internal/eventstream/workerappend.go new file mode 100644 index 00000000..ad0d4158 --- /dev/null +++ b/internal/eventstream/workerappend.go @@ -0,0 +1,226 @@ +package eventstream + +import ( + "context" + "log/slog" + + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" +) + +func (w *worker) handleAppend( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + if !req.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if len(req.Events) == 0 { + // We panic rather than just failing the exchange because we never want + // empty requests to occupy space in the worker's queue. The sender + // should simply not send empty requests. + panic("received append request with no events") + } + + defer w.resetIdleTimer() + + if req.LowestPossibleOffset > w.nextOffset { + if err := w.catchUpWithJournal(ctx); err != nil { + return AppendResponse{}, err + } + } + + for { + res, err := w.findPriorAppend(ctx, req) + if err != nil { + return AppendResponse{}, err + } + + if res.AppendedByPriorAttempt { + for index, event := range req.Events { + w.Logger.Info( + "discarded duplicate event", + slog.Uint64("stream_offset", uint64(res.BeginOffset)+uint64(index)), + slog.String("message_id", event.MessageId.AsString()), + slog.String("description", event.Description), + ) + } + return res, nil + } + + res, err = w.writeEventsToJournal(ctx, req) + if err == nil { + w.publishEvents(res.BeginOffset, req.Events) + return res, nil + } + + if err != journal.ErrConflict { + return AppendResponse{}, err + } + + if err := w.catchUpWithJournal(ctx); err != nil { + return AppendResponse{}, err + } + } +} + +// findPriorAppend returns an [AppendResponse] if the given [AppendRequest] has +// already been handled. +func (w *worker) findPriorAppend( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + // If the lowest possible offset is ahead of the next offset the request is + // malformed. Either theres a bug in Veracity, or the journal has suffered + // catastrophic data loss. + if req.LowestPossibleOffset > w.nextOffset { + panic("lowest possible offset is greater than the next offset") + } + + // If the lowest possible offset is equal to the next offset, no events + // have been recorded since the the request was created, and hence there + // can be no prior append attempt. + if req.LowestPossibleOffset == w.nextOffset { + return AppendResponse{}, nil + } + + // If the lowest possible offset is in the cache, we can check for + // duplicates without using the journal. We search using the last event in + // the request as it's the most likely to still be in the cache. + lowestPossibleOffset := req.LowestPossibleOffset + Offset(len(req.Events)) + + if cacheIndex := w.findInCache(lowestPossibleOffset); cacheIndex != -1 { + lastMessageIndex := len(req.Events) - 1 + lastMessageID := req.Events[lastMessageIndex].MessageId + + for _, event := range w.recentEvents[cacheIndex:] { + if event.Envelope.MessageId.Equal(lastMessageID) { + return AppendResponse{ + // We know the offset of the last message in the request, so + // we can compute the offset of the first message, even if + // it's no longer in the cache. + BeginOffset: event.Offset - Offset(lastMessageIndex), + EndOffset: event.Offset + 1, + AppendedByPriorAttempt: true, + }, nil + } + } + } + + // Finally, we search the journal for the record containing the events. + rec, err := journal.ScanFromSearchResult( + ctx, + w.Journal, + 0, + w.nextPos, + eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)), + func( + _ context.Context, + _ journal.Position, + rec *eventstreamjournal.Record, + ) (*eventstreamjournal.Record, bool, error) { + if op := rec.GetEventsAppended(); op != nil { + targetID := req.Events[0].MessageId + candidateID := op.Events[0].MessageId + return rec, candidateID.Equal(targetID), nil + } + return nil, false, nil + }, + ) + if err != nil { + return AppendResponse{}, journal.IgnoreNotFound(err) + } + + return AppendResponse{ + BeginOffset: Offset(rec.StreamOffsetBefore), + EndOffset: Offset(rec.StreamOffsetAfter), + AppendedByPriorAttempt: true, + }, nil +} + +func (w *worker) writeEventsToJournal( + ctx context.Context, + req AppendRequest, +) (AppendResponse, error) { + before := w.nextOffset + after := w.nextOffset + Offset(len(req.Events)) + + if err := w.Journal.Append( + ctx, + w.nextPos, + eventstreamjournal. + NewRecordBuilder(). + WithStreamOffsetBefore(uint64(before)). + WithStreamOffsetAfter(uint64(after)). + WithEventsAppended(&eventstreamjournal.EventsAppended{ + Events: req.Events, + }). + Build(), + ); err != nil { + return AppendResponse{}, err + } + + for index, event := range req.Events { + w.Logger.Info( + "appended event to the stream", + slog.Uint64("journal_position", uint64(w.nextPos)), + slog.Uint64("stream_offset", uint64(before)+uint64(index)), + slog.String("message_id", event.MessageId.AsString()), + slog.String("description", event.Description), + ) + } + + w.nextPos++ + w.nextOffset = after + + return AppendResponse{ + BeginOffset: before, + EndOffset: after, + AppendedByPriorAttempt: false, + }, nil +} + +// publishEvents publishes the events to both the recent event cache and any +// interested subscribers. +func (w *worker) publishEvents( + offset Offset, + events []*envelopepb.Envelope, +) { + w.growCache(len(events)) + + for _, env := range events { + event := Event{w.StreamID, offset, env} + offset++ + + w.appendEventToCache(event) + + if len(w.subscribers) == 0 { + continue + } + + var delivered, filtered, canceled int + + for sub := range w.subscribers { + switch w.deliverEventToSubscriber(event, sub) { + case eventDelivered: + delivered++ + case eventFiltered: + filtered++ + case subscriptionCanceled: + canceled++ + } + } + + w.Logger.Debug( + "event published to subscribers", + slog.Uint64("stream_offset", uint64(event.Offset)), + slog.String("message_id", env.MessageId.AsString()), + slog.String("description", env.Description), + slog.Int("delivered_count", delivered), + slog.Int("filtered_count", filtered), + slog.Int("canceled_count", canceled), + ) + } +} diff --git a/internal/eventstream/workercache.go b/internal/eventstream/workercache.go new file mode 100644 index 00000000..1d7d6d95 --- /dev/null +++ b/internal/eventstream/workercache.go @@ -0,0 +1,86 @@ +package eventstream + +import ( + "slices" + "time" +) + +const ( + // maxCacheAge is the maximum age of an event that will be retained in the + // cache of recent events. + maxCacheAge = 1 * time.Minute + + // maxCacheCapacity is the maximum number of events that will be retained in + // the cache of recent events. + maxCacheCapacity = 1000 +) + +// findInCache returns the index of the event with the given offset in the cache +// of recent events, or -1 if the event is not in the cache. +func (w *worker) findInCache(offset Offset) int { + begin := w.nextOffset - Offset(len(w.recentEvents)) + + if begin <= offset && offset < w.nextOffset { + return int(offset - begin) + } + + return -1 +} + +// growCache grows the cache capacity to fit an additional n events. It removes +// old events if necessary. +// +// It returns the number of events that may be added to the cache. +func (w *worker) growCache(n int) int { + begin := 0 + end := len(w.recentEvents) + + if end > maxCacheCapacity { + // This should never happen, as any code that is appending to the cache + // should first call purgeCache(n) to ensure there is enough space + // without exceeding the capacity, then use appendToCache(). + panic("cache exceeds capacity") + } + + if n >= maxCacheCapacity { + // We've requested the entire cache, so just clear it entirely. + end = 0 + n = maxCacheCapacity + } else { + // Otherwise, first remove any events that are older than the cache TTL. + for index, event := range w.recentEvents[begin:end] { + createdAt := event.Envelope.CreatedAt.AsTime() + + if time.Since(createdAt) < maxCacheAge { + begin += index + break + } + } + + // Then, if we still don't have enough space, remove the oldest events. + capacity := end - begin + n + if capacity > maxCacheCapacity { + begin += capacity - maxCacheCapacity + } + } + + // Note, the slice indices are computed without modifying the slice so that + // we only perform a single copy operation. + copy(w.recentEvents, w.recentEvents[begin:end]) + + w.recentEvents = w.recentEvents[:end-begin] + w.recentEvents = slices.Grow(w.recentEvents, n) + + return n +} + +// appendEventToCache appends the given event to the cache of recent events. +func (w *worker) appendEventToCache(event Event) { + if len(w.recentEvents) == cap(w.recentEvents) { + // This should never happen, as any code that is appending to the cache + // should first call purgeCache(n) to ensure there is enough space + // without exceeding the capacity. + panic("cache exceeds capacity") + } + w.recentEvents = append(w.recentEvents, event) +} diff --git a/internal/eventstream/workeridle.go b/internal/eventstream/workeridle.go new file mode 100644 index 00000000..b1daeecc --- /dev/null +++ b/internal/eventstream/workeridle.go @@ -0,0 +1,59 @@ +package eventstream + +import ( + "context" + "log/slog" + "time" +) + +const ( + // shutdownTimeout is the amount of time a worker WITH NO SUBSCRIBERS will + // wait after appending events before shutting down. + shutdownTimeout = 5 * time.Minute + + // catchUpTimeout is the amount of time a worker WITH SUBSCRIBERS will wait + // after appending events before "catching up" with any journal records that + // have been appended by other nodes. + catchUpTimeout = 10 * time.Second +) + +// resetIdleTimer starts or resets the idle timer. +func (w *worker) resetIdleTimer() { + timeout := shutdownTimeout + if len(w.subscribers) > 0 { + timeout = catchUpTimeout + } + + if w.idleTimer == nil { + w.idleTimer = time.NewTimer(timeout) + } else { + if !w.idleTimer.Stop() { + <-w.idleTimer.C + } + w.idleTimer.Reset(timeout) + } +} + +// handleIdle handles the idle state. If there are no subscribers, the worker +// will shutdown. Otherwise, it will poll the journal to see if there are new +// events to deliver to the subscribers. +// +// It returns false if the worker should stop. +func (w *worker) handleIdle(ctx context.Context) (bool, error) { + if len(w.subscribers) == 0 { + w.Logger.Debug( + "event stream worker stopped due to inactivity", + slog.Uint64("next_journal_position", uint64(w.nextPos)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + return false, nil + } + + if err := w.catchUpWithJournal(ctx); err != nil { + return false, err + } + + w.resetIdleTimer() + + return true, nil +} diff --git a/internal/eventstream/workersubscriber.go b/internal/eventstream/workersubscriber.go new file mode 100644 index 00000000..b1bb2cd5 --- /dev/null +++ b/internal/eventstream/workersubscriber.go @@ -0,0 +1,123 @@ +package eventstream + +import ( + "fmt" + "log/slog" +) + +func (w *worker) handleSubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + if w.subscribers == nil { + w.subscribers = map[*Subscriber]struct{}{} + } + w.subscribers[sub] = struct{}{} + + w.Logger.Debug( + "subscription activated", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Uint64("requested_stream_offset", uint64(sub.Offset)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + + if sub.Offset >= w.nextOffset { + return + } + + index := w.findInCache(sub.Offset) + + if index == -1 { + sub.canceled.Signal() + w.Logger.Warn( + "subscription canceled immediately due request for historical events", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Int("cached_event_count", len(w.recentEvents)), + slog.Uint64("requested_stream_offset", uint64(sub.Offset)), + slog.Uint64("next_stream_offset", uint64(w.nextOffset)), + ) + return + } + + for _, event := range w.recentEvents[index:] { + if w.deliverEventToSubscriber(event, sub) == subscriptionCanceled { + return + } + } +} + +func (w *worker) handleUnsubscribe(sub *Subscriber) { + if !sub.StreamID.Equal(w.StreamID) { + panic("received request for a different stream ID") + } + + before := len(w.subscribers) + delete(w.subscribers, sub) + after := len(w.subscribers) + + if before > after { + sub.canceled.Signal() + + w.Logger.Debug( + "subscription canceled by subscriber", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", after), + ) + } +} + +// deliverResult is an enumeration of the possible outcomes of delivering an +// event to a subscriber. +type deliverResult int + +const ( + eventDelivered deliverResult = iota + eventFiltered + subscriptionCanceled +) + +// deliverEventToSubscriber attempts to deliver an event to a subscriber. +func (w *worker) deliverEventToSubscriber(event Event, sub *Subscriber) deliverResult { + if event.Offset > sub.Offset { + panic("event is out of order") + } + + if event.Offset < sub.Offset { + return eventFiltered + } + + if !sub.Filter(event.Envelope) { + sub.Offset++ + return eventFiltered + } + + select { + case sub.Events <- event: + sub.Offset++ + return eventDelivered + + default: + delete(w.subscribers, sub) + sub.canceled.Signal() + + w.Logger.Warn( + "subscription canceled due to insufficient channel headroom", + slog.String("channel_address", fmt.Sprint(sub.Events)), + slog.Int("channel_capacity", cap(sub.Events)), + slog.Int("channel_headroom", cap(sub.Events)-len(sub.Events)), + slog.Int("subscriber_count", len(w.subscribers)), + slog.Uint64("stream_offset", uint64(event.Offset)), + ) + + return subscriptionCanceled + } +} diff --git a/internal/messaging/queue.go b/internal/messaging/queue.go new file mode 100644 index 00000000..2134de4f --- /dev/null +++ b/internal/messaging/queue.go @@ -0,0 +1,31 @@ +package messaging + +import ( + "sync" +) + +// Queue encapsulates an unbuffered request queue. +// +// It is ultimately just a self-initializing channel, but carries with it the +// semantic meaning of a queue. +type Queue[Request any] struct { + init sync.Once + queue chan Request +} + +// Recv returns a channel that, when read, dequeues the next request. +func (q *Queue[Request]) Recv() <-chan Request { + return q.getQueue() +} + +// Send returns a channel that, when written, enqueues a request. +func (q *Queue[Request]) Send() chan<- Request { + return q.getQueue() +} + +func (q *Queue[Request]) getQueue() chan Request { + q.init.Do(func() { + q.queue = make(chan Request) + }) + return q.queue +}