Skip to content

Commit

Permalink
Implement projection sub system
Browse files Browse the repository at this point in the history
Co-authored-by: Kevin Millar <[email protected]>
Co-authored-by: Troy Parker <[email protected]>
Co-authored-by: Ben Robey <[email protected]>
Co-authored-by: Danil Petrov <[email protected]>
Co-authored-by: Steve Stenzel <[email protected]>
  • Loading branch information
6 people authored and jmalloc committed Mar 27, 2024
1 parent 9005e18 commit 673cfe1
Show file tree
Hide file tree
Showing 8 changed files with 649 additions and 0 deletions.
355 changes: 355 additions & 0 deletions internal/projection/consume_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,355 @@
package projection_test

import (
"bytes"
"context"
"errors"
"sync"
"sync/atomic"
"testing"

"github.com/dogmatiq/dogma"
. "github.com/dogmatiq/dogma/fixtures"
"github.com/dogmatiq/enginekit/protobuf/envelopepb"
"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/veracity/internal/envelope"
"github.com/dogmatiq/veracity/internal/eventstream"
. "github.com/dogmatiq/veracity/internal/projection"
"github.com/dogmatiq/veracity/internal/test"
)

func TestConsume(t *testing.T) {
t.Parallel()

type dependencies struct {
Packer *envelope.Packer
Handler *ProjectionMessageHandler
EventConsumer *eventConsumerStub
Supervisor *Supervisor
}

setup := func(t test.TestingT) (deps dependencies) {
deps.Packer = newPacker()
deps.Handler = &ProjectionMessageHandler{}
deps.EventConsumer = &eventConsumerStub{}

deps.Supervisor = &Supervisor{
Handler: deps.Handler,
EventConsumer: deps.EventConsumer,
Packer: deps.Packer,
}

return deps
}

t.Run("it applies events exactly once, in order regardless of errors", func(t *testing.T) {
t.Parallel()

cases := []struct {
Desc string
InduceFailure func(*dependencies)
}{
{
Desc: "no faults",
InduceFailure: func(*dependencies) {
},
},
{
Desc: "failure before handling event at offset 0",
InduceFailure: func(deps *dependencies) {
var done atomic.Bool
handle := deps.Handler.HandleEventFunc

deps.Handler.HandleEventFunc = func(
ctx context.Context,
r, c, n []byte,
s dogma.ProjectionEventScope,
e dogma.Event,
) (bool, error) {
if e == MessageE1 && done.CompareAndSwap(false, true) {
return false, errors.New("<error>")
}

return handle(ctx, r, c, n, s, e)
}
},
},
{
Desc: "failure after handling event at offset 0",
InduceFailure: func(deps *dependencies) {
var done atomic.Bool
handle := deps.Handler.HandleEventFunc

deps.Handler.HandleEventFunc = func(
ctx context.Context,
r, c, n []byte,
s dogma.ProjectionEventScope,
e dogma.Event,
) (bool, error) {
ok, err := handle(ctx, r, c, n, s, e)
if !ok || err != nil {
return ok, err
}
if e == MessageE1 && done.CompareAndSwap(false, true) {
return false, errors.New("<error>")
}

return true, nil
}
},
},
{
Desc: "failure before handling event at offset 1",
InduceFailure: func(deps *dependencies) {
var done atomic.Bool
handle := deps.Handler.HandleEventFunc

deps.Handler.HandleEventFunc = func(
ctx context.Context,
r, c, n []byte,
s dogma.ProjectionEventScope,
e dogma.Event,
) (bool, error) {
if e == MessageE2 && done.CompareAndSwap(false, true) {
return false, errors.New("<error>")
}

return handle(ctx, r, c, n, s, e)
}
},
},
{
Desc: "failure after handling event at offset 1",
InduceFailure: func(deps *dependencies) {
var done atomic.Bool
handle := deps.Handler.HandleEventFunc

deps.Handler.HandleEventFunc = func(
ctx context.Context,
r, c, n []byte,
s dogma.ProjectionEventScope,
e dogma.Event,
) (bool, error) {
ok, err := handle(ctx, r, c, n, s, e)
if !ok || err != nil {
return ok, err
}
if e == MessageE2 && done.CompareAndSwap(false, true) {
return false, errors.New("<error>")
}

return true, nil
}
},
},
{
Desc: "occ failure at offset 0",
InduceFailure: func(deps *dependencies) {
var done atomic.Bool
resourceVersionFunc := deps.Handler.ResourceVersionFunc

deps.Handler.ResourceVersionFunc = func(ctx context.Context, r []byte) ([]byte, error) {
if done.CompareAndSwap(false, true) {
return []byte{0, 0, 0, 0, 0, 0, 0, 1}, nil
}

return resourceVersionFunc(ctx, r)
}
},
},
{
Desc: "occ failure at offset 1",
InduceFailure: func(deps *dependencies) {
var done atomic.Bool
handle := deps.Handler.HandleEventFunc

deps.Handler.HandleEventFunc = func(
ctx context.Context,
r, c, n []byte,
s dogma.ProjectionEventScope,
e dogma.Event,
) (bool, error) {
if e == MessageE2 && done.CompareAndSwap(false, true) {
return false, nil
}

return handle(ctx, r, c, n, s, e)
}
},
},
}

for _, c := range cases {
c := c

t.Run(c.Desc, func(t *testing.T) {
tctx := test.WithContext(t)

deps := setup(tctx)

var (
mu sync.Mutex
appliedResources = map[string][]byte{}
appliedEvents = make(chan dogma.Event, 100)
)

deps.Handler.HandleEventFunc = func(
ctx context.Context,
r, c, n []byte,
s dogma.ProjectionEventScope,
e dogma.Event,
) (bool, error) {
mu.Lock()
defer mu.Unlock()

v := appliedResources[string(r)]
if !bytes.Equal(v, c) {
t.Logf("[%T] resource %x occ conflict: %x != %x", e, r, c, v)
return false, nil
}

select {
case <-ctx.Done():
return false, ctx.Err()
case appliedEvents <- e:
t.Logf("[%T] resource %x updated: %x -> %x", e, r, c, n)
appliedResources[string(r)] = n
return true, nil
}
}

deps.Handler.ResourceVersionFunc = func(ctx context.Context, r []byte) ([]byte, error) {
mu.Lock()
defer mu.Unlock()

v := appliedResources[string(r)]
t.Logf("resource %x loaded: %x", r, v)

return v, nil
}

expectedStreamID := uuidpb.Generate()
expectedEvents := []*envelopepb.Envelope{
deps.Packer.Pack(MessageE1),
deps.Packer.Pack(MessageE2),
deps.Packer.Pack(MessageE3),
}

deps.EventConsumer.ConsumeFunc = func(
ctx context.Context,
streamID *uuidpb.UUID,
offset eventstream.Offset,
events chan<- eventstream.Event,
) error {
var matching []*envelopepb.Envelope

if streamID.Equal(expectedStreamID) && offset < eventstream.Offset(len(expectedEvents)) {
matching = expectedEvents[offset:]
}

for i, env := range matching {
ese := eventstream.Event{
StreamID: streamID,
Offset: eventstream.Offset(i),
Envelope: env,
}

select {
case <-ctx.Done():
return ctx.Err()
case events <- ese:
}
}

<-ctx.Done()
return ctx.Err()
}

deps.Supervisor.StreamIDs = []*uuidpb.UUID{expectedStreamID}

c.InduceFailure(&deps)

supervisorTask := test.
RunInBackground(t, "supervisor", deps.Supervisor.Run).
RepeatedlyUntilSuccess()

for _, env := range expectedEvents {
expected, err := deps.Packer.Unpack(env)
if err != nil {
t.Fatal(err)
}

test.
ExpectChannelToReceive(
tctx,
appliedEvents,
expected,
)
}

test.
ExpectChannelWouldBlock(
tctx,
appliedEvents,
)
deps.Supervisor.Shutdown()
supervisorTask.WaitForSuccess()
})
}
})

t.Run("it makes the event type available via the scope", func(t *testing.T) {
tctx := test.WithContext(t)

deps := setup(tctx)

env := deps.Packer.Pack(MessageE1)

var supervisorTask *test.Task

deps.Handler.HandleEventFunc = func(
ctx context.Context,
r, c, n []byte,
s dogma.ProjectionEventScope,
e dogma.Event,
) (bool, error) {
expected := env.GetCreatedAt().AsTime()
if !s.RecordedAt().Equal(expected) {
t.Fatalf("unexpected recorded at time: got %s, want %s", s.RecordedAt(), expected)
}

supervisorTask.Stop()

return true, nil
}

deps.EventConsumer.ConsumeFunc = func(
ctx context.Context,
streamID *uuidpb.UUID,
offset eventstream.Offset,
events chan<- eventstream.Event,
) error {
ese := eventstream.Event{
StreamID: streamID,
Offset: 0,
Envelope: env,
}

select {
case <-ctx.Done():
return ctx.Err()
case events <- ese:
}

<-ctx.Done()
return ctx.Err()
}

deps.Supervisor.StreamIDs = []*uuidpb.UUID{uuidpb.Generate()}

supervisorTask = test.
RunInBackground(t, "supervisor", deps.Supervisor.Run).
UntilStopped()
supervisorTask.WaitUntilStopped()
})
}
2 changes: 2 additions & 0 deletions internal/projection/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package projection dispatches events to projection message handlers.
package projection
17 changes: 17 additions & 0 deletions internal/projection/eventconsumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package projection

import (
"context"

"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/veracity/internal/eventstream"
)

type EventConsumer interface {
Consume(
ctx context.Context,
streamID *uuidpb.UUID,
offset eventstream.Offset,
events chan<- eventstream.Event,
) error
}
19 changes: 19 additions & 0 deletions internal/projection/eventconsumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package projection_test

import (
"context"

"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/veracity/internal/eventstream"
)

type eventConsumerStub struct {
ConsumeFunc func(ctx context.Context, streamID *uuidpb.UUID, offset eventstream.Offset, events chan<- eventstream.Event) error
}

func (s *eventConsumerStub) Consume(ctx context.Context, streamID *uuidpb.UUID, offset eventstream.Offset, events chan<- eventstream.Event) error {
if s.ConsumeFunc != nil {
return s.ConsumeFunc(ctx, streamID, offset, events)
}
return nil
}
Loading

0 comments on commit 673cfe1

Please sign in to comment.