Skip to content

Commit

Permalink
Change integration to use generic journal.Journal type instead of…
Browse files Browse the repository at this point in the history
… `protojournal` package.
  • Loading branch information
jmalloc committed Mar 17, 2024
1 parent b8db736 commit bb14828
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 91 deletions.
2 changes: 1 addition & 1 deletion internal/engineconfig/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c applicationVisitor) VisitRichIntegration(_ context.Context, cfg configki
sup := &integration.Supervisor{
Handler: cfg.Handler(),
HandlerIdentity: marshalIdentity(cfg.Identity()),
Journals: c.Persistence.Journals,
Journals: integration.NewJournalStore(c.Persistence.Journals),
Packer: c.packer,
}

Expand Down
29 changes: 29 additions & 0 deletions internal/integration/journal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package integration

import (
"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/persistencekit/marshal"
"github.com/dogmatiq/veracity/internal/integration/internal/journalpb"
)

// JournalStore is the [journal.Store] type used to integration handler state.
type JournalStore = journal.Store[*journalpb.Record]

// Journal is a journal of records for a specific integration handler.
type Journal = journal.Journal[*journalpb.Record]

// NewJournalStore returns a new [JournalStore] that uses s for its underlying
// storage.
func NewJournalStore(s journal.BinaryStore) JournalStore {
return journal.NewMarshalingStore(
s,
marshal.ProtocolBuffers[*journalpb.Record]{},
)
}

// JournalName returns the name of the journal that contains the state
// of the integration handler with the given key.
func JournalName(key *uuidpb.UUID) string {
return "integration:" + key.AsString()
}
34 changes: 11 additions & 23 deletions internal/integration/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/dogmatiq/veracity/internal/fsm"
"github.com/dogmatiq/veracity/internal/integration/internal/journalpb"
"github.com/dogmatiq/veracity/internal/messaging"
"github.com/dogmatiq/veracity/internal/protobuf/protojournal"
"github.com/dogmatiq/veracity/internal/signaling"
"google.golang.org/protobuf/proto"
)
Expand All @@ -38,14 +37,14 @@ type Supervisor struct {
ExecuteQueue messaging.ExchangeQueue[ExecuteRequest, ExecuteResponse]
Handler dogma.IntegrationMessageHandler
HandlerIdentity *identitypb.Identity
Journals journal.BinaryStore
Journals JournalStore
Keyspaces kv.BinaryStore
Packer *envelope.Packer
EventRecorder EventRecorder

eventStreamID *uuidpb.UUID
lowestPossibleEventOffset eventstream.Offset
journal journal.BinaryJournal
journal Journal
pos journal.Position
handledCmds kv.BinaryKeyspace
shutdown signaling.Latch
Expand Down Expand Up @@ -91,9 +90,8 @@ func (s *Supervisor) initState(ctx context.Context) fsm.Action {
// Range over the journal to build a list of pending work consisting of:
// - enqueued but unhandled commands
// - events that have not been recorded to an event stream
if err := protojournal.Range(
if err := s.journal.Range(
ctx,
s.journal,
s.pos,
func(
ctx context.Context,
Expand Down Expand Up @@ -135,13 +133,13 @@ func (s *Supervisor) initState(ctx context.Context) fsm.Action {
}

for _, op := range unrecorded {
if err := s.recordEvents(ctx, s.journal, op, false); err != nil {
if err := s.recordEvents(ctx, op, false); err != nil {
return fsm.Fail(err)
}
}

for _, cmd := range unhandled {
if err := s.handleCommand(ctx, cmd, s.journal); err != nil {
if err := s.handleCommand(ctx, cmd); err != nil {
return fsm.Fail(err)
}
}
Expand Down Expand Up @@ -171,7 +169,7 @@ func (s *Supervisor) Shutdown() {
s.shutdown.Signal()
}

func (s *Supervisor) handleCommand(ctx context.Context, cmd *envelopepb.Envelope, j journal.BinaryJournal) error {
func (s *Supervisor) handleCommand(ctx context.Context, cmd *envelopepb.Envelope) error {
c, err := s.Packer.Unpack(cmd)
if err != nil {
return err
Expand Down Expand Up @@ -210,9 +208,8 @@ func (s *Supervisor) handleCommand(ctx context.Context, cmd *envelopepb.Envelope
LowestPossibleEventOffset: uint64(s.lowestPossibleEventOffset),
}

if err := protojournal.Append(
if err := s.journal.Append(
ctx,
j,
s.pos,
journalpb.
NewRecordBuilder().
Expand All @@ -223,12 +220,11 @@ func (s *Supervisor) handleCommand(ctx context.Context, cmd *envelopepb.Envelope
}
s.pos++

return s.recordEvents(ctx, j, op, true)
return s.recordEvents(ctx, op, true)
}

func (s *Supervisor) recordEvents(
ctx context.Context,
j journal.BinaryJournal,
op *journalpb.CommandHandled,
isFirstAttempt bool,
) error {
Expand All @@ -254,9 +250,8 @@ func (s *Supervisor) recordEvents(
s.lowestPossibleEventOffset = res.EndOffset
}

if err := protojournal.Append(
if err := s.journal.Append(
ctx,
j,
s.pos,
journalpb.
NewRecordBuilder().
Expand Down Expand Up @@ -294,9 +289,8 @@ func (s *Supervisor) handleCommandState(
}
}

if err := protojournal.Append(
if err := s.journal.Append(
ctx,
s.journal,
s.pos,
journalpb.
NewRecordBuilder().
Expand All @@ -314,19 +308,13 @@ func (s *Supervisor) handleCommandState(
s.pos++
ex.Ok(ExecuteResponse{})

if err := s.handleCommand(ctx, ex.Request.Command, s.journal); err != nil {
if err := s.handleCommand(ctx, ex.Request.Command); err != nil {
return fsm.Fail(err)
}

return fsm.EnterState(s.idleState)
}

// JournalName returns the name of the journal that contains the state
// of the integration handler with the given key.
func JournalName(key *uuidpb.UUID) string {
return "integration:" + key.AsString()
}

// HandledCommandsKeyspaceName returns the name of the keyspace that contains
// the set of handled command IDs.
func HandledCommandsKeyspaceName(key *uuidpb.UUID) string {
Expand Down
16 changes: 8 additions & 8 deletions internal/integration/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func TestSupervisor(t *testing.T) {
type dependencies struct {
Packer *envelope.Packer
Journals *memoryjournal.BinaryStore
Journals *memoryjournal.Store[*journalpb.Record]
Keyspaces *memorykv.BinaryStore
Handler *IntegrationMessageHandler
EventRecorder *eventRecorderStub
Expand All @@ -41,7 +41,7 @@ func TestSupervisor(t *testing.T) {
setup := func(test.TestingT) (deps dependencies) {
deps.Packer = newPacker()

deps.Journals = &memoryjournal.BinaryStore{}
deps.Journals = &memoryjournal.Store[*journalpb.Record]{}

deps.Keyspaces = &memorykv.BinaryStore{}

Expand Down Expand Up @@ -92,7 +92,7 @@ func TestSupervisor(t *testing.T) {
{
Desc: "failure before appending CommandEnqueued record to the journal",
InduceFailure: func(deps *dependencies) {
test.XXX_FailBeforeJournalAppend(
test.FailBeforeJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand All @@ -105,7 +105,7 @@ func TestSupervisor(t *testing.T) {
{
Desc: "failure after appending CommandEnqueued record to the journal",
InduceFailure: func(deps *dependencies) {
test.XXX_FailAfterJournalAppend(
test.FailAfterJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestSupervisor(t *testing.T) {
{
Desc: "failure before appending CommandHandled record to the journal",
InduceFailure: func(deps *dependencies) {
test.XXX_FailBeforeJournalAppend(
test.FailBeforeJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand All @@ -172,7 +172,7 @@ func TestSupervisor(t *testing.T) {
{
Desc: "failure after appending CommandHandled record to the journal",
InduceFailure: func(deps *dependencies) {
test.XXX_FailAfterJournalAppend(
test.FailAfterJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestSupervisor(t *testing.T) {
Desc: "failure before appending EventsAppendedToStream record to the journal",
ExpectMultipleEventAppendRequests: true,
InduceFailure: func(deps *dependencies) {
test.XXX_FailBeforeJournalAppend(
test.FailBeforeJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand All @@ -241,7 +241,7 @@ func TestSupervisor(t *testing.T) {
{
Desc: "failure after appending EventsAppendedToStream record to the journal",
InduceFailure: func(deps *dependencies) {
test.XXX_FailAfterJournalAppend(
test.FailAfterJournalAppend(
deps.Journals,
JournalName(deps.Supervisor.HandlerIdentity.Key),
func(r *journalpb.Record) bool {
Expand Down
59 changes: 0 additions & 59 deletions internal/test/journal.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package test

import (
"reflect"

"github.com/dogmatiq/persistencekit/driver/memory/memoryjournal"
"google.golang.org/protobuf/proto"
)

// FailOnJournalOpen configures the journal with the given name to return an
Expand Down Expand Up @@ -70,59 +67,3 @@ func failAppendOnce[T any](
return nil
}
}

// XXX_FailBeforeJournalAppend configures the journal with the given name to return
// an error on the next call to Append() with a record that satisifies the given
// predicate function.
//
// The error is returned before the append is actually performed.
func XXX_FailBeforeJournalAppend[R proto.Message](
s *memoryjournal.BinaryStore,
name string,
pred func(R) bool,
err error,
) {
s.BeforeAppend = XXX_failAppendOnce(name, pred, err)
}

// XXX_FailAfterJournalAppend configures the journal with the given name to return
// an error on the next call to Append() with a record that satisifies the given
// predicate function.
//
// The error is returned after the append is actually performed.
func XXX_FailAfterJournalAppend[R proto.Message](
s *memoryjournal.BinaryStore,
name string,
pred func(R) bool,
err error,
) {
s.AfterAppend = XXX_failAppendOnce(name, pred, err)
}

func XXX_failAppendOnce[R proto.Message](
name string,
pred func(R) bool,
err error,
) func(string, []byte) error {
fail := FailOnce(err)

return func(n string, data []byte) error {
if n != name {
return nil
}

var rec R
rec = reflect.New(
reflect.TypeOf(rec).Elem(),
).Interface().(R)

if err := proto.Unmarshal(data, rec); err != nil {
panic(err)
}

if pred(rec) {
return fail()
}
return nil
}
}

0 comments on commit bb14828

Please sign in to comment.