From abfec9a9699d7fffda50c1dd4bd30646c8061ba2 Mon Sep 17 00:00:00 2001 From: mhlidd Date: Mon, 9 Dec 2024 11:46:44 -0500 Subject: [PATCH] Adding Span Link support for compound header tag extractions with conflicting trace IDs (#2948) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mikayla Toffler <46911781+mtoffl01@users.noreply.github.com> Co-authored-by: Mikayla Toffler Co-authored-by: Dario Castañé --- contrib/IBM/sarama.v1/sarama.go | 8 ++ contrib/Shopify/sarama/sarama.go | 8 ++ .../go/pubsub.v1/internal/tracing/tracing.go | 4 + .../internal/tracing/consumer.go | 4 + .../internal/tracing/producer.go | 4 + contrib/gofiber/fiber.v2/fiber.go | 4 + contrib/google.golang.org/grpc/grpc.go | 4 + contrib/internal/httptrace/httptrace.go | 6 +- .../kafka.go.v0/internal/tracing/tracing.go | 4 + contrib/twitchtv/twirp/twirp.go | 9 ++ contrib/valyala/fasthttp.v1/fasthttp.go | 4 + ddtrace/tracer/civisibility_tslv_msgp.go | 28 ++--- ddtrace/tracer/span.go | 2 +- ddtrace/tracer/span_msgp.go | 39 ++++--- ddtrace/tracer/textmap.go | 105 ++++++++++++------ ddtrace/tracer/textmap_test.go | 93 +++++++++++++++- ddtrace/tracer/writer_test.go | 2 +- 17 files changed, 260 insertions(+), 68 deletions(-) diff --git a/contrib/IBM/sarama.v1/sarama.go b/contrib/IBM/sarama.v1/sarama.go index 7d9f86701e..3166977abc 100644 --- a/contrib/IBM/sarama.v1/sarama.go +++ b/contrib/IBM/sarama.v1/sarama.go @@ -74,6 +74,10 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P // kafka supports headers, so try to extract a span context carrier := NewConsumerMessageCarrier(msg) if spanctx, err := tracer.Extract(carrier); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } opts = append(opts, tracer.ChildOf(spanctx)) } next := tracer.StartSpan(cfg.consumerSpanName, opts...) @@ -298,6 +302,10 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro } // if there's a span context in the headers, use that as the parent if spanctx, err := tracer.Extract(carrier); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } opts = append(opts, tracer.ChildOf(spanctx)) } span := tracer.StartSpan(cfg.producerSpanName, opts...) diff --git a/contrib/Shopify/sarama/sarama.go b/contrib/Shopify/sarama/sarama.go index 5b96be10a7..a6bceae0e3 100644 --- a/contrib/Shopify/sarama/sarama.go +++ b/contrib/Shopify/sarama/sarama.go @@ -77,6 +77,10 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P // kafka supports headers, so try to extract a span context carrier := NewConsumerMessageCarrier(msg) if spanctx, err := tracer.Extract(carrier); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } opts = append(opts, tracer.ChildOf(spanctx)) } next := tracer.StartSpan(cfg.consumerSpanName, opts...) @@ -301,6 +305,10 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro } // if there's a span context in the headers, use that as the parent if spanctx, err := tracer.Extract(carrier); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } opts = append(opts, tracer.ChildOf(spanctx)) } span := tracer.StartSpan(cfg.producerSpanName, opts...) diff --git a/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go b/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go index 43633fbf48..d7259864bb 100644 --- a/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go +++ b/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go @@ -118,6 +118,10 @@ func TraceReceiveFunc(s Subscription, opts ...Option) func(ctx context.Context, if cfg.measured { opts = append(opts, tracer.Measured()) } + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := parentSpanCtx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } span, ctx := tracer.StartSpanFromContext(ctx, cfg.receiveSpanName, opts...) if msg.DeliveryAttempt != nil { span.SetTag("delivery_attempt", *msg.DeliveryAttempt) diff --git a/contrib/confluentinc/confluent-kafka-go/internal/tracing/consumer.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/consumer.go index 90678c4ed2..aca6125aa7 100644 --- a/contrib/confluentinc/confluent-kafka-go/internal/tracing/consumer.go +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/consumer.go @@ -77,6 +77,10 @@ func (tr *KafkaTracer) StartConsumeSpan(msg Message) ddtrace.Span { // kafka supports headers, so try to extract a span context carrier := MessageCarrier{msg: msg} if spanctx, err := tracer.Extract(carrier); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } opts = append(opts, tracer.ChildOf(spanctx)) } span, _ := tracer.StartSpanFromContext(tr.ctx, tr.consumerSpanName, opts...) diff --git a/contrib/confluentinc/confluent-kafka-go/internal/tracing/producer.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/producer.go index 25b043017f..e7e78db042 100644 --- a/contrib/confluentinc/confluent-kafka-go/internal/tracing/producer.go +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/producer.go @@ -67,6 +67,10 @@ func (tr *KafkaTracer) StartProduceSpan(msg Message) ddtrace.Span { // if there's a span context in the headers, use that as the parent carrier := NewMessageCarrier(msg) if spanctx, err := tracer.Extract(carrier); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } opts = append(opts, tracer.ChildOf(spanctx)) } span, _ := tracer.StartSpanFromContext(tr.ctx, tr.producerSpanName, opts...) diff --git a/contrib/gofiber/fiber.v2/fiber.go b/contrib/gofiber/fiber.v2/fiber.go index 7d9ef223fe..93a454417d 100644 --- a/contrib/gofiber/fiber.v2/fiber.go +++ b/contrib/gofiber/fiber.v2/fiber.go @@ -62,6 +62,10 @@ func Middleware(opts ...Option) func(c *fiber.Ctx) error { } } if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(h)); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } opts = append(opts, tracer.ChildOf(spanctx)) } opts = append(opts, cfg.spanOpts...) diff --git a/contrib/google.golang.org/grpc/grpc.go b/contrib/google.golang.org/grpc/grpc.go index 323edbb542..24ca5b7d61 100644 --- a/contrib/google.golang.org/grpc/grpc.go +++ b/contrib/google.golang.org/grpc/grpc.go @@ -71,6 +71,10 @@ func startSpanFromContext( ) md, _ := metadata.FromIncomingContext(ctx) // nil is ok if sctx, err := tracer.Extract(grpcutil.MDCarrier(md)); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := sctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } opts = append(opts, tracer.ChildOf(sctx)) } return tracer.StartSpanFromContext(ctx, operation, opts...) diff --git a/contrib/internal/httptrace/httptrace.go b/contrib/internal/httptrace/httptrace.go index 92069e03d8..4aad5aba38 100644 --- a/contrib/internal/httptrace/httptrace.go +++ b/contrib/internal/httptrace/httptrace.go @@ -51,7 +51,11 @@ func StartRequestSpan(r *http.Request, opts ...ddtrace.StartSpanOption) (tracer. cfg.Tags["http.host"] = r.Host } if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(r.Header)); err == nil { - cfg.Parent = spanctx + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + tracer.WithSpanLinks(linksCtx.SpanLinks())(cfg) + } + tracer.ChildOf(spanctx)(cfg) } for k, v := range ipTags { cfg.Tags[k] = v diff --git a/contrib/segmentio/kafka.go.v0/internal/tracing/tracing.go b/contrib/segmentio/kafka.go.v0/internal/tracing/tracing.go index 9b5f7bbb9b..34c6af01bd 100644 --- a/contrib/segmentio/kafka.go.v0/internal/tracing/tracing.go +++ b/contrib/segmentio/kafka.go.v0/internal/tracing/tracing.go @@ -49,6 +49,10 @@ func (tr *Tracer) StartConsumeSpan(ctx context.Context, msg Message) ddtrace.Spa // kafka supports headers, so try to extract a span context carrier := NewMessageCarrier(msg) if spanctx, err := tracer.Extract(carrier); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } opts = append(opts, tracer.ChildOf(spanctx)) } span, _ := tracer.StartSpanFromContext(ctx, tr.consumerSpanName, opts...) diff --git a/contrib/twitchtv/twirp/twirp.go b/contrib/twitchtv/twirp/twirp.go index 19515575f5..6178d2ff43 100644 --- a/contrib/twitchtv/twirp/twirp.go +++ b/contrib/twitchtv/twirp/twirp.go @@ -14,6 +14,7 @@ import ( "net/http" "strconv" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" @@ -90,6 +91,10 @@ func (wc *wrappedClient) Do(req *http.Request) (*http.Response, error) { opts = append(opts, tracer.Tag(ext.EventSampleRate, wc.cfg.analyticsRate)) } if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(req.Header)); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + opts = append(opts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } opts = append(opts, tracer.ChildOf(spanctx)) } @@ -139,6 +144,10 @@ func WrapServer(h http.Handler, opts ...Option) http.Handler { spanOpts = append(spanOpts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate)) } if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(r.Header)); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := spanctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + spanOpts = append(spanOpts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } spanOpts = append(spanOpts, tracer.ChildOf(spanctx)) } span, ctx := tracer.StartSpanFromContext(r.Context(), "twirp.handler", spanOpts...) diff --git a/contrib/valyala/fasthttp.v1/fasthttp.go b/contrib/valyala/fasthttp.v1/fasthttp.go index 7d803b8652..50dafb7cab 100644 --- a/contrib/valyala/fasthttp.v1/fasthttp.go +++ b/contrib/valyala/fasthttp.v1/fasthttp.go @@ -46,6 +46,10 @@ func WrapHandler(h fasthttp.RequestHandler, opts ...Option) fasthttp.RequestHand ReqHeader: &fctx.Request.Header, } if sctx, err := tracer.Extract(fcc); err == nil { + // If there are span links as a result of context extraction, add them as a StartSpanOption + if linksCtx, ok := sctx.(ddtrace.SpanContextWithLinks); ok && linksCtx.SpanLinks() != nil { + spanOpts = append(spanOpts, tracer.WithSpanLinks(linksCtx.SpanLinks())) + } spanOpts = append(spanOpts, tracer.ChildOf(sctx)) } span := fasthttptrace.StartSpanFromContext(fctx, "http.request", spanOpts...) diff --git a/ddtrace/tracer/civisibility_tslv_msgp.go b/ddtrace/tracer/civisibility_tslv_msgp.go index 63fa4b8499..37c4f06a52 100644 --- a/ddtrace/tracer/civisibility_tslv_msgp.go +++ b/ddtrace/tracer/civisibility_tslv_msgp.go @@ -497,25 +497,25 @@ func (z *tslvSpan) DecodeMsg(dc *msgp.Reader) (err error) { case "test_session_id": z.SessionID, err = dc.ReadUint64() if err != nil { - err = msgp.WrapError(err, "SessionId") + err = msgp.WrapError(err, "SessionID") return } case "test_module_id": z.ModuleID, err = dc.ReadUint64() if err != nil { - err = msgp.WrapError(err, "ModuleId") + err = msgp.WrapError(err, "ModuleID") return } case "test_suite_id": z.SuiteID, err = dc.ReadUint64() if err != nil { - err = msgp.WrapError(err, "SuiteId") + err = msgp.WrapError(err, "SuiteID") return } case "itr_correlation_id": z.CorrelationID, err = dc.ReadString() if err != nil { - err = msgp.WrapError(err, "CorrelationId") + err = msgp.WrapError(err, "CorrelationID") return } case "name": @@ -651,7 +651,7 @@ func (z *tslvSpan) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *tslvSpan) EncodeMsg(en *msgp.Writer) (err error) { - // omitempty: check for empty values + // check for omitted fields zb0001Len := uint32(16) var zb0001Mask uint16 /* 16 bits */ _ = zb0001Mask @@ -699,7 +699,7 @@ func (z *tslvSpan) EncodeMsg(en *msgp.Writer) (err error) { if zb0001Len == 0 { return } - if (zb0001Mask & 0x1) == 0 { // if not empty + if (zb0001Mask & 0x1) == 0 { // if not omitted // write "test_session_id" err = en.Append(0xaf, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64) if err != nil { @@ -711,7 +711,7 @@ func (z *tslvSpan) EncodeMsg(en *msgp.Writer) (err error) { return } } - if (zb0001Mask & 0x2) == 0 { // if not empty + if (zb0001Mask & 0x2) == 0 { // if not omitted // write "test_module_id" err = en.Append(0xae, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x6d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x5f, 0x69, 0x64) if err != nil { @@ -723,7 +723,7 @@ func (z *tslvSpan) EncodeMsg(en *msgp.Writer) (err error) { return } } - if (zb0001Mask & 0x4) == 0 { // if not empty + if (zb0001Mask & 0x4) == 0 { // if not omitted // write "test_suite_id" err = en.Append(0xad, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x73, 0x75, 0x69, 0x74, 0x65, 0x5f, 0x69, 0x64) if err != nil { @@ -735,7 +735,7 @@ func (z *tslvSpan) EncodeMsg(en *msgp.Writer) (err error) { return } } - if (zb0001Mask & 0x8) == 0 { // if not empty + if (zb0001Mask & 0x8) == 0 { // if not omitted // write "itr_correlation_id" err = en.Append(0xb2, 0x69, 0x74, 0x72, 0x5f, 0x63, 0x6f, 0x72, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64) if err != nil { @@ -807,7 +807,7 @@ func (z *tslvSpan) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Duration") return } - if (zb0001Mask & 0x400) == 0 { // if not empty + if (zb0001Mask & 0x400) == 0 { // if not omitted // write "span_id" err = en.Append(0xa7, 0x73, 0x70, 0x61, 0x6e, 0x5f, 0x69, 0x64) if err != nil { @@ -819,7 +819,7 @@ func (z *tslvSpan) EncodeMsg(en *msgp.Writer) (err error) { return } } - if (zb0001Mask & 0x800) == 0 { // if not empty + if (zb0001Mask & 0x800) == 0 { // if not omitted // write "trace_id" err = en.Append(0xa8, 0x74, 0x72, 0x61, 0x63, 0x65, 0x5f, 0x69, 0x64) if err != nil { @@ -831,7 +831,7 @@ func (z *tslvSpan) EncodeMsg(en *msgp.Writer) (err error) { return } } - if (zb0001Mask & 0x1000) == 0 { // if not empty + if (zb0001Mask & 0x1000) == 0 { // if not omitted // write "parent_id" err = en.Append(0xa9, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64) if err != nil { @@ -853,7 +853,7 @@ func (z *tslvSpan) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Error") return } - if (zb0001Mask & 0x4000) == 0 { // if not empty + if (zb0001Mask & 0x4000) == 0 { // if not omitted // write "meta" err = en.Append(0xa4, 0x6d, 0x65, 0x74, 0x61) if err != nil { @@ -877,7 +877,7 @@ func (z *tslvSpan) EncodeMsg(en *msgp.Writer) (err error) { } } } - if (zb0001Mask & 0x8000) == 0 { // if not empty + if (zb0001Mask & 0x8000) == 0 { // if not omitted // write "metrics" err = en.Append(0xa7, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73) if err != nil { diff --git a/ddtrace/tracer/span.go b/ddtrace/tracer/span.go index ebfa681fb3..27437ff5c4 100644 --- a/ddtrace/tracer/span.go +++ b/ddtrace/tracer/span.go @@ -78,7 +78,7 @@ type span struct { TraceID uint64 `msg:"trace_id"` // lower 64-bits of the root span identifier ParentID uint64 `msg:"parent_id"` // identifier of the span's direct parent Error int32 `msg:"error"` // error status of the span; 0 means no errors - SpanLinks []ddtrace.SpanLink `msg:"span_links"` // links to other spans + SpanLinks []ddtrace.SpanLink `msg:"span_links,omitempty"` // links to other spans goExecTraced bool `msg:"-"` noDebugStack bool `msg:"-"` // disables debug stack traces diff --git a/ddtrace/tracer/span_msgp.go b/ddtrace/tracer/span_msgp.go index c6cb8274a7..c4a65ef723 100644 --- a/ddtrace/tracer/span_msgp.go +++ b/ddtrace/tracer/span_msgp.go @@ -4,7 +4,6 @@ package tracer import ( "github.com/tinylib/msgp/msgp" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" ) @@ -273,7 +272,7 @@ func (z *span) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *span) EncodeMsg(en *msgp.Writer) (err error) { - // omitempty: check for empty values + // check for omitted fields zb0001Len := uint32(14) var zb0001Mask uint16 /* 14 bits */ _ = zb0001Mask @@ -285,6 +284,10 @@ func (z *span) EncodeMsg(en *msgp.Writer) (err error) { zb0001Len-- zb0001Mask |= 0x100 } + if z.SpanLinks == nil { + zb0001Len-- + zb0001Mask |= 0x2000 + } // variable map header, size zb0001Len err = en.Append(0x80 | uint8(zb0001Len)) if err != nil { @@ -353,7 +356,7 @@ func (z *span) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Duration") return } - if (zb0001Mask & 0x40) == 0 { // if not empty + if (zb0001Mask & 0x40) == 0 { // if not omitted // write "meta" err = en.Append(0xa4, 0x6d, 0x65, 0x74, 0x61) if err != nil { @@ -387,7 +390,7 @@ func (z *span) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "MetaStruct") return } - if (zb0001Mask & 0x100) == 0 { // if not empty + if (zb0001Mask & 0x100) == 0 { // if not omitted // write "metrics" err = en.Append(0xa7, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73) if err != nil { @@ -451,22 +454,24 @@ func (z *span) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Error") return } - // write "span_links" - err = en.Append(0xaa, 0x73, 0x70, 0x61, 0x6e, 0x5f, 0x6c, 0x69, 0x6e, 0x6b, 0x73) - if err != nil { - return - } - err = en.WriteArrayHeader(uint32(len(z.SpanLinks))) - if err != nil { - err = msgp.WrapError(err, "SpanLinks") - return - } - for za0005 := range z.SpanLinks { - err = z.SpanLinks[za0005].EncodeMsg(en) + if (zb0001Mask & 0x2000) == 0 { // if not omitted + // write "span_links" + err = en.Append(0xaa, 0x73, 0x70, 0x61, 0x6e, 0x5f, 0x6c, 0x69, 0x6e, 0x6b, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.SpanLinks))) if err != nil { - err = msgp.WrapError(err, "SpanLinks", za0005) + err = msgp.WrapError(err, "SpanLinks") return } + for za0005 := range z.SpanLinks { + err = z.SpanLinks[za0005].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "SpanLinks", za0005) + return + } + } } return } diff --git a/ddtrace/tracer/textmap.go b/ddtrace/tracer/textmap.go index 1fee0ba689..f7367adb83 100644 --- a/ddtrace/tracer/textmap.go +++ b/ddtrace/tracer/textmap.go @@ -262,55 +262,90 @@ func (p *chainedPropagator) Inject(spanCtx ddtrace.SpanContext, carrier interfac return nil } -// Extract implements Propagator. This method will attempt to extract the context +// Extract implements Propagator. This method will attempt to extract a span context // based on the precedence order of the propagators. Generally, the first valid -// trace context that could be extracted will be returned, and other extractors will -// be ignored. However, the W3C tracestate header value will always be extracted and -// stored in the local trace context even if a previous propagator has already succeeded -// so long as the trace-ids match. +// trace context that could be extracted will be returned. However, the W3C tracestate +// header value will always be extracted and stored in the local trace context even if +// a previous propagator has succeeded so long as the trace-ids match. +// Furthermore, if we have already successfully extracted a trace context and a +// subsequent trace context has conflicting trace information, such information will +// be relayed in the returned SpanContext with a SpanLink. func (p *chainedPropagator) Extract(carrier interface{}) (ddtrace.SpanContext, error) { var ctx ddtrace.SpanContext + var links []ddtrace.SpanLink for _, v := range p.extractors { - if ctx != nil { - // A local trace context has already been extracted. - pw3c, isW3C := v.(*propagatorW3c) - if !isW3C { - continue // Ignore other propagators. + firstExtract := (ctx == nil) // ctx stores the most recently extracted ctx across iterations; if it's nil, no extractor has run yet + extractedCtx, err := v.Extract(carrier) + if firstExtract { + if err != nil && err != ErrSpanContextNotFound { // We only care if the first extraction returns an error because this breaks distributed tracing + return nil, err } - w3cCtx, err := pw3c.Extract(carrier) - if err == nil && w3cCtx.(*spanContext).TraceID128() == ctx.(*spanContext).TraceID128() { - pw3c.propagateTracestate(ctx.(*spanContext), w3cCtx.(*spanContext)) - if w3cCtx.SpanID() != ctx.SpanID() { - var ddCtx *spanContext - if ddp := getDatadogPropagator(p); ddp != nil { - if ddSpanCtx, err := ddp.Extract(carrier); err == nil { - ddCtx, _ = ddSpanCtx.(*spanContext) + if p.onlyExtractFirst { // Return early if only performing one extraction + return extractedCtx.(*spanContext), nil + } + ctx = extractedCtx + } else { // A local trace context has already been extracted + extractedCtx2, ok1 := extractedCtx.(*spanContext) + ctx2, ok2 := ctx.(*spanContext) + // If we can't cast to spanContext, we can't propgate tracestate or create span links + if !ok1 || !ok2 { + continue + } + if extractedCtx2.TraceID128() == ctx2.TraceID128() { + if pW3C, ok := v.(*propagatorW3c); ok { + pW3C.propagateTracestate(ctx2, extractedCtx2) + // If trace IDs match but span IDs do not, use spanID from `*propagatorW3c` extractedCtx for parenting + if extractedCtx2.SpanID() != ctx2.SpanID() { + var ddCtx *spanContext + // Grab the datadog-propagated spancontext again + if ddp := getDatadogPropagator(p); ddp != nil { + if ddSpanCtx, err := ddp.Extract(carrier); err == nil { + ddCtx, _ = ddSpanCtx.(*spanContext) + } } + overrideDatadogParentID(ctx2, extractedCtx2, ddCtx) } - overrideDatadogParentID(ctx.(*spanContext), w3cCtx.(*spanContext), ddCtx) } + } else { // Trace IDs do not match - create span links + link := ddtrace.SpanLink{TraceID: extractedCtx2.TraceID(), SpanID: extractedCtx2.SpanID(), TraceIDHigh: extractedCtx2.TraceIDUpper(), Attributes: map[string]string{"reason": "terminated_context", "context_headers": getPropagatorName(v)}} + if trace := extractedCtx2.trace; trace != nil { + if flags := uint32(*trace.priority); flags > 0 { // Set the flags based on the sampling priority + link.Flags = 1 + } else { + link.Flags = 0 + } + link.Tracestate = extractedCtx2.trace.propagatingTag(tracestateHeader) + } + links = append(links, link) } - break - } - var err error - ctx, err = v.Extract(carrier) - if ctx != nil { - if p.onlyExtractFirst { - // Return early if the customer configured that only the first successful - // extraction should occur. - return ctx, nil - } - } else if err != ErrSpanContextNotFound { - return nil, err } } + // 0 successful extractions if ctx == nil { return nil, ErrSpanContextNotFound } + if spCtx, ok := ctx.(*spanContext); ok && len(links) > 0 { + spCtx.spanLinks = links + } log.Debug("Extracted span context: %#v", ctx) return ctx, nil } +func getPropagatorName(p Propagator) string { + switch p.(type) { + case *propagator: + return "datadog" + case *propagatorB3: + return "b3multi" + case *propagatorB3SingleHeader: + return "b3" + case *propagatorW3c: + return "tracecontext" + default: + return "" + } +} + // propagateTracestate will add the tracestate propagating tag to the given // *spanContext. The W3C trace context will be extracted from the provided // carrier. The trace id of this W3C trace context must match the trace id @@ -503,9 +538,13 @@ func getDatadogPropagator(cp *chainedPropagator) *propagator { return nil } -// overrideDatadogParentID overrides the span ID of a context with the ID extracted from tracecontext headers -// if the reparenting ID is not set on the context, the span ID from datadog headers is used. +// overrideDatadogParentID overrides the span ID of a context with the ID extracted from tracecontext headers. +// If the reparenting ID is not set on the context, the span ID from datadog headers is used. +// spanContexts are passed by reference to avoid copying lock value in spanContext type func overrideDatadogParentID(ctx, w3cCtx, ddCtx *spanContext) { + if ctx == nil || w3cCtx == nil || ddCtx == nil { + return + } ctx.spanID = w3cCtx.spanID if w3cCtx.reparentID != "" { ctx.reparentID = w3cCtx.reparentID diff --git a/ddtrace/tracer/textmap_test.go b/ddtrace/tracer/textmap_test.go index bbbd3b52c9..35a84523aa 100644 --- a/ddtrace/tracer/textmap_test.go +++ b/ddtrace/tracer/textmap_test.go @@ -209,7 +209,9 @@ func TestTextMapExtractTracestatePropagation(t *testing.T) { ctx, err := tracer.Extract(headers) assert.Nil(err) sctx, ok := ctx.(*spanContext) - assert.True(ok) + if !ok { + t.Fail() + } assert.Equal("00000000000000000000000000000004", sctx.traceID.HexEncoded()) if tc.conflictingParentID == true { // tracecontext span id should be used @@ -1957,6 +1959,95 @@ func TestTraceContextPrecedence(t *testing.T) { assert.Equal(2, p) } +// Assert that span links are generated only when trace headers contain divergent trace IDs +func TestSpanLinks(t *testing.T) { + s, c := httpmem.ServerAndClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(404) + })) + defer s.Close() + t.Run("Links on divergent trace IDs", func(t *testing.T) { + carrier := TextMapCarrier{ + DefaultTraceIDHeader: "1", + DefaultParentIDHeader: "1", + DefaultPriorityHeader: "3", + traceparentHeader: "00-00000000000000000000000000000002-0000000000000002-01", + tracestateHeader: "dd=s:1;o:rum;t.usr.id:baz64~~", + b3TraceIDHeader: "3", + b3SpanIDHeader: "3", + } + w3cLink := ddtrace.SpanLink{TraceID: 2, TraceIDHigh: 0, SpanID: 2, Tracestate: "dd=s:1;o:rum;t.usr.id:baz64~~", Flags: 1, Attributes: map[string]string{"reason": "terminated_context", "context_headers": "tracecontext"}} + ddLink := ddtrace.SpanLink{TraceID: 1, TraceIDHigh: 0, SpanID: 1, Flags: 1, Attributes: map[string]string{"reason": "terminated_context", "context_headers": "datadog"}} + b3Link := ddtrace.SpanLink{TraceID: 3, TraceIDHigh: 0, SpanID: 3, Tracestate: "", Flags: 0, Attributes: map[string]string{"reason": "terminated_context", "context_headers": "b3multi"}} + tests := []struct { + name string + envVal string + out []ddtrace.SpanLink + tid traceID + }{ + { + name: "datadog first", + envVal: "datadog,tracecontext,b3", + out: []ddtrace.SpanLink{w3cLink, b3Link}, + tid: traceIDFrom64Bits(1), + }, + { + name: "tracecontext first", + envVal: "tracecontext,datadog,b3", + out: []ddtrace.SpanLink{ddLink, b3Link}, + tid: traceIDFrom64Bits(2), + }, + { + name: "b3 first", + envVal: "b3,tracecontext,datadog", + out: []ddtrace.SpanLink{w3cLink, ddLink}, + tid: traceIDFrom64Bits(3), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.envVal != "" { + t.Setenv(headerPropagationStyleExtract, tt.envVal) + } + tracer := newTracer(WithHTTPClient(c)) + defer tracer.Stop() + assert := assert.New(t) + ctx, err := tracer.Extract(carrier) + if err != nil { + t.Fatal(err) + } + sctx, ok := ctx.(*spanContext) + assert.True(ok) + + assert.Equal(tt.tid, sctx.traceID) + assert.Len(sctx.spanLinks, 2) + assert.Contains(sctx.spanLinks, tt.out[0]) + assert.Contains(sctx.spanLinks, tt.out[1]) + }) + } + }) + t.Run("No links on equal trace IDs", func(t *testing.T) { + carrier := TextMapCarrier{ + DefaultTraceIDHeader: "1", + DefaultParentIDHeader: "1", + DefaultPriorityHeader: "3", + traceparentHeader: "00-00000000000000000000000000000001-0000000000000002-01", + tracestateHeader: "dd=s:1;o:rum;t.usr.id:baz64~~", + } + tracer := newTracer(WithHTTPClient(c)) + defer tracer.Stop() + assert := assert.New(t) + ctx, err := tracer.Extract(carrier) + if err != nil { + t.Fatal(err) + } + sctx, ok := ctx.(*spanContext) + assert.True(ok) + + assert.Equal(traceIDFrom64Bits(1), sctx.traceID) + assert.Len(sctx.spanLinks, 0) + }) +} + func TestW3CExtractsBaggage(t *testing.T) { tracer := newTracer() defer tracer.Stop() diff --git a/ddtrace/tracer/writer_test.go b/ddtrace/tracer/writer_test.go index 38af02b2d5..bcdf529a58 100644 --- a/ddtrace/tracer/writer_test.go +++ b/ddtrace/tracer/writer_test.go @@ -387,7 +387,7 @@ func TestTraceWriterFlushRetries(t *testing.T) { sentCounts := map[string]int64{ "datadog.tracer.decode_error": 1, - "datadog.tracer.flush_bytes": 197, + "datadog.tracer.flush_bytes": 185, "datadog.tracer.flush_traces": 1, } droppedCounts := map[string]int64{