Skip to content

Commit

Permalink
Adding Span Link support for compound header tag extractions with con…
Browse files Browse the repository at this point in the history
…flicting trace IDs (#2948)

Co-authored-by: Mikayla Toffler <[email protected]>
Co-authored-by: Mikayla Toffler <[email protected]>
Co-authored-by: Dario Castañé <[email protected]>
  • Loading branch information
4 people authored Dec 9, 2024
1 parent d9a7f04 commit abfec9a
Show file tree
Hide file tree
Showing 17 changed files with 260 additions and 68 deletions.
8 changes: 8 additions & 0 deletions contrib/IBM/sarama.v1/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P
// kafka supports headers, so try to extract a span context
carrier := NewConsumerMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
opts = append(opts, tracer.ChildOf(spanctx))
}
next := tracer.StartSpan(cfg.consumerSpanName, opts...)
Expand Down Expand Up @@ -298,6 +302,10 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro
}
// if there's a span context in the headers, use that as the parent
if spanctx, err := tracer.Extract(carrier); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
opts = append(opts, tracer.ChildOf(spanctx))
}
span := tracer.StartSpan(cfg.producerSpanName, opts...)
Expand Down
8 changes: 8 additions & 0 deletions contrib/Shopify/sarama/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P
// kafka supports headers, so try to extract a span context
carrier := NewConsumerMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
opts = append(opts, tracer.ChildOf(spanctx))
}
next := tracer.StartSpan(cfg.consumerSpanName, opts...)
Expand Down Expand Up @@ -301,6 +305,10 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro
}
// if there's a span context in the headers, use that as the parent
if spanctx, err := tracer.Extract(carrier); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
opts = append(opts, tracer.ChildOf(spanctx))
}
span := tracer.StartSpan(cfg.producerSpanName, opts...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func TraceReceiveFunc(s Subscription, opts ...Option) func(ctx context.Context,
if cfg.measured {
opts = append(opts, tracer.Measured())
}
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := parentSpanCtx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
span, ctx := tracer.StartSpanFromContext(ctx, cfg.receiveSpanName, opts...)
if msg.DeliveryAttempt != nil {
span.SetTag("delivery_attempt", *msg.DeliveryAttempt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (tr *KafkaTracer) StartConsumeSpan(msg Message) ddtrace.Span {
// kafka supports headers, so try to extract a span context
carrier := MessageCarrier{msg: msg}
if spanctx, err := tracer.Extract(carrier); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(tr.ctx, tr.consumerSpanName, opts...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (tr *KafkaTracer) StartProduceSpan(msg Message) ddtrace.Span {
// if there's a span context in the headers, use that as the parent
carrier := NewMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(tr.ctx, tr.producerSpanName, opts...)
Expand Down
4 changes: 4 additions & 0 deletions contrib/gofiber/fiber.v2/fiber.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func Middleware(opts ...Option) func(c *fiber.Ctx) error {
}
}
if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(h)); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
opts = append(opts, tracer.ChildOf(spanctx))
}
opts = append(opts, cfg.spanOpts...)
Expand Down
4 changes: 4 additions & 0 deletions contrib/google.golang.org/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func startSpanFromContext(
)
md, _ := metadata.FromIncomingContext(ctx) // nil is ok
if sctx, err := tracer.Extract(grpcutil.MDCarrier(md)); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := sctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
opts = append(opts, tracer.ChildOf(sctx))
}
return tracer.StartSpanFromContext(ctx, operation, opts...)
Expand Down
6 changes: 5 additions & 1 deletion contrib/internal/httptrace/httptrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ func StartRequestSpan(r *http.Request, opts ...ddtrace.StartSpanOption) (tracer.
cfg.Tags["http.host"] = r.Host
}
if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(r.Header)); err == nil {
cfg.Parent = spanctx
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
tracer.WithSpanLinks(linksCtx.SpanLinks())(cfg)
}
tracer.ChildOf(spanctx)(cfg)
}
for k, v := range ipTags {
cfg.Tags[k] = v
Expand Down
4 changes: 4 additions & 0 deletions contrib/segmentio/kafka.go.v0/internal/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (tr *Tracer) StartConsumeSpan(ctx context.Context, msg Message) ddtrace.Spa
// kafka supports headers, so try to extract a span context
carrier := NewMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(ctx, tr.consumerSpanName, opts...)
Expand Down
9 changes: 9 additions & 0 deletions contrib/twitchtv/twirp/twirp.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"strconv"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
Expand Down Expand Up @@ -90,6 +91,10 @@ func (wc *wrappedClient) Do(req *http.Request) (*http.Response, error) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, wc.cfg.analyticsRate))
}
if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(req.Header)); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
opts = append(opts, tracer.ChildOf(spanctx))
}

Expand Down Expand Up @@ -139,6 +144,10 @@ func WrapServer(h http.Handler, opts ...Option) http.Handler {
spanOpts = append(spanOpts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate))
}
if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(r.Header)); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
spanOpts = append(spanOpts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
spanOpts = append(spanOpts, tracer.ChildOf(spanctx))
}
span, ctx := tracer.StartSpanFromContext(r.Context(), "twirp.handler", spanOpts...)
Expand Down
4 changes: 4 additions & 0 deletions contrib/valyala/fasthttp.v1/fasthttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func WrapHandler(h fasthttp.RequestHandler, opts ...Option) fasthttp.RequestHand
ReqHeader: &fctx.Request.Header,
}
if sctx, err := tracer.Extract(fcc); err == nil {
// If there are span links as a result of context extraction, add them as a StartSpanOption
if linksCtx, ok := sctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil {
spanOpts = append(spanOpts, tracer.WithSpanLinks(linksCtx.SpanLinks()))
}
spanOpts = append(spanOpts, tracer.ChildOf(sctx))
}
span := fasthttptrace.StartSpanFromContext(fctx, "http.request", spanOpts...)
Expand Down
28 changes: 14 additions & 14 deletions ddtrace/tracer/civisibility_tslv_msgp.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type span struct {
TraceID uint64 `msg:"trace_id"` // lower 64-bits of the root span identifier
ParentID uint64 `msg:"parent_id"` // identifier of the span's direct parent
Error int32 `msg:"error"` // error status of the span; 0 means no errors
SpanLinks []ddtrace.SpanLink `msg:"span_links"` // links to other spans
SpanLinks []ddtrace.SpanLink `msg:"span_links,omitempty"` // links to other spans

goExecTraced bool `msg:"-"`
noDebugStack bool `msg:"-"` // disables debug stack traces
Expand Down
39 changes: 22 additions & 17 deletions ddtrace/tracer/span_msgp.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit abfec9a

Please sign in to comment.