Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add eventstream.Reader #526

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ require (

require (
github.com/dogmatiq/cosyne v0.2.0 // indirect
github.com/dogmatiq/dapper v0.5.3 // indirect
github.com/dogmatiq/dyad v1.0.0 // indirect
github.com/dogmatiq/iago v0.4.0 // indirect
github.com/dogmatiq/interopspec v0.5.3 // indirect
Expand Down
78 changes: 76 additions & 2 deletions go.sum

Large diffs are not rendered by default.

171 changes: 171 additions & 0 deletions internal/eventstream/append.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package eventstream

import (
"context"
"log/slog"

"github.com/dogmatiq/enginekit/protobuf/envelopepb"
"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal"
)

// AppendRequest is a request to append events to an event stream.
Expand Down Expand Up @@ -31,3 +36,169 @@ type AppendResponse struct {
// [AppendRequest] and hence deduplicated.
AppendedByPriorAttempt bool
}

func (w *worker) handleAppend(
ctx context.Context,
req AppendRequest,
) (AppendResponse, error) {
if !req.StreamID.Equal(w.StreamID) {
panic("received request for a different stream ID")
}

if len(req.Events) == 0 {
// We panic rather than just failing the exchange because we never want
// empty requests to occupy space in the worker's queue. The sender
// should simply not send empty requests.
panic("received append request with no events")
}

// Reset the idle timer _after_ whatever work is done so it's duration is
// not "eaten up" by the work.
defer w.resetIdleTimer()

if req.LowestPossibleOffset > w.nextOffset {
if err := w.catchUpWithJournal(ctx); err != nil {
return AppendResponse{}, err
}
}

for {
res, err := w.findPriorAppend(ctx, req)
if err != nil {
return AppendResponse{}, err
}

if res.AppendedByPriorAttempt {
for index, event := range req.Events {
w.Logger.Info(
"discarded duplicate event",
slog.Uint64("stream_offset", uint64(res.BeginOffset)+uint64(index)),
slog.String("message_id", event.MessageId.AsString()),
slog.String("description", event.Description),
)
}
return res, nil
}

pos, rec, err := w.writeEventsToJournal(ctx, req)
if err == nil {
w.publishEvents(pos, rec)
return res, nil
}

if err != journal.ErrConflict {
return AppendResponse{}, err
}

if err := w.catchUpWithJournal(ctx); err != nil {
return AppendResponse{}, err
}
}
}

// findPriorAppend returns an [AppendResponse] if the given [AppendRequest] has
// already been handled.
func (w *worker) findPriorAppend(
ctx context.Context,
req AppendRequest,
) (AppendResponse, error) {
// If the lowest possible offset is ahead of the next offset the request is
// malformed. Either theres a bug in Veracity, or the journal has suffered
// catastrophic data loss.
if req.LowestPossibleOffset > w.nextOffset {
panic("lowest possible offset is greater than the next offset")
}

// If the lowest possible offset is equal to the next offset, no events
// have been recorded since the the request was created, and hence there
// can be no prior append attempt.
if req.LowestPossibleOffset == w.nextOffset {
return AppendResponse{}, nil
}

// If the lowest possible offset is in the cache, we can check for
// duplicates without using the journal. We search using the last event in
// the request as it's the most likely to still be in the cache.
lowestPossibleOffset := req.LowestPossibleOffset + Offset(len(req.Events))

if cacheIndex := w.findInCache(lowestPossibleOffset); cacheIndex != -1 {
lastMessageIndex := len(req.Events) - 1
lastMessageID := req.Events[lastMessageIndex].MessageId

for _, event := range w.recentEvents[cacheIndex:] {
if event.Envelope.MessageId.Equal(lastMessageID) {
return AppendResponse{
// We know the offset of the last message in the request, so
// we can compute the offset of the first message, even if
// it's no longer in the cache.
BeginOffset: event.Offset - Offset(lastMessageIndex),
EndOffset: event.Offset + 1,
AppendedByPriorAttempt: true,
}, nil
}
}
}

// Finally, we search the journal for the record containing the events.
rec, err := journal.ScanFromSearchResult(
ctx,
w.Journal,
0,
w.nextPos,
eventstreamjournal.SearchByOffset(uint64(req.LowestPossibleOffset)),
func(
_ context.Context,
_ journal.Position,
rec *eventstreamjournal.Record,
) (*eventstreamjournal.Record, bool, error) {
if op := rec.GetEventsAppended(); op != nil {
targetID := req.Events[0].MessageId
candidateID := op.Events[0].MessageId
return rec, candidateID.Equal(targetID), nil
}
return nil, false, nil
},
)
if err != nil {
return AppendResponse{}, journal.IgnoreNotFound(err)
}

return AppendResponse{
BeginOffset: Offset(rec.StreamOffsetBefore),
EndOffset: Offset(rec.StreamOffsetAfter),
AppendedByPriorAttempt: true,
}, nil
}

func (w *worker) writeEventsToJournal(
ctx context.Context,
req AppendRequest,
) (journal.Position, *eventstreamjournal.Record, error) {
pos := w.nextPos
rec := eventstreamjournal.
NewRecordBuilder().
WithStreamOffsetBefore(uint64(w.nextOffset)).
WithStreamOffsetAfter(uint64(w.nextOffset) + uint64(len(req.Events))).
WithEventsAppended(&eventstreamjournal.EventsAppended{Events: req.Events}).
Build()

if err := w.Journal.Append(ctx, pos, rec); err != nil {
return 0, nil, err
}

w.nextPos++

for _, event := range req.Events {
w.Logger.Info(
"event appended to stream",
slog.Uint64("journal_position", uint64(pos)),
slog.Uint64("stream_offset", uint64(w.nextOffset)),
slog.String("message_id", event.MessageId.AsString()),
slog.String("description", event.Description),
)

w.nextOffset++
}

return pos, rec, nil
}
109 changes: 90 additions & 19 deletions internal/eventstream/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestAppend(t *testing.T) {
Journals *memoryjournal.BinaryStore
Supervisor *Supervisor
Packer *envelope.Packer
Barrier chan struct{}
}

setup := func(t test.TestingT) (deps dependencies) {
Expand All @@ -41,6 +42,8 @@ func TestAppend(t *testing.T) {
Marshaler: Marshaler,
}

deps.Barrier = make(chan struct{})

return deps
}

Expand All @@ -51,26 +54,26 @@ func TestAppend(t *testing.T) {

cases := []struct {
Desc string
InduceFailure func(*dependencies)
InduceFailure func(context.Context, *testing.T, *dependencies)
}{
{
Desc: "no faults",
InduceFailure: func(*dependencies) {
},
},
{
Desc: "failure to open journal",
InduceFailure: func(deps *dependencies) {
InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) {
test.FailOnJournalOpen(
deps.Journals,
eventstreamjournal.Name(streamID),
errors.New("<error>"),
)
t.Log("configured journal store to fail when opening the journal")
close(deps.Barrier)
},
},
{
Desc: "failure before appending to journal",
InduceFailure: func(deps *dependencies) {
InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) {
test.FailBeforeJournalAppend(
deps.Journals,
eventstreamjournal.Name(streamID),
Expand All @@ -79,11 +82,13 @@ func TestAppend(t *testing.T) {
},
errors.New("<error>"),
)
t.Log("configured journal store to fail before appending a record")
close(deps.Barrier)
},
},
{
Desc: "failure after appending to journal",
InduceFailure: func(deps *dependencies) {
InduceFailure: func(_ context.Context, t *testing.T, deps *dependencies) {
test.FailAfterJournalAppend(
deps.Journals,
eventstreamjournal.Name(streamID),
Expand All @@ -92,6 +97,56 @@ func TestAppend(t *testing.T) {
},
errors.New("<error>"),
)
t.Log("configured journal store to fail after appending a record")
close(deps.Barrier)
},
},
{
Desc: "optimistic concurrency conflict",
InduceFailure: func(ctx context.Context, t *testing.T, deps *dependencies) {
go func() {
if _, err := deps.Supervisor.Append.Do(
ctx,
AppendRequest{
StreamID: streamID,
Events: []*envelopepb.Envelope{
deps.Packer.Pack(MessageX1),
},
},
); err != nil {
t.Error(err)
return
}

t.Log("confirmed that the supervisor-under-test is running")

s := &Supervisor{
Journals: deps.Journals,
Logger: spruce.NewLogger(t),
}

defer test.
RunInBackground(t, "conflict-generating-supervisor", s.Run).
UntilStopped().
Stop()

if _, err := s.Append.Do(
ctx,
AppendRequest{
StreamID: streamID,
Events: []*envelopepb.Envelope{
deps.Packer.Pack(MessageX2),
},
},
); err != nil {
t.Error(err)
return
}

t.Log("appended events using a different supervisor to induce a journal conflict")

close(deps.Barrier)
}()
},
},
}
Expand All @@ -101,13 +156,13 @@ func TestAppend(t *testing.T) {
tctx := test.WithContext(t)
deps := setup(tctx)

t.Log("append some initial events to the stream")
t.Log("seeding the event stream with some initial events")

supervisor := test.
RunInBackground(t, "supervisor", deps.Supervisor.Run).
RunInBackground(t, "event-seeding-supervisor", deps.Supervisor.Run).
UntilStopped()

res, err := deps.Supervisor.AppendQueue.Do(
res, err := deps.Supervisor.Append.Do(
tctx,
AppendRequest{
StreamID: streamID,
Expand All @@ -124,14 +179,35 @@ func TestAppend(t *testing.T) {

supervisor.StopAndWait()

t.Log("induce a failure")
// Open a journal that was can use for verifying results
// _before_ inducing any failure.
j, err := eventstreamjournal.Open(tctx, deps.Journals, streamID)
if err != nil {
t.Fatal(err)
}
defer j.Close()

c.InduceFailure(&deps)
if c.InduceFailure != nil {
c.InduceFailure(tctx, t, &deps)
} else {
close(deps.Barrier)
}

supervisor = test.
RunInBackground(t, "supervisor", deps.Supervisor.Run).
RunInBackground(t, "supervisor-under-test", deps.Supervisor.Run).
RepeatedlyUntilStopped()

<-deps.Barrier

// Read the journal bounds as they exist before the test
// commences.
begin, end, err := j.Bounds(tctx)
if err != nil {
t.Fatal(err)
}

t.Logf("proceeding with test, journal bounds are [%d, %d)", begin, end)

event := deps.Packer.Pack(MessageE1)

req := AppendRequest{
Expand All @@ -144,7 +220,7 @@ func TestAppend(t *testing.T) {
t.Logf("append an event, attempt #%d", attempt)
attempt++

_, err := deps.Supervisor.AppendQueue.Do(tctx, req)
_, err := deps.Supervisor.Append.Do(tctx, req)
if err == nil {
break
}
Expand All @@ -159,16 +235,11 @@ func TestAppend(t *testing.T) {

t.Log("ensure that the event was appended to the stream exactly once")

j, err := eventstreamjournal.Open(tctx, deps.Journals, streamID)
if err != nil {
t.Fatal(err)
}

var events []*envelopepb.Envelope

if err := j.Range(
tctx,
1,
end, // only read the records appended during the test
func(
ctx context.Context,
_ journal.Position,
Expand Down
Loading