Skip to content

Commit

Permalink
Update to persistencekit v0.8.0, remove internal journal/kv protobuf …
Browse files Browse the repository at this point in the history
…utilities.
  • Loading branch information
jmalloc committed Mar 17, 2024
1 parent bb14828 commit dec335c
Show file tree
Hide file tree
Showing 29 changed files with 130 additions and 571 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/dogmatiq/example v0.0.0-20230606031437-2bd84c72050b
github.com/dogmatiq/ferrite v1.3.0
github.com/dogmatiq/marshalkit v0.7.3
github.com/dogmatiq/persistencekit v0.7.1-0.20240317044652-8e6daf523e36
github.com/dogmatiq/persistencekit v0.8.0
github.com/dogmatiq/primo v0.2.0
github.com/dogmatiq/spruce v0.1.0
github.com/google/go-cmp v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ github.com/dogmatiq/linger v1.1.0 h1:kGL9sL79qRa6Cr8PhadeJ/ptbum+b48pAaNWWlyVVKg
github.com/dogmatiq/linger v1.1.0/go.mod h1:OOWJUwTxNkFolhuVdaTYjO4FmFLjZHZ8EMc5H5qOJ7Q=
github.com/dogmatiq/marshalkit v0.7.3 h1:kBymR5txcHFBJcYvzld6kFWshqL9YqfBfnFzl9KwEaI=
github.com/dogmatiq/marshalkit v0.7.3/go.mod h1:gGiQXt9aHidlR1GIgHlZxJU8QAd004kFxU6beq+MPmI=
github.com/dogmatiq/persistencekit v0.7.1-0.20240317044652-8e6daf523e36 h1:ARp+NI7z3iNTd6f/eujKh65eCDe7eXJzP94RH0Pl0jM=
github.com/dogmatiq/persistencekit v0.7.1-0.20240317044652-8e6daf523e36/go.mod h1:ZzkP9/iVL3nvAo5MWUlCRpA8WQEIe5Dq4nycKzJj5GM=
github.com/dogmatiq/persistencekit v0.8.0 h1:Xvam51Kl2/wLU78CU5u3ClYFPPVdL5aLRznHmcR8RBI=
github.com/dogmatiq/persistencekit v0.8.0/go.mod h1:ZzkP9/iVL3nvAo5MWUlCRpA8WQEIe5Dq4nycKzJj5GM=
github.com/dogmatiq/primo v0.2.0 h1:XSgal1oykHCFtHvHXdsaSDvQ2x/V/h+clDS1YIqtwHM=
github.com/dogmatiq/primo v0.2.0/go.mod h1:c1EGDvqJQSaIlTxpT1jPgXMBhOXLnQ3jtKPRH5nuUis=
github.com/dogmatiq/projectionkit v0.6.5 h1:3Ues+QL5oVtJcx4WogMA6XjJF1QyOlcx1uRmUrl2ghI=
Expand Down
20 changes: 20 additions & 0 deletions internal/cluster/persistence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cluster

import (
"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/kv"
"github.com/dogmatiq/persistencekit/marshaler"
"github.com/dogmatiq/veracity/internal/cluster/internal/registrypb"
)

// newKVStore returns a new [kv.Store] that uses s for its underlying storage.
func newKVStore(s kv.BinaryStore) kv.Store[*uuidpb.UUID, *registrypb.Registration] {
return kv.NewMarshalingStore(
s,
marshaler.NewProto[*uuidpb.UUID](),
marshaler.NewProto[*registrypb.Registration](),
)
}

// registryKeyspace is the name of the keyspace that contains registry data.
const registryKeyspace = "cluster.registry"
41 changes: 13 additions & 28 deletions internal/cluster/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/dogmatiq/persistencekit/kv"
"github.com/dogmatiq/veracity/internal/cluster/internal/registrypb"
"github.com/dogmatiq/veracity/internal/fsm"
"github.com/dogmatiq/veracity/internal/protobuf/protokv"
"github.com/dogmatiq/veracity/internal/signaling"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand All @@ -23,9 +22,6 @@ const (
// The registration period is always set to 2 * DefaultRenewInterval.
DefaultRenewInterval = 10 * time.Second

// RegistryKeyspace is the name of the keyspace that contains registry data.
RegistryKeyspace = "cluster.registry"

// DefaultRegistryPollInterval is the default interval at which the registry
// polls the underlying key-value store for changes.
DefaultRegistryPollInterval = 3 * time.Second
Expand All @@ -40,14 +36,14 @@ type Registrar struct {
Shutdown signaling.Latch
Logger *slog.Logger

keyspace kv.BinaryKeyspace
keyspace kv.Keyspace[*uuidpb.UUID, *registrypb.Registration]
interval time.Duration
}

// Run starts the registrar.
func (r *Registrar) Run(ctx context.Context) error {
var err error
r.keyspace, err = r.Keyspaces.Open(ctx, RegistryKeyspace)
r.keyspace, err = newKVStore(r.Keyspaces).Open(ctx, registryKeyspace)
if err != nil {
return err
}
Expand Down Expand Up @@ -113,15 +109,11 @@ func (r *Registrar) deregister(ctx context.Context) error {

// renew updates a node's registration expiry time.
func (r *Registrar) renew(ctx context.Context) error {
reg, ok, err := protokv.Get[*registrypb.Registration](
ctx,
r.keyspace,
r.Node.ID.AsBytes(),
)
reg, err := r.keyspace.Get(ctx, r.Node.ID)
if err != nil {
return err
}
if !ok {
if reg == nil {
return errors.New("cluster node not registered")
}

Expand Down Expand Up @@ -153,10 +145,10 @@ func (r *Registrar) saveRegistration(
ctx context.Context,
) (time.Time, error) {
expiresAt := time.Now().Add(r.interval * 2)
err := protokv.Set(

return expiresAt, r.keyspace.Set(
ctx,
r.keyspace,
r.Node.ID.AsBytes(),
r.Node.ID,
&registrypb.Registration{
Node: &registrypb.Node{
Id: r.Node.ID,
Expand All @@ -165,19 +157,13 @@ func (r *Registrar) saveRegistration(
ExpiresAt: timestamppb.New(expiresAt),
},
)

return expiresAt, err
}

// deleteRegistration removes a registration from the registry.
func (r *Registrar) deleteRegistration(
ctx context.Context,
) error {
return r.keyspace.Set(
ctx,
r.Node.ID.AsBytes(),
nil,
)
return r.keyspace.Set(ctx, r.Node.ID, nil)
}

// RegistryObserver emits events about changes to the nodes in the registry.
Expand All @@ -187,15 +173,15 @@ type RegistryObserver struct {
Shutdown signaling.Latch
PollInterval time.Duration

keyspace kv.BinaryKeyspace
keyspace kv.Keyspace[*uuidpb.UUID, *registrypb.Registration]
nodes uuidpb.Map[Node]
readyForPoll *time.Ticker
}

// Run starts the observer.
func (o *RegistryObserver) Run(ctx context.Context) error {
var err error
o.keyspace, err = o.Keyspaces.Open(ctx, RegistryKeyspace)
o.keyspace, err = newKVStore(o.Keyspaces).Open(ctx, registryKeyspace)
if err != nil {
return err
}
Expand Down Expand Up @@ -279,12 +265,11 @@ func (o *RegistryObserver) publishState(ctx context.Context, ev MembershipChange
func (o *RegistryObserver) loadNodes(ctx context.Context) (uuidpb.Map[Node], error) {
nodes := uuidpb.Map[Node]{}

return nodes, protokv.Range(
return nodes, o.keyspace.Range(
ctx,
o.keyspace,
func(
ctx context.Context,
k []byte,
id *uuidpb.UUID,
reg *registrypb.Registration,
) (bool, error) {
if reg.ExpiresAt.AsTime().After(time.Now()) {
Expand All @@ -295,7 +280,7 @@ func (o *RegistryObserver) loadNodes(ctx context.Context) (uuidpb.Map[Node], err
Addresses: reg.Node.Addresses,
},
)
} else if err := o.keyspace.Set(ctx, k, nil); err != nil {
} else if err := o.keyspace.Set(ctx, id, nil); err != nil {
return false, err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/engineconfig/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c applicationVisitor) VisitRichIntegration(_ context.Context, cfg configki
sup := &integration.Supervisor{
Handler: cfg.Handler(),
HandlerIdentity: marshalIdentity(cfg.Identity()),
Journals: integration.NewJournalStore(c.Persistence.Journals),
Journals: c.Persistence.Journals,
Packer: c.packer,
}

Expand Down
4 changes: 2 additions & 2 deletions internal/envelope/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/dogmatiq/enginekit/protobuf/envelopepb"
"github.com/dogmatiq/marshalkit"
"github.com/dogmatiq/veracity/internal/protobuf/typedproto"
"google.golang.org/protobuf/proto"
)

// Transcoder re-encodes messages to different media-types on the fly.
Expand Down Expand Up @@ -47,7 +47,7 @@ func (t *Transcoder) Transcode(env *envelopepb.Envelope) (*envelopepb.Envelope,
return nil, ok, err
}

env = typedproto.Clone(env)
env = proto.Clone(env).(*envelopepb.Envelope)
env.MediaType = packet.MediaType
env.Data = packet.Data

Expand Down
6 changes: 3 additions & 3 deletions internal/eventstream/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ func TestAppend(t *testing.T) {
t.Parallel()

type dependencies struct {
Journals *memoryjournal.Store[*journalpb.Record]
Journals *memoryjournal.BinaryStore
Supervisor *Supervisor
Events <-chan Event
Packer *envelope.Packer
}

setup := func(t test.TestingT) (deps dependencies) {
deps.Journals = &memoryjournal.Store[*journalpb.Record]{}
deps.Journals = &memoryjournal.BinaryStore{}

events := make(chan Event, 100)

Expand Down Expand Up @@ -168,7 +168,7 @@ func TestAppend(t *testing.T) {

t.Log("ensure that the event was appended to the stream exactly once")

j, err := deps.Journals.Open(tctx, JournalName(streamID))
j, err := NewJournalStore(deps.Journals).Open(tctx, JournalName(streamID))
if err != nil {
t.Fatal(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,22 @@ import (

"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/persistencekit/marshal"
"github.com/dogmatiq/persistencekit/marshaler"
"github.com/dogmatiq/veracity/internal/eventstream/internal/journalpb"
)

// JournalStore is the [journal.Store] type used to store event stream state.
// Each event stream has its own journal.
type JournalStore = journal.Store[*journalpb.Record]

// Journal is a journal of event stream records for a single event stream.
type Journal = journal.Journal[*journalpb.Record]

// NewJournalStore returns a new [JournalStore] that uses s for its underlying
// newJournalStore returns a new [journal.Store] that uses s for its underlying
// storage.
func NewJournalStore(s journal.BinaryStore) JournalStore {
func newJournalStore(s journal.BinaryStore) journal.Store[*journalpb.Record] {
return journal.NewMarshalingStore(
s,
marshal.ProtocolBuffers[*journalpb.Record]{},
marshaler.NewProto[*journalpb.Record](),
)
}

// JournalName returns the name of the journal that contains the state
// journalName returns the name of the journal that contains the state
// of the event stream with the given ID.
func JournalName(streamID *uuidpb.UUID) string {
func journalName(streamID *uuidpb.UUID) string {
return "eventstream:" + streamID.AsString()
}

Expand Down
6 changes: 6 additions & 0 deletions internal/eventstream/persistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package eventstream

var (
JournalName = journalName
NewJournalStore = newJournalStore
)
11 changes: 9 additions & 2 deletions internal/eventstream/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"log/slog"

"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/veracity/internal/eventstream/internal/journalpb"
"github.com/dogmatiq/veracity/internal/fsm"
"github.com/dogmatiq/veracity/internal/messaging"
"github.com/dogmatiq/veracity/internal/signaling"
Expand All @@ -17,11 +19,12 @@ var errShuttingDown = errors.New("event stream sub-system is shutting down")

// A Supervisor coordinates event stream workers.
type Supervisor struct {
Journals JournalStore
Journals journal.BinaryStore
AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse]
Events chan<- Event
Logger *slog.Logger

journals journal.Store[*journalpb.Record]
shutdown signaling.Latch
workers uuidpb.Map[*worker]
workerStopped chan workerResult
Expand Down Expand Up @@ -135,7 +138,11 @@ func (s *Supervisor) startWorkerForStreamID(
ctx context.Context,
streamID *uuidpb.UUID,
) (*worker, error) {
j, err := s.Journals.Open(ctx, JournalName(streamID))
if s.journals == nil {
s.journals = newJournalStore(s.Journals)
}

j, err := s.journals.Open(ctx, journalName(streamID))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/eventstream/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const defaultIdleTimeout = 5 * time.Minute
// A worker manages the state of an event stream.
type worker struct {
// Journal stores the event stream's state.
Journal Journal
Journal journal.Journal[*journalpb.Record]

// AppendQueue is a queue of requests to append events to the stream.
AppendQueue messaging.ExchangeQueue[AppendRequest, AppendResponse]
Expand Down
29 changes: 0 additions & 29 deletions internal/integration/journal.go

This file was deleted.

17 changes: 17 additions & 0 deletions internal/integration/persistence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package integration

import (
"github.com/dogmatiq/enginekit/protobuf/uuidpb"
)

// journalName returns the name of the journal that contains the state
// of the integration handler with the given key.
func journalName(key *uuidpb.UUID) string {
return "integration:" + key.AsString()
}

// handledCommandsKeyspaceName returns the name of the keyspace that contains
// the set of handled command IDs.
func handledCommandsKeyspaceName(key *uuidpb.UUID) string {
return "integration:" + key.AsString() + ":handled-commands"
}
6 changes: 6 additions & 0 deletions internal/integration/persistence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package integration

var (
JournalName = journalName
HandledCommandsKeyspaceName = handledCommandsKeyspaceName
)
Loading

0 comments on commit dec335c

Please sign in to comment.