diff --git a/contrib/confluentinc/confluent-kafka-go/internal/tracing/consumer.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/consumer.go new file mode 100644 index 0000000000..90678c4ed2 --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/consumer.go @@ -0,0 +1,86 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package tracing + +import ( + "math" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +func WrapConsumeEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, consumer Consumer, translateFn func(E) TE) chan E { + // in will be nil when consuming via the events channel is not enabled + if in == nil { + return nil + } + + out := make(chan E, 1) + go func() { + defer close(out) + for evt := range in { + tEvt := translateFn(evt) + var next ddtrace.Span + + // only trace messages + if msg, ok := tEvt.KafkaMessage(); ok { + next = tr.StartConsumeSpan(msg) + tr.SetConsumeCheckpoint(msg) + } else if offset, ok := tEvt.KafkaOffsetsCommitted(); ok { + tr.TrackCommitOffsets(offset.GetOffsets(), offset.GetError()) + tr.TrackHighWatermarkOffset(offset.GetOffsets(), consumer) + } + + out <- evt + + if tr.PrevSpan != nil { + tr.PrevSpan.Finish() + } + tr.PrevSpan = next + } + // finish any remaining span + if tr.PrevSpan != nil { + tr.PrevSpan.Finish() + tr.PrevSpan = nil + } + }() + return out +} + +func (tr *KafkaTracer) StartConsumeSpan(msg Message) ddtrace.Span { + opts := []tracer.StartSpanOption{ + tracer.ServiceName(tr.consumerServiceName), + tracer.ResourceName("Consume Topic " + msg.GetTopicPartition().GetTopic()), + tracer.SpanType(ext.SpanTypeMessageConsumer), + tracer.Tag(ext.MessagingKafkaPartition, msg.GetTopicPartition().GetPartition()), + tracer.Tag("offset", msg.GetTopicPartition().GetOffset()), + tracer.Tag(ext.Component, ComponentName(tr.ckgoVersion)), + tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), + tracer.Measured(), + } + if tr.bootstrapServers != "" { + opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, tr.bootstrapServers)) + } + if tr.tagFns != nil { + for key, tagFn := range tr.tagFns { + opts = append(opts, tracer.Tag(key, tagFn(msg))) + } + } + if !math.IsNaN(tr.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, tr.analyticsRate)) + } + // kafka supports headers, so try to extract a span context + carrier := MessageCarrier{msg: msg} + if spanctx, err := tracer.Extract(carrier); err == nil { + opts = append(opts, tracer.ChildOf(spanctx)) + } + span, _ := tracer.StartSpanFromContext(tr.ctx, tr.consumerSpanName, opts...) + // reinject the span context so consumers can pick it up + tracer.Inject(span.Context(), carrier) + return span +} diff --git a/contrib/confluentinc/confluent-kafka-go/internal/tracing/dsm.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/dsm.go new file mode 100644 index 0000000000..d27dc18ab8 --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/dsm.go @@ -0,0 +1,88 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package tracing + +import ( + "context" + + "gopkg.in/DataDog/dd-trace-go.v1/datastreams" + "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +func (tr *KafkaTracer) TrackCommitOffsets(offsets []TopicPartition, err error) { + if err != nil || tr.groupID == "" || !tr.dsmEnabled { + return + } + for _, tp := range offsets { + tracer.TrackKafkaCommitOffset(tr.groupID, tp.GetTopic(), tp.GetPartition(), tp.GetOffset()) + } +} + +func (tr *KafkaTracer) TrackHighWatermarkOffset(offsets []TopicPartition, consumer Consumer) { + if !tr.dsmEnabled { + return + } + for _, tp := range offsets { + if _, high, err := consumer.GetWatermarkOffsets(tp.GetTopic(), tp.GetPartition()); err == nil { + tracer.TrackKafkaHighWatermarkOffset("", tp.GetTopic(), tp.GetPartition(), high) + } + } +} + +func (tr *KafkaTracer) TrackProduceOffsets(msg Message) { + err := msg.GetTopicPartition().GetError() + if err != nil || !tr.dsmEnabled || msg.GetTopicPartition().GetTopic() == "" { + return + } + tp := msg.GetTopicPartition() + tracer.TrackKafkaProduceOffset(tp.GetTopic(), tp.GetPartition(), tp.GetOffset()) +} + +func (tr *KafkaTracer) SetConsumeCheckpoint(msg Message) { + if !tr.dsmEnabled || msg == nil { + return + } + edges := []string{"direction:in", "topic:" + msg.GetTopicPartition().GetTopic(), "type:kafka"} + if tr.groupID != "" { + edges = append(edges, "group:"+tr.groupID) + } + carrier := NewMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams( + datastreams.ExtractFromBase64Carrier(context.Background(), carrier), + options.CheckpointParams{PayloadSize: getMsgSize(msg)}, + edges..., + ) + if !ok { + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) +} + +func (tr *KafkaTracer) SetProduceCheckpoint(msg Message) { + if !tr.dsmEnabled || msg == nil { + return + } + edges := []string{"direction:out", "topic:" + msg.GetTopicPartition().GetTopic(), "type:kafka"} + carrier := NewMessageCarrier(msg) + ctx, ok := tracer.SetDataStreamsCheckpointWithParams( + datastreams.ExtractFromBase64Carrier(context.Background(), carrier), + options.CheckpointParams{PayloadSize: getMsgSize(msg)}, + edges..., + ) + if !ok || tr.librdKafkaVersion < 0x000b0400 { + // headers not supported before librdkafka >=0.11.4 + return + } + datastreams.InjectToBase64Carrier(ctx, carrier) +} + +func getMsgSize(msg Message) (size int64) { + for _, header := range msg.GetHeaders() { + size += int64(len(header.GetKey()) + len(header.GetValue())) + } + return size + int64(len(msg.GetValue())+len(msg.GetKey())) +} diff --git a/contrib/confluentinc/confluent-kafka-go/internal/tracing/kafka_tracer.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/kafka_tracer.go new file mode 100644 index 0000000000..b293c698ea --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/kafka_tracer.go @@ -0,0 +1,142 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package tracing + +import ( + "context" + "math" + "net" + "strings" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/internal" + "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema" +) + +const defaultServiceName = "kafka" + +type KafkaTracer struct { + PrevSpan ddtrace.Span + ctx context.Context + consumerServiceName string + producerServiceName string + consumerSpanName string + producerSpanName string + analyticsRate float64 + bootstrapServers string + groupID string + tagFns map[string]func(msg Message) interface{} + dsmEnabled bool + ckgoVersion CKGoVersion + librdKafkaVersion int +} + +func (tr *KafkaTracer) DSMEnabled() bool { + return tr.dsmEnabled +} + +// An Option customizes the KafkaTracer. +type Option func(tr *KafkaTracer) + +func NewKafkaTracer(ckgoVersion CKGoVersion, librdKafkaVersion int, opts ...Option) *KafkaTracer { + tr := &KafkaTracer{ + ctx: context.Background(), + // analyticsRate: globalconfig.AnalyticsRate(), + analyticsRate: math.NaN(), + ckgoVersion: ckgoVersion, + librdKafkaVersion: librdKafkaVersion, + } + tr.dsmEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false) + if internal.BoolEnv("DD_TRACE_KAFKA_ANALYTICS_ENABLED", false) { + tr.analyticsRate = 1.0 + } + + tr.consumerServiceName = namingschema.ServiceName(defaultServiceName) + tr.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName) + tr.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound) + tr.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound) + + for _, opt := range opts { + opt(tr) + } + return tr +} + +// WithContext sets the config context to ctx. +// Deprecated: This is deprecated in favor of passing the context +// via the message headers +func WithContext(ctx context.Context) Option { + return func(tr *KafkaTracer) { + tr.ctx = ctx + } +} + +// WithServiceName sets the config service name to serviceName. +func WithServiceName(serviceName string) Option { + return func(tr *KafkaTracer) { + tr.consumerServiceName = serviceName + tr.producerServiceName = serviceName + } +} + +// WithAnalytics enables Trace Analytics for all started spans. +func WithAnalytics(on bool) Option { + return func(tr *KafkaTracer) { + if on { + tr.analyticsRate = 1.0 + } else { + tr.analyticsRate = math.NaN() + } + } +} + +// WithAnalyticsRate sets the sampling rate for Trace Analytics events +// correlated to started spans. +func WithAnalyticsRate(rate float64) Option { + return func(tr *KafkaTracer) { + if rate >= 0.0 && rate <= 1.0 { + tr.analyticsRate = rate + } else { + tr.analyticsRate = math.NaN() + } + } +} + +// WithCustomTag will cause the given tagFn to be evaluated after executing +// a query and attach the result to the span tagged by the key. +func WithCustomTag(tag string, tagFn func(msg Message) interface{}) Option { + return func(tr *KafkaTracer) { + if tr.tagFns == nil { + tr.tagFns = make(map[string]func(msg Message) interface{}) + } + tr.tagFns[tag] = tagFn + } +} + +// WithConfig extracts the config information for the client to be tagged +func WithConfig(cg ConfigMap) Option { + return func(tr *KafkaTracer) { + if groupID, err := cg.Get("group.id", ""); err == nil { + tr.groupID = groupID.(string) + } + if bs, err := cg.Get("bootstrap.servers", ""); err == nil && bs != "" { + for _, addr := range strings.Split(bs.(string), ",") { + host, _, err := net.SplitHostPort(addr) + if err == nil { + tr.bootstrapServers = host + return + } + } + } + } +} + +// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/ +func WithDataStreams() Option { + return func(tr *KafkaTracer) { + tr.dsmEnabled = true + } +} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/option_test.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/kafka_tracer_test.go similarity index 65% rename from contrib/confluentinc/confluent-kafka-go/kafka.v2/option_test.go rename to contrib/confluentinc/confluent-kafka-go/internal/tracing/kafka_tracer_test.go index d990870fc5..f426458dbb 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/option_test.go +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/kafka_tracer_test.go @@ -3,7 +3,7 @@ // This product includes software developed at Datadog (https://www.datadoghq.com/). // Copyright 2016 Datadog, Inc. -package kafka +package tracing import ( "math" @@ -16,29 +16,29 @@ import ( func TestDataStreamsActivation(t *testing.T) { t.Run("default", func(t *testing.T) { - cfg := newConfig() - assert.False(t, cfg.dataStreamsEnabled) + tr := NewKafkaTracer(0, 0) + assert.False(t, tr.DSMEnabled()) }) t.Run("withOption", func(t *testing.T) { - cfg := newConfig(WithDataStreams()) - assert.True(t, cfg.dataStreamsEnabled) + tr := NewKafkaTracer(0, 0, WithDataStreams()) + assert.True(t, tr.DSMEnabled()) }) t.Run("withEnv", func(t *testing.T) { t.Setenv("DD_DATA_STREAMS_ENABLED", "true") - cfg := newConfig() - assert.True(t, cfg.dataStreamsEnabled) + tr := NewKafkaTracer(0, 0) + assert.True(t, tr.DSMEnabled()) }) t.Run("optionOverridesEnv", func(t *testing.T) { t.Setenv("DD_DATA_STREAMS_ENABLED", "false") - cfg := newConfig(WithDataStreams()) - assert.True(t, cfg.dataStreamsEnabled) + tr := NewKafkaTracer(0, 0, WithDataStreams()) + assert.True(t, tr.DSMEnabled()) }) } func TestAnalyticsSettings(t *testing.T) { t.Run("defaults", func(t *testing.T) { - cfg := newConfig() - assert.True(t, math.IsNaN(cfg.analyticsRate)) + tr := NewKafkaTracer(0, 0) + assert.True(t, math.IsNaN(tr.analyticsRate)) }) t.Run("global", func(t *testing.T) { @@ -47,13 +47,13 @@ func TestAnalyticsSettings(t *testing.T) { defer globalconfig.SetAnalyticsRate(rate) globalconfig.SetAnalyticsRate(0.4) - cfg := newConfig() - assert.Equal(t, 0.4, cfg.analyticsRate) + tr := NewKafkaTracer(0, 0) + assert.Equal(t, 0.4, tr.analyticsRate) }) t.Run("enabled", func(t *testing.T) { - cfg := newConfig(WithAnalytics(true)) - assert.Equal(t, 1.0, cfg.analyticsRate) + tr := NewKafkaTracer(0, 0, WithAnalytics(true)) + assert.Equal(t, 1.0, tr.analyticsRate) }) t.Run("override", func(t *testing.T) { @@ -61,7 +61,7 @@ func TestAnalyticsSettings(t *testing.T) { defer globalconfig.SetAnalyticsRate(rate) globalconfig.SetAnalyticsRate(0.4) - cfg := newConfig(WithAnalyticsRate(0.2)) - assert.Equal(t, 0.2, cfg.analyticsRate) + tr := NewKafkaTracer(0, 0, WithAnalyticsRate(0.2)) + assert.Equal(t, 0.2, tr.analyticsRate) }) } diff --git a/contrib/confluentinc/confluent-kafka-go/internal/tracing/message_carrier.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/message_carrier.go new file mode 100644 index 0000000000..5fbeecf9e0 --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/message_carrier.go @@ -0,0 +1,50 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package tracing + +import "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + +// A MessageCarrier implements TextMapReader/TextMapWriter for extracting/injecting traces on a kafka.msg +type MessageCarrier struct { + msg Message +} + +var _ interface { + tracer.TextMapReader + tracer.TextMapWriter +} = (*MessageCarrier)(nil) + +// ForeachKey conforms to the TextMapReader interface. +func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error { + for _, h := range c.msg.GetHeaders() { + err := handler(h.GetKey(), string(h.GetValue())) + if err != nil { + return err + } + } + return nil +} + +// Set implements TextMapWriter +func (c MessageCarrier) Set(key, val string) { + headers := c.msg.GetHeaders() + // ensure uniqueness of keys + for i := 0; i < len(headers); i++ { + if headers[i].GetKey() == key { + headers = append(headers[:i], headers[i+1:]...) + i-- + } + } + headers = append(headers, KafkaHeader{ + Key: key, + Value: []byte(val), + }) + c.msg.SetHeaders(headers) +} + +func NewMessageCarrier(msg Message) MessageCarrier { + return MessageCarrier{msg: msg} +} diff --git a/contrib/confluentinc/confluent-kafka-go/internal/tracing/producer.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/producer.go new file mode 100644 index 0000000000..25b043017f --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/producer.go @@ -0,0 +1,103 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package tracing + +import ( + "math" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +func WrapProduceChannel[M any, TM Message](tr *KafkaTracer, out chan M, translateFn func(M) TM) chan M { + if out == nil { + return out + } + in := make(chan M, 1) + go func() { + for msg := range in { + tMsg := translateFn(msg) + span := tr.StartProduceSpan(tMsg) + tr.SetProduceCheckpoint(tMsg) + out <- msg + span.Finish() + } + }() + return in +} + +func WrapProduceEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, translateFn func(E) TE) chan E { + if in == nil { + return nil + } + out := make(chan E, 1) + go func() { + defer close(out) + for evt := range in { + tEvt := translateFn(evt) + if msg, ok := tEvt.KafkaMessage(); ok { + tr.TrackProduceOffsets(msg) + } + out <- evt + } + }() + return out +} + +func (tr *KafkaTracer) StartProduceSpan(msg Message) ddtrace.Span { + opts := []tracer.StartSpanOption{ + tracer.ServiceName(tr.producerServiceName), + tracer.ResourceName("Produce Topic " + msg.GetTopicPartition().GetTopic()), + tracer.SpanType(ext.SpanTypeMessageProducer), + tracer.Tag(ext.Component, ComponentName(tr.ckgoVersion)), + tracer.Tag(ext.SpanKind, ext.SpanKindProducer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), + tracer.Tag(ext.MessagingKafkaPartition, msg.GetTopicPartition().GetPartition()), + } + if tr.bootstrapServers != "" { + opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, tr.bootstrapServers)) + } + if !math.IsNaN(tr.analyticsRate) { + opts = append(opts, tracer.Tag(ext.EventSampleRate, tr.analyticsRate)) + } + // if there's a span context in the headers, use that as the parent + carrier := NewMessageCarrier(msg) + if spanctx, err := tracer.Extract(carrier); err == nil { + opts = append(opts, tracer.ChildOf(spanctx)) + } + span, _ := tracer.StartSpanFromContext(tr.ctx, tr.producerSpanName, opts...) + // inject the span context so consumers can pick it up + tracer.Inject(span.Context(), carrier) + return span +} + +func WrapDeliveryChannel[E any, TE Event](tr *KafkaTracer, deliveryChan chan E, span ddtrace.Span, translateFn func(E) TE) (chan E, chan error) { + // if the user has selected a delivery channel, we will wrap it and + // wait for the delivery event to finish the span + if deliveryChan == nil { + return nil, nil + } + wrapped := make(chan E) + errChan := make(chan error, 1) + go func() { + var err error + select { + case evt := <-wrapped: + tEvt := translateFn(evt) + if msg, ok := tEvt.KafkaMessage(); ok { + // delivery errors are returned via TopicPartition.Error + err = msg.GetTopicPartition().GetError() + tr.TrackProduceOffsets(msg) + } + deliveryChan <- evt + case e := <-errChan: + err = e + } + span.Finish(tracer.WithError(err)) + }() + return wrapped, errChan +} diff --git a/contrib/confluentinc/confluent-kafka-go/internal/tracing/tracing.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/tracing.go new file mode 100644 index 0000000000..4e5f78a7d6 --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/tracing.go @@ -0,0 +1,35 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package tracing + +type CKGoVersion int32 + +const ( + CKGoVersion1 CKGoVersion = 1 + CKGoVersion2 CKGoVersion = 2 +) + +func ComponentName(v CKGoVersion) string { + switch v { + case CKGoVersion1: + return "confluentinc/confluent-kafka-go/kafka" + case CKGoVersion2: + return "confluentinc/confluent-kafka-go/kafka.v2" + default: + return "" + } +} + +func IntegrationName(v CKGoVersion) string { + switch v { + case CKGoVersion1: + return "github.com/confluentinc/confluent-kafka-go" + case CKGoVersion2: + return "github.com/confluentinc/confluent-kafka-go/v2" + default: + return "" + } +} diff --git a/contrib/confluentinc/confluent-kafka-go/internal/tracing/types.go b/contrib/confluentinc/confluent-kafka-go/internal/tracing/types.go new file mode 100644 index 0000000000..537c111341 --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/internal/tracing/types.go @@ -0,0 +1,66 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package tracing + +import ( + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" +) + +type Message interface { + GetValue() []byte + GetKey() []byte + GetHeaders() []Header + SetHeaders([]Header) + GetTopicPartition() TopicPartition + Unwrap() any +} + +type Header interface { + GetKey() string + GetValue() []byte +} + +type KafkaHeader struct { + Key string + Value []byte +} + +func (h KafkaHeader) GetKey() string { + return h.Key +} + +func (h KafkaHeader) GetValue() []byte { + return h.Value +} + +type OffsetsCommitted interface { + GetError() error + GetOffsets() []TopicPartition +} + +type TopicPartition interface { + GetTopic() string + GetPartition() int32 + GetOffset() int64 + GetError() error +} + +type Event interface { + KafkaMessage() (Message, bool) + KafkaOffsetsCommitted() (OffsetsCommitted, bool) +} + +type Consumer interface { + GetWatermarkOffsets(topic string, partition int32) (low int64, high int64, err error) +} + +type ConfigMap interface { + Get(key string, defval any) (any, error) +} + +type SpanStore struct { + Prev ddtrace.Span +} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/headers.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/headers.go index d940513ff8..c4061543ed 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/headers.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/headers.go @@ -6,48 +6,15 @@ package kafka import ( - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" -) - -// A MessageCarrier injects and extracts traces from a sarama.ProducerMessage. -type MessageCarrier struct { - msg *kafka.Message -} -var _ interface { - tracer.TextMapReader - tracer.TextMapWriter -} = (*MessageCarrier)(nil) - -// ForeachKey iterates over every header. -func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error { - for _, h := range c.msg.Headers { - err := handler(string(h.Key), string(h.Value)) - if err != nil { - return err - } - } - return nil -} + "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing" +) -// Set sets a header. -func (c MessageCarrier) Set(key, val string) { - // ensure uniqueness of keys - for i := 0; i < len(c.msg.Headers); i++ { - if string(c.msg.Headers[i].Key) == key { - c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...) - i-- - } - } - c.msg.Headers = append(c.msg.Headers, kafka.Header{ - Key: key, - Value: []byte(val), - }) -} +// A MessageCarrier injects and extracts traces from a kafka.Message. +type MessageCarrier = tracing.MessageCarrier // NewMessageCarrier creates a new MessageCarrier. func NewMessageCarrier(msg *kafka.Message) MessageCarrier { - return MessageCarrier{msg} + return tracing.NewMessageCarrier(wrapMessage(msg)) } diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go index f450c03f94..24bbe5a3d8 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go @@ -4,34 +4,32 @@ // Copyright 2016 Datadog, Inc. // Package kafka provides functions to trace the confluentinc/confluent-kafka-go package (https://github.com/confluentinc/confluent-kafka-go). -package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka" +package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka.v2" import ( - "context" - "math" "time" - "gopkg.in/DataDog/dd-trace-go.v1/datastreams" - "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + + "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" - - "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) const ( - // make sure these 3 are updated to V2 for the V2 version. - componentName = "confluentinc/confluent-kafka-go/kafka.v2" - packageName = "contrib/confluentinc/confluent-kafka-go/kafka.v2" - integrationName = "github.com/confluentinc/confluent-kafka-go/v2" + ckgoVersion = tracing.CKGoVersion2 + logPrefix = "contrib/confluentinc/confluent-kafka-go/kafka.v2" ) func init() { - telemetry.LoadIntegration(componentName) - tracer.MarkIntegrationImported(integrationName) + telemetry.LoadIntegration(tracing.ComponentName(ckgoVersion)) + tracer.MarkIntegrationImported(tracing.IntegrationName(ckgoVersion)) +} + +func newKafkaTracer(opts ...Option) *tracing.KafkaTracer { + v, _ := kafka.LibraryVersion() + return tracing.NewKafkaTracer(tracing.CKGoVersion2, v, opts...) } // NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer. @@ -57,93 +55,21 @@ func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) { // A Consumer wraps a kafka.Consumer. type Consumer struct { *kafka.Consumer - cfg *config + tracer *tracing.KafkaTracer events chan kafka.Event - prev ddtrace.Span } // WrapConsumer wraps a kafka.Consumer so that any consumed events are traced. func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer { wrapped := &Consumer{ Consumer: c, - cfg: newConfig(opts...), + tracer: newKafkaTracer(opts...), } - log.Debug("%s: Wrapping Consumer: %#v", packageName, wrapped.cfg) - wrapped.events = wrapped.traceEventsChannel(c.Events()) + log.Debug("%s: Wrapping Consumer: %#v", logPrefix, wrapped.tracer) + wrapped.events = tracing.WrapConsumeEventsChannel(wrapped.tracer, c.Events(), c, wrapEvent) return wrapped } -func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { - // in will be nil when consuming via the events channel is not enabled - if in == nil { - return nil - } - - out := make(chan kafka.Event, 1) - go func() { - defer close(out) - for evt := range in { - var next ddtrace.Span - - // only trace messages - if msg, ok := evt.(*kafka.Message); ok { - next = c.startSpan(msg) - setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) - } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { - commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) - c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets) - } - - out <- evt - - if c.prev != nil { - c.prev.Finish() - } - c.prev = next - } - // finish any remaining span - if c.prev != nil { - c.prev.Finish() - c.prev = nil - } - }() - return out -} - -func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span { - opts := []tracer.StartSpanOption{ - tracer.ServiceName(c.cfg.consumerServiceName), - tracer.ResourceName("Consume Topic " + *msg.TopicPartition.Topic), - tracer.SpanType(ext.SpanTypeMessageConsumer), - tracer.Tag(ext.MessagingKafkaPartition, msg.TopicPartition.Partition), - tracer.Tag("offset", msg.TopicPartition.Offset), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), - tracer.Measured(), - } - if c.cfg.bootstrapServers != "" { - opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, c.cfg.bootstrapServers)) - } - if c.cfg.tagFns != nil { - for key, tagFn := range c.cfg.tagFns { - opts = append(opts, tracer.Tag(key, tagFn(msg))) - } - } - if !math.IsNaN(c.cfg.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, c.cfg.analyticsRate)) - } - // kafka supports headers, so try to extract a span context - carrier := NewMessageCarrier(msg) - if spanctx, err := tracer.Extract(carrier); err == nil { - opts = append(opts, tracer.ChildOf(spanctx)) - } - span, _ := tracer.StartSpanFromContext(c.cfg.ctx, c.cfg.consumerSpanName, opts...) - // reinject the span context so consumers can pick it up - tracer.Inject(span.Context(), carrier) - return span -} - // Close calls the underlying Consumer.Close and if polling is enabled, finishes // any remaining span. func (c *Consumer) Close() error { @@ -151,178 +77,111 @@ func (c *Consumer) Close() error { // we only close the previous span if consuming via the events channel is // not enabled, because otherwise there would be a data race from the // consuming goroutine. - if c.events == nil && c.prev != nil { - c.prev.Finish() - c.prev = nil + if c.events == nil && c.tracer.PrevSpan != nil { + c.tracer.PrevSpan.Finish() + c.tracer.PrevSpan = nil } return err } -// Events returns the kafka Events channel (if enabled). Message events will be +// Events returns the kafka Events channel (if enabled). msg events will be // traced. func (c *Consumer) Events() chan kafka.Event { return c.events } -// Poll polls the consumer for messages or events. Message will be +// Poll polls the consumer for messages or events. msg will be // traced. func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) { - if c.prev != nil { - c.prev.Finish() - c.prev = nil + if c.tracer.PrevSpan != nil { + c.tracer.PrevSpan.Finish() + c.tracer.PrevSpan = nil } evt := c.Consumer.Poll(timeoutMS) if msg, ok := evt.(*kafka.Message); ok { - setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) - c.prev = c.startSpan(msg) + tMsg := wrapMessage(msg) + c.tracer.SetConsumeCheckpoint(tMsg) + c.tracer.PrevSpan = c.tracer.StartConsumeSpan(tMsg) } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { - commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) - c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets) + tOffsets := wrapTopicPartitions(offset.Offsets) + c.tracer.TrackCommitOffsets(tOffsets, offset.Error) + c.tracer.TrackHighWatermarkOffset(tOffsets, c.Consumer) } return evt } -func (c *Consumer) trackHighWatermark(dataStreamsEnabled bool, offsets []kafka.TopicPartition) { - if !dataStreamsEnabled { - return - } - for _, tp := range offsets { - if _, high, err := c.Consumer.GetWatermarkOffsets(*tp.Topic, tp.Partition); err == nil { - tracer.TrackKafkaHighWatermarkOffset("", *tp.Topic, tp.Partition, high) - } - } -} - -// ReadMessage polls the consumer for a message. Message will be traced. +// ReadMessage polls the consumer for a message. msg will be traced. func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { - if c.prev != nil { - c.prev.Finish() - c.prev = nil + if c.tracer.PrevSpan != nil { + c.tracer.PrevSpan.Finish() + c.tracer.PrevSpan = nil } msg, err := c.Consumer.ReadMessage(timeout) if err != nil { return nil, err } - setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) - c.prev = c.startSpan(msg) + tMsg := wrapMessage(msg) + c.tracer.SetConsumeCheckpoint(tMsg) + c.tracer.PrevSpan = c.tracer.StartConsumeSpan(tMsg) return msg, nil } // Commit commits current offsets and tracks the commit offsets if data streams is enabled. func (c *Consumer) Commit() ([]kafka.TopicPartition, error) { tps, err := c.Consumer.Commit() - commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) - c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) + tOffsets := wrapTopicPartitions(tps) + c.tracer.TrackCommitOffsets(tOffsets, err) + c.tracer.TrackHighWatermarkOffset(tOffsets, c.Consumer) return tps, err } // CommitMessage commits a message and tracks the commit offsets if data streams is enabled. func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) { tps, err := c.Consumer.CommitMessage(msg) - commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) - c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) + tOffsets := wrapTopicPartitions(tps) + c.tracer.TrackCommitOffsets(tOffsets, err) + c.tracer.TrackHighWatermarkOffset(tOffsets, c.Consumer) return tps, err } // CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled. func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) { tps, err := c.Consumer.CommitOffsets(offsets) - commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) - c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) + tOffsets := wrapTopicPartitions(tps) + c.tracer.TrackCommitOffsets(tOffsets, err) + c.tracer.TrackHighWatermarkOffset(tOffsets, c.Consumer) return tps, err } -func commitOffsets(dataStreamsEnabled bool, groupID string, tps []kafka.TopicPartition, err error) { - if err != nil || groupID == "" || !dataStreamsEnabled { - return - } - for _, tp := range tps { - tracer.TrackKafkaCommitOffset(groupID, *tp.Topic, tp.Partition, int64(tp.Offset)) - } -} - -func trackProduceOffsets(dataStreamsEnabled bool, msg *kafka.Message, err error) { - if err != nil || !dataStreamsEnabled || msg.TopicPartition.Topic == nil { - return - } - tracer.TrackKafkaProduceOffset(*msg.TopicPartition.Topic, msg.TopicPartition.Partition, int64(msg.TopicPartition.Offset)) -} - // A Producer wraps a kafka.Producer. type Producer struct { *kafka.Producer - cfg *config + tracer *tracing.KafkaTracer produceChannel chan *kafka.Message events chan kafka.Event - libraryVersion int } // WrapProducer wraps a kafka.Producer so requests are traced. func WrapProducer(p *kafka.Producer, opts ...Option) *Producer { - version, _ := kafka.LibraryVersion() wrapped := &Producer{ - Producer: p, - cfg: newConfig(opts...), - events: p.Events(), - libraryVersion: version, + Producer: p, + tracer: newKafkaTracer(opts...), + events: p.Events(), } - log.Debug("%s: Wrapping Producer: %#v", packageName, wrapped.cfg) - wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel()) - if wrapped.cfg.dataStreamsEnabled { - wrapped.events = wrapped.traceEventsChannel(p.Events()) + log.Debug("%s: Wrapping Producer: %#v", logPrefix, wrapped.tracer) + wrapped.produceChannel = tracing.WrapProduceChannel(wrapped.tracer, p.ProduceChannel(), wrapMessage) + if wrapped.tracer.DSMEnabled() { + wrapped.events = tracing.WrapProduceEventsChannel(wrapped.tracer, p.Events(), wrapEvent) } return wrapped } -// Events returns the kafka Events channel (if enabled). Message events will be monitored +// Events returns the kafka Events channel (if enabled). msg events will be monitored // with data streams monitoring (if enabled) func (p *Producer) Events() chan kafka.Event { return p.events } -func (p *Producer) traceProduceChannel(out chan *kafka.Message) chan *kafka.Message { - if out == nil { - return out - } - in := make(chan *kafka.Message, 1) - go func() { - for msg := range in { - span := p.startSpan(msg) - setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg) - out <- msg - span.Finish() - } - }() - return in -} - -func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span { - opts := []tracer.StartSpanOption{ - tracer.ServiceName(p.cfg.producerServiceName), - tracer.ResourceName("Produce Topic " + *msg.TopicPartition.Topic), - tracer.SpanType(ext.SpanTypeMessageProducer), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindProducer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), - tracer.Tag(ext.MessagingKafkaPartition, msg.TopicPartition.Partition), - } - if p.cfg.bootstrapServers != "" { - opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, p.cfg.bootstrapServers)) - } - if !math.IsNaN(p.cfg.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, p.cfg.analyticsRate)) - } - //if there's a span context in the headers, use that as the parent - carrier := NewMessageCarrier(msg) - if spanctx, err := tracer.Extract(carrier); err == nil { - opts = append(opts, tracer.ChildOf(spanctx)) - } - span, _ := tracer.StartSpanFromContext(p.cfg.ctx, p.cfg.producerSpanName, opts...) - // inject the span context so consumers can pick it up - tracer.Inject(span.Context(), carrier) - return span -} - // Close calls the underlying Producer.Close and also closes the internal // wrapping producer channel. func (p *Producer) Close() { @@ -332,42 +191,20 @@ func (p *Producer) Close() { // Produce calls the underlying Producer.Produce and traces the request. func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error { - span := p.startSpan(msg) + tMsg := wrapMessage(msg) + span := p.tracer.StartProduceSpan(tMsg) var errChan chan error + deliveryChan, errChan = tracing.WrapDeliveryChannel(p.tracer, deliveryChan, span, wrapEvent) - // if the user has selected a delivery channel, we will wrap it and - // wait for the delivery event to finish the span. - // in case the Produce call returns an error, we won't receive anything - // in the deliveryChan, so we use errChan to make sure this goroutine exits. - if deliveryChan != nil { - errChan = make(chan error, 1) - oldDeliveryChan := deliveryChan - deliveryChan = make(chan kafka.Event) - go func() { - var err error - select { - case evt := <-deliveryChan: - if msg, ok := evt.(*kafka.Message); ok { - // delivery errors are returned via TopicPartition.Error - err = msg.TopicPartition.Error - trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err) - } - oldDeliveryChan <- evt - - case e := <-errChan: - err = e - } - span.Finish(tracer.WithError(err)) - }() - } + p.tracer.SetProduceCheckpoint(tMsg) - setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg) err := p.Producer.Produce(msg, deliveryChan) if err != nil { - if deliveryChan != nil { + if errChan != nil { errChan <- err } else { + // with no delivery channel or enqueue error, finish immediately span.Finish(tracer.WithError(err)) } } @@ -379,57 +216,3 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er func (p *Producer) ProduceChannel() chan *kafka.Message { return p.produceChannel } - -func (p *Producer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { - if in == nil { - return nil - } - out := make(chan kafka.Event, 1) - go func() { - defer close(out) - for evt := range in { - if msg, ok := evt.(*kafka.Message); ok { - trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, msg.TopicPartition.Error) - } - out <- evt - } - }() - return out -} - -func setConsumeCheckpoint(dataStreamsEnabled bool, groupID string, msg *kafka.Message) { - if !dataStreamsEnabled || msg == nil { - return - } - edges := []string{"direction:in", "topic:" + *msg.TopicPartition.Topic, "type:kafka"} - if groupID != "" { - edges = append(edges, "group:"+groupID) - } - carrier := NewMessageCarrier(msg) - ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getMsgSize(msg)}, edges...) - if !ok { - return - } - datastreams.InjectToBase64Carrier(ctx, carrier) -} - -func setProduceCheckpoint(dataStreamsEnabled bool, version int, msg *kafka.Message) { - if !dataStreamsEnabled || msg == nil { - return - } - edges := []string{"direction:out", "topic:" + *msg.TopicPartition.Topic, "type:kafka"} - carrier := NewMessageCarrier(msg) - ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getMsgSize(msg)}, edges...) - if !ok || version < 0x000b0400 { - // headers not supported before librdkafka >=0.11.4 - return - } - datastreams.InjectToBase64Carrier(ctx, carrier) -} - -func getMsgSize(msg *kafka.Message) (size int64) { - for _, header := range msg.Headers { - size += int64(len(header.Key) + len(header.Value)) - } - return size + int64(len(msg.Value)+len(msg.Key)) -} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go index 6f3112a69a..e57288598b 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go @@ -89,8 +89,8 @@ func TestConsumerChannel(t *testing.T) { assert.Equal(t, "queue", s.Tag(ext.SpanType)) assert.Equal(t, int32(1), s.Tag(ext.MessagingKafkaPartition)) assert.Equal(t, 0.3, s.Tag(ext.EventSampleRate)) - assert.Equal(t, kafka.Offset(i+1), s.Tag("offset")) - assert.Equal(t, componentName, s.Tag(ext.Component)) + assert.EqualValues(t, kafka.Offset(i+1), s.Tag("offset")) + assert.Equal(t, "confluentinc/confluent-kafka-go/kafka.v2", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) } @@ -137,7 +137,7 @@ func TestConsumerFunctional(t *testing.T) { assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s0.Tag(ext.SpanType)) assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, componentName, s0.Tag(ext.Component)) + assert.Equal(t, "confluentinc/confluent-kafka-go/kafka.v2", s0.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) assert.Equal(t, "127.0.0.1", s0.Tag(ext.KafkaBootstrapServers)) @@ -149,7 +149,7 @@ func TestConsumerFunctional(t *testing.T) { assert.Equal(t, nil, s1.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s1.Tag(ext.SpanType)) assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, componentName, s1.Tag(ext.Component)) + assert.Equal(t, "confluentinc/confluent-kafka-go/kafka.v2", s1.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) assert.Equal(t, "127.0.0.1", s1.Tag(ext.KafkaBootstrapServers)) @@ -302,7 +302,9 @@ func TestProduceError(t *testing.T) { err := goleak.Find() if err != nil { // if a goroutine is leaking, ensure it is not coming from this package - assert.NotContains(t, err.Error(), "contrib/confluentinc/confluent-kafka-go") + if strings.Contains(err.Error(), "contrib/confluentinc/confluent-kafka-go") { + assert.NoError(t, err, "found leaked goroutine(s) from this package") + } } }() @@ -399,7 +401,7 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO // they should be linked via headers assert.Equal(t, spans[0].TraceID(), spans[1].TraceID()) - if c.cfg.dataStreamsEnabled { + if c.tracer.DSMEnabled() { backlogs := mt.SentDSMBacklogs() toMap := func(_ []internaldsm.Backlog) map[string]struct{} { m := make(map[string]struct{}) diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/option.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/option.go index 8c7b8fed05..4061e2b6fa 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/option.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/option.go @@ -6,129 +6,45 @@ package kafka import ( - "context" - "math" - "net" - "strings" - - "gopkg.in/DataDog/dd-trace-go.v1/internal" - "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" -) - -const defaultServiceName = "kafka" -type config struct { - ctx context.Context - consumerServiceName string - producerServiceName string - consumerSpanName string - producerSpanName string - analyticsRate float64 - bootstrapServers string - groupID string - tagFns map[string]func(msg *kafka.Message) interface{} - dataStreamsEnabled bool -} + "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing" +) // An Option customizes the config. -type Option func(cfg *config) - -func newConfig(opts ...Option) *config { - cfg := &config{ - ctx: context.Background(), - // analyticsRate: globalconfig.AnalyticsRate(), - analyticsRate: math.NaN(), - } - cfg.dataStreamsEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false) - if internal.BoolEnv("DD_TRACE_KAFKA_ANALYTICS_ENABLED", false) { - cfg.analyticsRate = 1.0 - } - - cfg.consumerServiceName = namingschema.ServiceName(defaultServiceName) - cfg.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName) - cfg.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound) - cfg.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound) - - for _, opt := range opts { - opt(cfg) - } - return cfg -} +type Option = tracing.Option // WithContext sets the config context to ctx. // Deprecated: This is deprecated in favor of passing the context // via the message headers -func WithContext(ctx context.Context) Option { - return func(cfg *config) { - cfg.ctx = ctx - } -} +var WithContext = tracing.WithContext // WithServiceName sets the config service name to serviceName. -func WithServiceName(serviceName string) Option { - return func(cfg *config) { - cfg.consumerServiceName = serviceName - cfg.producerServiceName = serviceName - } -} +var WithServiceName = tracing.WithServiceName // WithAnalytics enables Trace Analytics for all started spans. -func WithAnalytics(on bool) Option { - return func(cfg *config) { - if on { - cfg.analyticsRate = 1.0 - } else { - cfg.analyticsRate = math.NaN() - } - } -} +var WithAnalytics = tracing.WithAnalytics // WithAnalyticsRate sets the sampling rate for Trace Analytics events // correlated to started spans. -func WithAnalyticsRate(rate float64) Option { - return func(cfg *config) { - if rate >= 0.0 && rate <= 1.0 { - cfg.analyticsRate = rate - } else { - cfg.analyticsRate = math.NaN() - } - } -} +var WithAnalyticsRate = tracing.WithAnalyticsRate // WithCustomTag will cause the given tagFn to be evaluated after executing // a query and attach the result to the span tagged by the key. func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Option { - return func(cfg *config) { - if cfg.tagFns == nil { - cfg.tagFns = make(map[string]func(msg *kafka.Message) interface{}) + wrapped := func(msg tracing.Message) interface{} { + if m, ok := msg.Unwrap().(*kafka.Message); ok { + return tagFn(m) } - cfg.tagFns[tag] = tagFn + return nil } + return tracing.WithCustomTag(tag, wrapped) } // WithConfig extracts the config information for the client to be tagged -func WithConfig(cg *kafka.ConfigMap) Option { - return func(cfg *config) { - if groupID, err := cg.Get("group.id", ""); err == nil { - cfg.groupID = groupID.(string) - } - if bs, err := cg.Get("bootstrap.servers", ""); err == nil && bs != "" { - for _, addr := range strings.Split(bs.(string), ",") { - host, _, err := net.SplitHostPort(addr) - if err == nil { - cfg.bootstrapServers = host - return - } - } - } - } +func WithConfig(cm *kafka.ConfigMap) Option { + return tracing.WithConfig(wrapConfigMap(cm)) } // WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/ -func WithDataStreams() Option { - return func(cfg *config) { - cfg.dataStreamsEnabled = true - } -} +var WithDataStreams = tracing.WithDataStreams diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/tracing.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/tracing.go new file mode 100644 index 0000000000..02f96217ec --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/tracing.go @@ -0,0 +1,163 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package kafka + +import ( + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + + "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing" +) + +type wMessage struct { + *kafka.Message +} + +func wrapMessage(msg *kafka.Message) tracing.Message { + if msg == nil { + return nil + } + return &wMessage{msg} +} + +func (w *wMessage) Unwrap() any { + return w.Message +} + +func (w *wMessage) GetValue() []byte { + return w.Message.Value +} + +func (w *wMessage) GetKey() []byte { + return w.Message.Key +} + +func (w *wMessage) GetHeaders() []tracing.Header { + hs := make([]tracing.Header, 0, len(w.Headers)) + for _, h := range w.Headers { + hs = append(hs, wrapHeader(h)) + } + return hs +} + +func (w *wMessage) SetHeaders(headers []tracing.Header) { + hs := make([]kafka.Header, 0, len(headers)) + for _, h := range headers { + hs = append(hs, kafka.Header{ + Key: h.GetKey(), + Value: h.GetValue(), + }) + } + w.Message.Headers = hs +} + +func (w *wMessage) GetTopicPartition() tracing.TopicPartition { + return wrapTopicPartition(w.Message.TopicPartition) +} + +type wHeader struct { + kafka.Header +} + +func wrapHeader(h kafka.Header) tracing.Header { + return &wHeader{h} +} + +func (w wHeader) GetKey() string { + return w.Header.Key +} + +func (w wHeader) GetValue() []byte { + return w.Header.Value +} + +type wTopicPartition struct { + kafka.TopicPartition +} + +func wrapTopicPartition(tp kafka.TopicPartition) tracing.TopicPartition { + return wTopicPartition{tp} +} + +func wrapTopicPartitions(tps []kafka.TopicPartition) []tracing.TopicPartition { + wtps := make([]tracing.TopicPartition, 0, len(tps)) + for _, tp := range tps { + wtps = append(wtps, wTopicPartition{tp}) + } + return wtps +} + +func (w wTopicPartition) GetTopic() string { + if w.Topic == nil { + return "" + } + return *w.Topic +} + +func (w wTopicPartition) GetPartition() int32 { + return w.Partition +} + +func (w wTopicPartition) GetOffset() int64 { + return int64(w.Offset) +} + +func (w wTopicPartition) GetError() error { + return w.Error +} + +type wEvent struct { + kafka.Event +} + +func wrapEvent(event kafka.Event) tracing.Event { + return wEvent{event} +} + +func (w wEvent) KafkaMessage() (tracing.Message, bool) { + if m, ok := w.Event.(*kafka.Message); ok { + return wrapMessage(m), true + } + return nil, false +} + +func (w wEvent) KafkaOffsetsCommitted() (tracing.OffsetsCommitted, bool) { + if oc, ok := w.Event.(kafka.OffsetsCommitted); ok { + return wrapOffsetsCommitted(oc), true + } + return nil, false +} + +type wOffsetsCommitted struct { + kafka.OffsetsCommitted +} + +func wrapOffsetsCommitted(oc kafka.OffsetsCommitted) tracing.OffsetsCommitted { + return wOffsetsCommitted{oc} +} + +func (w wOffsetsCommitted) GetError() error { + return w.Error +} + +func (w wOffsetsCommitted) GetOffsets() []tracing.TopicPartition { + ttps := make([]tracing.TopicPartition, 0, len(w.Offsets)) + for _, tp := range w.Offsets { + ttps = append(ttps, wrapTopicPartition(tp)) + } + return ttps +} + +type wConfigMap struct { + cfg *kafka.ConfigMap +} + +func wrapConfigMap(cm *kafka.ConfigMap) tracing.ConfigMap { + return &wConfigMap{cm} +} + +func (w *wConfigMap) Get(key string, defVal any) (any, error) { + return w.cfg.Get(key, defVal) +} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/headers.go b/contrib/confluentinc/confluent-kafka-go/kafka/headers.go index 3f66e98be5..e29f88f30c 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/headers.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/headers.go @@ -6,48 +6,15 @@ package kafka import ( - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - "github.com/confluentinc/confluent-kafka-go/kafka" -) - -// A MessageCarrier injects and extracts traces from a sarama.ProducerMessage. -type MessageCarrier struct { - msg *kafka.Message -} -var _ interface { - tracer.TextMapReader - tracer.TextMapWriter -} = (*MessageCarrier)(nil) - -// ForeachKey iterates over every header. -func (c MessageCarrier) ForeachKey(handler func(key, val string) error) error { - for _, h := range c.msg.Headers { - err := handler(string(h.Key), string(h.Value)) - if err != nil { - return err - } - } - return nil -} + "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing" +) -// Set sets a header. -func (c MessageCarrier) Set(key, val string) { - // ensure uniqueness of keys - for i := 0; i < len(c.msg.Headers); i++ { - if string(c.msg.Headers[i].Key) == key { - c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...) - i-- - } - } - c.msg.Headers = append(c.msg.Headers, kafka.Header{ - Key: key, - Value: []byte(val), - }) -} +// A MessageCarrier injects and extracts traces from a kafka.Message. +type MessageCarrier = tracing.MessageCarrier // NewMessageCarrier creates a new MessageCarrier. func NewMessageCarrier(msg *kafka.Message) MessageCarrier { - return MessageCarrier{msg} + return tracing.NewMessageCarrier(wrapMessage(msg)) } diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go index b61388d3bb..b4f5485c37 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go @@ -7,31 +7,29 @@ package kafka // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/kafka" import ( - "context" - "math" "time" - "gopkg.in/DataDog/dd-trace-go.v1/datastreams" - "gopkg.in/DataDog/dd-trace-go.v1/datastreams/options" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "github.com/confluentinc/confluent-kafka-go/kafka" + + "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" - - "github.com/confluentinc/confluent-kafka-go/kafka" ) const ( - // make sure these 3 are updated to V2 for the V2 version. - componentName = "confluentinc/confluent-kafka-go/kafka" - packageName = "contrib/confluentinc/confluent-kafka-go/kafka" - integrationName = "github.com/confluentinc/confluent-kafka-go" + ckgoVersion = tracing.CKGoVersion1 + logPrefix = "contrib/confluentinc/confluent-kafka-go/kafka" ) func init() { - telemetry.LoadIntegration(componentName) - tracer.MarkIntegrationImported(integrationName) + telemetry.LoadIntegration(tracing.ComponentName(ckgoVersion)) + tracer.MarkIntegrationImported(tracing.IntegrationName(ckgoVersion)) +} + +func newKafkaTracer(opts ...Option) *tracing.KafkaTracer { + v, _ := kafka.LibraryVersion() + return tracing.NewKafkaTracer(tracing.CKGoVersion1, v, opts...) } // NewConsumer calls kafka.NewConsumer and wraps the resulting Consumer. @@ -57,93 +55,21 @@ func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) { // A Consumer wraps a kafka.Consumer. type Consumer struct { *kafka.Consumer - cfg *config + tracer *tracing.KafkaTracer events chan kafka.Event - prev ddtrace.Span } // WrapConsumer wraps a kafka.Consumer so that any consumed events are traced. func WrapConsumer(c *kafka.Consumer, opts ...Option) *Consumer { wrapped := &Consumer{ Consumer: c, - cfg: newConfig(opts...), + tracer: newKafkaTracer(opts...), } - log.Debug("%s: Wrapping Consumer: %#v", packageName, wrapped.cfg) - wrapped.events = wrapped.traceEventsChannel(c.Events()) + log.Debug("%s: Wrapping Consumer: %#v", logPrefix, wrapped.tracer) + wrapped.events = tracing.WrapConsumeEventsChannel(wrapped.tracer, c.Events(), c, wrapEvent) return wrapped } -func (c *Consumer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { - // in will be nil when consuming via the events channel is not enabled - if in == nil { - return nil - } - - out := make(chan kafka.Event, 1) - go func() { - defer close(out) - for evt := range in { - var next ddtrace.Span - - // only trace messages - if msg, ok := evt.(*kafka.Message); ok { - next = c.startSpan(msg) - setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) - } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { - commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) - c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets) - } - - out <- evt - - if c.prev != nil { - c.prev.Finish() - } - c.prev = next - } - // finish any remaining span - if c.prev != nil { - c.prev.Finish() - c.prev = nil - } - }() - return out -} - -func (c *Consumer) startSpan(msg *kafka.Message) ddtrace.Span { - opts := []tracer.StartSpanOption{ - tracer.ServiceName(c.cfg.consumerServiceName), - tracer.ResourceName("Consume Topic " + *msg.TopicPartition.Topic), - tracer.SpanType(ext.SpanTypeMessageConsumer), - tracer.Tag(ext.MessagingKafkaPartition, msg.TopicPartition.Partition), - tracer.Tag("offset", msg.TopicPartition.Offset), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), - tracer.Measured(), - } - if c.cfg.bootstrapServers != "" { - opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, c.cfg.bootstrapServers)) - } - if c.cfg.tagFns != nil { - for key, tagFn := range c.cfg.tagFns { - opts = append(opts, tracer.Tag(key, tagFn(msg))) - } - } - if !math.IsNaN(c.cfg.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, c.cfg.analyticsRate)) - } - // kafka supports headers, so try to extract a span context - carrier := NewMessageCarrier(msg) - if spanctx, err := tracer.Extract(carrier); err == nil { - opts = append(opts, tracer.ChildOf(spanctx)) - } - span, _ := tracer.StartSpanFromContext(c.cfg.ctx, c.cfg.consumerSpanName, opts...) - // reinject the span context so consumers can pick it up - tracer.Inject(span.Context(), carrier) - return span -} - // Close calls the underlying Consumer.Close and if polling is enabled, finishes // any remaining span. func (c *Consumer) Close() error { @@ -151,178 +77,111 @@ func (c *Consumer) Close() error { // we only close the previous span if consuming via the events channel is // not enabled, because otherwise there would be a data race from the // consuming goroutine. - if c.events == nil && c.prev != nil { - c.prev.Finish() - c.prev = nil + if c.events == nil && c.tracer.PrevSpan != nil { + c.tracer.PrevSpan.Finish() + c.tracer.PrevSpan = nil } return err } -// Events returns the kafka Events channel (if enabled). Message events will be +// Events returns the kafka Events channel (if enabled). msg events will be // traced. func (c *Consumer) Events() chan kafka.Event { return c.events } -// Poll polls the consumer for messages or events. Message will be +// Poll polls the consumer for messages or events. msg will be // traced. func (c *Consumer) Poll(timeoutMS int) (event kafka.Event) { - if c.prev != nil { - c.prev.Finish() - c.prev = nil + if c.tracer.PrevSpan != nil { + c.tracer.PrevSpan.Finish() + c.tracer.PrevSpan = nil } evt := c.Consumer.Poll(timeoutMS) if msg, ok := evt.(*kafka.Message); ok { - setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) - c.prev = c.startSpan(msg) + tMsg := wrapMessage(msg) + c.tracer.SetConsumeCheckpoint(tMsg) + c.tracer.PrevSpan = c.tracer.StartConsumeSpan(tMsg) } else if offset, ok := evt.(kafka.OffsetsCommitted); ok { - commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, offset.Offsets, offset.Error) - c.trackHighWatermark(c.cfg.dataStreamsEnabled, offset.Offsets) + tOffsets := wrapTopicPartitions(offset.Offsets) + c.tracer.TrackCommitOffsets(tOffsets, offset.Error) + c.tracer.TrackHighWatermarkOffset(tOffsets, c.Consumer) } return evt } -func (c *Consumer) trackHighWatermark(dataStreamsEnabled bool, offsets []kafka.TopicPartition) { - if !dataStreamsEnabled { - return - } - for _, tp := range offsets { - if _, high, err := c.Consumer.GetWatermarkOffsets(*tp.Topic, tp.Partition); err == nil { - tracer.TrackKafkaHighWatermarkOffset("", *tp.Topic, tp.Partition, high) - } - } -} - -// ReadMessage polls the consumer for a message. Message will be traced. +// ReadMessage polls the consumer for a message. msg will be traced. func (c *Consumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { - if c.prev != nil { - c.prev.Finish() - c.prev = nil + if c.tracer.PrevSpan != nil { + c.tracer.PrevSpan.Finish() + c.tracer.PrevSpan = nil } msg, err := c.Consumer.ReadMessage(timeout) if err != nil { return nil, err } - setConsumeCheckpoint(c.cfg.dataStreamsEnabled, c.cfg.groupID, msg) - c.prev = c.startSpan(msg) + tMsg := wrapMessage(msg) + c.tracer.SetConsumeCheckpoint(tMsg) + c.tracer.PrevSpan = c.tracer.StartConsumeSpan(tMsg) return msg, nil } // Commit commits current offsets and tracks the commit offsets if data streams is enabled. func (c *Consumer) Commit() ([]kafka.TopicPartition, error) { tps, err := c.Consumer.Commit() - commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) - c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) + tOffsets := wrapTopicPartitions(tps) + c.tracer.TrackCommitOffsets(tOffsets, err) + c.tracer.TrackHighWatermarkOffset(tOffsets, c.Consumer) return tps, err } // CommitMessage commits a message and tracks the commit offsets if data streams is enabled. func (c *Consumer) CommitMessage(msg *kafka.Message) ([]kafka.TopicPartition, error) { tps, err := c.Consumer.CommitMessage(msg) - commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) - c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) + tOffsets := wrapTopicPartitions(tps) + c.tracer.TrackCommitOffsets(tOffsets, err) + c.tracer.TrackHighWatermarkOffset(tOffsets, c.Consumer) return tps, err } // CommitOffsets commits provided offsets and tracks the commit offsets if data streams is enabled. func (c *Consumer) CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error) { tps, err := c.Consumer.CommitOffsets(offsets) - commitOffsets(c.cfg.dataStreamsEnabled, c.cfg.groupID, tps, err) - c.trackHighWatermark(c.cfg.dataStreamsEnabled, tps) + tOffsets := wrapTopicPartitions(tps) + c.tracer.TrackCommitOffsets(tOffsets, err) + c.tracer.TrackHighWatermarkOffset(tOffsets, c.Consumer) return tps, err } -func commitOffsets(dataStreamsEnabled bool, groupID string, tps []kafka.TopicPartition, err error) { - if err != nil || groupID == "" || !dataStreamsEnabled { - return - } - for _, tp := range tps { - tracer.TrackKafkaCommitOffset(groupID, *tp.Topic, tp.Partition, int64(tp.Offset)) - } -} - -func trackProduceOffsets(dataStreamsEnabled bool, msg *kafka.Message, err error) { - if err != nil || !dataStreamsEnabled || msg.TopicPartition.Topic == nil { - return - } - tracer.TrackKafkaProduceOffset(*msg.TopicPartition.Topic, msg.TopicPartition.Partition, int64(msg.TopicPartition.Offset)) -} - // A Producer wraps a kafka.Producer. type Producer struct { *kafka.Producer - cfg *config + tracer *tracing.KafkaTracer produceChannel chan *kafka.Message events chan kafka.Event - libraryVersion int } // WrapProducer wraps a kafka.Producer so requests are traced. func WrapProducer(p *kafka.Producer, opts ...Option) *Producer { - version, _ := kafka.LibraryVersion() wrapped := &Producer{ - Producer: p, - cfg: newConfig(opts...), - events: p.Events(), - libraryVersion: version, + Producer: p, + tracer: newKafkaTracer(opts...), + events: p.Events(), } - log.Debug("%s: Wrapping Producer: %#v", packageName, wrapped.cfg) - wrapped.produceChannel = wrapped.traceProduceChannel(p.ProduceChannel()) - if wrapped.cfg.dataStreamsEnabled { - wrapped.events = wrapped.traceEventsChannel(p.Events()) + log.Debug("%s: Wrapping Producer: %#v", logPrefix, wrapped.tracer) + wrapped.produceChannel = tracing.WrapProduceChannel(wrapped.tracer, p.ProduceChannel(), wrapMessage) + if wrapped.tracer.DSMEnabled() { + wrapped.events = tracing.WrapProduceEventsChannel(wrapped.tracer, p.Events(), wrapEvent) } return wrapped } -// Events returns the kafka Events channel (if enabled). Message events will be monitored +// Events returns the kafka Events channel (if enabled). msg events will be monitored // with data streams monitoring (if enabled) func (p *Producer) Events() chan kafka.Event { return p.events } -func (p *Producer) traceProduceChannel(out chan *kafka.Message) chan *kafka.Message { - if out == nil { - return out - } - in := make(chan *kafka.Message, 1) - go func() { - for msg := range in { - span := p.startSpan(msg) - setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg) - out <- msg - span.Finish() - } - }() - return in -} - -func (p *Producer) startSpan(msg *kafka.Message) ddtrace.Span { - opts := []tracer.StartSpanOption{ - tracer.ServiceName(p.cfg.producerServiceName), - tracer.ResourceName("Produce Topic " + *msg.TopicPartition.Topic), - tracer.SpanType(ext.SpanTypeMessageProducer), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindProducer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka), - tracer.Tag(ext.MessagingKafkaPartition, msg.TopicPartition.Partition), - } - if p.cfg.bootstrapServers != "" { - opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, p.cfg.bootstrapServers)) - } - if !math.IsNaN(p.cfg.analyticsRate) { - opts = append(opts, tracer.Tag(ext.EventSampleRate, p.cfg.analyticsRate)) - } - // if there's a span context in the headers, use that as the parent - carrier := NewMessageCarrier(msg) - if spanctx, err := tracer.Extract(carrier); err == nil { - opts = append(opts, tracer.ChildOf(spanctx)) - } - span, _ := tracer.StartSpanFromContext(p.cfg.ctx, p.cfg.producerSpanName, opts...) - // inject the span context so consumers can pick it up - tracer.Inject(span.Context(), carrier) - return span -} - // Close calls the underlying Producer.Close and also closes the internal // wrapping producer channel. func (p *Producer) Close() { @@ -332,42 +191,20 @@ func (p *Producer) Close() { // Produce calls the underlying Producer.Produce and traces the request. func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error { - span := p.startSpan(msg) + tMsg := wrapMessage(msg) + span := p.tracer.StartProduceSpan(tMsg) var errChan chan error + deliveryChan, errChan = tracing.WrapDeliveryChannel(p.tracer, deliveryChan, span, wrapEvent) - // if the user has selected a delivery channel, we will wrap it and - // wait for the delivery event to finish the span. - // in case the Produce call returns an error, we won't receive anything - // in the deliveryChan, so we use errChan to make sure this goroutine exits. - if deliveryChan != nil { - errChan = make(chan error, 1) - oldDeliveryChan := deliveryChan - deliveryChan = make(chan kafka.Event) - go func() { - var err error - select { - case evt := <-deliveryChan: - if msg, ok := evt.(*kafka.Message); ok { - // delivery errors are returned via TopicPartition.Error - err = msg.TopicPartition.Error - trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err) - } - oldDeliveryChan <- evt - - case e := <-errChan: - err = e - } - span.Finish(tracer.WithError(err)) - }() - } + p.tracer.SetProduceCheckpoint(tMsg) - setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg) err := p.Producer.Produce(msg, deliveryChan) if err != nil { - if deliveryChan != nil { + if errChan != nil { errChan <- err } else { + // with no delivery channel or enqueue error, finish immediately span.Finish(tracer.WithError(err)) } } @@ -379,57 +216,3 @@ func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) er func (p *Producer) ProduceChannel() chan *kafka.Message { return p.produceChannel } - -func (p *Producer) traceEventsChannel(in chan kafka.Event) chan kafka.Event { - if in == nil { - return nil - } - out := make(chan kafka.Event, 1) - go func() { - defer close(out) - for evt := range in { - if msg, ok := evt.(*kafka.Message); ok { - trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, msg.TopicPartition.Error) - } - out <- evt - } - }() - return out -} - -func setConsumeCheckpoint(dataStreamsEnabled bool, groupID string, msg *kafka.Message) { - if !dataStreamsEnabled || msg == nil { - return - } - edges := []string{"direction:in", "topic:" + *msg.TopicPartition.Topic, "type:kafka"} - if groupID != "" { - edges = append(edges, "group:"+groupID) - } - carrier := NewMessageCarrier(msg) - ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getMsgSize(msg)}, edges...) - if !ok { - return - } - datastreams.InjectToBase64Carrier(ctx, carrier) -} - -func setProduceCheckpoint(dataStreamsEnabled bool, version int, msg *kafka.Message) { - if !dataStreamsEnabled || msg == nil { - return - } - edges := []string{"direction:out", "topic:" + *msg.TopicPartition.Topic, "type:kafka"} - carrier := NewMessageCarrier(msg) - ctx, ok := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(context.Background(), carrier), options.CheckpointParams{PayloadSize: getMsgSize(msg)}, edges...) - if !ok || version < 0x000b0400 { - // headers not supported before librdkafka >=0.11.4 - return - } - datastreams.InjectToBase64Carrier(ctx, carrier) -} - -func getMsgSize(msg *kafka.Message) (size int64) { - for _, header := range msg.Headers { - size += int64(len(header.Key) + len(header.Value)) - } - return size + int64(len(msg.Value)+len(msg.Key)) -} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go index 343b4c531b..6f4b70dd78 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go @@ -89,8 +89,8 @@ func TestConsumerChannel(t *testing.T) { assert.Equal(t, "queue", s.Tag(ext.SpanType)) assert.Equal(t, int32(1), s.Tag(ext.MessagingKafkaPartition)) assert.Equal(t, 0.3, s.Tag(ext.EventSampleRate)) - assert.Equal(t, kafka.Offset(i+1), s.Tag("offset")) - assert.Equal(t, componentName, s.Tag(ext.Component)) + assert.EqualValues(t, kafka.Offset(i+1), s.Tag("offset")) + assert.Equal(t, "confluentinc/confluent-kafka-go/kafka", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem)) } @@ -137,7 +137,7 @@ func TestConsumerFunctional(t *testing.T) { assert.Equal(t, 0.1, s0.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s0.Tag(ext.SpanType)) assert.Equal(t, int32(0), s0.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, componentName, s0.Tag(ext.Component)) + assert.Equal(t, "confluentinc/confluent-kafka-go/kafka", s0.Tag(ext.Component)) assert.Equal(t, ext.SpanKindProducer, s0.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s0.Tag(ext.MessagingSystem)) assert.Equal(t, "127.0.0.1", s0.Tag(ext.KafkaBootstrapServers)) @@ -149,7 +149,7 @@ func TestConsumerFunctional(t *testing.T) { assert.Equal(t, nil, s1.Tag(ext.EventSampleRate)) assert.Equal(t, "queue", s1.Tag(ext.SpanType)) assert.Equal(t, int32(0), s1.Tag(ext.MessagingKafkaPartition)) - assert.Equal(t, componentName, s1.Tag(ext.Component)) + assert.Equal(t, "confluentinc/confluent-kafka-go/kafka", s1.Tag(ext.Component)) assert.Equal(t, ext.SpanKindConsumer, s1.Tag(ext.SpanKind)) assert.Equal(t, "kafka", s1.Tag(ext.MessagingSystem)) assert.Equal(t, "127.0.0.1", s1.Tag(ext.KafkaBootstrapServers)) @@ -302,7 +302,9 @@ func TestProduceError(t *testing.T) { err := goleak.Find() if err != nil { // if a goroutine is leaking, ensure it is not coming from this package - assert.NotContains(t, err.Error(), "contrib/confluentinc/confluent-kafka-go") + if strings.Contains(err.Error(), "contrib/confluentinc/confluent-kafka-go") { + assert.NoError(t, err, "found leaked goroutine(s) from this package") + } } }() @@ -398,7 +400,7 @@ func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerO // they should be linked via headers assert.Equal(t, spans[0].TraceID(), spans[1].TraceID()) - if c.cfg.dataStreamsEnabled { + if c.tracer.DSMEnabled() { backlogs := mt.SentDSMBacklogs() toMap := func(_ []internaldsm.Backlog) map[string]struct{} { m := make(map[string]struct{}) diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/option.go b/contrib/confluentinc/confluent-kafka-go/kafka/option.go index 7898d4607c..707bdd1214 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/option.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/option.go @@ -6,129 +6,45 @@ package kafka import ( - "context" - "math" - "net" - "strings" - - "gopkg.in/DataDog/dd-trace-go.v1/internal" - "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema" - "github.com/confluentinc/confluent-kafka-go/kafka" -) - -const defaultServiceName = "kafka" -type config struct { - ctx context.Context - consumerServiceName string - producerServiceName string - consumerSpanName string - producerSpanName string - analyticsRate float64 - bootstrapServers string - groupID string - tagFns map[string]func(msg *kafka.Message) interface{} - dataStreamsEnabled bool -} + "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing" +) // An Option customizes the config. -type Option func(cfg *config) - -func newConfig(opts ...Option) *config { - cfg := &config{ - ctx: context.Background(), - // analyticsRate: globalconfig.AnalyticsRate(), - analyticsRate: math.NaN(), - } - cfg.dataStreamsEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false) - if internal.BoolEnv("DD_TRACE_KAFKA_ANALYTICS_ENABLED", false) { - cfg.analyticsRate = 1.0 - } - - cfg.consumerServiceName = namingschema.ServiceName(defaultServiceName) - cfg.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName) - cfg.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound) - cfg.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound) - - for _, opt := range opts { - opt(cfg) - } - return cfg -} +type Option = tracing.Option // WithContext sets the config context to ctx. // Deprecated: This is deprecated in favor of passing the context // via the message headers -func WithContext(ctx context.Context) Option { - return func(cfg *config) { - cfg.ctx = ctx - } -} +var WithContext = tracing.WithContext // WithServiceName sets the config service name to serviceName. -func WithServiceName(serviceName string) Option { - return func(cfg *config) { - cfg.consumerServiceName = serviceName - cfg.producerServiceName = serviceName - } -} +var WithServiceName = tracing.WithServiceName // WithAnalytics enables Trace Analytics for all started spans. -func WithAnalytics(on bool) Option { - return func(cfg *config) { - if on { - cfg.analyticsRate = 1.0 - } else { - cfg.analyticsRate = math.NaN() - } - } -} +var WithAnalytics = tracing.WithAnalytics // WithAnalyticsRate sets the sampling rate for Trace Analytics events // correlated to started spans. -func WithAnalyticsRate(rate float64) Option { - return func(cfg *config) { - if rate >= 0.0 && rate <= 1.0 { - cfg.analyticsRate = rate - } else { - cfg.analyticsRate = math.NaN() - } - } -} +var WithAnalyticsRate = tracing.WithAnalyticsRate // WithCustomTag will cause the given tagFn to be evaluated after executing // a query and attach the result to the span tagged by the key. func WithCustomTag(tag string, tagFn func(msg *kafka.Message) interface{}) Option { - return func(cfg *config) { - if cfg.tagFns == nil { - cfg.tagFns = make(map[string]func(msg *kafka.Message) interface{}) + wrapped := func(msg tracing.Message) interface{} { + if m, ok := msg.Unwrap().(*kafka.Message); ok { + return tagFn(m) } - cfg.tagFns[tag] = tagFn + return nil } + return tracing.WithCustomTag(tag, wrapped) } // WithConfig extracts the config information for the client to be tagged -func WithConfig(cg *kafka.ConfigMap) Option { - return func(cfg *config) { - if groupID, err := cg.Get("group.id", ""); err == nil { - cfg.groupID = groupID.(string) - } - if bs, err := cg.Get("bootstrap.servers", ""); err == nil && bs != "" { - for _, addr := range strings.Split(bs.(string), ",") { - host, _, err := net.SplitHostPort(addr) - if err == nil { - cfg.bootstrapServers = host - return - } - } - } - } +func WithConfig(cm *kafka.ConfigMap) Option { + return tracing.WithConfig(wrapConfigMap(cm)) } // WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/ -func WithDataStreams() Option { - return func(cfg *config) { - cfg.dataStreamsEnabled = true - } -} +var WithDataStreams = tracing.WithDataStreams diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/option_test.go b/contrib/confluentinc/confluent-kafka-go/kafka/option_test.go deleted file mode 100644 index d990870fc5..0000000000 --- a/contrib/confluentinc/confluent-kafka-go/kafka/option_test.go +++ /dev/null @@ -1,67 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016 Datadog, Inc. - -package kafka - -import ( - "math" - "testing" - - "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" - - "github.com/stretchr/testify/assert" -) - -func TestDataStreamsActivation(t *testing.T) { - t.Run("default", func(t *testing.T) { - cfg := newConfig() - assert.False(t, cfg.dataStreamsEnabled) - }) - t.Run("withOption", func(t *testing.T) { - cfg := newConfig(WithDataStreams()) - assert.True(t, cfg.dataStreamsEnabled) - }) - t.Run("withEnv", func(t *testing.T) { - t.Setenv("DD_DATA_STREAMS_ENABLED", "true") - cfg := newConfig() - assert.True(t, cfg.dataStreamsEnabled) - }) - t.Run("optionOverridesEnv", func(t *testing.T) { - t.Setenv("DD_DATA_STREAMS_ENABLED", "false") - cfg := newConfig(WithDataStreams()) - assert.True(t, cfg.dataStreamsEnabled) - }) -} - -func TestAnalyticsSettings(t *testing.T) { - t.Run("defaults", func(t *testing.T) { - cfg := newConfig() - assert.True(t, math.IsNaN(cfg.analyticsRate)) - }) - - t.Run("global", func(t *testing.T) { - t.Skip("global flag disabled") - rate := globalconfig.AnalyticsRate() - defer globalconfig.SetAnalyticsRate(rate) - globalconfig.SetAnalyticsRate(0.4) - - cfg := newConfig() - assert.Equal(t, 0.4, cfg.analyticsRate) - }) - - t.Run("enabled", func(t *testing.T) { - cfg := newConfig(WithAnalytics(true)) - assert.Equal(t, 1.0, cfg.analyticsRate) - }) - - t.Run("override", func(t *testing.T) { - rate := globalconfig.AnalyticsRate() - defer globalconfig.SetAnalyticsRate(rate) - globalconfig.SetAnalyticsRate(0.4) - - cfg := newConfig(WithAnalyticsRate(0.2)) - assert.Equal(t, 0.2, cfg.analyticsRate) - }) -} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/tracing.go b/contrib/confluentinc/confluent-kafka-go/kafka/tracing.go new file mode 100644 index 0000000000..9e4c379ff9 --- /dev/null +++ b/contrib/confluentinc/confluent-kafka-go/kafka/tracing.go @@ -0,0 +1,163 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + +package kafka + +import ( + "github.com/confluentinc/confluent-kafka-go/kafka" + + "gopkg.in/DataDog/dd-trace-go.v1/contrib/confluentinc/confluent-kafka-go/internal/tracing" +) + +type wMessage struct { + *kafka.Message +} + +func wrapMessage(msg *kafka.Message) tracing.Message { + if msg == nil { + return nil + } + return &wMessage{msg} +} + +func (w *wMessage) Unwrap() any { + return w.Message +} + +func (w *wMessage) GetValue() []byte { + return w.Message.Value +} + +func (w *wMessage) GetKey() []byte { + return w.Message.Key +} + +func (w *wMessage) GetHeaders() []tracing.Header { + hs := make([]tracing.Header, 0, len(w.Headers)) + for _, h := range w.Headers { + hs = append(hs, wrapHeader(h)) + } + return hs +} + +func (w *wMessage) SetHeaders(headers []tracing.Header) { + hs := make([]kafka.Header, 0, len(headers)) + for _, h := range headers { + hs = append(hs, kafka.Header{ + Key: h.GetKey(), + Value: h.GetValue(), + }) + } + w.Message.Headers = hs +} + +func (w *wMessage) GetTopicPartition() tracing.TopicPartition { + return wrapTopicPartition(w.Message.TopicPartition) +} + +type wHeader struct { + kafka.Header +} + +func wrapHeader(h kafka.Header) tracing.Header { + return &wHeader{h} +} + +func (w wHeader) GetKey() string { + return w.Header.Key +} + +func (w wHeader) GetValue() []byte { + return w.Header.Value +} + +type wTopicPartition struct { + kafka.TopicPartition +} + +func wrapTopicPartition(tp kafka.TopicPartition) tracing.TopicPartition { + return wTopicPartition{tp} +} + +func wrapTopicPartitions(tps []kafka.TopicPartition) []tracing.TopicPartition { + wtps := make([]tracing.TopicPartition, 0, len(tps)) + for _, tp := range tps { + wtps = append(wtps, wTopicPartition{tp}) + } + return wtps +} + +func (w wTopicPartition) GetTopic() string { + if w.Topic == nil { + return "" + } + return *w.Topic +} + +func (w wTopicPartition) GetPartition() int32 { + return w.Partition +} + +func (w wTopicPartition) GetOffset() int64 { + return int64(w.Offset) +} + +func (w wTopicPartition) GetError() error { + return w.Error +} + +type wEvent struct { + kafka.Event +} + +func wrapEvent(event kafka.Event) tracing.Event { + return wEvent{event} +} + +func (w wEvent) KafkaMessage() (tracing.Message, bool) { + if m, ok := w.Event.(*kafka.Message); ok { + return wrapMessage(m), true + } + return nil, false +} + +func (w wEvent) KafkaOffsetsCommitted() (tracing.OffsetsCommitted, bool) { + if oc, ok := w.Event.(kafka.OffsetsCommitted); ok { + return wrapOffsetsCommitted(oc), true + } + return nil, false +} + +type wOffsetsCommitted struct { + kafka.OffsetsCommitted +} + +func wrapOffsetsCommitted(oc kafka.OffsetsCommitted) tracing.OffsetsCommitted { + return wOffsetsCommitted{oc} +} + +func (w wOffsetsCommitted) GetError() error { + return w.Error +} + +func (w wOffsetsCommitted) GetOffsets() []tracing.TopicPartition { + ttps := make([]tracing.TopicPartition, 0, len(w.Offsets)) + for _, tp := range w.Offsets { + ttps = append(ttps, wrapTopicPartition(tp)) + } + return ttps +} + +type wConfigMap struct { + cfg *kafka.ConfigMap +} + +func wrapConfigMap(cm *kafka.ConfigMap) tracing.ConfigMap { + return &wConfigMap{cm} +} + +func (w *wConfigMap) Get(key string, defVal any) (any, error) { + return w.cfg.Get(key, defVal) +} diff --git a/ddtrace/tracer/option_test.go b/ddtrace/tracer/option_test.go index 514046e1ed..34ebc0a846 100644 --- a/ddtrace/tracer/option_test.go +++ b/ddtrace/tracer/option_test.go @@ -318,9 +318,7 @@ type contribPkg struct { func TestIntegrationEnabled(t *testing.T) { body, err := exec.Command("go", "list", "-json", "../../contrib/...").Output() - if err != nil { - t.Fatalf(err.Error()) - } + require.NoError(t, err, "go list command failed") var packages []contribPkg stream := json.NewDecoder(strings.NewReader(string(body))) for stream.More() { @@ -337,9 +335,7 @@ func TestIntegrationEnabled(t *testing.T) { } p := strings.Replace(pkg.Dir, pkg.Root, "../..", 1) body, err := exec.Command("grep", "-rl", "MarkIntegrationImported", p).Output() - if err != nil { - t.Fatalf(err.Error()) - } + require.NoError(t, err, "grep command failed") assert.NotEqual(t, len(body), 0, "expected %s to call MarkIntegrationImported", pkg.Name) } }