Skip to content

Commit

Permalink
Implement event subscriptions.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmalloc committed Apr 3, 2024
1 parent 8aa86c7 commit ae1b153
Show file tree
Hide file tree
Showing 7 changed files with 816 additions and 159 deletions.
146 changes: 146 additions & 0 deletions internal/eventstream/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package eventstream

import (
"context"
"fmt"
"time"

"github.com/dogmatiq/enginekit/protobuf/envelopepb"
"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal"
"github.com/dogmatiq/veracity/internal/messaging"
"github.com/dogmatiq/veracity/internal/signaling"
)

type Subscriber struct {
StreamID *uuidpb.UUID
Offset Offset
Filter func(*envelopepb.Envelope) bool
Events chan<- Event

canceled signaling.Event
}

type Reader struct {
Journals journal.BinaryStore
SubscribeQueue *messaging.ExchangeQueue[*Subscriber, struct{}]
UnsubscribeQueue *messaging.ExchangeQueue[*Subscriber, struct{}]
}

func (r *Reader) Read(ctx context.Context, sub *Subscriber) error {
for {
if err := r.readHistorical(ctx, sub); err != nil {
return err
}

if err := r.readContemporary(ctx, sub); err != nil {
return err
}
}
}

func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error {
j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID)
if err != nil {
return err
}
defer j.Close()

searchBegin, searchEnd, err := j.Bounds(ctx)
if err != nil {
return err
}

return journal.RangeFromSearchResult(
ctx,
j,
searchBegin, searchEnd,
eventstreamjournal.SearchByOffset(uint64(sub.Offset)),
func(
ctx context.Context,
pos journal.Position,
rec *eventstreamjournal.Record,
) (bool, error) {
begin := Offset(rec.StreamOffsetBefore)
end := Offset(rec.StreamOffsetAfter)

if begin == end {
// no events in this record
return true, nil
}

if sub.Offset < begin || sub.Offset >= end {
return false, fmt.Errorf(
"event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)",
pos,
sub.Offset,
begin,
end,
)
}

index := sub.Offset - begin

for _, env := range rec.GetEventsAppended().Events[index:] {
if sub.Filter(env) {
sub.Offset++
continue
}

select {
case <-ctx.Done():
return false, ctx.Err()
case sub.Events <- Event{sub.StreamID, sub.Offset, env}:
sub.Offset++
}
}

return true, nil
},
)
}

func (r *Reader) readContemporary(ctx context.Context, cur *Subscriber) error {
// TODO: remote read

if err := r.subscribe(ctx, cur); err != nil {
return err
}
defer r.unsubscribe(cur)

select {
case <-ctx.Done():
return ctx.Err()
case <-cur.canceled.Signaled():
return nil
}
}

func (r *Reader) subscribe(ctx context.Context, cur *Subscriber) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) // TODO: make configurable
cancel()

if _, err := r.SubscribeQueue.Exchange(ctx, cur); err != nil {
return fmt.Errorf("cannot subscribe to event stream: %w", err)
}

return nil
}

func (r *Reader) unsubscribe(cur *Subscriber) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Cancel the unsubscribe context when the subscription is canceled,
// regardless of the reason.
//
// This handles the situation where the subscription is canceled because the
// worker shutdown (and hence wont service the unsubscribe request).
go func() {
<-cur.canceled.Signaled()
cancel()
}()

r.UnsubscribeQueue.Exchange(ctx, cur)
}
81 changes: 71 additions & 10 deletions internal/eventstream/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ import (
"github.com/dogmatiq/veracity/internal/signaling"
)

// errShuttingDown is sent in response to append requests that are not serviced
// errShuttingDown is sent in response to requests that are not serviced
// because of an error within the event stream supervisor or a worker.
var errShuttingDown = errors.New("event stream sub-system is shutting down")

// A Supervisor coordinates event stream workers.
type Supervisor struct {
Journals journal.BinaryStore
AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse]
Logger *slog.Logger
Journals journal.BinaryStore
AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse]
SubscribeQueue messaging.ExchangeQueue[*Subscriber, struct{}]
UnsubscribeQueue messaging.ExchangeQueue[*Subscriber, struct{}]
Logger *slog.Logger

shutdown signaling.Latch
workers uuidpb.Map[*worker]
Expand Down Expand Up @@ -64,16 +66,74 @@ func (s *Supervisor) idleState(ctx context.Context) fsm.Action {
return fsm.StayInCurrentState()

case ex := <-s.AppendQueue.Recv():
return fsm.With(ex).EnterState(s.forwardAppendState)
return fsm.With(ex).EnterState(s.appendState)

case ex := <-s.SubscribeQueue.Recv():
return fsm.With(ex).EnterState(s.subscribeState)

case ex := <-s.UnsubscribeQueue.Recv():
return fsm.With(ex).EnterState(s.unsubscribeState)
}
}

// forwardAppendState forwards an append request to the appropriate worker.
func (s *Supervisor) forwardAppendState(
// appendState forwards an append request to the appropriate worker.
func (s *Supervisor) appendState(
ctx context.Context,
ex messaging.Exchange[AppendRequest, AppendResponse],
) fsm.Action {
w, err := s.workerByStreamID(ctx, ex.Request.StreamID)
return forwardToWorker(
ctx,
s,
ex.Request.StreamID,
ex,
func(w *worker) *messaging.ExchangeQueue[AppendRequest, AppendResponse] {
return &w.AppendQueue
},
)
}

// subscribeState forwards a subscribe request to the appropriate worker.
func (s *Supervisor) subscribeState(
ctx context.Context,
ex messaging.Exchange[*Subscriber, struct{}],
) fsm.Action {
return forwardToWorker(
ctx,
s,
ex.Request.StreamID,
ex,
func(w *worker) *messaging.ExchangeQueue[*Subscriber, struct{}] {
return &w.SubscribeQueue
},
)
}

// unsubscribeState forwards an unsubscribe request to the appropriate worker.
func (s *Supervisor) unsubscribeState(
ctx context.Context,
ex messaging.Exchange[*Subscriber, struct{}],
) fsm.Action {
// TODO: this will currently start a worker even if it's not already running
// (and hence the subscription cannot be active)
return forwardToWorker(
ctx,
s,
ex.Request.StreamID,
ex,
func(w *worker) *messaging.ExchangeQueue[*Subscriber, struct{}] {
return &w.UnsubscribeQueue
},
)
}

func forwardToWorker[Req, Res any](
ctx context.Context,
s *Supervisor,
streamID *uuidpb.UUID,
ex messaging.Exchange[Req, Res],
queue func(w *worker) *messaging.ExchangeQueue[Req, Res],
) fsm.Action {
w, err := s.workerByStreamID(ctx, streamID)
if err != nil {
ex.Err(errShuttingDown)
return fsm.Fail(err)
Expand All @@ -92,7 +152,7 @@ func (s *Supervisor) forwardAppendState(
}
return fsm.StayInCurrentState()

case w.AppendQueue.Send() <- ex:
case queue(w).Send() <- ex:
return fsm.EnterState(s.idleState)
}
}
Expand Down Expand Up @@ -142,7 +202,8 @@ func (s *Supervisor) startWorkerForStreamID(
}

w := &worker{
Journal: j,
StreamID: streamID,
Journal: j,
Logger: s.Logger.With(
slog.String("stream_id", streamID.AsString()),
),
Expand Down
Loading

0 comments on commit ae1b153

Please sign in to comment.