Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/confluentinc/confluent-kafka-go: split tracing code #2907

Merged
merged 11 commits into from
Oct 21, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracing

import (
"math"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func WrapConsumeEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, consumer Consumer, translateFn func(E) TE) chan E {
// in will be nil when consuming via the events channel is not enabled
if in == nil {
return nil
}

out := make(chan E, 1)
go func() {
defer close(out)
for evt := range in {
tEvt := translateFn(evt)
var next ddtrace.Span

// only trace messages
if msg, ok := tEvt.KafkaMessage(); ok {
next = tr.StartConsumeSpan(msg)
tr.SetConsumeCheckpoint(msg)
} else if offset, ok := tEvt.KafkaOffsetsCommitted(); ok {
tr.TrackCommitOffsets(offset.GetOffsets(), offset.GetError())
tr.TrackHighWatermarkOffset(offset.GetOffsets(), consumer)
}

out <- evt

if tr.PrevSpan != nil {
tr.PrevSpan.Finish()
}
tr.PrevSpan = next
}
// finish any remaining span
if tr.PrevSpan != nil {
tr.PrevSpan.Finish()
tr.PrevSpan = nil
}
}()
return out
}

func (tr *KafkaTracer) StartConsumeSpan(msg Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(tr.consumerServiceName),
tracer.ResourceName("Consume Topic " + msg.GetTopicPartition().GetTopic()),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag(ext.MessagingKafkaPartition, msg.GetTopicPartition().GetPartition()),
tracer.Tag("offset", msg.GetTopicPartition().GetOffset()),
tracer.Tag(ext.Component, ComponentName(tr.ckgoVersion)),
tracer.Tag(ext.SpanKind, ext.SpanKindConsumer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka),
tracer.Measured(),
}
if tr.bootstrapServers != "" {
opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, tr.bootstrapServers))
}
if tr.tagFns != nil {
for key, tagFn := range tr.tagFns {
opts = append(opts, tracer.Tag(key, tagFn(msg)))
}
}
if !math.IsNaN(tr.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, tr.analyticsRate))
}
// kafka supports headers, so try to extract a span context
carrier := MessageCarrier{msg: msg}
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(tr.ctx, tr.consumerSpanName, opts...)
// reinject the span context so consumers can pick it up
tracer.Inject(span.Context(), carrier)
return span
}
88 changes: 88 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/internal/tracing/dsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracing

import (
"context"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func (tr *KafkaTracer) TrackCommitOffsets(offsets []TopicPartition, err error) {
if err != nil || tr.groupID == "" || !tr.dsmEnabled {
return
}
for _, tp := range offsets {
tracer.TrackKafkaCommitOffset(tr.groupID, tp.GetTopic(), tp.GetPartition(), tp.GetOffset())
}
}

func (tr *KafkaTracer) TrackHighWatermarkOffset(offsets []TopicPartition, consumer Consumer) {
if !tr.dsmEnabled {
return
}
for _, tp := range offsets {
if _, high, err := consumer.GetWatermarkOffsets(tp.GetTopic(), tp.GetPartition()); err == nil {
tracer.TrackKafkaHighWatermarkOffset("", tp.GetTopic(), tp.GetPartition(), high)
}
}
}

func (tr *KafkaTracer) TrackProduceOffsets(msg Message) {
err := msg.GetTopicPartition().GetError()
if err != nil || !tr.dsmEnabled || msg.GetTopicPartition().GetTopic() == "" {
return
}
tp := msg.GetTopicPartition()
tracer.TrackKafkaProduceOffset(tp.GetTopic(), tp.GetPartition(), tp.GetOffset())
}

func (tr *KafkaTracer) SetConsumeCheckpoint(msg Message) {
if !tr.dsmEnabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.GetTopicPartition().GetTopic(), "type:kafka"}
if tr.groupID != "" {
edges = append(edges, "group:"+tr.groupID)
}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getMsgSize(msg)},
edges...,
)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func (tr *KafkaTracer) SetProduceCheckpoint(msg Message) {
if !tr.dsmEnabled || msg == nil {
return
}
edges := []string{"direction:out", "topic:" + msg.GetTopicPartition().GetTopic(), "type:kafka"}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getMsgSize(msg)},
edges...,
)
if !ok || tr.librdKafkaVersion < 0x000b0400 {
// headers not supported before librdkafka >=0.11.4
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func getMsgSize(msg Message) (size int64) {
for _, header := range msg.GetHeaders() {
size += int64(len(header.GetKey()) + len(header.GetValue()))
}
return size + int64(len(msg.GetValue())+len(msg.GetKey()))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracing

import (
"context"
"math"
"net"
"strings"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
)

const defaultServiceName = "kafka"

type KafkaTracer struct {
PrevSpan ddtrace.Span
ctx context.Context
consumerServiceName string
producerServiceName string
consumerSpanName string
producerSpanName string
analyticsRate float64
bootstrapServers string
groupID string
tagFns map[string]func(msg Message) interface{}
dsmEnabled bool
ckgoVersion CKGoVersion
librdKafkaVersion int
}

func (tr *KafkaTracer) DSMEnabled() bool {
return tr.dsmEnabled
}

// An Option customizes the KafkaTracer.
type Option func(tr *KafkaTracer)

func NewKafkaTracer(ckgoVersion CKGoVersion, librdKafkaVersion int, opts ...Option) *KafkaTracer {
tr := &KafkaTracer{
ctx: context.Background(),
// analyticsRate: globalconfig.AnalyticsRate(),
analyticsRate: math.NaN(),
ckgoVersion: ckgoVersion,
librdKafkaVersion: librdKafkaVersion,
}
tr.dsmEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false)
if internal.BoolEnv("DD_TRACE_KAFKA_ANALYTICS_ENABLED", false) {
tr.analyticsRate = 1.0
}

tr.consumerServiceName = namingschema.ServiceName(defaultServiceName)
tr.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
tr.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound)
tr.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound)

for _, opt := range opts {
opt(tr)
}
return tr
}

// WithContext sets the config context to ctx.
// Deprecated: This is deprecated in favor of passing the context
// via the message headers
func WithContext(ctx context.Context) Option {
return func(tr *KafkaTracer) {
tr.ctx = ctx
}
}

// WithServiceName sets the config service name to serviceName.
func WithServiceName(serviceName string) Option {
return func(tr *KafkaTracer) {
tr.consumerServiceName = serviceName
tr.producerServiceName = serviceName
}
}

// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) Option {
return func(tr *KafkaTracer) {
if on {
tr.analyticsRate = 1.0
} else {
tr.analyticsRate = math.NaN()
}
}
}

// WithAnalyticsRate sets the sampling rate for Trace Analytics events
// correlated to started spans.
func WithAnalyticsRate(rate float64) Option {
return func(tr *KafkaTracer) {
if rate >= 0.0 && rate <= 1.0 {
tr.analyticsRate = rate
} else {
tr.analyticsRate = math.NaN()
}
}
}

// WithCustomTag will cause the given tagFn to be evaluated after executing
// a query and attach the result to the span tagged by the key.
func WithCustomTag(tag string, tagFn func(msg Message) interface{}) Option {
return func(tr *KafkaTracer) {
if tr.tagFns == nil {
tr.tagFns = make(map[string]func(msg Message) interface{})
}
tr.tagFns[tag] = tagFn
}
}

// WithConfig extracts the config information for the client to be tagged
func WithConfig(cg ConfigMap) Option {
return func(tr *KafkaTracer) {
if groupID, err := cg.Get("group.id", ""); err == nil {
tr.groupID = groupID.(string)
}
if bs, err := cg.Get("bootstrap.servers", ""); err == nil && bs != "" {
for _, addr := range strings.Split(bs.(string), ",") {
host, _, err := net.SplitHostPort(addr)
if err == nil {
tr.bootstrapServers = host
return
}
}
}
}
}

// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithDataStreams() Option {
return func(tr *KafkaTracer) {
tr.dsmEnabled = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka
package tracing

import (
"math"
Expand All @@ -16,29 +16,29 @@ import (

func TestDataStreamsActivation(t *testing.T) {
t.Run("default", func(t *testing.T) {
cfg := newConfig()
assert.False(t, cfg.dataStreamsEnabled)
tr := NewKafkaTracer(0, 0)
assert.False(t, tr.DSMEnabled())
})
t.Run("withOption", func(t *testing.T) {
cfg := newConfig(WithDataStreams())
assert.True(t, cfg.dataStreamsEnabled)
tr := NewKafkaTracer(0, 0, WithDataStreams())
assert.True(t, tr.DSMEnabled())
})
t.Run("withEnv", func(t *testing.T) {
t.Setenv("DD_DATA_STREAMS_ENABLED", "true")
cfg := newConfig()
assert.True(t, cfg.dataStreamsEnabled)
tr := NewKafkaTracer(0, 0)
assert.True(t, tr.DSMEnabled())
})
t.Run("optionOverridesEnv", func(t *testing.T) {
t.Setenv("DD_DATA_STREAMS_ENABLED", "false")
cfg := newConfig(WithDataStreams())
assert.True(t, cfg.dataStreamsEnabled)
tr := NewKafkaTracer(0, 0, WithDataStreams())
assert.True(t, tr.DSMEnabled())
})
}

func TestAnalyticsSettings(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
cfg := newConfig()
assert.True(t, math.IsNaN(cfg.analyticsRate))
tr := NewKafkaTracer(0, 0)
assert.True(t, math.IsNaN(tr.analyticsRate))
})

t.Run("global", func(t *testing.T) {
Expand All @@ -47,21 +47,21 @@ func TestAnalyticsSettings(t *testing.T) {
defer globalconfig.SetAnalyticsRate(rate)
globalconfig.SetAnalyticsRate(0.4)

cfg := newConfig()
assert.Equal(t, 0.4, cfg.analyticsRate)
tr := NewKafkaTracer(0, 0)
assert.Equal(t, 0.4, tr.analyticsRate)
})

t.Run("enabled", func(t *testing.T) {
cfg := newConfig(WithAnalytics(true))
assert.Equal(t, 1.0, cfg.analyticsRate)
tr := NewKafkaTracer(0, 0, WithAnalytics(true))
assert.Equal(t, 1.0, tr.analyticsRate)
})

t.Run("override", func(t *testing.T) {
rate := globalconfig.AnalyticsRate()
defer globalconfig.SetAnalyticsRate(rate)
globalconfig.SetAnalyticsRate(0.4)

cfg := newConfig(WithAnalyticsRate(0.2))
assert.Equal(t, 0.2, cfg.analyticsRate)
tr := NewKafkaTracer(0, 0, WithAnalyticsRate(0.2))
assert.Equal(t, 0.2, tr.analyticsRate)
})
}
Loading
Loading