diff --git a/go.mod b/go.mod index 89d06cec..3775ccc7 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/dogmatiq/example v0.0.0-20230606031437-2bd84c72050b github.com/dogmatiq/ferrite v1.3.0 github.com/dogmatiq/marshalkit v0.7.3 - github.com/dogmatiq/persistencekit v0.7.1-0.20240317044652-8e6daf523e36 + github.com/dogmatiq/persistencekit v0.8.0 github.com/dogmatiq/primo v0.2.0 github.com/dogmatiq/spruce v0.1.0 github.com/google/go-cmp v0.6.0 diff --git a/go.sum b/go.sum index c02027f9..10af4c61 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,8 @@ github.com/dogmatiq/linger v1.1.0 h1:kGL9sL79qRa6Cr8PhadeJ/ptbum+b48pAaNWWlyVVKg github.com/dogmatiq/linger v1.1.0/go.mod h1:OOWJUwTxNkFolhuVdaTYjO4FmFLjZHZ8EMc5H5qOJ7Q= github.com/dogmatiq/marshalkit v0.7.3 h1:kBymR5txcHFBJcYvzld6kFWshqL9YqfBfnFzl9KwEaI= github.com/dogmatiq/marshalkit v0.7.3/go.mod h1:gGiQXt9aHidlR1GIgHlZxJU8QAd004kFxU6beq+MPmI= -github.com/dogmatiq/persistencekit v0.7.1-0.20240317044652-8e6daf523e36 h1:ARp+NI7z3iNTd6f/eujKh65eCDe7eXJzP94RH0Pl0jM= -github.com/dogmatiq/persistencekit v0.7.1-0.20240317044652-8e6daf523e36/go.mod h1:ZzkP9/iVL3nvAo5MWUlCRpA8WQEIe5Dq4nycKzJj5GM= +github.com/dogmatiq/persistencekit v0.8.0 h1:Xvam51Kl2/wLU78CU5u3ClYFPPVdL5aLRznHmcR8RBI= +github.com/dogmatiq/persistencekit v0.8.0/go.mod h1:ZzkP9/iVL3nvAo5MWUlCRpA8WQEIe5Dq4nycKzJj5GM= github.com/dogmatiq/primo v0.2.0 h1:XSgal1oykHCFtHvHXdsaSDvQ2x/V/h+clDS1YIqtwHM= github.com/dogmatiq/primo v0.2.0/go.mod h1:c1EGDvqJQSaIlTxpT1jPgXMBhOXLnQ3jtKPRH5nuUis= github.com/dogmatiq/projectionkit v0.6.5 h1:3Ues+QL5oVtJcx4WogMA6XjJF1QyOlcx1uRmUrl2ghI= diff --git a/internal/cluster/persistence.go b/internal/cluster/persistence.go new file mode 100644 index 00000000..41ac259f --- /dev/null +++ b/internal/cluster/persistence.go @@ -0,0 +1,20 @@ +package cluster + +import ( + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/persistencekit/kv" + "github.com/dogmatiq/persistencekit/marshaler" + "github.com/dogmatiq/veracity/internal/cluster/internal/registrypb" +) + +// newKVStore returns a new [kv.Store] that uses s for its underlying storage. +func newKVStore(s kv.BinaryStore) kv.Store[*uuidpb.UUID, *registrypb.Registration] { + return kv.NewMarshalingStore( + s, + marshaler.NewProto[*uuidpb.UUID](), + marshaler.NewProto[*registrypb.Registration](), + ) +} + +// registryKeyspace is the name of the keyspace that contains registry data. +const registryKeyspace = "cluster.registry" diff --git a/internal/cluster/registry.go b/internal/cluster/registry.go index 86965c1e..825935b0 100644 --- a/internal/cluster/registry.go +++ b/internal/cluster/registry.go @@ -11,7 +11,6 @@ import ( "github.com/dogmatiq/persistencekit/kv" "github.com/dogmatiq/veracity/internal/cluster/internal/registrypb" "github.com/dogmatiq/veracity/internal/fsm" - "github.com/dogmatiq/veracity/internal/protobuf/protokv" "github.com/dogmatiq/veracity/internal/signaling" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -23,9 +22,6 @@ const ( // The registration period is always set to 2 * DefaultRenewInterval. DefaultRenewInterval = 10 * time.Second - // RegistryKeyspace is the name of the keyspace that contains registry data. - RegistryKeyspace = "cluster.registry" - // DefaultRegistryPollInterval is the default interval at which the registry // polls the underlying key-value store for changes. DefaultRegistryPollInterval = 3 * time.Second @@ -40,14 +36,14 @@ type Registrar struct { Shutdown signaling.Latch Logger *slog.Logger - keyspace kv.BinaryKeyspace + keyspace kv.Keyspace[*uuidpb.UUID, *registrypb.Registration] interval time.Duration } // Run starts the registrar. func (r *Registrar) Run(ctx context.Context) error { var err error - r.keyspace, err = r.Keyspaces.Open(ctx, RegistryKeyspace) + r.keyspace, err = newKVStore(r.Keyspaces).Open(ctx, registryKeyspace) if err != nil { return err } @@ -113,15 +109,11 @@ func (r *Registrar) deregister(ctx context.Context) error { // renew updates a node's registration expiry time. func (r *Registrar) renew(ctx context.Context) error { - reg, ok, err := protokv.Get[*registrypb.Registration]( - ctx, - r.keyspace, - r.Node.ID.AsBytes(), - ) + reg, err := r.keyspace.Get(ctx, r.Node.ID) if err != nil { return err } - if !ok { + if reg == nil { return errors.New("cluster node not registered") } @@ -153,10 +145,10 @@ func (r *Registrar) saveRegistration( ctx context.Context, ) (time.Time, error) { expiresAt := time.Now().Add(r.interval * 2) - err := protokv.Set( + + return expiresAt, r.keyspace.Set( ctx, - r.keyspace, - r.Node.ID.AsBytes(), + r.Node.ID, ®istrypb.Registration{ Node: ®istrypb.Node{ Id: r.Node.ID, @@ -165,19 +157,13 @@ func (r *Registrar) saveRegistration( ExpiresAt: timestamppb.New(expiresAt), }, ) - - return expiresAt, err } // deleteRegistration removes a registration from the registry. func (r *Registrar) deleteRegistration( ctx context.Context, ) error { - return r.keyspace.Set( - ctx, - r.Node.ID.AsBytes(), - nil, - ) + return r.keyspace.Set(ctx, r.Node.ID, nil) } // RegistryObserver emits events about changes to the nodes in the registry. @@ -187,7 +173,7 @@ type RegistryObserver struct { Shutdown signaling.Latch PollInterval time.Duration - keyspace kv.BinaryKeyspace + keyspace kv.Keyspace[*uuidpb.UUID, *registrypb.Registration] nodes uuidpb.Map[Node] readyForPoll *time.Ticker } @@ -195,7 +181,7 @@ type RegistryObserver struct { // Run starts the observer. func (o *RegistryObserver) Run(ctx context.Context) error { var err error - o.keyspace, err = o.Keyspaces.Open(ctx, RegistryKeyspace) + o.keyspace, err = newKVStore(o.Keyspaces).Open(ctx, registryKeyspace) if err != nil { return err } @@ -279,12 +265,11 @@ func (o *RegistryObserver) publishState(ctx context.Context, ev MembershipChange func (o *RegistryObserver) loadNodes(ctx context.Context) (uuidpb.Map[Node], error) { nodes := uuidpb.Map[Node]{} - return nodes, protokv.Range( + return nodes, o.keyspace.Range( ctx, - o.keyspace, func( ctx context.Context, - k []byte, + id *uuidpb.UUID, reg *registrypb.Registration, ) (bool, error) { if reg.ExpiresAt.AsTime().After(time.Now()) { @@ -295,7 +280,7 @@ func (o *RegistryObserver) loadNodes(ctx context.Context) (uuidpb.Map[Node], err Addresses: reg.Node.Addresses, }, ) - } else if err := o.keyspace.Set(ctx, k, nil); err != nil { + } else if err := o.keyspace.Set(ctx, id, nil); err != nil { return false, err } diff --git a/internal/engineconfig/app.go b/internal/engineconfig/app.go index 3fa10c6b..9c20950d 100644 --- a/internal/engineconfig/app.go +++ b/internal/engineconfig/app.go @@ -63,7 +63,7 @@ func (c applicationVisitor) VisitRichIntegration(_ context.Context, cfg configki sup := &integration.Supervisor{ Handler: cfg.Handler(), HandlerIdentity: marshalIdentity(cfg.Identity()), - Journals: integration.NewJournalStore(c.Persistence.Journals), + Journals: c.Persistence.Journals, Packer: c.packer, } diff --git a/internal/envelope/transcoder.go b/internal/envelope/transcoder.go index 87005d90..f1c3f90b 100644 --- a/internal/envelope/transcoder.go +++ b/internal/envelope/transcoder.go @@ -5,7 +5,7 @@ import ( "github.com/dogmatiq/enginekit/protobuf/envelopepb" "github.com/dogmatiq/marshalkit" - "github.com/dogmatiq/veracity/internal/protobuf/typedproto" + "google.golang.org/protobuf/proto" ) // Transcoder re-encodes messages to different media-types on the fly. @@ -47,7 +47,7 @@ func (t *Transcoder) Transcode(env *envelopepb.Envelope) (*envelopepb.Envelope, return nil, ok, err } - env = typedproto.Clone(env) + env = proto.Clone(env).(*envelopepb.Envelope) env.MediaType = packet.MediaType env.Data = packet.Data diff --git a/internal/eventstream/append_test.go b/internal/eventstream/append_test.go index 1baf81a8..6ee9c574 100644 --- a/internal/eventstream/append_test.go +++ b/internal/eventstream/append_test.go @@ -23,14 +23,14 @@ func TestAppend(t *testing.T) { t.Parallel() type dependencies struct { - Journals *memoryjournal.Store[*journalpb.Record] + Journals *memoryjournal.BinaryStore Supervisor *Supervisor Events <-chan Event Packer *envelope.Packer } setup := func(t test.TestingT) (deps dependencies) { - deps.Journals = &memoryjournal.Store[*journalpb.Record]{} + deps.Journals = &memoryjournal.BinaryStore{} events := make(chan Event, 100) @@ -168,7 +168,7 @@ func TestAppend(t *testing.T) { t.Log("ensure that the event was appended to the stream exactly once") - j, err := deps.Journals.Open(tctx, JournalName(streamID)) + j, err := NewJournalStore(deps.Journals).Open(tctx, JournalName(streamID)) if err != nil { t.Fatal(err) } diff --git a/internal/eventstream/journal.go b/internal/eventstream/persistence.go similarity index 56% rename from internal/eventstream/journal.go rename to internal/eventstream/persistence.go index d7c85e10..a1411390 100644 --- a/internal/eventstream/journal.go +++ b/internal/eventstream/persistence.go @@ -5,29 +5,22 @@ import ( "github.com/dogmatiq/enginekit/protobuf/uuidpb" "github.com/dogmatiq/persistencekit/journal" - "github.com/dogmatiq/persistencekit/marshal" + "github.com/dogmatiq/persistencekit/marshaler" "github.com/dogmatiq/veracity/internal/eventstream/internal/journalpb" ) -// JournalStore is the [journal.Store] type used to store event stream state. -// Each event stream has its own journal. -type JournalStore = journal.Store[*journalpb.Record] - -// Journal is a journal of event stream records for a single event stream. -type Journal = journal.Journal[*journalpb.Record] - -// NewJournalStore returns a new [JournalStore] that uses s for its underlying +// newJournalStore returns a new [journal.Store] that uses s for its underlying // storage. -func NewJournalStore(s journal.BinaryStore) JournalStore { +func newJournalStore(s journal.BinaryStore) journal.Store[*journalpb.Record] { return journal.NewMarshalingStore( s, - marshal.ProtocolBuffers[*journalpb.Record]{}, + marshaler.NewProto[*journalpb.Record](), ) } -// JournalName returns the name of the journal that contains the state +// journalName returns the name of the journal that contains the state // of the event stream with the given ID. -func JournalName(streamID *uuidpb.UUID) string { +func journalName(streamID *uuidpb.UUID) string { return "eventstream:" + streamID.AsString() } diff --git a/internal/eventstream/persistence_test.go b/internal/eventstream/persistence_test.go new file mode 100644 index 00000000..622a3278 --- /dev/null +++ b/internal/eventstream/persistence_test.go @@ -0,0 +1,6 @@ +package eventstream + +var ( + JournalName = journalName + NewJournalStore = newJournalStore +) diff --git a/internal/eventstream/supervisor.go b/internal/eventstream/supervisor.go index 2a2cf3e0..c7b16b63 100644 --- a/internal/eventstream/supervisor.go +++ b/internal/eventstream/supervisor.go @@ -6,6 +6,8 @@ import ( "log/slog" "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "github.com/dogmatiq/persistencekit/journal" + "github.com/dogmatiq/veracity/internal/eventstream/internal/journalpb" "github.com/dogmatiq/veracity/internal/fsm" "github.com/dogmatiq/veracity/internal/messaging" "github.com/dogmatiq/veracity/internal/signaling" @@ -17,11 +19,12 @@ var errShuttingDown = errors.New("event stream sub-system is shutting down") // A Supervisor coordinates event stream workers. type Supervisor struct { - Journals JournalStore + Journals journal.BinaryStore AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] Events chan<- Event Logger *slog.Logger + journals journal.Store[*journalpb.Record] shutdown signaling.Latch workers uuidpb.Map[*worker] workerStopped chan workerResult @@ -135,7 +138,11 @@ func (s *Supervisor) startWorkerForStreamID( ctx context.Context, streamID *uuidpb.UUID, ) (*worker, error) { - j, err := s.Journals.Open(ctx, JournalName(streamID)) + if s.journals == nil { + s.journals = newJournalStore(s.Journals) + } + + j, err := s.journals.Open(ctx, journalName(streamID)) if err != nil { return nil, err } diff --git a/internal/eventstream/worker.go b/internal/eventstream/worker.go index 65b8315c..b5eb7816 100644 --- a/internal/eventstream/worker.go +++ b/internal/eventstream/worker.go @@ -17,7 +17,7 @@ const defaultIdleTimeout = 5 * time.Minute // A worker manages the state of an event stream. type worker struct { // Journal stores the event stream's state. - Journal Journal + Journal journal.Journal[*journalpb.Record] // AppendQueue is a queue of requests to append events to the stream. AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse] diff --git a/internal/integration/journal.go b/internal/integration/journal.go deleted file mode 100644 index 467cf803..00000000 --- a/internal/integration/journal.go +++ /dev/null @@ -1,29 +0,0 @@ -package integration - -import ( - "github.com/dogmatiq/enginekit/protobuf/uuidpb" - "github.com/dogmatiq/persistencekit/journal" - "github.com/dogmatiq/persistencekit/marshal" - "github.com/dogmatiq/veracity/internal/integration/internal/journalpb" -) - -// JournalStore is the [journal.Store] type used to integration handler state. -type JournalStore = journal.Store[*journalpb.Record] - -// Journal is a journal of records for a specific integration handler. -type Journal = journal.Journal[*journalpb.Record] - -// NewJournalStore returns a new [JournalStore] that uses s for its underlying -// storage. -func NewJournalStore(s journal.BinaryStore) JournalStore { - return journal.NewMarshalingStore( - s, - marshal.ProtocolBuffers[*journalpb.Record]{}, - ) -} - -// JournalName returns the name of the journal that contains the state -// of the integration handler with the given key. -func JournalName(key *uuidpb.UUID) string { - return "integration:" + key.AsString() -} diff --git a/internal/integration/persistence.go b/internal/integration/persistence.go new file mode 100644 index 00000000..a3faa3a5 --- /dev/null +++ b/internal/integration/persistence.go @@ -0,0 +1,17 @@ +package integration + +import ( + "github.com/dogmatiq/enginekit/protobuf/uuidpb" +) + +// journalName returns the name of the journal that contains the state +// of the integration handler with the given key. +func journalName(key *uuidpb.UUID) string { + return "integration:" + key.AsString() +} + +// handledCommandsKeyspaceName returns the name of the keyspace that contains +// the set of handled command IDs. +func handledCommandsKeyspaceName(key *uuidpb.UUID) string { + return "integration:" + key.AsString() + ":handled-commands" +} diff --git a/internal/integration/persistence_test.go b/internal/integration/persistence_test.go new file mode 100644 index 00000000..60d801fe --- /dev/null +++ b/internal/integration/persistence_test.go @@ -0,0 +1,6 @@ +package integration + +var ( + JournalName = journalName + HandledCommandsKeyspaceName = handledCommandsKeyspaceName +) diff --git a/internal/integration/supervisor.go b/internal/integration/supervisor.go index 54a4197c..e11f7b4e 100644 --- a/internal/integration/supervisor.go +++ b/internal/integration/supervisor.go @@ -10,6 +10,7 @@ import ( "github.com/dogmatiq/enginekit/protobuf/uuidpb" "github.com/dogmatiq/persistencekit/journal" "github.com/dogmatiq/persistencekit/kv" + "github.com/dogmatiq/persistencekit/marshaler" "github.com/dogmatiq/veracity/internal/envelope" "github.com/dogmatiq/veracity/internal/eventstream" "github.com/dogmatiq/veracity/internal/fsm" @@ -37,16 +38,16 @@ type Supervisor struct { ExecuteQueue messaging.ExchangeQueue[ExecuteRequest, ExecuteResponse] Handler dogma.IntegrationMessageHandler HandlerIdentity *identitypb.Identity - Journals JournalStore + Journals journal.BinaryStore Keyspaces kv.BinaryStore Packer *envelope.Packer EventRecorder EventRecorder eventStreamID *uuidpb.UUID lowestPossibleEventOffset eventstream.Offset - journal Journal + journal journal.Journal[*journalpb.Record] pos journal.Position - handledCmds kv.BinaryKeyspace + handled kv.Keyspace[*uuidpb.UUID, bool] shutdown signaling.Latch } @@ -55,17 +56,28 @@ type Supervisor struct { func (s *Supervisor) Run(ctx context.Context) error { var err error - s.journal, err = s.Journals.Open(ctx, JournalName(s.HandlerIdentity.Key)) + journals := journal.NewMarshalingStore( + s.Journals, + marshaler.NewProto[*journalpb.Record](), + ) + + s.journal, err = journals.Open(ctx, journalName(s.HandlerIdentity.Key)) if err != nil { return err } defer s.journal.Close() - s.handledCmds, err = s.Keyspaces.Open(ctx, HandledCommandsKeyspaceName(s.HandlerIdentity.Key)) + keyspaces := kv.NewMarshalingStore( + s.Keyspaces, + marshaler.NewProto[*uuidpb.UUID](), + marshaler.Bool, + ) + + s.handled, err = keyspaces.Open(ctx, handledCommandsKeyspaceName(s.HandlerIdentity.Key)) if err != nil { return err } - defer s.journal.Close() + defer s.handled.Close() return fsm.Start(ctx, s.initState) } @@ -181,11 +193,7 @@ func (s *Supervisor) handleCommand(ctx context.Context, cmd *envelopepb.Envelope return err } - if err := s.handledCmds.Set( - ctx, - cmd.GetMessageId().AsBytes(), - []byte{1}, - ); err != nil { + if err := s.handled.Set(ctx, cmd.GetMessageId(), true); err != nil { return err } @@ -275,10 +283,7 @@ func (s *Supervisor) handleCommandState( ex messaging.Exchange[ExecuteRequest, ExecuteResponse], ) fsm.Action { if !ex.Request.IsFirstAttempt { - alreadyHandled, err := s.handledCmds.Has( - ctx, - ex.Request.Command.GetMessageId().AsBytes(), - ) + alreadyHandled, err := s.handled.Has(ctx, ex.Request.Command.GetMessageId()) if err != nil { ex.Err(err) return fsm.Fail(err) @@ -314,9 +319,3 @@ func (s *Supervisor) handleCommandState( return fsm.EnterState(s.idleState) } - -// HandledCommandsKeyspaceName returns the name of the keyspace that contains -// the set of handled command IDs. -func HandledCommandsKeyspaceName(key *uuidpb.UUID) string { - return "integration:" + key.AsString() + ":handled-commands" -} diff --git a/internal/integration/supervisor_test.go b/internal/integration/supervisor_test.go index 299914bc..faf425c1 100644 --- a/internal/integration/supervisor_test.go +++ b/internal/integration/supervisor_test.go @@ -30,7 +30,7 @@ import ( func TestSupervisor(t *testing.T) { type dependencies struct { Packer *envelope.Packer - Journals *memoryjournal.Store[*journalpb.Record] + Journals *memoryjournal.BinaryStore Keyspaces *memorykv.BinaryStore Handler *IntegrationMessageHandler EventRecorder *eventRecorderStub @@ -41,7 +41,7 @@ func TestSupervisor(t *testing.T) { setup := func(test.TestingT) (deps dependencies) { deps.Packer = newPacker() - deps.Journals = &memoryjournal.Store[*journalpb.Record]{} + deps.Journals = &memoryjournal.BinaryStore{} deps.Keyspaces = &memorykv.BinaryStore{} diff --git a/internal/protobuf/protojournal/append.go b/internal/protobuf/protojournal/append.go deleted file mode 100644 index 96155dbd..00000000 --- a/internal/protobuf/protojournal/append.go +++ /dev/null @@ -1,27 +0,0 @@ -package protojournal - -import ( - "context" - "fmt" - - "github.com/dogmatiq/persistencekit/journal" - "github.com/dogmatiq/veracity/internal/protobuf/typedproto" -) - -// Append adds a record to the journal. -func Append[ - Record typedproto.Message[Struct], - Struct typedproto.MessageStruct, -]( - ctx context.Context, - j journal.BinaryJournal, - end journal.Position, - rec Record, -) error { - data, err := typedproto.Marshal(rec) - if err != nil { - return fmt.Errorf("unable to marshal record: %w", err) - } - - return j.Append(ctx, end, data) -} diff --git a/internal/protobuf/protojournal/doc.go b/internal/protobuf/protojournal/doc.go deleted file mode 100644 index 06899207..00000000 --- a/internal/protobuf/protojournal/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package protojournal provides utilities for storing Protocol Buffers messages -// in journals. -package protojournal diff --git a/internal/protobuf/protojournal/get.go b/internal/protobuf/protojournal/get.go deleted file mode 100644 index f3641e55..00000000 --- a/internal/protobuf/protojournal/get.go +++ /dev/null @@ -1,59 +0,0 @@ -package protojournal - -import ( - "context" - "errors" - "fmt" - - "github.com/dogmatiq/persistencekit/journal" - "github.com/dogmatiq/veracity/internal/protobuf/typedproto" -) - -// Get returns the record at the given position. -func Get[ - Record typedproto.Message[Struct], - Struct typedproto.MessageStruct, -]( - ctx context.Context, - j journal.BinaryJournal, - pos journal.Position, -) (Record, error) { - data, err := j.Get(ctx, pos) - if err != nil { - return nil, err - } - - rec, err := typedproto.Unmarshal[Record](data) - if err != nil { - return nil, fmt.Errorf("unable to unmarshal record: %w", err) - } - - return rec, nil -} - -// GetLatest returns the most recent record in the journal. -func GetLatest[ - Record typedproto.Message[Struct], - Struct typedproto.MessageStruct, -]( - ctx context.Context, - j journal.BinaryJournal, -) (journal.Position, Record, bool, error) { - for { - begin, end, err := j.Bounds(ctx) - if begin == end || err != nil { - return 0, nil, false, err - } - - pos := end - 1 - rec, err := Get[Record](ctx, j, pos) - - if !errors.Is(err, journal.ErrNotFound) { - return pos, rec, true, err - } - - // We didn't find the record, assuming the journal is not corrupted, - // that means that it was truncated after the call to Bounds() but - // before the call to Get(), so we re-read the bounds and try again. - } -} diff --git a/internal/protobuf/protojournal/range.go b/internal/protobuf/protojournal/range.go deleted file mode 100644 index ef8ee70f..00000000 --- a/internal/protobuf/protojournal/range.go +++ /dev/null @@ -1,45 +0,0 @@ -package protojournal - -import ( - "context" - "fmt" - - "github.com/dogmatiq/persistencekit/journal" - "github.com/dogmatiq/veracity/internal/protobuf/typedproto" - "google.golang.org/protobuf/proto" -) - -// A RangeFunc is called by [Range] for each record in a [journal.Journal]. -type RangeFunc[Record proto.Message] func( - ctx context.Context, - pos journal.Position, - rec Record, -) (ok bool, err error) - -// Range invokes fn for each record in the journal, in order, beginning at the -// given position. -func Range[ - Record typedproto.Message[Struct], - Struct typedproto.MessageStruct, -]( - ctx context.Context, - j journal.BinaryJournal, - begin journal.Position, - fn RangeFunc[Record], -) error { - return j.Range( - ctx, - begin, - func( - ctx context.Context, - pos journal.Position, - data []byte, - ) (bool, error) { - rec, err := typedproto.Unmarshal[Record](data) - if err != nil { - return false, fmt.Errorf("unable to unmarshal record: %w", err) - } - return fn(ctx, pos, rec) - }, - ) -} diff --git a/internal/protobuf/protojournal/scan.go b/internal/protobuf/protojournal/scan.go deleted file mode 100644 index 5464f5ba..00000000 --- a/internal/protobuf/protojournal/scan.go +++ /dev/null @@ -1,67 +0,0 @@ -package protojournal - -import ( - "context" - - "github.com/dogmatiq/persistencekit/journal" - "github.com/dogmatiq/veracity/internal/protobuf/typedproto" - "google.golang.org/protobuf/proto" -) - -// ScanFunc is a predicate function that produces a value of type T from a -// record. -type ScanFunc[T any, Record proto.Message] func( - ctx context.Context, - pos journal.Position, - rec Record, -) (value T, ok bool, err error) - -// Scan finds a value within the journal by scanning all records -// beginning with the record at the given position. -func Scan[ - T any, - Record typedproto.Message[Struct], - Struct typedproto.MessageStruct, -]( - ctx context.Context, - j journal.BinaryJournal, - begin journal.Position, - scan ScanFunc[T, Record], -) (value T, ok bool, err error) { - err = Range( - ctx, - j, - begin, - func(ctx context.Context, pos journal.Position, rec Record) (bool, error) { - value, ok, err = scan(ctx, pos, rec) - return !ok, err - }, - ) - return value, ok, err -} - -// ScanFromSearchResult finds a value within the journal by scanning all records -// beginning with the record for which cmp() returns true. -func ScanFromSearchResult[ - T any, - Record typedproto.Message[Struct], - Struct typedproto.MessageStruct, -]( - ctx context.Context, - j journal.BinaryJournal, - begin, end journal.Position, - cmp CompareFunc[Record], - scan ScanFunc[T, Record], -) (value T, ok bool, err error) { - pos, rec, ok, err := Search(ctx, j, begin, end, cmp) - if !ok || err != nil { - return value, false, err - } - - value, ok, err = scan(ctx, pos, rec) - if ok || err != nil { - return value, ok, err - } - - return Scan(ctx, j, pos+1, scan) -} diff --git a/internal/protobuf/protojournal/search.go b/internal/protobuf/protojournal/search.go deleted file mode 100644 index 747e7b37..00000000 --- a/internal/protobuf/protojournal/search.go +++ /dev/null @@ -1,59 +0,0 @@ -package protojournal - -import ( - "context" - "errors" - - "github.com/dogmatiq/persistencekit/journal" - "github.com/dogmatiq/veracity/internal/protobuf/typedproto" - "google.golang.org/protobuf/proto" -) - -// CompareFunc is a function that compares a record to some datum. -// -// If the record is less than the datum, cmp is negative. If the record is -// greater than the datum, cmp is positive. Otherwise, the record is -// considered equal to the datum. -type CompareFunc[Record proto.Message] func( - ctx context.Context, - pos journal.Position, - rec Record, -) (cmp int, err error) - -// Search performs a binary search of the journal to find the position of -// the record for which cmp() returns zero. -func Search[ - Record typedproto.Message[Struct], - Struct typedproto.MessageStruct, -]( - ctx context.Context, - j journal.BinaryJournal, - begin, end journal.Position, - cmp CompareFunc[Record], -) (journal.Position, Record, bool, error) { - for begin < end { - pos := (begin >> 1) + (end >> 1) - - rec, err := Get[Record](ctx, j, pos) - if errors.Is(err, journal.ErrNotFound) { - break - } else if err != nil { - return 0, nil, false, err - } - - result, err := cmp(ctx, pos, rec) - if err != nil { - return 0, nil, false, err - } - - if result < 0 { - end = pos - } else if result > 0 { - begin = pos + 1 - } else { - return pos, rec, true, nil - } - } - - return 0, nil, false, nil -} diff --git a/internal/protobuf/protokv/doc.go b/internal/protobuf/protokv/doc.go deleted file mode 100644 index 1be1c3ae..00000000 --- a/internal/protobuf/protokv/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package protokv provides utilities for storing Protocol Buffers messages in -// key/value stores. -package protokv diff --git a/internal/protobuf/protokv/kv.go b/internal/protobuf/protokv/kv.go deleted file mode 100644 index dfe72884..00000000 --- a/internal/protobuf/protokv/kv.go +++ /dev/null @@ -1,74 +0,0 @@ -package protokv - -import ( - "context" - "fmt" - - "github.com/dogmatiq/persistencekit/kv" - "github.com/dogmatiq/veracity/internal/protobuf/typedproto" - "google.golang.org/protobuf/proto" -) - -// A RangeFunc is a function used to range over the key/value pairs in a -// [Keyspace]. -type RangeFunc[T proto.Message] func(ctx context.Context, k []byte, v T) (ok bool, err error) - -// Get returns the value associated with k. -func Get[ - T typedproto.Message[S], - S typedproto.MessageStruct, -]( - ctx context.Context, - ks kv.BinaryKeyspace, - k []byte, -) (T, bool, error) { - data, err := ks.Get(ctx, k) - if err != nil || len(data) == 0 { - return nil, false, err - } - - v, err := typedproto.Unmarshal[T](data) - if err != nil { - return nil, false, fmt.Errorf("unable to unmarshal value: %w", err) - } - - return v, true, nil -} - -// Set associates a value with k. -func Set[ - T typedproto.Message[S], - S typedproto.MessageStruct, -]( - ctx context.Context, - ks kv.BinaryKeyspace, - k []byte, - v T, -) error { - data, err := typedproto.Marshal(v) - if err != nil { - return fmt.Errorf("unable to marshal value: %w", err) - } - return ks.Set(ctx, k, data) -} - -// Range invokes fn for each key in ks. -func Range[ - T typedproto.Message[S], - S typedproto.MessageStruct, -]( - ctx context.Context, - ks kv.BinaryKeyspace, - fn RangeFunc[T], -) error { - return ks.Range( - ctx, - func(ctx context.Context, k, data []byte) (bool, error) { - v, err := typedproto.Unmarshal[T](data) - if err != nil { - return false, fmt.Errorf("unable to unmarshal value: %w", err) - } - return fn(ctx, k, v) - }, - ) -} diff --git a/internal/protobuf/prototest/doc.go b/internal/protobuf/prototest/doc.go deleted file mode 100644 index b5c37f20..00000000 --- a/internal/protobuf/prototest/doc.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package grpctest contains utilities for testing gRPC services. -package prototest diff --git a/internal/protobuf/prototest/grpc.go b/internal/protobuf/prototest/grpc.go deleted file mode 100644 index 4e87539e..00000000 --- a/internal/protobuf/prototest/grpc.go +++ /dev/null @@ -1,55 +0,0 @@ -package prototest - -import ( - "net" - "testing" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -// RunGRPCServer starts a gRPC server that listens on a random port and returns a -// client connection to it. -// -// register is called to register the server's gRPC services. -func RunGRPCServer( - t *testing.T, - reg func(grpc.ServiceRegistrar), -) grpc.ClientConnInterface { - server := grpc.NewServer() - reg(server) - - lis, err := net.Listen("tcp", ":0") - if err != nil { - t.Fatal(err) - } - t.Cleanup(func() { - lis.Close() - }) - - result := make(chan error, 1) - go func() { - result <- server.Serve(lis) - }() - t.Cleanup(func() { - server.Stop() - if err := <-result; err != nil { - t.Fatal(err) - } - }) - - conn, err := grpc.Dial( - lis.Addr().String(), - grpc.WithTransportCredentials( - insecure.NewCredentials(), - ), - ) - if err != nil { - t.Fatal(err) - } - t.Cleanup(func() { - conn.Close() - }) - - return conn -} diff --git a/internal/protobuf/typedproto/doc.go b/internal/protobuf/typedproto/doc.go deleted file mode 100644 index f5773195..00000000 --- a/internal/protobuf/typedproto/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package typedproto provides "type-safe" (generic) utilities for working with -// Protocol Buffers messages. -package typedproto diff --git a/internal/protobuf/typedproto/message.go b/internal/protobuf/typedproto/message.go deleted file mode 100644 index bd9ce634..00000000 --- a/internal/protobuf/typedproto/message.go +++ /dev/null @@ -1,59 +0,0 @@ -package typedproto - -import "google.golang.org/protobuf/proto" - -// MessageStruct is a "place-holder constraint" that indicates a type parameter -// should be a generated struct, a pointer to which implements implements -// [proto.Message]. -type MessageStruct interface{} - -// Message is a constraint for a [proto.Message] implemented by type *S. -// -// Even though Protocol Buffers generates structs that use pointer receivers, S -// is the "raw" struct type, not a pointer to it. -type Message[S MessageStruct] interface { - proto.Message - *S -} - -// New returns a new zero-valued message of type T. -func New[ - T Message[S], - S MessageStruct, -]() T { - var m S - return &m -} - -// Unmarshal unmarshals data into a new message of type T. -func Unmarshal[ - T Message[S], - S MessageStruct, -](data []byte) (T, error) { - m := New[T, S]() - - if err := proto.Unmarshal(data, m); err != nil { - return nil, err - } - - return m, nil -} - -// Marshal marshals m to its wire representation. -// -// This function is provided for symmetry with [Unmarshal], but is otherwise -// functionally an alias for [proto.Marshal]. -func Marshal[ - T Message[S], - S MessageStruct, -](m T) ([]byte, error) { - return proto.Marshal(m) -} - -// Clone returns a deep copy of m. -func Clone[ - T Message[S], - S MessageStruct, -](m T) T { - return proto.Clone(m).(T) -} diff --git a/internal/test/journal.go b/internal/test/journal.go index 432822df..b5eb3ba6 100644 --- a/internal/test/journal.go +++ b/internal/test/journal.go @@ -1,13 +1,16 @@ package test import ( + "reflect" + "github.com/dogmatiq/persistencekit/driver/memory/memoryjournal" + "google.golang.org/protobuf/proto" ) // FailOnJournalOpen configures the journal with the given name to return an // error on the next call to Open(). -func FailOnJournalOpen[T any]( - s *memoryjournal.Store[T], +func FailOnJournalOpen( + s *memoryjournal.BinaryStore, name string, err error, ) { @@ -25,8 +28,8 @@ func FailOnJournalOpen[T any]( // predicate function. // // The error is returned before the append is actually performed. -func FailBeforeJournalAppend[T any]( - s *memoryjournal.Store[T], +func FailBeforeJournalAppend[T proto.Message]( + s *memoryjournal.BinaryStore, name string, pred func(T) bool, err error, @@ -39,8 +42,8 @@ func FailBeforeJournalAppend[T any]( // predicate function. // // The error is returned after the append is actually performed. -func FailAfterJournalAppend[T any]( - s *memoryjournal.Store[T], +func FailAfterJournalAppend[T proto.Message]( + s *memoryjournal.BinaryStore, name string, pred func(T) bool, err error, @@ -48,22 +51,30 @@ func FailAfterJournalAppend[T any]( s.AfterAppend = failAppendOnce(name, pred, err) } -func failAppendOnce[T any]( +func failAppendOnce[T proto.Message]( name string, pred func(T) bool, err error, -) func(string, T) error { +) func(string, []byte) error { fail := FailOnce(err) - return func(n string, rec T) error { + return func(n string, data []byte) error { if n != name { return nil } + var rec T + rec = reflect.New( + reflect.TypeOf(rec).Elem(), + ).Interface().(T) + + if err := proto.Unmarshal(data, rec); err != nil { + panic(err) + } + if pred(rec) { return fail() } - return nil } }