Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing to events package, deprecate TraceID/SpanID #123

Merged
merged 4 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions events/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ type ChangeMessage struct {
Source string `json:"source"`
// Timestamp is the time representing when the message was created
Timestamp time.Time `json:"timestamp"`
// TraceContext is a map of values used for OpenTelemetry context propagation.
TraceContext map[string]string `json:"traceContext"`
// TraceID is the ID of the trace for this event
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
TraceID string `json:"traceID"`
// SpanID is the ID of the span that additional traces should based off of
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
SpanID string `json:"spanID"`
// SubjectFields is a map of the fields on the subject
SubjectFields map[string]string `json:"subjectFields"`
Expand All @@ -91,9 +95,13 @@ type EventMessage struct {
Source string `json:"source"`
// Timestamp is the time representing when the message was created
Timestamp time.Time `json:"timestamp"`
// TraceContext is a map of values used for OpenTelemetry context propagation.
TraceContext map[string]string `json:"traceContext"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to use the underlying type of propogation.MapCarrier instead of that type directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like having TraceContext be a propagation.MapCarrier? If so, we could do that. I kind of like the idea of minimizing third-party dependencies in our message formats though.

Copy link

@adammohammed adammohammed Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my thinking is that a service using this type should not be free to just write arbitrary keys, or should need to pass through the interface the carrier provides. Looking at the otel SDK again, propogation.MapCarrier is just one implementation, so I think this is the best way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more (and also condensing an offline discussion with @adammohammed): My main argument for using builtin types like map[string]string in here is that it makes the message formats clearer for consumers that aren't using Go for writing Infratographer integrations. While it's conventional in Go to use things like time.Time for JSON data, that doesn't really tell an outside party what the shape of the data should be - to figure that out, they would need to go to the relevant documentation. Since we're not using protobuf or a similar language-agnostic method for publishing these, it's probably good to keep the barrier to understanding the message format as low as possible.

// TraceID is the ID of the trace for this event
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
TraceID string `json:"traceID"`
// SpanID is the ID of the span that additional traces should based off of
// Deprecated: Use TraceContext with OpenTelemetry context propagation instead.
SpanID string `json:"spanID"`
// Data is a field to store any information that may be important to include about the event
Data map[string]interface{} `json:"data"`
Expand Down
1 change: 1 addition & 0 deletions events/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func testChange(eventType string) events.ChangeMessage {
CurrentValue: string(js),
},
},
TraceContext: map[string]string{},
}
}

Expand Down
142 changes: 139 additions & 3 deletions events/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@ import (

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"go.infratographer.com/x/echojwtx"
"go.infratographer.com/x/gidx"
)

const instrumentationName = "go.infratographer.com/x/events"

// ErrUnsupportedPubsub is returned when the pubsub URL is not a supported provider
var ErrUnsupportedPubsub = errors.New("unsupported pubsub provider")

Expand All @@ -40,14 +47,18 @@ type Publisher struct {
source string
publisher message.Publisher
logger *zap.SugaredLogger
tracer trace.Tracer
}

// NewPublisherWithLogger returns a publisher for the given config provided
func NewPublisherWithLogger(cfg PublisherConfig, logger *zap.SugaredLogger) (*Publisher, error) {
tracer := otel.GetTracerProvider().Tracer(instrumentationName)

p := &Publisher{
prefix: cfg.Prefix,
source: cfg.Source,
logger: logger,
tracer: tracer,
}

switch {
Expand All @@ -72,12 +83,54 @@ func NewPublisher(cfg PublisherConfig) (*Publisher, error) {

// PublishChange will publish a ChangeMessage to the topic for the change
func (p *Publisher) PublishChange(ctx context.Context, subjectType string, change ChangeMessage) error {
ctx, span := p.tracer.Start(
ctx,
"events.publishChange",
trace.WithAttributes(
attribute.String(
"events.subject_type",
subjectType,
),
attribute.String(
"events.subject_id",
change.SubjectID.String(),
),
attribute.String(
"events.event_type",
change.EventType,
),
attribute.String(
"events.source",
p.source,
jnschaeffer marked this conversation as resolved.
Show resolved Hide resolved
),
),
)

defer span.End()

// Propagate trace context into the message for the subscriber
var mapCarrier propagation.MapCarrier = make(map[string]string)

otel.GetTextMapPropagator().Inject(ctx, mapCarrier)

change.TraceContext = mapCarrier

if change.EventType == "" {
span.RecordError(ErrMissingEventType)
span.SetStatus(codes.Error, ErrMissingEventType.Error())

return ErrMissingEventType
}

topic := strings.Join([]string{p.prefix, "changes", change.EventType, subjectType}, ".")

span.SetAttributes(
attribute.String(
"events.topic",
topic,
),
)

change.Source = p.source
if change.ActorID == gidx.NullPrefixedID {
id, ok := ctx.Value(echojwtx.ActorCtxKey).(string)
Expand All @@ -88,34 +141,117 @@ func (p *Publisher) PublishChange(ctx context.Context, subjectType string, chang
}
}

span.SetAttributes(
attribute.String(
"events.actor_id",
change.ActorID.String(),
),
)

v, err := json.Marshal(change)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

msg := message.NewMessage(watermill.NewUUID(), v)

return p.publisher.Publish(topic, msg)
span.SetAttributes(
attribute.String(
"events.message_id",
msg.UUID,
),
)

if err := p.publisher.Publish(topic, msg); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

return nil
}

// PublishEvent will publish an EventMessage to the proper topic for that event
func (p *Publisher) PublishEvent(_ context.Context, subjectType string, event EventMessage) error {
func (p *Publisher) PublishEvent(ctx context.Context, subjectType string, event EventMessage) error {
ctx, span := p.tracer.Start(
ctx,
"events.publishEvent",
trace.WithAttributes(
attribute.String(
"events.subject_type",
subjectType,
),
attribute.String(
"events.subject_id",
event.SubjectID.String(),
),
attribute.String(
"events.event_type",
event.EventType,
),
attribute.String(
"events.source",
p.source,
jnschaeffer marked this conversation as resolved.
Show resolved Hide resolved
),
),
)

defer span.End()

// Propagate trace context into the message for the subscriber
var mapCarrier propagation.MapCarrier = make(map[string]string)

otel.GetTextMapPropagator().Inject(ctx, mapCarrier)

event.TraceContext = mapCarrier

if event.EventType == "" {
span.RecordError(ErrMissingEventType)
span.SetStatus(codes.Error, ErrMissingEventType.Error())

return ErrMissingEventType
}

topic := strings.Join([]string{p.prefix, "events", subjectType, event.EventType}, ".")

span.SetAttributes(
attribute.String(
"events.topic",
topic,
),
)

event.Source = p.source

v, err := json.Marshal(event)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

msg := message.NewMessage(watermill.NewUUID(), v)

return p.publisher.Publish(topic, msg)
span.SetAttributes(
attribute.String(
"events.message_id",
msg.UUID,
),
)

if err := p.publisher.Publish(topic, msg); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

return nil
}

// Close will close the publisher
Expand Down
16 changes: 16 additions & 0 deletions events/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/ThreeDotsLabs/watermill/message"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -77,3 +79,17 @@ func (s *Subscriber) SubscribeEvents(ctx context.Context, topic string) (<-chan
func (s *Subscriber) Close() error {
return s.subscriber.Close()
}

// TraceContextFromChangeMessage creates a new OpenTelemetry context from the given ChangeMessage.
func TraceContextFromChangeMessage(ctx context.Context, msg ChangeMessage) context.Context {
tp := otel.GetTextMapPropagator()

return tp.Extract(ctx, propagation.MapCarrier(msg.TraceContext))
}

// TraceContextFromEventMessage creates a new OpenTelemetry context from the given ChangeMessage.
func TraceContextFromEventMessage(ctx context.Context, msg EventMessage) context.Context {
tp := otel.GetTextMapPropagator()

return tp.Extract(ctx, propagation.MapCarrier(msg.TraceContext))
}
1 change: 1 addition & 0 deletions testing/eventtools/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func testChange(eventType string) events.ChangeMessage {
CurrentValue: string(js),
},
},
TraceContext: map[string]string{},
}
}

Expand Down
Loading