Skip to content

Commit

Permalink
Add tracing to events package, deprecate TraceID/SpanID
Browse files Browse the repository at this point in the history
This commit adds tracing using a new field, TraceContext, in
ChangeMessage and EventMessage. While we could stick with TraceID and
SpanID (and those fields still exist in the message and will be
marshaled/unmarshaled), there are other OpenTelemetry features like
the Baggage API and W3C traceparent/tracestate values that we can
leverage using TraceContext instead. Moving to this allows for a
tighter integration with OpenTelemetry's Go SDK as well and makes it
easier for other services using the events package to add
observability to their event systems.

Signed-off-by: John Schaeffer <[email protected]>
  • Loading branch information
jnschaeffer committed Jul 19, 2023
1 parent bd9575a commit d481531
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 3 deletions.
8 changes: 8 additions & 0 deletions events/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ type ChangeMessage struct {
Source string `json:"source"`
// Timestamp is the time representing when the message was created
Timestamp time.Time `json:"timestamp"`
// TraceContext is a map of values used for OpenTelemetry context propagation.
TraceContext map[string]string `json:"traceContext"`
// TraceID is the ID of the trace for this event
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
TraceID string `json:"traceID"`
// SpanID is the ID of the span that additional traces should based off of
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
SpanID string `json:"spanID"`
// SubjectFields is a map of the fields on the subject
SubjectFields map[string]string `json:"subjectFields"`
Expand All @@ -91,9 +95,13 @@ type EventMessage struct {
Source string `json:"source"`
// Timestamp is the time representing when the message was created
Timestamp time.Time `json:"timestamp"`
// TraceContext is a map of values used for OpenTelemetry context propagation.
TraceContext map[string]string `json:"traceContext"`
// TraceID is the ID of the trace for this event
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
TraceID string `json:"traceID"`
// SpanID is the ID of the span that additional traces should based off of
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
SpanID string `json:"spanID"`
// Data is a field to store any information that may be important to include about the event
Data map[string]interface{} `json:"data"`
Expand Down
1 change: 1 addition & 0 deletions events/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func testChange(eventType string) events.ChangeMessage {
CurrentValue: string(js),
},
},
TraceContext: map[string]string{},
}
}

Expand Down
140 changes: 137 additions & 3 deletions events/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,21 @@ import (

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.infratographer.com/x/echojwtx"
"go.infratographer.com/x/gidx"
)

const instrumentationName = "go.infratographer.com/x/events"

var tracer = otel.GetTracerProvider().Tracer(instrumentationName)

// ErrUnsupportedPubsub is returned when the pubsub URL is not a supported provider
var ErrUnsupportedPubsub = errors.New("unsupported pubsub provider")

Expand Down Expand Up @@ -72,12 +81,54 @@ func NewPublisher(cfg PublisherConfig) (*Publisher, error) {

// PublishChange will publish a ChangeMessage to the topic for the change
func (p *Publisher) PublishChange(ctx context.Context, subjectType string, change ChangeMessage) error {
ctx, span := tracer.Start(
ctx,
"events.publishChange",
trace.WithAttributes(
attribute.String(
"events.subject_type",
subjectType,
),
attribute.String(
"events.subject_id",
change.SubjectID.String(),
),
attribute.String(
"events.event_type",
change.EventType,
),
attribute.String(
"events.source",
p.source,
),
),
)

defer span.End()

// Propagate trace context into the message for the subscriber
var mapCarrier propagation.MapCarrier = make(map[string]string)

otel.GetTextMapPropagator().Inject(ctx, mapCarrier)

change.TraceContext = mapCarrier

if change.EventType == "" {
span.RecordError(ErrMissingEventType)
span.SetStatus(codes.Error, ErrMissingEventType.Error())

return ErrMissingEventType
}

topic := strings.Join([]string{p.prefix, "changes", change.EventType, subjectType}, ".")

span.SetAttributes(
attribute.String(
"events.topic",
topic,
),
)

change.Source = p.source
if change.ActorID == gidx.NullPrefixedID {
id, ok := ctx.Value(echojwtx.ActorCtxKey).(string)
Expand All @@ -88,34 +139,117 @@ func (p *Publisher) PublishChange(ctx context.Context, subjectType string, chang
}
}

span.SetAttributes(
attribute.String(
"events.actor_id",
change.ActorID.String(),
),
)

v, err := json.Marshal(change)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

msg := message.NewMessage(watermill.NewUUID(), v)

return p.publisher.Publish(topic, msg)
span.SetAttributes(
attribute.String(
"events.message_id",
msg.UUID,
),
)

if err := p.publisher.Publish(topic, msg); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

return nil
}

// PublishEvent will publish an EventMessage to the proper topic for that event
func (p *Publisher) PublishEvent(_ context.Context, subjectType string, event EventMessage) error {
func (p *Publisher) PublishEvent(ctx context.Context, subjectType string, event EventMessage) error {
ctx, span := tracer.Start(
ctx,
"events.publishEvent",
trace.WithAttributes(
attribute.String(
"events.subject_type",
subjectType,
),
attribute.String(
"events.subject_id",
event.SubjectID.String(),
),
attribute.String(
"events.event_type",
event.EventType,
),
attribute.String(
"events.source",
p.source,
),
),
)

defer span.End()

// Propagate trace context into the message for the subscriber
var mapCarrier propagation.MapCarrier = make(map[string]string)

otel.GetTextMapPropagator().Inject(ctx, mapCarrier)

event.TraceContext = mapCarrier

if event.EventType == "" {
span.RecordError(ErrMissingEventType)
span.SetStatus(codes.Error, ErrMissingEventType.Error())

return ErrMissingEventType
}

topic := strings.Join([]string{p.prefix, "events", subjectType, event.EventType}, ".")

span.SetAttributes(
attribute.String(
"events.topic",
topic,
),
)

event.Source = p.source

v, err := json.Marshal(event)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

msg := message.NewMessage(watermill.NewUUID(), v)

return p.publisher.Publish(topic, msg)
span.SetAttributes(
attribute.String(
"events.message_id",
msg.UUID,
),
)

if err := p.publisher.Publish(topic, msg); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

return nil
}

// Close will close the publisher
Expand Down
16 changes: 16 additions & 0 deletions events/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/ThreeDotsLabs/watermill/message"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -77,3 +79,17 @@ func (s *Subscriber) SubscribeEvents(ctx context.Context, topic string) (<-chan
func (s *Subscriber) Close() error {
return s.subscriber.Close()
}

// TraceContextFromChangeMessage creates a new OpenTelemetry context from the given ChangeMessage.
func TraceContextFromChangeMessage(ctx context.Context, msg ChangeMessage) context.Context {
tp := otel.GetTextMapPropagator()

return tp.Extract(ctx, propagation.MapCarrier(msg.TraceContext))
}

// TraceContextFromEventMessage creates a new OpenTelemetry context from the given ChangeMessage.
func TraceContextFromEventMessage(ctx context.Context, msg EventMessage) context.Context {
tp := otel.GetTextMapPropagator()

return tp.Extract(ctx, propagation.MapCarrier(msg.TraceContext))
}
1 change: 1 addition & 0 deletions testing/eventtools/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func testChange(eventType string) events.ChangeMessage {
CurrentValue: string(js),
},
},
TraceContext: map[string]string{},
}
}

Expand Down

0 comments on commit d481531

Please sign in to comment.