Skip to content

Commit

Permalink
Change eventstream to use generic journal.Journal type instead of…
Browse files Browse the repository at this point in the history
… `protojournal` package.
  • Loading branch information
jmalloc committed Mar 17, 2024
1 parent 3a7964a commit b8db736
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 113 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/dogmatiq/example v0.0.0-20230606031437-2bd84c72050b
github.com/dogmatiq/ferrite v1.3.0
github.com/dogmatiq/marshalkit v0.7.3
github.com/dogmatiq/persistencekit v0.7.1-0.20240317034053-ba7e57ef6507
github.com/dogmatiq/persistencekit v0.7.1-0.20240317044652-8e6daf523e36
github.com/dogmatiq/primo v0.2.0
github.com/dogmatiq/spruce v0.1.0
github.com/google/go-cmp v0.6.0
Expand Down
78 changes: 2 additions & 76 deletions go.sum

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions internal/eventstream/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@ import (
"github.com/dogmatiq/veracity/internal/envelope"
. "github.com/dogmatiq/veracity/internal/eventstream"
"github.com/dogmatiq/veracity/internal/eventstream/internal/journalpb"
"github.com/dogmatiq/veracity/internal/protobuf/protojournal"
"github.com/dogmatiq/veracity/internal/test"
)

func TestAppend(t *testing.T) {
t.Parallel()

type dependencies struct {
Journals *memoryjournal.BinaryStore
Journals *memoryjournal.Store[*journalpb.Record]
Supervisor *Supervisor
Events <-chan Event
Packer *envelope.Packer
}

setup := func(t test.TestingT) (deps dependencies) {
deps.Journals = &memoryjournal.BinaryStore{}
deps.Journals = &memoryjournal.Store[*journalpb.Record]{}

events := make(chan Event, 100)

Expand Down Expand Up @@ -176,9 +175,8 @@ func TestAppend(t *testing.T) {

var events []*envelopepb.Envelope

if err := protojournal.Range(
if err := j.Range(
tctx,
j,
1,
func(
ctx context.Context,
Expand Down
20 changes: 18 additions & 2 deletions internal/eventstream/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,26 @@ import (

"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/persistencekit/marshal"
"github.com/dogmatiq/veracity/internal/eventstream/internal/journalpb"
"github.com/dogmatiq/veracity/internal/protobuf/protojournal"
)

// JournalStore is the [journal.Store] type used to store event stream state.
// Each event stream has its own journal.
type JournalStore = journal.Store[*journalpb.Record]

// Journal is a journal of event stream records for a single event stream.
type Journal = journal.Journal[*journalpb.Record]

// NewJournalStore returns a new [JournalStore] that uses s for its underlying
// storage.
func NewJournalStore(s journal.BinaryStore) JournalStore {
return journal.NewMarshalingStore(
s,
marshal.ProtocolBuffers[*journalpb.Record]{},
)
}

// JournalName returns the name of the journal that contains the state
// of the event stream with the given ID.
func JournalName(streamID *uuidpb.UUID) string {
Expand All @@ -17,7 +33,7 @@ func JournalName(streamID *uuidpb.UUID) string {

// searchByOffset returns a compare function that searches for the journal
// record that contains the event with the given offset.
func searchByOffset(off Offset) protojournal.CompareFunc[*journalpb.Record] {
func searchByOffset(off Offset) journal.CompareFunc[*journalpb.Record] {
return func(
ctx context.Context,
pos journal.Position,
Expand Down
3 changes: 1 addition & 2 deletions internal/eventstream/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log/slog"

"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/veracity/internal/fsm"
"github.com/dogmatiq/veracity/internal/messaging"
"github.com/dogmatiq/veracity/internal/signaling"
Expand All @@ -18,7 +17,7 @@ var errShuttingDown = errors.New("event stream sub-system is shutting down")

// A Supervisor coordinates event stream workers.
type Supervisor struct {
Journals journal.BinaryStore
Journals JournalStore
AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse]
Events chan<- Event
Logger *slog.Logger
Expand Down
21 changes: 8 additions & 13 deletions internal/eventstream/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/dogmatiq/veracity/internal/eventstream/internal/journalpb"
"github.com/dogmatiq/veracity/internal/fsm"
"github.com/dogmatiq/veracity/internal/messaging"
"github.com/dogmatiq/veracity/internal/protobuf/protojournal"
"github.com/dogmatiq/veracity/internal/signaling"
)

Expand All @@ -18,7 +17,7 @@ const defaultIdleTimeout = 5 * time.Minute
// A worker manages the state of an event stream.
type worker struct {
// Journal stores the event stream's state.
Journal journal.BinaryJournal
Journal Journal

// AppendQueue is a queue of requests to append events to the stream.
AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse]
Expand Down Expand Up @@ -49,7 +48,7 @@ func (w *worker) Run(ctx context.Context) (err error) {
w.Logger.DebugContext(ctx, "event stream worker started")
defer w.Logger.DebugContext(ctx, "event stream worker stopped")

pos, rec, ok, err := protojournal.GetLatest[*journalpb.Record](ctx, w.Journal)
pos, rec, ok, err := journal.LastRecord(ctx, w.Journal)
if err != nil {
return err
}
Expand Down Expand Up @@ -144,12 +143,7 @@ func (w *worker) appendEvents(
req AppendRequest,
) (AppendResponse, error) {
if w.mightBeDuplicates(req) {
rec, ok, err := w.findAppendRecord(ctx, req)
if err != nil {
return AppendResponse{}, err
}

if ok {
if rec, err := w.findAppendRecord(ctx, req); err == nil {
for i, e := range req.Events {
w.Logger.WarnContext(
ctx,
Expand All @@ -165,15 +159,16 @@ func (w *worker) appendEvents(
EndOffset: Offset(rec.StreamOffsetAfter),
AppendedByPriorAttempt: true,
}, nil
} else if err != journal.ErrNotFound {
return AppendResponse{}, err
}
}

before := w.off
after := w.off + Offset(len(req.Events))

if err := protojournal.Append(
if err := w.Journal.Append(
ctx,
w.Journal,
w.pos,
journalpb.
NewRecordBuilder().
Expand Down Expand Up @@ -235,8 +230,8 @@ func (w *worker) mightBeDuplicates(req AppendRequest) bool {
func (w *worker) findAppendRecord(
ctx context.Context,
req AppendRequest,
) (*journalpb.Record, bool, error) {
return protojournal.ScanFromSearchResult(
) (*journalpb.Record, error) {
return journal.ScanFromSearchResult(
ctx,
w.Journal,
0,
Expand Down
12 changes: 6 additions & 6 deletions internal/integration/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestSupervisor(t *testing.T) {
{
Desc: "failure before appending CommandEnqueued record to the journal",
InduceFailure: func(deps *dependencies) {
test.FailBeforeJournalAppend(
test.XXX_FailBeforeJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand All @@ -105,7 +105,7 @@ func TestSupervisor(t *testing.T) {
{
Desc: "failure after appending CommandEnqueued record to the journal",
InduceFailure: func(deps *dependencies) {
test.FailAfterJournalAppend(
test.XXX_FailAfterJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestSupervisor(t *testing.T) {
{
Desc: "failure before appending CommandHandled record to the journal",
InduceFailure: func(deps *dependencies) {
test.FailBeforeJournalAppend(
test.XXX_FailBeforeJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand All @@ -172,7 +172,7 @@ func TestSupervisor(t *testing.T) {
{
Desc: "failure after appending CommandHandled record to the journal",
InduceFailure: func(deps *dependencies) {
test.FailAfterJournalAppend(
test.XXX_FailAfterJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestSupervisor(t *testing.T) {
Desc: "failure before appending EventsAppendedToStream record to the journal",
ExpectMultipleEventAppendRequests: true,
InduceFailure: func(deps *dependencies) {
test.FailBeforeJournalAppend(
test.XXX_FailBeforeJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand All @@ -241,7 +241,7 @@ func TestSupervisor(t *testing.T) {
{
Desc: "failure after appending EventsAppendedToStream record to the journal",
InduceFailure: func(deps *dependencies) {
test.FailAfterJournalAppend(
test.XXX_FailAfterJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand Down
64 changes: 56 additions & 8 deletions internal/test/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

// FailOnJournalOpen configures the journal with the given name to return an
// error on the next call to Open().
func FailOnJournalOpen(
s *memoryjournal.BinaryStore,
func FailOnJournalOpen[T any](
s *memoryjournal.Store[T],
name string,
err error,
) {
Expand All @@ -28,10 +28,10 @@ func FailOnJournalOpen(
// predicate function.
//
// The error is returned before the append is actually performed.
func FailBeforeJournalAppend[R proto.Message](
s *memoryjournal.BinaryStore,
func FailBeforeJournalAppend[T any](
s *memoryjournal.Store[T],
name string,
pred func(R) bool,
pred func(T) bool,
err error,
) {
s.BeforeAppend = failAppendOnce(name, pred, err)
Expand All @@ -42,16 +42,64 @@ func FailBeforeJournalAppend[R proto.Message](
// predicate function.
//
// The error is returned after the append is actually performed.
func FailAfterJournalAppend[R proto.Message](
func FailAfterJournalAppend[T any](
s *memoryjournal.Store[T],
name string,
pred func(T) bool,
err error,
) {
s.AfterAppend = failAppendOnce(name, pred, err)
}

func failAppendOnce[T any](
name string,
pred func(T) bool,
err error,
) func(string, T) error {
fail := FailOnce(err)

return func(n string, rec T) error {
if n != name {
return nil
}

if pred(rec) {
return fail()
}

return nil
}
}

// XXX_FailBeforeJournalAppend configures the journal with the given name to return
// an error on the next call to Append() with a record that satisifies the given
// predicate function.
//
// The error is returned before the append is actually performed.
func XXX_FailBeforeJournalAppend[R proto.Message](
s *memoryjournal.BinaryStore,
name string,
pred func(R) bool,
err error,
) {
s.AfterAppend = failAppendOnce(name, pred, err)
s.BeforeAppend = XXX_failAppendOnce(name, pred, err)
}

// XXX_FailAfterJournalAppend configures the journal with the given name to return
// an error on the next call to Append() with a record that satisifies the given
// predicate function.
//
// The error is returned after the append is actually performed.
func XXX_FailAfterJournalAppend[R proto.Message](
s *memoryjournal.BinaryStore,
name string,
pred func(R) bool,
err error,
) {
s.AfterAppend = XXX_failAppendOnce(name, pred, err)
}

func failAppendOnce[R proto.Message](
func XXX_failAppendOnce[R proto.Message](
name string,
pred func(R) bool,
err error,
Expand Down

0 comments on commit b8db736

Please sign in to comment.