Skip to content

Commit

Permalink
WIP [ci skip]
Browse files Browse the repository at this point in the history
  • Loading branch information
jmalloc committed Apr 3, 2024
1 parent d4e9f7f commit 228cf8d
Show file tree
Hide file tree
Showing 9 changed files with 696 additions and 445 deletions.
146 changes: 146 additions & 0 deletions internal/eventstream/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package eventstream

import (
"context"
"fmt"
"time"

"github.com/dogmatiq/enginekit/protobuf/envelopepb"
"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal"
"github.com/dogmatiq/veracity/internal/messaging"
"github.com/dogmatiq/veracity/internal/signaling"
)

type Subscriber struct {
StreamID *uuidpb.UUID
Offset Offset
Filter func(*envelopepb.Envelope) bool
Events chan<- Event

canceled signaling.Event
}

type Reader struct {
Journals journal.BinaryStore
SubscribeQueue *messaging.ExchangeQueue[*Subscriber, struct{}]
UnsubscribeQueue *messaging.ExchangeQueue[*Subscriber, struct{}]
}

func (r *Reader) Read(ctx context.Context, sub *Subscriber) error {
for {
if err := r.readHistorical(ctx, sub); err != nil {
return err
}

if err := r.readContemporary(ctx, sub); err != nil {
return err
}
}
}

func (r *Reader) readHistorical(ctx context.Context, sub *Subscriber) error {
j, err := eventstreamjournal.Open(ctx, r.Journals, sub.StreamID)
if err != nil {
return err
}
defer j.Close()

searchBegin, searchEnd, err := j.Bounds(ctx)
if err != nil {
return err
}

return journal.RangeFromSearchResult(
ctx,
j,
searchBegin, searchEnd,
eventstreamjournal.SearchByOffset(uint64(sub.Offset)),
func(
ctx context.Context,
pos journal.Position,
rec *eventstreamjournal.Record,
) (bool, error) {
begin := Offset(rec.StreamOffsetBefore)
end := Offset(rec.StreamOffsetAfter)

if begin == end {
// no events in this record
return true, nil
}

if sub.Offset < begin || sub.Offset >= end {
return false, fmt.Errorf(
"event stream integrity error at journal position %d: expected event at offset %d, but found offset range [%d, %d)",
pos,
sub.Offset,
begin,
end,
)
}

index := sub.Offset - begin

for _, env := range rec.GetEventsAppended().Events[index:] {
if sub.Filter(env) {
sub.Offset++
continue
}

select {
case <-ctx.Done():
return false, ctx.Err()
case sub.Events <- Event{sub.StreamID, sub.Offset, env}:
sub.Offset++
}
}

return true, nil
},
)
}

func (r *Reader) readContemporary(ctx context.Context, cur *Subscriber) error {
// TODO: remote read

if err := r.subscribe(ctx, cur); err != nil {
return err
}
defer r.unsubscribe(cur)

select {
case <-ctx.Done():
return ctx.Err()
case <-cur.canceled.Signaled():
return nil
}
}

func (r *Reader) subscribe(ctx context.Context, cur *Subscriber) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) // TODO: make configurable
cancel()

if _, err := r.SubscribeQueue.Exchange(ctx, cur); err != nil {
return fmt.Errorf("cannot subscribe to event stream: %w", err)
}

return nil
}

func (r *Reader) unsubscribe(cur *Subscriber) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Cancel the unsubscribe context when the subscription is canceled,
// regardless of the reason.
//
// This handles the situation where the subscription is canceled because the
// worker shutdown (and hence wont service the unsubscribe request).
go func() {
<-cur.canceled.Signaled()
cancel()
}()

r.UnsubscribeQueue.Exchange(ctx, cur)
}
39 changes: 0 additions & 39 deletions internal/eventstream/subscriber.go

This file was deleted.

19 changes: 11 additions & 8 deletions internal/eventstream/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ import (
"github.com/dogmatiq/veracity/internal/signaling"
)

// errShuttingDown is sent in response to append requests that are not serviced
// errShuttingDown is sent in response to requests that are not serviced
// because of an error within the event stream supervisor or a worker.
var errShuttingDown = errors.New("event stream sub-system is shutting down")

// A Supervisor coordinates event stream workers.
type Supervisor struct {
Journals journal.BinaryStore
AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse]
SubscribeQueue messaging.ExchangeQueue[SubscribeRequest, SubscribeResponse]
UnsubscribeQueue messaging.ExchangeQueue[UnsubscribeRequest, UnsubscribeResponse]
SubscribeQueue messaging.ExchangeQueue[*Subscriber, struct{}]
UnsubscribeQueue messaging.ExchangeQueue[*Subscriber, struct{}]
Logger *slog.Logger

shutdown signaling.Latch
Expand Down Expand Up @@ -95,14 +95,14 @@ func (s *Supervisor) appendState(
// subscribeState forwards a subscribe request to the appropriate worker.
func (s *Supervisor) subscribeState(
ctx context.Context,
ex messaging.Exchange[SubscribeRequest, SubscribeResponse],
ex messaging.Exchange[*Subscriber, struct{}],
) fsm.Action {
return forwardToWorker(
ctx,
s,
ex.Request.StreamID,
ex,
func(w *worker) *messaging.ExchangeQueue[SubscribeRequest, SubscribeResponse] {
func(w *worker) *messaging.ExchangeQueue[*Subscriber, struct{}] {
return &w.SubscribeQueue
},
)
Expand All @@ -111,14 +111,16 @@ func (s *Supervisor) subscribeState(
// unsubscribeState forwards an unsubscribe request to the appropriate worker.
func (s *Supervisor) unsubscribeState(
ctx context.Context,
ex messaging.Exchange[UnsubscribeRequest, UnsubscribeResponse],
ex messaging.Exchange[*Subscriber, struct{}],
) fsm.Action {
// TODO: this will currently start a worker even if it's not already running
// (and hence the subscription cannot be active)
return forwardToWorker(
ctx,
s,
ex.Request.StreamID,
ex,
func(w *worker) *messaging.ExchangeQueue[UnsubscribeRequest, UnsubscribeResponse] {
func(w *worker) *messaging.ExchangeQueue[*Subscriber, struct{}] {
return &w.UnsubscribeQueue
},
)
Expand Down Expand Up @@ -200,7 +202,8 @@ func (s *Supervisor) startWorkerForStreamID(
}

w := &worker{
Journal: j,
StreamID: streamID,
Journal: j,
Logger: s.Logger.With(
slog.String("stream_id", streamID.AsString()),
),
Expand Down
Loading

0 comments on commit 228cf8d

Please sign in to comment.