Skip to content

Commit

Permalink
Add tracing to events package, deprecate TraceID/SpanID (#123)
Browse files Browse the repository at this point in the history
This PR adds tracing using a new field, TraceContext, in ChangeMessage
and EventMessage. While we could stick with TraceID and SpanID (and
those fields still exist in the message and will be
marshaled/unmarshaled), there are other OpenTelemetry features like the
Baggage API and W3C traceparent/tracestate values that we can leverage
using TraceContext instead. Moving to this allows for a tighter
integration with OpenTelemetry's Go SDK as well and makes it easier for
other services using the events package to add observability to their
event systems.

---------

Signed-off-by: John Schaeffer <[email protected]>
  • Loading branch information
jnschaeffer authored Jul 20, 2023
1 parent 7f209fd commit 49f17be
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 3 deletions.
16 changes: 16 additions & 0 deletions events/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ type ChangeMessage struct {
Source string `json:"source"`
// Timestamp is the time representing when the message was created
Timestamp time.Time `json:"timestamp"`
// TraceContext is a map of values used for OpenTelemetry context propagation.
TraceContext map[string]string `json:"traceContext"`
// TraceID is the ID of the trace for this event
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
TraceID string `json:"traceID"`
// SpanID is the ID of the span that additional traces should based off of
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
SpanID string `json:"spanID"`
// SubjectFields is a map of the fields on the subject
SubjectFields map[string]string `json:"subjectFields"`
Expand All @@ -91,9 +95,13 @@ type EventMessage struct {
Source string `json:"source"`
// Timestamp is the time representing when the message was created
Timestamp time.Time `json:"timestamp"`
// TraceContext is a map of values used for OpenTelemetry context propagation.
TraceContext map[string]string `json:"traceContext"`
// TraceID is the ID of the trace for this event
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
TraceID string `json:"traceID"`
// SpanID is the ID of the span that additional traces should based off of
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
SpanID string `json:"spanID"`
// Data is a field to store any information that may be important to include about the event
Data map[string]interface{} `json:"data"`
Expand All @@ -115,9 +123,13 @@ type AuthRelationshipRequest struct {
ConditionName string `json:"conditionName"`
// ConditionValues are the condition values to be used on the condition check. (Optional)
ConditionValues map[string]interface{} `json:"conditionValue"`
// TraceContext is a map of values used for OpenTelemetry context propagation.
TraceContext map[string]string `json:"traceContext"`
// TraceID is the ID of the trace for this event
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
TraceID string `json:"traceID"`
// SpanID is the ID of the span that additional traces should based off of
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
SpanID string `json:"spanID"`
}

Expand All @@ -126,9 +138,13 @@ type AuthRelationshipRequest struct {
type AuthRelationshipResponse struct {
// Errors contains any errors, if empty the request was successful
Errors []error `json:"errors"`
// TraceContext is a map of values used for OpenTelemetry context propagation.
TraceContext map[string]string `json:"traceContext"`
// TraceID is the ID of the trace for this event
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
TraceID string `json:"traceID"`
// SpanID is the ID of the span that additional traces should based off of
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
SpanID string `json:"spanID"`
}

Expand Down
1 change: 1 addition & 0 deletions events/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func testChange(eventType string) events.ChangeMessage {
CurrentValue: string(js),
},
},
TraceContext: map[string]string{},
}
}

Expand Down
142 changes: 139 additions & 3 deletions events/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@ import (

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.infratographer.com/x/echojwtx"
"go.infratographer.com/x/gidx"
)

const instrumentationName = "go.infratographer.com/x/events"

// ErrUnsupportedPubsub is returned when the pubsub URL is not a supported provider
var ErrUnsupportedPubsub = errors.New("unsupported pubsub provider")

Expand All @@ -40,14 +47,18 @@ type Publisher struct {
source string
publisher message.Publisher
logger *zap.SugaredLogger
tracer trace.Tracer
}

// NewPublisherWithLogger returns a publisher for the given config provided
func NewPublisherWithLogger(cfg PublisherConfig, logger *zap.SugaredLogger) (*Publisher, error) {
tracer := otel.GetTracerProvider().Tracer(instrumentationName)

p := &Publisher{
prefix: cfg.Prefix,
source: cfg.Source,
logger: logger,
tracer: tracer,
}

switch {
Expand All @@ -72,12 +83,54 @@ func NewPublisher(cfg PublisherConfig) (*Publisher, error) {

// PublishChange will publish a ChangeMessage to the topic for the change
func (p *Publisher) PublishChange(ctx context.Context, subjectType string, change ChangeMessage) error {
ctx, span := p.tracer.Start(
ctx,
"events.publishChange",
trace.WithAttributes(
attribute.String(
"events.subject_type",
subjectType,
),
attribute.String(
"events.subject_id",
change.SubjectID.String(),
),
attribute.String(
"events.event_type",
change.EventType,
),
attribute.String(
"events.source",
change.Source,
),
),
)

defer span.End()

// Propagate trace context into the message for the subscriber
var mapCarrier propagation.MapCarrier = make(map[string]string)

otel.GetTextMapPropagator().Inject(ctx, mapCarrier)

change.TraceContext = mapCarrier

if change.EventType == "" {
span.RecordError(ErrMissingEventType)
span.SetStatus(codes.Error, ErrMissingEventType.Error())

return ErrMissingEventType
}

topic := strings.Join([]string{p.prefix, "changes", change.EventType, subjectType}, ".")

span.SetAttributes(
attribute.String(
"events.topic",
topic,
),
)

change.Source = p.source
if change.ActorID == gidx.NullPrefixedID {
id, ok := ctx.Value(echojwtx.ActorCtxKey).(string)
Expand All @@ -88,34 +141,117 @@ func (p *Publisher) PublishChange(ctx context.Context, subjectType string, chang
}
}

span.SetAttributes(
attribute.String(
"events.actor_id",
change.ActorID.String(),
),
)

v, err := json.Marshal(change)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

msg := message.NewMessage(watermill.NewUUID(), v)

return p.publisher.Publish(topic, msg)
span.SetAttributes(
attribute.String(
"events.message_id",
msg.UUID,
),
)

if err := p.publisher.Publish(topic, msg); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

return nil
}

// PublishEvent will publish an EventMessage to the proper topic for that event
func (p *Publisher) PublishEvent(_ context.Context, subjectType string, event EventMessage) error {
func (p *Publisher) PublishEvent(ctx context.Context, subjectType string, event EventMessage) error {
ctx, span := p.tracer.Start(
ctx,
"events.publishEvent",
trace.WithAttributes(
attribute.String(
"events.subject_type",
subjectType,
),
attribute.String(
"events.subject_id",
event.SubjectID.String(),
),
attribute.String(
"events.event_type",
event.EventType,
),
attribute.String(
"events.source",
event.Source,
),
),
)

defer span.End()

// Propagate trace context into the message for the subscriber
var mapCarrier propagation.MapCarrier = make(map[string]string)

otel.GetTextMapPropagator().Inject(ctx, mapCarrier)

event.TraceContext = mapCarrier

if event.EventType == "" {
span.RecordError(ErrMissingEventType)
span.SetStatus(codes.Error, ErrMissingEventType.Error())

return ErrMissingEventType
}

topic := strings.Join([]string{p.prefix, "events", subjectType, event.EventType}, ".")

span.SetAttributes(
attribute.String(
"events.topic",
topic,
),
)

event.Source = p.source

v, err := json.Marshal(event)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

msg := message.NewMessage(watermill.NewUUID(), v)

return p.publisher.Publish(topic, msg)
span.SetAttributes(
attribute.String(
"events.message_id",
msg.UUID,
),
)

if err := p.publisher.Publish(topic, msg); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

return nil
}

// Close will close the publisher
Expand Down
16 changes: 16 additions & 0 deletions events/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/ThreeDotsLabs/watermill/message"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -77,3 +79,17 @@ func (s *Subscriber) SubscribeEvents(ctx context.Context, topic string) (<-chan
func (s *Subscriber) Close() error {
return s.subscriber.Close()
}

// TraceContextFromChangeMessage creates a new OpenTelemetry context from the given ChangeMessage.
func TraceContextFromChangeMessage(ctx context.Context, msg ChangeMessage) context.Context {
tp := otel.GetTextMapPropagator()

return tp.Extract(ctx, propagation.MapCarrier(msg.TraceContext))
}

// TraceContextFromEventMessage creates a new OpenTelemetry context from the given ChangeMessage.
func TraceContextFromEventMessage(ctx context.Context, msg EventMessage) context.Context {
tp := otel.GetTextMapPropagator()

return tp.Extract(ctx, propagation.MapCarrier(msg.TraceContext))
}
1 change: 1 addition & 0 deletions testing/eventtools/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func testChange(eventType string) events.ChangeMessage {
CurrentValue: string(js),
},
},
TraceContext: map[string]string{},
}
}

Expand Down

0 comments on commit 49f17be

Please sign in to comment.