diff --git a/internal/projection/consume_test.go b/internal/projection/consume_test.go new file mode 100644 index 00000000..3f0201a3 --- /dev/null +++ b/internal/projection/consume_test.go @@ -0,0 +1,355 @@ +package projection_test + +import ( + "bytes" + "context" + "errors" + "sync" + "sync/atomic" + "testing" + + "github.com/dogmatiq/dogma" + . "github.com/dogmatiq/dogma/fixtures" + "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/veracity/internal/envelope" + "github.com/dogmatiq/veracity/internal/eventstream" + . "github.com/dogmatiq/veracity/internal/projection" + "github.com/dogmatiq/veracity/internal/test" +) + +func TestConsume(t *testing.T) { + t.Parallel() + + type dependencies struct { + Packer *envelope.Packer + Handler *ProjectionMessageHandler + EventConsumer *eventConsumerStub + Supervisor *Supervisor + } + + setup := func(t test.TestingT) (deps dependencies) { + deps.Packer = newPacker() + deps.Handler = &ProjectionMessageHandler{} + deps.EventConsumer = &eventConsumerStub{} + + deps.Supervisor = &Supervisor{ + Handler: deps.Handler, + EventConsumer: deps.EventConsumer, + Packer: deps.Packer, + } + + return deps + } + + t.Run("it applies events exactly once, in order regardless of errors", func(t *testing.T) { + t.Parallel() + + cases := []struct { + Desc string + InduceFailure func(*dependencies) + }{ + { + Desc: "no faults", + InduceFailure: func(*dependencies) { + }, + }, + { + Desc: "failure before handling event at offset 0", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + handle := deps.Handler.HandleEventFunc + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + if e == MessageE1 && done.CompareAndSwap(false, true) { + return false, errors.New("") + } + + return handle(ctx, r, c, n, s, e) + } + }, + }, + { + Desc: "failure after handling event at offset 0", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + handle := deps.Handler.HandleEventFunc + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + ok, err := handle(ctx, r, c, n, s, e) + if !ok || err != nil { + return ok, err + } + if e == MessageE1 && done.CompareAndSwap(false, true) { + return false, errors.New("") + } + + return true, nil + } + }, + }, + { + Desc: "failure before handling event at offset 1", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + handle := deps.Handler.HandleEventFunc + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + if e == MessageE2 && done.CompareAndSwap(false, true) { + return false, errors.New("") + } + + return handle(ctx, r, c, n, s, e) + } + }, + }, + { + Desc: "failure after handling event at offset 1", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + handle := deps.Handler.HandleEventFunc + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + ok, err := handle(ctx, r, c, n, s, e) + if !ok || err != nil { + return ok, err + } + if e == MessageE2 && done.CompareAndSwap(false, true) { + return false, errors.New("") + } + + return true, nil + } + }, + }, + { + Desc: "occ failure at offset 0", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + resourceVersionFunc := deps.Handler.ResourceVersionFunc + + deps.Handler.ResourceVersionFunc = func(ctx context.Context, r []byte) ([]byte, error) { + if done.CompareAndSwap(false, true) { + return []byte{0, 0, 0, 0, 0, 0, 0, 1}, nil + } + + return resourceVersionFunc(ctx, r) + } + }, + }, + { + Desc: "occ failure at offset 1", + InduceFailure: func(deps *dependencies) { + var done atomic.Bool + handle := deps.Handler.HandleEventFunc + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + if e == MessageE2 && done.CompareAndSwap(false, true) { + return false, nil + } + + return handle(ctx, r, c, n, s, e) + } + }, + }, + } + + for _, c := range cases { + c := c + + t.Run(c.Desc, func(t *testing.T) { + tctx := test.WithContext(t) + + deps := setup(tctx) + + var ( + mu sync.Mutex + appliedResources = map[string][]byte{} + appliedEvents = make(chan dogma.Event, 100) + ) + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + mu.Lock() + defer mu.Unlock() + + v := appliedResources[string(r)] + if !bytes.Equal(v, c) { + t.Logf("[%T] resource %x occ conflict: %x != %x", e, r, c, v) + return false, nil + } + + select { + case <-ctx.Done(): + return false, ctx.Err() + case appliedEvents <- e: + t.Logf("[%T] resource %x updated: %x -> %x", e, r, c, n) + appliedResources[string(r)] = n + return true, nil + } + } + + deps.Handler.ResourceVersionFunc = func(ctx context.Context, r []byte) ([]byte, error) { + mu.Lock() + defer mu.Unlock() + + v := appliedResources[string(r)] + t.Logf("resource %x loaded: %x", r, v) + + return v, nil + } + + expectedStreamID := uuidpb.Generate() + expectedEvents := []*envelopepb.Envelope{ + deps.Packer.Pack(MessageE1), + deps.Packer.Pack(MessageE2), + deps.Packer.Pack(MessageE3), + } + + deps.EventConsumer.ConsumeFunc = func( + ctx context.Context, + streamID *uuidpb.UUID, + offset eventstream.Offset, + events chan<- eventstream.Event, + ) error { + var matching []*envelopepb.Envelope + + if streamID.Equal(expectedStreamID) && offset < eventstream.Offset(len(expectedEvents)) { + matching = expectedEvents[offset:] + } + + for i, env := range matching { + ese := eventstream.Event{ + StreamID: streamID, + Offset: eventstream.Offset(i), + Envelope: env, + } + + select { + case <-ctx.Done(): + return ctx.Err() + case events <- ese: + } + } + + <-ctx.Done() + return ctx.Err() + } + + deps.Supervisor.StreamIDs = []*uuidpb.UUID{expectedStreamID} + + c.InduceFailure(&deps) + + supervisorTask := test. + RunInBackground(t, "supervisor", deps.Supervisor.Run). + RepeatedlyUntilSuccess() + + for _, env := range expectedEvents { + expected, err := deps.Packer.Unpack(env) + if err != nil { + t.Fatal(err) + } + + test. + ExpectChannelToReceive( + tctx, + appliedEvents, + expected, + ) + } + + test. + ExpectChannelWouldBlock( + tctx, + appliedEvents, + ) + deps.Supervisor.Shutdown() + supervisorTask.WaitForSuccess() + }) + } + }) + + t.Run("it makes the event type available via the scope", func(t *testing.T) { + tctx := test.WithContext(t) + + deps := setup(tctx) + + env := deps.Packer.Pack(MessageE1) + + var supervisorTask *test.Task + + deps.Handler.HandleEventFunc = func( + ctx context.Context, + r, c, n []byte, + s dogma.ProjectionEventScope, + e dogma.Event, + ) (bool, error) { + expected := env.GetCreatedAt().AsTime() + if !s.RecordedAt().Equal(expected) { + t.Fatalf("unexpected recorded at time: got %s, want %s", s.RecordedAt(), expected) + } + + supervisorTask.Stop() + + return true, nil + } + + deps.EventConsumer.ConsumeFunc = func( + ctx context.Context, + streamID *uuidpb.UUID, + offset eventstream.Offset, + events chan<- eventstream.Event, + ) error { + ese := eventstream.Event{ + StreamID: streamID, + Offset: 0, + Envelope: env, + } + + select { + case <-ctx.Done(): + return ctx.Err() + case events <- ese: + } + + <-ctx.Done() + return ctx.Err() + } + + deps.Supervisor.StreamIDs = []*uuidpb.UUID{uuidpb.Generate()} + + supervisorTask = test. + RunInBackground(t, "supervisor", deps.Supervisor.Run). + UntilStopped() + supervisorTask.WaitUntilStopped() + }) +} diff --git a/internal/projection/doc.go b/internal/projection/doc.go new file mode 100644 index 00000000..f393e022 --- /dev/null +++ b/internal/projection/doc.go @@ -0,0 +1,2 @@ +// Package projection dispatches events to projection message handlers. +package projection diff --git a/internal/projection/eventconsumer.go b/internal/projection/eventconsumer.go new file mode 100644 index 00000000..4a069517 --- /dev/null +++ b/internal/projection/eventconsumer.go @@ -0,0 +1,17 @@ +package projection + +import ( + "context" + + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/veracity/internal/eventstream" +) + +type EventConsumer interface { + Consume( + ctx context.Context, + streamID *uuidpb.UUID, + offset eventstream.Offset, + events chan<- eventstream.Event, + ) error +} diff --git a/internal/projection/eventconsumer_test.go b/internal/projection/eventconsumer_test.go new file mode 100644 index 00000000..ea451767 --- /dev/null +++ b/internal/projection/eventconsumer_test.go @@ -0,0 +1,19 @@ +package projection_test + +import ( + "context" + + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/veracity/internal/eventstream" +) + +type eventConsumerStub struct { + ConsumeFunc func(ctx context.Context, streamID *uuidpb.UUID, offset eventstream.Offset, events chan<- eventstream.Event) error +} + +func (s *eventConsumerStub) Consume(ctx context.Context, streamID *uuidpb.UUID, offset eventstream.Offset, events chan<- eventstream.Event) error { + if s.ConsumeFunc != nil { + return s.ConsumeFunc(ctx, streamID, offset, events) + } + return nil +} diff --git a/internal/projection/packer_test.go b/internal/projection/packer_test.go new file mode 100644 index 00000000..81742105 --- /dev/null +++ b/internal/projection/packer_test.go @@ -0,0 +1,36 @@ +package projection_test + +import ( + "sync/atomic" + "time" + + "github.com/dogmatiq/enginekit/protobuf/identitypb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + . "github.com/dogmatiq/marshalkit/fixtures" + "github.com/dogmatiq/veracity/internal/envelope" +) + +func newPacker() *envelope.Packer { + var counter atomic.Uint64 + return &envelope.Packer{ + Application: identitypb.New("", uuidpb.Generate()), + Marshaler: Marshaler, + Now: func() time.Time { + return time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + }, + GenerateID: func() *uuidpb.UUID { + return deterministicUUID(counter.Add(1)) + }, + } +} + +func deterministicUUID(counter uint64) *uuidpb.UUID { + var data [16]byte + data[6] = (data[6] & 0x0f) | 0x40 // Version 4 + data[8] = (data[8] & 0x3f) | 0x80 // Variant is 10 (RFC 4122) + + id := uuidpb.FromByteArray(data) + id.Lower |= counter + + return id +} diff --git a/internal/projection/scope.go b/internal/projection/scope.go new file mode 100644 index 00000000..8f264262 --- /dev/null +++ b/internal/projection/scope.go @@ -0,0 +1,22 @@ +package projection + +import ( + "time" +) + +type scope struct { + recordedAt time.Time +} + +func (s *scope) RecordedAt() time.Time { + return s.recordedAt +} + +func (s *scope) IsPrimaryDelivery() bool { + // TODO + return true +} + +func (s *scope) Log(string, ...any) { + // TODO +} diff --git a/internal/projection/supervisor.go b/internal/projection/supervisor.go new file mode 100644 index 00000000..fc823434 --- /dev/null +++ b/internal/projection/supervisor.go @@ -0,0 +1,49 @@ +package projection + +import ( + "context" + + "github.com/dogmatiq/dogma" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/veracity/internal/envelope" + "github.com/dogmatiq/veracity/internal/signaling" + "golang.org/x/sync/errgroup" +) + +// A Supervisor coordinates projection workers. +type Supervisor struct { + Handler dogma.ProjectionMessageHandler + Packer *envelope.Packer + EventConsumer EventConsumer + StreamIDs []*uuidpb.UUID + + shutdown signaling.Latch +} + +func (s *Supervisor) Run(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) + + for _, streamID := range s.StreamIDs { + streamID := streamID // capture loop variable + + eg.Go( + func() error { + worker := &worker{ + Handler: s.Handler, + EventConsumer: s.EventConsumer, + Packer: s.Packer, + StreamID: streamID, + Shutdown: &s.shutdown, + } + return worker.Run(ctx) + }, + ) + } + + return eg.Wait() +} + +// Shutdown stops the supervisor when it next becomes idle. +func (s *Supervisor) Shutdown() { + s.shutdown.Signal() +} diff --git a/internal/projection/worker.go b/internal/projection/worker.go new file mode 100644 index 00000000..247f2c6e --- /dev/null +++ b/internal/projection/worker.go @@ -0,0 +1,149 @@ +package projection + +import ( + "context" + "encoding/binary" + "errors" + + "github.com/dogmatiq/dogma" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/veracity/internal/envelope" + "github.com/dogmatiq/veracity/internal/eventstream" + "github.com/dogmatiq/veracity/internal/fsm" + "github.com/dogmatiq/veracity/internal/signaling" +) + +// A worker dispatches events from a single eventstream to a projection handler. +type worker struct { + Handler dogma.ProjectionMessageHandler + Packer *envelope.Packer + EventConsumer EventConsumer + StreamID *uuidpb.UUID + Shutdown *signaling.Latch + + resource []byte + currentVersion []byte + events chan eventstream.Event + consumerError chan error + cancelConsumer context.CancelFunc +} + +// resourceFromStream returns the resource ID for a stream. +func resourceFromStream(streamID *uuidpb.UUID) []byte { + return streamID.AsBytes() +} + +// offsetFromVersion returns the offset for a version. +func offsetFromVersion(version []byte) (eventstream.Offset, error) { + switch len(version) { + case 0: + return 0, nil + case 8: + return eventstream.Offset(binary.BigEndian.Uint64(version)), nil + default: + return 0, errors.New("invalid version") + } +} + +// offsetToVersion returns the version for an offset. +func offsetToVersion(offset eventstream.Offset) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(offset)) + return b +} + +func (w *worker) Run(ctx context.Context) error { + defer func() { + if w.cancelConsumer != nil { + w.cancelConsumer() + } + }() + return fsm.Start(ctx, w.initState) +} + +func (w *worker) initState(ctx context.Context) fsm.Action { + if w.cancelConsumer != nil { + w.cancelConsumer() + + select { + case <-ctx.Done(): + case <-w.consumerError: + } + } + + w.resource = resourceFromStream(w.StreamID) + w.events = make(chan eventstream.Event) + w.consumerError = make(chan error, 1) + + var err error + w.currentVersion, err = w.Handler.ResourceVersion(ctx, w.resource) + if err != nil { + return fsm.Fail(err) + } + + offset, err := offsetFromVersion(w.currentVersion) + if err != nil { + return fsm.Fail(err) + } + + var consumeCtx context.Context + consumeCtx, w.cancelConsumer = context.WithCancel(ctx) + go func() { + w.consumerError <- w.EventConsumer.Consume(consumeCtx, w.StreamID, offset, w.events) + }() + + return fsm.EnterState(w.idleState) +} + +func (w *worker) idleState(ctx context.Context) fsm.Action { + select { + case <-ctx.Done(): + return fsm.Stop() + + case <-w.Shutdown.Signaled(): + return fsm.Stop() + + case err := <-w.consumerError: + if err == nil { + panic("consumer returned nil") + } + return fsm.Fail(err) + + case ese := <-w.events: + return fsm.With(ese).EnterState(w.handleEventState) + } +} + +// handleEventState handles events. +func (w *worker) handleEventState( + ctx context.Context, + ese eventstream.Event, +) fsm.Action { + e, err := w.Packer.Unpack(ese.Envelope) + if err != nil { + return fsm.Fail(err) + } + + n := offsetToVersion(ese.Offset + 1) + + ok, err := w.Handler.HandleEvent( + ctx, + w.resource, + w.currentVersion, + n, + &scope{ + recordedAt: ese.Envelope.GetCreatedAt().AsTime(), + }, + e, + ) + if err != nil { + return fsm.Fail(err) + } + if !ok { + return fsm.EnterState(w.initState) + } + + w.currentVersion = n + + return fsm.EnterState(w.idleState) +}