Skip to content

Commit

Permalink
rewrite events handling to remove watermill
Browse files Browse the repository at this point in the history
As discussed in the community call, watermill doesn't give us the
necessary features we'd like to utilize with the underlying nats message.
We decided to switch to using nats directly but still wanted some
support for possibly changing this out later.

This rewrites events to use our own interfaces to allow for the
possibility of a different event driver later.

Additionally this switches to using pull subscriptions instead of push,
supports Ack, Nak and Term as well as Request/Reply semantics.

Due to the Request/Reply semantics, no longer are there separate
Publisher and Subscriber configurations as the driver needs to be able
to handle both.

Signed-off-by: Mike Mason <[email protected]>
  • Loading branch information
mikemrm committed Jul 27, 2023
1 parent 49f17be commit 9fff497
Show file tree
Hide file tree
Showing 16 changed files with 1,221 additions and 672 deletions.
63 changes: 10 additions & 53 deletions events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,62 +19,19 @@ import (

"github.com/spf13/pflag"
"github.com/spf13/viper"

"go.infratographer.com/x/viperx"
)

var defaultTimeout = time.Second * 10

// PublisherConfig handles reading in all the config values available for setting up a pubsub publisher
type PublisherConfig struct {
URL string `mapstructure:"url"`
Timeout time.Duration `mapstructure:"timeout"`
Prefix string `mapstructure:"prefix"`
Source string `mapstructure:"source"`
NATSConfig NATSConfig `mapstructure:"nats"`
}

// SubscriberConfig handles reading in all the config values available for setting up a pubsub publisher
type SubscriberConfig struct {
URL string `mapstructure:"url"`
Timeout time.Duration `mapstructure:"timeout"`
Prefix string `mapstructure:"prefix"`
QueueGroup string `mapstructure:"queueGroup"`
NATSConfig NATSConfig `mapstructure:"nats"`
}

// NATSConfig handles reading in all pubsub values specific to NATS
type NATSConfig struct {
Token string `mapstructure:"token"`
CredsFile string `mapstructure:"credsFile"`
}

// MustViperFlagsForPublisher returns the cobra flags and viper config for an event publisher
func MustViperFlagsForPublisher(v *viper.Viper, flags *pflag.FlagSet, appName string) {
flags.String("events-publisher-url", "nats://nats:4222", "nats server connection url")
viperx.MustBindFlag(v, "events.publisher.url", flags.Lookup("events-publisher-url"))

v.MustBindEnv("events.publisher.timeout")
v.MustBindEnv("events.publisher.prefix")
v.MustBindEnv("events.publisher.source")
v.MustBindEnv("events.publisher.nats.token")
v.MustBindEnv("events.publisher.nats.credsFile")
const (
defaultTimeout = time.Second * 10
tracerName = "go.infratographer.com/x/events"
)

v.SetDefault("events.publisher.timeout", defaultTimeout)
v.SetDefault("events.publisher.source", appName)
// Config contains event provider configs.
type Config struct {
NATS NatsConfig `mapstructure:"nats"`
}

// MustViperFlagsForSubscriber returns the cobra flags and viper config for an event subscriber
func MustViperFlagsForSubscriber(v *viper.Viper, flags *pflag.FlagSet) {
flags.String("events-subscriber-url", "nats://nats:4222", "nats server connection url")
viperx.MustBindFlag(v, "events.subscriber.url", flags.Lookup("events-subscriber-url"))
flags.String("events-subscriber-queuegroup", "", "subscriber queue group")
viperx.MustBindFlag(v, "events.subscriber.queueGroup", flags.Lookup("events-subscriber-queuegroup"))

v.MustBindEnv("events.subscriber.timeout")
v.MustBindEnv("events.subscriber.prefix")
v.MustBindEnv("events.subscriber.nats.token")
v.MustBindEnv("events.subscriber.nats.credsFile")

v.SetDefault("events.subscriber.timeout", defaultTimeout)
// MustViperFlags returns the cobra flags and viper config for events.
func MustViperFlags(v *viper.Viper, flags *pflag.FlagSet, appName string) {
MustViperFlagsForNats(v, flags, appName)
}
47 changes: 47 additions & 0 deletions events/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package events

import "context"

// Connection defines a connection handler.
type Connection interface {
// Gracefully close the connection.
Close(ctx context.Context) error

// Source gives you the raw underlying connection object.
Source() any
}

// Subscriber specifies subscriber methods.
type Subscriber interface {
Connection

// Subscribe subscribes to the provided topic responding a message.
Subscribe(ctx context.Context, topic string) (<-chan Message[any], error)
// SubscribeAuthRelationshipRequests subscribes to the provided topic responding with an AuthRelationshipRequest message.
SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Message[AuthRelationshipRequest], error)
// SubscribeChanges subscribes to the provided topic responding with an ChangeMessage message.
SubscribeChanges(ctx context.Context, topic string) (<-chan Message[ChangeMessage], error)
// SubscribeEvents subscribes to the provided topic responding with an EventMessage message.
SubscribeEvents(ctx context.Context, topic string) (<-chan Message[EventMessage], error)
}

// Publisher specifies publisher methods.
type Publisher interface {
Connection

// Publish publishes to the specified topic with the message given.
Publish(ctx context.Context, topic string, message any) (Message[any], error)
// PublishAuthRelationshipRequest publishes to the specified topic with the message given.
PublishAuthRelationshipRequest(ctx context.Context, topic string, message AuthRelationshipRequest) (Message[AuthRelationshipResponse], error)
// PublishChange publishes to the specified topic with the message given.
PublishChange(ctx context.Context, topic string, message ChangeMessage) (Message[ChangeMessage], error)
// PublishEvent publishes to the specified topic with the message given.
PublishEvent(ctx context.Context, topic string, message EventMessage) (Message[EventMessage], error)
}

// PubSub includes both publisher and subscriber methods.
type PubSub interface {
Connection
Subscriber
Publisher
}
150 changes: 150 additions & 0 deletions events/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,64 @@
package events

import (
"context"
"encoding/json"
"errors"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/multierr"

"go.infratographer.com/x/gidx"
)

var (
// ErrInvalidMessageAction is returned when the event message has the incorrect field Action value.
ErrInvalidMessageAction = errors.New("event message Action field must be write or delete")
// ErrMissingMessageObjectID is returned when the event message has the incorrect field ObjectID value.
ErrMissingMessageObjectID = errors.New("event message ObjectID field required")
// ErrMissingMessageRelationshipName is returned when the event message has the incorrect field RelationshipName value.
ErrMissingMessageRelationshipName = errors.New("event message RelationshipName field required")
// ErrMissingMessageSubjectID is returned when the event message has the incorrect field SubjectID value.
ErrMissingMessageSubjectID = errors.New("event message SubjectID field required")
// ErrMissingMessageEventType is returned when the event message has the incorrect field EventType value.
ErrMissingMessageEventType = errors.New("event message EventType field required")
)

// Message contains a message which has been published or received from a subscription.
type Message[T any] interface {
// Connection returns the underlying connection the message was received on.
Connection() Connection

// ID returns the unique message id.
ID() string
// Topic returns the topic the message was sent to.
Topic() string
// Message returns the decoded message object.
Message() T
// Ack acks the message.
Ack() error
// Nak nacks the message.
Nak(delay time.Duration) error
// Term terminates the message.
Term() error
// Timestamp returns the time the message was submitted.
Timestamp() time.Time
// Deliveries returns the number of times the message was delivered.
Deliveries() uint64

// Error returns any error encountered while decoding the message
Error() error

// ReplyAuthRelationshipRequest publishes an AuthRelationshipResponse message.
// An error is returned if the message is not an AuthRelationshipRequest.
ReplyAuthRelationshipRequest(ctx context.Context, message AuthRelationshipResponse) (Message[AuthRelationshipResponse], error)

// Source returns the underlying message object.
Source() any
}

// ChangeType represents the possible event types for a ChangeMessage
type ChangeType string

Expand Down Expand Up @@ -82,6 +134,32 @@ type ChangeMessage struct {
AdditionalData map[string]interface{} `json:"additionalData"`
}

// GetTraceContext creates a new OpenTelementry context for the message.
func (m ChangeMessage) GetTraceContext(ctx context.Context) context.Context {
tp := otel.GetTextMapPropagator()

return tp.Extract(ctx, propagation.MapCarrier(m.TraceContext))
}

// Validate ensures the message has all the required fields.
func (m ChangeMessage) Validate() error {
var errors []error

if m.SubjectID == "" {
errors = append(errors, ErrMissingMessageSubjectID)
}

if m.EventType == "" {
errors = append(errors, ErrMissingMessageEventType)
}

if len(errors) != 0 {
return multierr.Combine(errors...)
}

return nil
}

// EventMessage contains the data structure expected to be received when picking
// an event from an events message queue
type EventMessage struct {
Expand All @@ -107,6 +185,32 @@ type EventMessage struct {
Data map[string]interface{} `json:"data"`
}

// GetTraceContext creates a new OpenTelementry context for the message.
func (m EventMessage) GetTraceContext(ctx context.Context) context.Context {
tp := otel.GetTextMapPropagator()

return tp.Extract(ctx, propagation.MapCarrier(m.TraceContext))
}

// Validate ensures the message has all the required fields.
func (m EventMessage) Validate() error {
var errors []error

if m.SubjectID == "" {
errors = append(errors, ErrMissingMessageSubjectID)
}

if m.EventType == "" {
errors = append(errors, ErrMissingMessageEventType)
}

if len(errors) != 0 {
return multierr.Combine(errors...)
}

return nil
}

// AuthRelationshipRequest contains the data structure expected to be used to write or delete
// an auth relationship from PermissionsAPI
type AuthRelationshipRequest struct {
Expand All @@ -133,6 +237,40 @@ type AuthRelationshipRequest struct {
SpanID string `json:"spanID"`
}

// GetTraceContext creates a new OpenTelementry context for the message.
func (m AuthRelationshipRequest) GetTraceContext(ctx context.Context) context.Context {
tp := otel.GetTextMapPropagator()

return tp.Extract(ctx, propagation.MapCarrier(m.TraceContext))
}

// Validate ensures the message has all the required fields.
func (m AuthRelationshipRequest) Validate() error {
var errors []error

if m.Action == "" || m.Action != "write" && m.Action != "delete" {
errors = append(errors, ErrInvalidMessageAction)
}

if m.ObjectID == "" {
errors = append(errors, ErrMissingMessageObjectID)
}

if m.RelationshipName == "" {
errors = append(errors, ErrMissingMessageRelationshipName)
}

if m.SubjectID == "" {
errors = append(errors, ErrMissingMessageSubjectID)
}

if len(errors) != 0 {
return multierr.Combine(errors...)
}

return nil
}

// AuthRelationshipResponse contains the data structure expected to be received from an AuthRelationshipRequest
// message to write or delete an auth relationship from PermissionsAPI
type AuthRelationshipResponse struct {
Expand All @@ -148,6 +286,18 @@ type AuthRelationshipResponse struct {
SpanID string `json:"spanID"`
}

// GetTraceContext creates a new OpenTelementry context for the message.
func (m AuthRelationshipResponse) GetTraceContext(ctx context.Context) context.Context {
tp := otel.GetTextMapPropagator()

return tp.Extract(ctx, propagation.MapCarrier(m.TraceContext))
}

// Validate ensures the message has all the required fields.
func (m AuthRelationshipResponse) Validate() error {
return nil
}

// UnmarshalChangeMessage returns a ChangeMessage from a json []byte.
func UnmarshalChangeMessage(b []byte) (ChangeMessage, error) {
var c ChangeMessage
Expand Down
Loading

0 comments on commit 9fff497

Please sign in to comment.