Skip to content

Commit

Permalink
contrib/confluentinc/confluent-kafka-go: split tracing code (#2907)
Browse files Browse the repository at this point in the history
Co-authored-by: Dario Castañé <[email protected]>
  • Loading branch information
rarguelloF and darccio committed Oct 23, 2024
1 parent a16404e commit 8736720
Show file tree
Hide file tree
Showing 20 changed files with 1,094 additions and 933 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracing

import (
"math"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func WrapConsumeEventsChannel[E any, TE Event](tr *KafkaTracer, in chan E, consumer Consumer, translateFn func(E) TE) chan E {
// in will be nil when consuming via the events channel is not enabled
if in == nil {
return nil
}

out := make(chan E, 1)
go func() {
defer close(out)
for evt := range in {
tEvt := translateFn(evt)
var next ddtrace.Span

// only trace messages
if msg, ok := tEvt.KafkaMessage(); ok {
next = tr.StartConsumeSpan(msg)
tr.SetConsumeCheckpoint(msg)
} else if offset, ok := tEvt.KafkaOffsetsCommitted(); ok {
tr.TrackCommitOffsets(offset.GetOffsets(), offset.GetError())
tr.TrackHighWatermarkOffset(offset.GetOffsets(), consumer)
}

out <- evt

if tr.PrevSpan != nil {
tr.PrevSpan.Finish()
}
tr.PrevSpan = next
}
// finish any remaining span
if tr.PrevSpan != nil {
tr.PrevSpan.Finish()
tr.PrevSpan = nil
}
}()
return out
}

func (tr *KafkaTracer) StartConsumeSpan(msg Message) ddtrace.Span {
opts := []tracer.StartSpanOption{
tracer.ServiceName(tr.consumerServiceName),
tracer.ResourceName("Consume Topic " + msg.GetTopicPartition().GetTopic()),
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag(ext.MessagingKafkaPartition, msg.GetTopicPartition().GetPartition()),
tracer.Tag("offset", msg.GetTopicPartition().GetOffset()),
tracer.Tag(ext.Component, ComponentName(tr.ckgoVersion)),
tracer.Tag(ext.SpanKind, ext.SpanKindConsumer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka),
tracer.Measured(),
}
if tr.bootstrapServers != "" {
opts = append(opts, tracer.Tag(ext.KafkaBootstrapServers, tr.bootstrapServers))
}
if tr.tagFns != nil {
for key, tagFn := range tr.tagFns {
opts = append(opts, tracer.Tag(key, tagFn(msg)))
}
}
if !math.IsNaN(tr.analyticsRate) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, tr.analyticsRate))
}
// kafka supports headers, so try to extract a span context
carrier := MessageCarrier{msg: msg}
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(tr.ctx, tr.consumerSpanName, opts...)
// reinject the span context so consumers can pick it up
tracer.Inject(span.Context(), carrier)
return span
}
88 changes: 88 additions & 0 deletions contrib/confluentinc/confluent-kafka-go/internal/tracing/dsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracing

import (
"context"

"gopkg.in/DataDog/dd-trace-go.v1/datastreams"
"gopkg.in/DataDog/dd-trace-go.v1/datastreams/options"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func (tr *KafkaTracer) TrackCommitOffsets(offsets []TopicPartition, err error) {
if err != nil || tr.groupID == "" || !tr.dsmEnabled {
return
}
for _, tp := range offsets {
tracer.TrackKafkaCommitOffset(tr.groupID, tp.GetTopic(), tp.GetPartition(), tp.GetOffset())
}
}

func (tr *KafkaTracer) TrackHighWatermarkOffset(offsets []TopicPartition, consumer Consumer) {
if !tr.dsmEnabled {
return
}
for _, tp := range offsets {
if _, high, err := consumer.GetWatermarkOffsets(tp.GetTopic(), tp.GetPartition()); err == nil {
tracer.TrackKafkaHighWatermarkOffset("", tp.GetTopic(), tp.GetPartition(), high)
}
}
}

func (tr *KafkaTracer) TrackProduceOffsets(msg Message) {
err := msg.GetTopicPartition().GetError()
if err != nil || !tr.dsmEnabled || msg.GetTopicPartition().GetTopic() == "" {
return
}
tp := msg.GetTopicPartition()
tracer.TrackKafkaProduceOffset(tp.GetTopic(), tp.GetPartition(), tp.GetOffset())
}

func (tr *KafkaTracer) SetConsumeCheckpoint(msg Message) {
if !tr.dsmEnabled || msg == nil {
return
}
edges := []string{"direction:in", "topic:" + msg.GetTopicPartition().GetTopic(), "type:kafka"}
if tr.groupID != "" {
edges = append(edges, "group:"+tr.groupID)
}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getMsgSize(msg)},
edges...,
)
if !ok {
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func (tr *KafkaTracer) SetProduceCheckpoint(msg Message) {
if !tr.dsmEnabled || msg == nil {
return
}
edges := []string{"direction:out", "topic:" + msg.GetTopicPartition().GetTopic(), "type:kafka"}
carrier := NewMessageCarrier(msg)
ctx, ok := tracer.SetDataStreamsCheckpointWithParams(
datastreams.ExtractFromBase64Carrier(context.Background(), carrier),
options.CheckpointParams{PayloadSize: getMsgSize(msg)},
edges...,
)
if !ok || tr.librdKafkaVersion < 0x000b0400 {
// headers not supported before librdkafka >=0.11.4
return
}
datastreams.InjectToBase64Carrier(ctx, carrier)
}

func getMsgSize(msg Message) (size int64) {
for _, header := range msg.GetHeaders() {
size += int64(len(header.GetKey()) + len(header.GetValue()))
}
return size + int64(len(msg.GetValue())+len(msg.GetKey()))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package tracing

import (
"context"
"math"
"net"
"strings"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
)

const defaultServiceName = "kafka"

type KafkaTracer struct {
PrevSpan ddtrace.Span
ctx context.Context
consumerServiceName string
producerServiceName string
consumerSpanName string
producerSpanName string
analyticsRate float64
bootstrapServers string
groupID string
tagFns map[string]func(msg Message) interface{}
dsmEnabled bool
ckgoVersion CKGoVersion
librdKafkaVersion int
}

func (tr *KafkaTracer) DSMEnabled() bool {
return tr.dsmEnabled
}

// An Option customizes the KafkaTracer.
type Option func(tr *KafkaTracer)

func NewKafkaTracer(ckgoVersion CKGoVersion, librdKafkaVersion int, opts ...Option) *KafkaTracer {
tr := &KafkaTracer{
ctx: context.Background(),
// analyticsRate: globalconfig.AnalyticsRate(),
analyticsRate: math.NaN(),
ckgoVersion: ckgoVersion,
librdKafkaVersion: librdKafkaVersion,
}
tr.dsmEnabled = internal.BoolEnv("DD_DATA_STREAMS_ENABLED", false)
if internal.BoolEnv("DD_TRACE_KAFKA_ANALYTICS_ENABLED", false) {
tr.analyticsRate = 1.0
}

tr.consumerServiceName = namingschema.ServiceName(defaultServiceName)
tr.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
tr.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound)
tr.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound)

for _, opt := range opts {
opt(tr)
}
return tr
}

// WithContext sets the config context to ctx.
// Deprecated: This is deprecated in favor of passing the context
// via the message headers
func WithContext(ctx context.Context) Option {
return func(tr *KafkaTracer) {
tr.ctx = ctx
}
}

// WithServiceName sets the config service name to serviceName.
func WithServiceName(serviceName string) Option {
return func(tr *KafkaTracer) {
tr.consumerServiceName = serviceName
tr.producerServiceName = serviceName
}
}

// WithAnalytics enables Trace Analytics for all started spans.
func WithAnalytics(on bool) Option {
return func(tr *KafkaTracer) {
if on {
tr.analyticsRate = 1.0
} else {
tr.analyticsRate = math.NaN()
}
}
}

// WithAnalyticsRate sets the sampling rate for Trace Analytics events
// correlated to started spans.
func WithAnalyticsRate(rate float64) Option {
return func(tr *KafkaTracer) {
if rate >= 0.0 && rate <= 1.0 {
tr.analyticsRate = rate
} else {
tr.analyticsRate = math.NaN()
}
}
}

// WithCustomTag will cause the given tagFn to be evaluated after executing
// a query and attach the result to the span tagged by the key.
func WithCustomTag(tag string, tagFn func(msg Message) interface{}) Option {
return func(tr *KafkaTracer) {
if tr.tagFns == nil {
tr.tagFns = make(map[string]func(msg Message) interface{})
}
tr.tagFns[tag] = tagFn
}
}

// WithConfig extracts the config information for the client to be tagged
func WithConfig(cg ConfigMap) Option {
return func(tr *KafkaTracer) {
if groupID, err := cg.Get("group.id", ""); err == nil {
tr.groupID = groupID.(string)
}
if bs, err := cg.Get("bootstrap.servers", ""); err == nil && bs != "" {
for _, addr := range strings.Split(bs.(string), ",") {
host, _, err := net.SplitHostPort(addr)
if err == nil {
tr.bootstrapServers = host
return
}
}
}
}
}

// WithDataStreams enables the Data Streams monitoring product features: https://www.datadoghq.com/product/data-streams-monitoring/
func WithDataStreams() Option {
return func(tr *KafkaTracer) {
tr.dsmEnabled = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package kafka
package tracing

import (
"math"
Expand All @@ -16,29 +16,29 @@ import (

func TestDataStreamsActivation(t *testing.T) {
t.Run("default", func(t *testing.T) {
cfg := newConfig()
assert.False(t, cfg.dataStreamsEnabled)
tr := NewKafkaTracer(0, 0)
assert.False(t, tr.DSMEnabled())
})
t.Run("withOption", func(t *testing.T) {
cfg := newConfig(WithDataStreams())
assert.True(t, cfg.dataStreamsEnabled)
tr := NewKafkaTracer(0, 0, WithDataStreams())
assert.True(t, tr.DSMEnabled())
})
t.Run("withEnv", func(t *testing.T) {
t.Setenv("DD_DATA_STREAMS_ENABLED", "true")
cfg := newConfig()
assert.True(t, cfg.dataStreamsEnabled)
tr := NewKafkaTracer(0, 0)
assert.True(t, tr.DSMEnabled())
})
t.Run("optionOverridesEnv", func(t *testing.T) {
t.Setenv("DD_DATA_STREAMS_ENABLED", "false")
cfg := newConfig(WithDataStreams())
assert.True(t, cfg.dataStreamsEnabled)
tr := NewKafkaTracer(0, 0, WithDataStreams())
assert.True(t, tr.DSMEnabled())
})
}

func TestAnalyticsSettings(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
cfg := newConfig()
assert.True(t, math.IsNaN(cfg.analyticsRate))
tr := NewKafkaTracer(0, 0)
assert.True(t, math.IsNaN(tr.analyticsRate))
})

t.Run("global", func(t *testing.T) {
Expand All @@ -47,21 +47,21 @@ func TestAnalyticsSettings(t *testing.T) {
defer globalconfig.SetAnalyticsRate(rate)
globalconfig.SetAnalyticsRate(0.4)

cfg := newConfig()
assert.Equal(t, 0.4, cfg.analyticsRate)
tr := NewKafkaTracer(0, 0)
assert.Equal(t, 0.4, tr.analyticsRate)
})

t.Run("enabled", func(t *testing.T) {
cfg := newConfig(WithAnalytics(true))
assert.Equal(t, 1.0, cfg.analyticsRate)
tr := NewKafkaTracer(0, 0, WithAnalytics(true))
assert.Equal(t, 1.0, tr.analyticsRate)
})

t.Run("override", func(t *testing.T) {
rate := globalconfig.AnalyticsRate()
defer globalconfig.SetAnalyticsRate(rate)
globalconfig.SetAnalyticsRate(0.4)

cfg := newConfig(WithAnalyticsRate(0.2))
assert.Equal(t, 0.2, cfg.analyticsRate)
tr := NewKafkaTracer(0, 0, WithAnalyticsRate(0.2))
assert.Equal(t, 0.2, tr.analyticsRate)
})
}
Loading

0 comments on commit 8736720

Please sign in to comment.