diff --git a/CHANGELOG.md b/CHANGELOG.md index 0aca1c7..b2bd9c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,13 @@ The format is based on [Keep a Changelog], and this project adheres to [Keep a Changelog]: https://keepachangelog.com/en/1.0.0/ [Semantic Versioning]: https://semver.org/spec/v2.0.0.html +## [Unreleased] + +### Added + +- Added `identitypb.Identity.Equal()`. +- Added `envelopepb.Packer` and `Transcoder` types. + ## [0.12.0] - 2024-09-29 ### Changed diff --git a/go.mod b/go.mod index d30cb69..3691649 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/dogmatiq/dapper v0.6.0 github.com/dogmatiq/dogma v0.14.3 github.com/dogmatiq/primo v0.3.1 + github.com/google/go-cmp v0.6.0 google.golang.org/grpc v1.67.0 google.golang.org/protobuf v1.34.2 pgregory.net/rapid v1.1.0 diff --git a/internal/test/expect.go b/internal/test/expect.go new file mode 100644 index 0000000..0bdeda2 --- /dev/null +++ b/internal/test/expect.go @@ -0,0 +1,37 @@ +package test + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/protobuf/testing/protocmp" +) + +// Expect compares two values and fails the test if they are different. +func Expect[T any]( + t *testing.T, + failMessage string, + got, want T, + options ...cmp.Option, +) { + t.Helper() + + options = append( + []cmp.Option{ + protocmp.Transform(), + cmpopts.EquateEmpty(), + cmpopts.EquateErrors(), + }, + options..., + ) + + if diff := cmp.Diff( + want, + got, + options..., + ); diff != "" { + t.Log(failMessage) + t.Fatal(diff) + } +} diff --git a/protobuf/envelopepb/packer.go b/protobuf/envelopepb/packer.go new file mode 100644 index 0000000..806a510 --- /dev/null +++ b/protobuf/envelopepb/packer.go @@ -0,0 +1,152 @@ +package envelopepb + +import ( + "fmt" + "time" + + "github.com/dogmatiq/dogma" + "github.com/dogmatiq/enginekit/marshaler" + "github.com/dogmatiq/enginekit/protobuf/identitypb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// A Packer puts messages into envelopes. +type Packer struct { + // Site is the (optional) identity of the site that the source application + // is running within. + // + // The site is used to disambiguate between messages from different + // installations of the same application. + Site *identitypb.Identity + + // Application is the identity of the application that is the source of the + // messages. + Application *identitypb.Identity + + // Marshaler is used to marshal messages into envelopes. + Marshaler marshaler.Marshaler + + // GenerateID is a function used to generate new message IDs. + // + // If it is nil, a random UUID is generated. + GenerateID func() *uuidpb.UUID + + // Now is a function used to get the current time. If it is nil, time.Now() + // is used. + Now func() time.Time +} + +// Pack returns an envelope containing the given message. +func (p *Packer) Pack(m dogma.Message, options ...PackOption) *Envelope { + packet, err := p.Marshaler.Marshal(m) + if err != nil { + panic(err) + } + + id := p.generateID() + + env := &Envelope{ + MessageId: id, + CorrelationId: id, + CausationId: id, + SourceSite: p.Site, + SourceApplication: p.Application, + Description: m.MessageDescription(), + PortableName: packet.PortableName(), + MediaType: packet.MediaType, + Data: packet.Data, + } + + for _, opt := range options { + opt(env) + } + + if env.CreatedAt == nil { + env.CreatedAt = p.now() + } + + if err := env.Validate(); err != nil { + panic(err) + } + + return env +} + +// Unpack returns the message contained within an envelope. +func (p *Packer) Unpack(env *Envelope) (dogma.Message, error) { + packet := marshaler.Packet{ + MediaType: env.MediaType, + Data: env.Data, + } + + m, err := p.Marshaler.Unmarshal(packet) + if err != nil { + return nil, err + } + + if m, ok := m.(dogma.Message); ok { + return m, nil + } + + return nil, fmt.Errorf("'%T' does not implement dogma.Message", m) +} + +// now returns the current time. +func (p *Packer) now() *timestamppb.Timestamp { + if p.Now == nil { + return timestamppb.Now() + } + + return timestamppb.New(p.Now()) +} + +// generateID generates a new message ID. +func (p *Packer) generateID() *uuidpb.UUID { + if p.GenerateID != nil { + return p.GenerateID() + } + + return uuidpb.Generate() +} + +// PackOption is an option that alters the behavior of a Pack operation. +type PackOption func(*Envelope) + +// WithCause sets env as the "cause" of the message being packed. +func WithCause(env *Envelope) PackOption { + return func(e *Envelope) { + e.CausationId = env.MessageId + e.CorrelationId = env.CorrelationId + } +} + +// WithHandler sets h as the identity of the handler that is the source of the +// message. +func WithHandler(h *identitypb.Identity) PackOption { + return func(e *Envelope) { + e.SourceHandler = h + } +} + +// WithInstanceID sets the aggregate or process instance ID that is the +// source of the message. +func WithInstanceID(id string) PackOption { + return func(e *Envelope) { + e.SourceInstanceId = id + } +} + +// WithCreatedAt sets the creation time of a message. +func WithCreatedAt(t time.Time) PackOption { + return func(e *Envelope) { + e.CreatedAt = timestamppb.New(t) + } +} + +// WithScheduledFor sets the scheduled time of a timeout message. +func WithScheduledFor(t time.Time) PackOption { + return func(e *Envelope) { + e.ScheduledFor = timestamppb.New(t) + } +} diff --git a/protobuf/envelopepb/packer_test.go b/protobuf/envelopepb/packer_test.go new file mode 100644 index 0000000..2022807 --- /dev/null +++ b/protobuf/envelopepb/packer_test.go @@ -0,0 +1,199 @@ +package envelopepb_test + +import ( + "testing" + "time" + + "github.com/dogmatiq/dogma" + . "github.com/dogmatiq/enginekit/enginetest/stubs" + "github.com/dogmatiq/enginekit/internal/test" + . "github.com/dogmatiq/enginekit/protobuf/envelopepb" + "github.com/dogmatiq/enginekit/protobuf/identitypb" + "github.com/dogmatiq/enginekit/protobuf/uuidpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestPacker_packAndUnpack(t *testing.T) { + id := uuidpb.Generate() + now := time.Now() + + packer := &Packer{ + Marshaler: Marshaler, + Site: &identitypb.Identity{ + Name: "site", + Key: uuidpb.Generate(), + }, + Application: &identitypb.Identity{ + Name: "app", + Key: uuidpb.Generate(), + }, + GenerateID: func() *uuidpb.UUID { + return id + }, + Now: func() time.Time { + return now + }, + } + + got := packer.Pack(CommandA1) + + if err := got.Validate(); err != nil { + t.Fatalf("packer produced an invalid envelope: %v", err) + } + + want := &Envelope{ + MessageId: id, + CausationId: id, + CorrelationId: id, + SourceSite: packer.Site, + SourceApplication: packer.Application, + SourceInstanceId: "", + CreatedAt: timestamppb.New(now), + Description: `command(stubs.TypeA:A1, valid)`, + PortableName: `CommandStub[TypeA]`, + MediaType: `application/json; type="CommandStub[TypeA]"`, + Data: []byte(`{"content":"A1"}`), + } + + test.Expect( + t, + "unexpected envelope", + got, + want, + ) + + gotMessage, err := packer.Unpack(want) + if err != nil { + t.Fatal(err) + } + + test.Expect( + t, + "unexpected message", + gotMessage, + dogma.Message(CommandA1), + ) +} + +func TestWithCause(t *testing.T) { + packer := &Packer{ + Marshaler: Marshaler, + Application: &identitypb.Identity{ + Name: "app", + Key: uuidpb.Generate(), + }, + } + + root := packer.Pack(CommandA1) + cause := packer.Pack(EventA1, WithCause(root)) + got := packer.Pack(CommandA2, WithCause(cause)) + + if !got.CausationId.Equal(cause.MessageId) { + t.Fatalf("unexpected causation ID: got %s, want %s", got.CausationId, cause.MessageId) + } + + if !got.CorrelationId.Equal(cause.CorrelationId) { + t.Fatalf("unexpected correlation ID: got %s, want %s", got.CorrelationId, root.MessageId) + } +} + +func TestWithHandler(t *testing.T) { + packer := &Packer{ + Marshaler: Marshaler, + Application: &identitypb.Identity{ + Name: "app", + Key: uuidpb.Generate(), + }, + } + + handler := &identitypb.Identity{ + Name: "handler", + Key: uuidpb.Generate(), + } + + got := packer.Pack(CommandA1, WithHandler(handler)) + + if !got.SourceHandler.Equal(handler) { + t.Fatalf("unexpected handler: got %s, want %s", got.SourceHandler, handler) + } +} + +func TestWithInstanceID(t *testing.T) { + packer := &Packer{ + Marshaler: Marshaler, + Application: &identitypb.Identity{ + Name: "app", + Key: uuidpb.Generate(), + }, + } + + got := packer.Pack( + CommandA1, + WithInstanceID("instance"), + + // We cannot have an instance ID without saying which handler the + // instance is managed by. + WithHandler(&identitypb.Identity{ + Name: "handler", + Key: uuidpb.Generate(), + }), + ) + + if got.SourceInstanceId != "instance" { + t.Fatalf("unexpected instance ID: got %s, want instance", got.SourceInstanceId) + } +} + +func TestWithCreatedAt(t *testing.T) { + packer := &Packer{ + Marshaler: Marshaler, + Application: &identitypb.Identity{ + Name: "app", + Key: uuidpb.Generate(), + }, + Now: func() time.Time { + t.Fatal("unexpected call") + return time.Time{} + }, + } + + want := time.Now().Add(-time.Hour) + + got := packer.Pack( + CommandA1, + WithCreatedAt(want), + ).CreatedAt.AsTime() + + if !got.Equal(want) { + t.Fatalf("unexpected creation time: got %s, want %s", got, want) + } +} + +func TestWithScheduledFor(t *testing.T) { + packer := &Packer{ + Marshaler: Marshaler, + Application: &identitypb.Identity{ + Name: "app", + Key: uuidpb.Generate(), + }, + } + + want := time.Now().Add(time.Hour) + + got := packer.Pack( + TimeoutA1, + WithScheduledFor(want), + + // We cannot have a "scheduled-for" time without saying which handler + // and process instance produced the timeout message. + WithHandler(&identitypb.Identity{ + Name: "handler", + Key: uuidpb.Generate(), + }), + WithInstanceID("instance"), + ).ScheduledFor.AsTime() + + if !got.Equal(want) { + t.Fatalf("unexpected scheduled time: got %s, want %s", got, want) + } +} diff --git a/protobuf/envelopepb/transcoder.go b/protobuf/envelopepb/transcoder.go new file mode 100644 index 0000000..77c689a --- /dev/null +++ b/protobuf/envelopepb/transcoder.go @@ -0,0 +1,60 @@ +package envelopepb + +import ( + "strings" + + "github.com/dogmatiq/enginekit/marshaler" + "google.golang.org/protobuf/proto" +) + +// Transcoder re-encodes messages to different media-types on the fly. +type Transcoder struct { + // MediaTypes is a map of the message's "portable name" to a list of + // supported media-types, in order of preference. + MediaTypes map[string][]string + + // Marshaler is the marshaler to use to unmarshal and marshal messages. + Marshaler marshaler.Marshaler +} + +// Transcode re-encodes the message in env to one of the supported media-types. +func (t *Transcoder) Transcode(env *Envelope) (*Envelope, bool, error) { + candidates := t.MediaTypes[env.PortableName] + + // If the existing encoding is supported by the consumer use the envelope + // without any re-encoding. + for _, candidate := range candidates { + if mediaTypeEqual(env.MediaType, candidate) { + return env, true, nil + } + } + + packet := marshaler.Packet{ + MediaType: env.MediaType, + Data: env.Data, + } + + m, err := t.Marshaler.Unmarshal(packet) + if err != nil { + return nil, false, err + } + + // Otherwise, attempt to marshal the message using the client's requested + // media-types in order of preference. + packet, ok, err := t.Marshaler.MarshalAs(m, candidates) + if !ok || err != nil { + return nil, ok, err + } + + env = proto.Clone(env).(*Envelope) + env.MediaType = packet.MediaType + env.Data = packet.Data + + return env, true, nil +} + +func mediaTypeEqual(a, b string) bool { + // TODO(jmalloc): We should use mime.ParseMediaType() here to compare the + // media-types semantically, rather than using a naive string comparison. + return strings.EqualFold(a, b) +} diff --git a/protobuf/identitypb/identity.go b/protobuf/identitypb/identity.go index 7d8209b..d771c19 100644 --- a/protobuf/identitypb/identity.go +++ b/protobuf/identitypb/identity.go @@ -48,3 +48,8 @@ func (x *Identity) Format(f fmt.State, verb rune) { fmt.Sprintf("%s/%s", x.GetName(), x.GetKey()), ) } + +// Equal returns true if x and id are equal. +func (x *Identity) Equal(id *Identity) bool { + return x.GetName() == id.GetName() && x.GetKey() == id.GetKey() +} diff --git a/protobuf/identitypb/identity_test.go b/protobuf/identitypb/identity_test.go index 1bfccc4..2d62cc1 100644 --- a/protobuf/identitypb/identity_test.go +++ b/protobuf/identitypb/identity_test.go @@ -90,3 +90,25 @@ func TestIdentity_Format(t *testing.T) { t.Fatalf("got %q, want %q", actual, expect) } } + +func TestIdentity_Equal(t *testing.T) { + t.Parallel() + + a := &Identity{ + Name: "", + Key: uuidpb.Generate(), + } + + b := &Identity{ + Name: "", + Key: uuidpb.Generate(), + } + + if a.Equal(b) { + t.Fatal("did not expect a == b") + } + + if !a.Equal(a) { + t.Fatal("did not expect a != b") + } +}