Skip to content

Commit

Permalink
Ensure that the supervisor recovers from optimistic concurrency confl…
Browse files Browse the repository at this point in the history
…icts.
  • Loading branch information
jmalloc committed Apr 10, 2024
1 parent 8bafe85 commit 386c22a
Show file tree
Hide file tree
Showing 9 changed files with 529 additions and 391 deletions.
74 changes: 74 additions & 0 deletions go.sum

Large diffs are not rendered by default.

178 changes: 178 additions & 0 deletions internal/eventstream/append.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package eventstream

import (
"context"
"log/slog"

"github.com/dogmatiq/enginekit/protobuf/envelopepb"
"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal"
)

// AppendRequest is a request to append events to an event stream.
Expand Down Expand Up @@ -31,3 +36,176 @@ type AppendResponse struct {
// [AppendRequest] and hence deduplicated.
AppendedByPriorAttempt bool
}

func (w *worker) handleAppend(
ctx context.Context,
req AppendRequest,
) (AppendResponse, error) {
if !req.StreamID.Equal(w.StreamID) {
panic("received request for a different stream ID")
}

if len(req.Events) == 0 {
// We panic rather than just failing the exchange because we never want
// empty requests to occupy space in the worker's queue. The sender
// should simply not send empty requests.
panic("received append request with no events")
}

defer w.resetIdleTimer()

if req.LowestPossibleOffset > w.nextOffset {
if err := w.catchUpWithJournal(ctx); err != nil {
return AppendResponse{}, err
}
}

for {
res, err := w.findPriorAppend(ctx, req)
if err != nil {
return AppendResponse{}, err
}

if res.AppendedByPriorAttempt {
for index, event := range req.Events {
w.Logger.Info(
"discarded duplicate event",
slog.Uint64("stream_offset", uint64(res.BeginOffset)+uint64(index)),
slog.String("message_id", event.MessageId.AsString()),
slog.String("description", event.Description),
)
}
return res, nil
}

res, err = w.writeEventsToJournal(ctx, req)
if err == nil {
w.publishEvents(res.BeginOffset, req.Events)
return res, nil
}

if err != journal.ErrConflict {
return AppendResponse{}, err
}

if err := w.catchUpWithJournal(ctx); err != nil {
return AppendResponse{}, err
}
}
}

// findPriorAppend returns an [AppendResponse] if the given [AppendRequest] has
// already been handled.
func (w *worker) findPriorAppend(
ctx context.Context,
req AppendRequest,
) (AppendResponse, error) {
// If the lowest possible offset is ahead of the next offset the request is
// malformed. Either theres a bug in Veracity, or the journal has suffered
// catastrophic data loss.
if req.LowestPossibleOffset > w.nextOffset {
panic("lowest possible offset is greater than the next offset")
}

// If the lowest possible offset is equal to the next offset, no events
// have been recorded since the the request was created, and hence there
// can be no prior append attempt.
if req.LowestPossibleOffset == w.nextOffset {
return AppendResponse{}, nil
}

// If the lowest possible offset is in the cache, we can check for
// duplicates without using the journal. We search using the last event in
// the request as it's the most likely to still be in the cache.
lowestPossibleOffset := req.LowestPossibleOffset + Offset(len(req.Events))

if cacheIndex := w.findInCache(lowestPossibleOffset); cacheIndex != -1 {
lastMessageIndex := len(req.Events) - 1
lastMessageID := req.Events[lastMessageIndex].MessageId

for _, event := range w.recentEvents[cacheIndex:] {
if event.Envelope.MessageId.Equal(lastMessageID) {
return AppendResponse{
// We know the offset of the last message in the request, so
// we can compute the offset of the first message, even if
// it's no longer in the cache.
BeginOffset: event.Offset - Offset(lastMessageIndex),
EndOffset: event.Offset + 1,
AppendedByPriorAttempt: true,
}, nil
}
}
}

// Finally, we search the journal for the record containing the events.
rec, err := journal.ScanFromSearchResult(
ctx,
w.Journal,
0,
w.nextPos,
eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)),
func(
_ context.Context,
_ journal.Position,
rec *eventstreamjournal.Record,
) (*eventstreamjournal.Record, bool, error) {
if op := rec.GetEventsAppended(); op != nil {
targetID := req.Events[0].MessageId
candidateID := op.Events[0].MessageId
return rec, candidateID.Equal(targetID), nil
}
return nil, false, nil
},
)
if err != nil {
return AppendResponse{}, journal.IgnoreNotFound(err)
}

return AppendResponse{
BeginOffset: Offset(rec.StreamOffsetBefore),
EndOffset: Offset(rec.StreamOffsetAfter),
AppendedByPriorAttempt: true,
}, nil
}

func (w *worker) writeEventsToJournal(
ctx context.Context,
req AppendRequest,
) (AppendResponse, error) {
before := w.nextOffset
after := w.nextOffset + Offset(len(req.Events))

if err := w.Journal.Append(
ctx,
w.nextPos,
eventstreamjournal.
NewRecordBuilder().
WithStreamOffsetBefore(uint64(before)).
WithStreamOffsetAfter(uint64(after)).
WithEventsAppended(&eventstreamjournal.EventsAppended{
Events: req.Events,
}).
Build(),
); err != nil {
return AppendResponse{}, err
}

for index, event := range req.Events {
w.Logger.Info(
"appended event to the stream",
slog.Uint64("journal_position", uint64(w.nextPos)),
slog.Uint64("stream_offset", uint64(before)+uint64(index)),
slog.String("message_id", event.MessageId.AsString()),
slog.String("description", event.Description),
)
}

w.nextPos++
w.nextOffset = after

return AppendResponse{
BeginOffset: before,
EndOffset: after,
AppendedByPriorAttempt: false,
}, nil
}
105 changes: 88 additions & 17 deletions internal/eventstream/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestAppend(t *testing.T) {
Journals *memoryjournal.BinaryStore
Supervisor *Supervisor
Packer *envelope.Packer
Barrier chan struct{}
}

setup := func(t test.TestingT) (deps dependencies) {
Expand All @@ -41,6 +42,8 @@ func TestAppend(t *testing.T) {
Marshaler: Marshaler,
}

deps.Barrier = make(chan struct{})

return deps
}

Expand All @@ -51,26 +54,26 @@ func TestAppend(t *testing.T) {

cases := []struct {
Desc string
InduceFailure func(*dependencies)
InduceFailure func(context.Context, *testing.T, *dependencies)
}{
{
Desc: "no faults",
InduceFailure: func(*dependencies) {
},
},
{
Desc: "failure to open journal",
InduceFailure: func(deps *dependencies) {
InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) {
test.FailOnJournalOpen(
deps.Journals,
eventstreamjournal.Name(streamID),
errors.New("<error>"),
)
t.Log("configured journal store to fail when opening the journal")
close(deps.Barrier)
},
},
{
Desc: "failure before appending to journal",
InduceFailure: func(deps *dependencies) {
InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) {
test.FailBeforeJournalAppend(
deps.Journals,
eventstreamjournal.Name(streamID),
Expand All @@ -79,11 +82,13 @@ func TestAppend(t *testing.T) {
},
errors.New("<error>"),
)
t.Log("configured journal store to fail before appending a record")
close(deps.Barrier)
},
},
{
Desc: "failure after appending to journal",
InduceFailure: func(deps *dependencies) {
InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) {
test.FailAfterJournalAppend(
deps.Journals,
eventstreamjournal.Name(streamID),
Expand All @@ -92,6 +97,56 @@ func TestAppend(t *testing.T) {
},
errors.New("<error>"),
)
t.Log("configured journal store to fail after appending a record")
close(deps.Barrier)
},
},
{
Desc: "optimistic concurrency conflict",
InduceFailure: func(ctx context.Context, t *testing.T, deps *dependencies) {
go func() {
if _, err := deps.Supervisor.AppendQueue.Do(
ctx,
AppendRequest{
StreamID: streamID,
Events: []*envelopepb.Envelope{
deps.Packer.Pack(MessageX1),
},
},
); err != nil {
t.Error(err)
return
}

t.Log("confirmed that the supervisor-under-test is running")

s := &Supervisor{
Journals: deps.Journals,
Logger: spruce.NewLogger(t),
}

defer test.
RunInBackground(t, "conflict-generating-supervisor", s.Run).
UntilStopped().
Stop()

if _, err := s.AppendQueue.Do(
ctx,
AppendRequest{
StreamID: streamID,
Events: []*envelopepb.Envelope{
deps.Packer.Pack(MessageX2),
},
},
); err != nil {
t.Error(err)
return
}

t.Log("appended events using a different supervisor to induce a journal conflict")

close(deps.Barrier)
}()
},
},
}
Expand All @@ -101,10 +156,10 @@ func TestAppend(t *testing.T) {
tctx := test.WithContext(t)
deps := setup(tctx)

t.Log("append some initial events to the stream")
t.Log("seeding the event stream with some initial events")

supervisor := test.
RunInBackground(t, "supervisor", deps.Supervisor.Run).
RunInBackground(t, "event-seeding-supervisor", deps.Supervisor.Run).
UntilStopped()

res, err := deps.Supervisor.AppendQueue.Do(
Expand All @@ -124,14 +179,35 @@ func TestAppend(t *testing.T) {

supervisor.StopAndWait()

t.Log("induce a failure")
// Open a journal that was can use for verifying results
// _before_ inducing any failure.
j, err := eventstreamjournal.Open(tctx, deps.Journals, streamID)
if err != nil {
t.Fatal(err)
}
defer j.Close()

c.InduceFailure(&deps)
if c.InduceFailure != nil {
c.InduceFailure(tctx, t, &deps)
} else {
close(deps.Barrier)
}

supervisor = test.
RunInBackground(t, "supervisor", deps.Supervisor.Run).
RunInBackground(t, "supervisor-under-test", deps.Supervisor.Run).
RepeatedlyUntilStopped()

<-deps.Barrier

// Read the journal bounds as they exist before the test
// commences.
begin, end, err := j.Bounds(tctx)
if err != nil {
t.Fatal(err)
}

t.Logf("proceeding with test, journal bounds are [%d, %d)", begin, end)

event := deps.Packer.Pack(MessageE1)

req := AppendRequest{
Expand Down Expand Up @@ -159,16 +235,11 @@ func TestAppend(t *testing.T) {

t.Log("ensure that the event was appended to the stream exactly once")

j, err := eventstreamjournal.Open(tctx, deps.Journals, streamID)
if err != nil {
t.Fatal(err)
}

var events []*envelopepb.Envelope

if err := j.Range(
tctx,
1,
end, // only read the records appended during the test
func(
ctx context.Context,
_ journal.Position,
Expand Down
Loading

0 comments on commit 386c22a

Please sign in to comment.