diff --git a/go.mod b/go.mod index d27c803f..303799a6 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,9 @@ require ( github.com/dogmatiq/configkit v0.14.0 github.com/dogmatiq/discoverkit v0.1.2 github.com/dogmatiq/dogma v0.14.3 - github.com/dogmatiq/enginekit v0.12.0 + github.com/dogmatiq/enginekit v0.12.1 github.com/dogmatiq/example v0.0.0-20240928215850-83b14743c287 github.com/dogmatiq/ferrite v1.4.0 - github.com/dogmatiq/marshalkit v0.9.0 github.com/dogmatiq/persistencekit v0.10.0 github.com/dogmatiq/primo v0.3.1 github.com/dogmatiq/spruce v0.2.2 @@ -38,6 +37,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/onsi/ginkgo/v2 v2.20.2 // indirect github.com/rivo/uniseg v0.4.7 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.29.0 // indirect diff --git a/go.sum b/go.sum index 4c21ef0d..d7a05e8e 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/dogmatiq/dogma v0.14.3 h1:qwZqU1yqp80toUJcJBdFxLlh6xvlFd7jb7rycmriRUo github.com/dogmatiq/dogma v0.14.3/go.mod h1:9lyVA+6V2+E/exV0IrBOrkUiyFwIATEhv+b0vnB2umQ= github.com/dogmatiq/dyad v1.0.0 h1:T50WQ9rAzCMcT941wVH5RW5+tAGRssuVlNog+ZMQR54= github.com/dogmatiq/dyad v1.0.0/go.mod h1:PrUQr7OCSEUjAkhi7ad8BjxFPKe0I9yP2f1kGimPcJE= -github.com/dogmatiq/enginekit v0.12.0 h1:w16TGKVrvvfTejd6HJ2rhVQGZBW3TSK+o3heo7gTEog= -github.com/dogmatiq/enginekit v0.12.0/go.mod h1:o0xikkm3INnWBjIXvJ9LffjdYRhIG3jwFNDyhTk1hcI= +github.com/dogmatiq/enginekit v0.12.1 h1:EcW7LPWTer+notttjwkCCsjlywaRg8r/MWbfaKgZe1g= +github.com/dogmatiq/enginekit v0.12.1/go.mod h1:aMBlieoE3An8SVW3vxUcxOOAVaz5Pxc9BPY37Jdg9M0= github.com/dogmatiq/example v0.0.0-20240928215850-83b14743c287 h1:MS/wfudIjXb0vFiALKpS1IZmNMu9AoDzdghrjqB4vPI= github.com/dogmatiq/example v0.0.0-20240928215850-83b14743c287/go.mod h1:T/G0HRH6OSo5cIIH31dLBYZiFjsL8RQsaN2vv8XjoTw= github.com/dogmatiq/ferrite v1.4.0 h1:cXmorC8NdSieWhPBfgmnj4cmWIeaCQK2sflGa2H0xXY= @@ -28,8 +28,6 @@ github.com/dogmatiq/jumble v0.1.0 h1:Cb3ExfxY+AoUP4G9/sOwoOdYX8o+kOLK8+dhXAry+QA github.com/dogmatiq/jumble v0.1.0/go.mod h1:FCGV2ImXu8zvThxhd4QLstiEdu74vbIVw9bFJSBcKr4= github.com/dogmatiq/linger v1.1.0 h1:kGL9sL79qRa6Cr8PhadeJ/ptbum+b48pAaNWWlyVVKg= github.com/dogmatiq/linger v1.1.0/go.mod h1:OOWJUwTxNkFolhuVdaTYjO4FmFLjZHZ8EMc5H5qOJ7Q= -github.com/dogmatiq/marshalkit v0.9.0 h1:YjBYoxRCSfLyRc5SUIDoCX0DTZ/RRCUkiUKEM49iINQ= -github.com/dogmatiq/marshalkit v0.9.0/go.mod h1:dQ4yRVYT6FhAF+DOPwUh4wqcqmzbpGITluPeH1stpA0= github.com/dogmatiq/persistencekit v0.10.0 h1:i87Bv0riDAxDWrCqxyGIqzpymEHeL7WmBaKGXflihWc= github.com/dogmatiq/persistencekit v0.10.0/go.mod h1:F2M0TCqf/mkH5Nno1aILZfiR00I4LDQg/BE9s01qXfc= github.com/dogmatiq/primo v0.3.1 h1:JSqiCh1ma9CbIVzPf8k1vhzQ2Zn/d/WupzElDoiYZw0= @@ -88,8 +86,6 @@ github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw= github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= -github.com/jmalloc/gomegax v0.0.0-20200507221434-64fca4c0e03a h1:Gk7Gkwl1KUJII/FiAjvBjRgEz/lpvTV8kNYp+9jdpuk= -github.com/jmalloc/gomegax v0.0.0-20200507221434-64fca4c0e03a/go.mod h1:TZpc8ObQEKqTuy1/VXpPRfcMU80QFDU4zK3nchXts/k= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= diff --git a/internal/engineconfig/app.go b/internal/engineconfig/app.go index 9c20950d..2b79c75f 100644 --- a/internal/engineconfig/app.go +++ b/internal/engineconfig/app.go @@ -6,21 +6,20 @@ import ( "github.com/dogmatiq/configkit" "github.com/dogmatiq/dogma" + "github.com/dogmatiq/enginekit/marshaler" + "github.com/dogmatiq/enginekit/marshaler/codecs/json" + "github.com/dogmatiq/enginekit/marshaler/codecs/protobuf" + "github.com/dogmatiq/enginekit/protobuf/envelopepb" "github.com/dogmatiq/enginekit/protobuf/identitypb" "github.com/dogmatiq/enginekit/protobuf/uuidpb" - "github.com/dogmatiq/marshalkit" - "github.com/dogmatiq/marshalkit/codec" - "github.com/dogmatiq/marshalkit/codec/json" - "github.com/dogmatiq/marshalkit/codec/protobuf" - "github.com/dogmatiq/veracity/internal/envelope" "github.com/dogmatiq/veracity/internal/integration" ) type applicationVisitor struct { *Config - marshaler marshalkit.ValueMarshaler - packer *envelope.Packer + marshaler marshaler.Marshaler + packer *envelopepb.Packer } func (c applicationVisitor) VisitRichApplication(ctx context.Context, cfg configkit.RichApplication) error { @@ -29,9 +28,9 @@ func (c applicationVisitor) VisitRichApplication(ctx context.Context, cfg config types = append(types, t.ReflectType()) } - m, err := codec.NewMarshaler( + m, err := marshaler.New( types, - []codec.Codec{ + []marshaler.Codec{ protobuf.DefaultNativeCodec, json.DefaultCodec, }, @@ -42,7 +41,7 @@ func (c applicationVisitor) VisitRichApplication(ctx context.Context, cfg config c.marshaler = m - c.packer = &envelope.Packer{ + c.packer = &envelopepb.Packer{ Site: c.SiteID, Application: marshalIdentity(cfg.Identity()), Marshaler: c.marshaler, @@ -90,7 +89,7 @@ func (c applicationVisitor) VisitRichProjection(context.Context, configkit.RichP } func marshalIdentity(id configkit.Identity) *identitypb.Identity { - key, err := uuidpb.FromString(id.Key) + key, err := uuidpb.Parse(id.Key) if err != nil { panic(err) } diff --git a/internal/engineconfig/cluster.go b/internal/engineconfig/cluster.go index 267d1ca0..89cb588e 100644 --- a/internal/engineconfig/cluster.go +++ b/internal/engineconfig/cluster.go @@ -10,7 +10,7 @@ var nodeID = ferrite. WithConstraint( "must be a UUID", func(v string) bool { - id, err := uuidpb.FromString(v) + id, err := uuidpb.Parse(v) if err != nil { return false } @@ -27,11 +27,7 @@ func (c *Config) finalizeNodeID() { if c.UseEnv { if v, ok := nodeID.Value(); ok { - id, err := uuidpb.FromString(v) - if err != nil { - panic(err) - } - c.NodeID = id + c.NodeID = uuidpb.MustParse(v) return } } diff --git a/internal/envelope/doc.go b/internal/envelope/doc.go deleted file mode 100644 index 95f5fa61..00000000 --- a/internal/envelope/doc.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package envelope contains tools for working with message envelopes. -package envelope diff --git a/internal/envelope/packer.go b/internal/envelope/packer.go deleted file mode 100644 index 7956247f..00000000 --- a/internal/envelope/packer.go +++ /dev/null @@ -1,155 +0,0 @@ -package envelope - -import ( - "fmt" - "time" - - "github.com/dogmatiq/dogma" - "github.com/dogmatiq/enginekit/protobuf/envelopepb" - "github.com/dogmatiq/enginekit/protobuf/identitypb" - "github.com/dogmatiq/enginekit/protobuf/uuidpb" - "github.com/dogmatiq/marshalkit" - "google.golang.org/protobuf/types/known/timestamppb" -) - -// A Packer puts messages into envelopes. -type Packer struct { - // Site is the (optional) identity of the site that the source application - // is running within. - // - // The site is used to disambiguate between messages from different - // installations of the same application. - Site *identitypb.Identity - - // Application is the identity of the application that is the source of the - // messages. - Application *identitypb.Identity - - // Marshaler is used to marshal messages into envelopes. - Marshaler marshalkit.ValueMarshaler - - // GenerateID is a function used to generate new message IDs. If it is nil, - // a UUID is generated. - GenerateID func() *uuidpb.UUID - - // Now is a function used to get the current time. If it is nil, time.Now() - // is used. - Now func() time.Time -} - -// Pack returns an envelope containing the given message. -func (p *Packer) Pack( - m dogma.Message, - options ...PackOption, -) *envelopepb.Envelope { - packet := marshalkit.MustMarshal(p.Marshaler, m) - _, name, err := packet.ParseMediaType() - if err != nil { - // CODE COVERAGE: This branch would require the marshaler to violate its - // own requirements on the format of the media-type. - panic(err) - } - - id := p.generateID() - - env := &envelopepb.Envelope{ - MessageId: id, - CorrelationId: id, - CausationId: id, - SourceSite: p.Site, - SourceApplication: p.Application, - CreatedAt: p.now(), - Description: m.MessageDescription(), - PortableName: name, - MediaType: packet.MediaType, - Data: packet.Data, - } - - for _, opt := range options { - opt(env) - } - - if err := env.Validate(); err != nil { - panic(err) - } - - return env -} - -// Unpack returns the message contained within an envelope. -func (p *Packer) Unpack(env *envelopepb.Envelope) (dogma.Message, error) { - packet := marshalkit.Packet{ - MediaType: env.MediaType, - Data: env.Data, - } - - m, err := p.Marshaler.Unmarshal(packet) - if err != nil { - return nil, err - } - - if m, ok := m.(dogma.Message); ok { - return m, nil - } - - return nil, fmt.Errorf("'%T' is not a dogma message", m) -} - -// now returns the current time. -func (p *Packer) now() *timestamppb.Timestamp { - if p.Now == nil { - return timestamppb.Now() - } - - return timestamppb.New(p.Now()) -} - -// generateID generates a new message ID. -func (p *Packer) generateID() *uuidpb.UUID { - if p.GenerateID != nil { - return p.GenerateID() - } - - return uuidpb.Generate() -} - -// PackOption is an option that alters the behavior of a Pack operation. -type PackOption func(*envelopepb.Envelope) - -// WithCause sets env as the "cause" of the message being packed. -func WithCause(env *envelopepb.Envelope) PackOption { - return func(e *envelopepb.Envelope) { - e.CausationId = env.MessageId - e.CorrelationId = env.CorrelationId - } -} - -// WithHandler sets h as the identity of the handler that is the source of the -// message. -func WithHandler(h *identitypb.Identity) PackOption { - return func(e *envelopepb.Envelope) { - e.SourceHandler = h - } -} - -// WithInstanceID sets the aggregate or process instance ID that is the -// source of the message. -func WithInstanceID(id string) PackOption { - return func(e *envelopepb.Envelope) { - e.SourceInstanceId = id - } -} - -// WithCreatedAt sets the creation time of a message. -func WithCreatedAt(t time.Time) PackOption { - return func(e *envelopepb.Envelope) { - e.CreatedAt = timestamppb.New(t) - } -} - -// WithScheduledFor sets the scheduled time of a timeout message. -func WithScheduledFor(t time.Time) PackOption { - return func(e *envelopepb.Envelope) { - e.ScheduledFor = timestamppb.New(t) - } -} diff --git a/internal/envelope/transcoder.go b/internal/envelope/transcoder.go deleted file mode 100644 index f1c3f90b..00000000 --- a/internal/envelope/transcoder.go +++ /dev/null @@ -1,55 +0,0 @@ -package envelope - -import ( - "strings" - - "github.com/dogmatiq/enginekit/protobuf/envelopepb" - "github.com/dogmatiq/marshalkit" - "google.golang.org/protobuf/proto" -) - -// Transcoder re-encodes messages to different media-types on the fly. -type Transcoder struct { - // MediaTypes is a map of the message's "portable name" to a list of - // supported media-types, in order of preference. - MediaTypes map[string][]string - - // Marshaler is the marshaler to use to unmarshal and marshal messages. - Marshaler marshalkit.Marshaler -} - -// Transcode re-encodes the message in env to one of the supported media-types. -func (t *Transcoder) Transcode(env *envelopepb.Envelope) (*envelopepb.Envelope, bool, error) { - candidates := t.MediaTypes[env.PortableName] - - // If the existing encoding is supported by the consumer use the envelope - // without any re-encoding. - for _, candidate := range candidates { - if strings.EqualFold(env.MediaType, candidate) { - return env, true, nil - } - } - - packet := marshalkit.Packet{ - MediaType: env.MediaType, - Data: env.Data, - } - - m, err := t.Marshaler.Unmarshal(packet) - if err != nil { - return nil, false, err - } - - // Otherwise, attempt to marshal the message using the client's requested - // media-types in order of preference. - packet, ok, err := t.Marshaler.MarshalAs(m, candidates) - if !ok || err != nil { - return nil, ok, err - } - - env = proto.Clone(env).(*envelopepb.Envelope) - env.MediaType = packet.MediaType - env.Data = packet.Data - - return env, true, nil -} diff --git a/internal/eventstream/append_test.go b/internal/eventstream/append_test.go index 10881974..c1f9517a 100644 --- a/internal/eventstream/append_test.go +++ b/internal/eventstream/append_test.go @@ -9,11 +9,9 @@ import ( "github.com/dogmatiq/enginekit/protobuf/envelopepb" "github.com/dogmatiq/enginekit/protobuf/identitypb" "github.com/dogmatiq/enginekit/protobuf/uuidpb" - . "github.com/dogmatiq/marshalkit/fixtures" "github.com/dogmatiq/persistencekit/driver/memory/memoryjournal" "github.com/dogmatiq/persistencekit/journal" "github.com/dogmatiq/spruce" - "github.com/dogmatiq/veracity/internal/envelope" . "github.com/dogmatiq/veracity/internal/eventstream" "github.com/dogmatiq/veracity/internal/eventstream/internal/eventstreamjournal" "github.com/dogmatiq/veracity/internal/test" @@ -25,7 +23,7 @@ func TestAppend(t *testing.T) { type dependencies struct { Journals *memoryjournal.BinaryStore Supervisor *Supervisor - Packer *envelope.Packer + Packer *envelopepb.Packer } setup := func(t test.TestingT) (deps dependencies) { @@ -36,7 +34,7 @@ func TestAppend(t *testing.T) { Logger: spruce.NewTestLogger(t), } - deps.Packer = &envelope.Packer{ + deps.Packer = &envelopepb.Packer{ Application: identitypb.New("", uuidpb.Generate()), Marshaler: Marshaler, } diff --git a/internal/integration/executor.go b/internal/integration/executor.go index afef29b4..e46eceeb 100644 --- a/internal/integration/executor.go +++ b/internal/integration/executor.go @@ -4,7 +4,7 @@ import ( "context" "github.com/dogmatiq/dogma" - "github.com/dogmatiq/veracity/internal/envelope" + "github.com/dogmatiq/enginekit/protobuf/envelopepb" "github.com/dogmatiq/veracity/internal/messaging" ) @@ -12,7 +12,7 @@ import ( // dispatches the command to an exchange queue. type CommandExecutor struct { ExecuteQueue *messaging.ExchangeQueue[ExecuteRequest, ExecuteResponse] - Packer *envelope.Packer + Packer *envelopepb.Packer } // ExecuteCommand enqueues a command. diff --git a/internal/integration/packer_test.go b/internal/integration/packer_test.go index 46b4bfe6..0a054972 100644 --- a/internal/integration/packer_test.go +++ b/internal/integration/packer_test.go @@ -4,15 +4,15 @@ import ( "sync/atomic" "time" + . "github.com/dogmatiq/enginekit/enginetest/stubs" + "github.com/dogmatiq/enginekit/protobuf/envelopepb" "github.com/dogmatiq/enginekit/protobuf/identitypb" "github.com/dogmatiq/enginekit/protobuf/uuidpb" - . "github.com/dogmatiq/marshalkit/fixtures" - "github.com/dogmatiq/veracity/internal/envelope" ) -func newPacker() *envelope.Packer { +func newPacker() *envelopepb.Packer { var counter atomic.Uint64 - return &envelope.Packer{ + return &envelopepb.Packer{ Application: identitypb.New("", uuidpb.Generate()), Marshaler: Marshaler, Now: func() time.Time { diff --git a/internal/integration/supervisor.go b/internal/integration/supervisor.go index f650b6f4..e8852c2b 100644 --- a/internal/integration/supervisor.go +++ b/internal/integration/supervisor.go @@ -10,7 +10,6 @@ import ( "github.com/dogmatiq/enginekit/protobuf/uuidpb" "github.com/dogmatiq/persistencekit/journal" "github.com/dogmatiq/persistencekit/kv" - "github.com/dogmatiq/veracity/internal/envelope" "github.com/dogmatiq/veracity/internal/eventstream" "github.com/dogmatiq/veracity/internal/fsm" "github.com/dogmatiq/veracity/internal/integration/internal/integrationjournal" @@ -36,7 +35,7 @@ type Supervisor struct { HandlerIdentity *identitypb.Identity Journals journal.BinaryStore Keyspaces kv.BinaryStore - Packer *envelope.Packer + Packer *envelopepb.Packer EventRecorder EventRecorder eventStreamID *uuidpb.UUID @@ -184,7 +183,14 @@ func (s *Supervisor) handleCommand(ctx context.Context, cmd *envelopepb.Envelope var envs []*envelopepb.Envelope for _, ev := range sc.evs { - envs = append(envs, s.Packer.Pack(ev, envelope.WithCause(cmd), envelope.WithHandler(s.HandlerIdentity))) + envs = append( + envs, + s.Packer.Pack( + ev, + envelopepb.WithCause(cmd), + envelopepb.WithHandler(s.HandlerIdentity), + ), + ) } if s.eventStreamID == nil { diff --git a/internal/integration/supervisor_test.go b/internal/integration/supervisor_test.go index 7eeab055..553a10e9 100644 --- a/internal/integration/supervisor_test.go +++ b/internal/integration/supervisor_test.go @@ -16,7 +16,6 @@ import ( "github.com/dogmatiq/enginekit/protobuf/uuidpb" "github.com/dogmatiq/persistencekit/driver/memory/memoryjournal" "github.com/dogmatiq/persistencekit/driver/memory/memorykv" - "github.com/dogmatiq/veracity/internal/envelope" "github.com/dogmatiq/veracity/internal/eventstream" . "github.com/dogmatiq/veracity/internal/integration" "github.com/dogmatiq/veracity/internal/integration/internal/integrationjournal" @@ -28,7 +27,7 @@ import ( func TestSupervisor(t *testing.T) { type dependencies struct { - Packer *envelope.Packer + Packer *envelopepb.Packer Journals *memoryjournal.BinaryStore Keyspaces *memorykv.BinaryStore Handler *IntegrationMessageHandlerStub