From bfd6900488c13a622a0d760bb0ba9e63b45d2f8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rodrigo=20Arg=C3=BCello?= Date: Tue, 5 Nov 2024 13:34:35 +0100 Subject: [PATCH] feat: support segmentio/kafka.go.v0 (#293) Adds support for instrumenting `segmentio/kafka.go.v0` Requires https://github.com/DataDog/dd-trace-go/pull/2885 --------- Signed-off-by: Eliott Bouhana Co-authored-by: Romain Marcadier Co-authored-by: Eliott Bouhana --- _integration-tests/go.mod | 1 + _integration-tests/go.sum | 2 + .../tests/segmentio_kafka.v0/gen_test.go | 19 ++ .../segmentio_kafka.v0/segmentio_kafka.go | 245 ++++++++++++++++++ internal/injector/builtin/generated.go | 79 +++++- internal/injector/builtin/generated_deps.go | 2 + .../yaml/datastreams/segmentio_kafka_v0.yml | 233 +++++++++++++++++ 7 files changed, 580 insertions(+), 1 deletion(-) create mode 100644 _integration-tests/tests/segmentio_kafka.v0/gen_test.go create mode 100644 _integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go create mode 100644 internal/injector/builtin/yaml/datastreams/segmentio_kafka_v0.yml diff --git a/_integration-tests/go.mod b/_integration-tests/go.mod index 77df5a7f..f7fe70b6 100644 --- a/_integration-tests/go.mod +++ b/_integration-tests/go.mod @@ -39,6 +39,7 @@ require ( github.com/labstack/echo/v4 v4.12.0 github.com/mattn/go-sqlite3 v1.14.22 github.com/redis/go-redis/v9 v9.7.0 + github.com/segmentio/kafka-go v0.4.42 github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go v0.34.0 github.com/testcontainers/testcontainers-go/modules/cassandra v0.34.0 diff --git a/_integration-tests/go.sum b/_integration-tests/go.sum index d01664e7..8c60c01f 100644 --- a/_integration-tests/go.sum +++ b/_integration-tests/go.sum @@ -1556,6 +1556,8 @@ github.com/sanity-io/litter v1.5.5 h1:iE+sBxPBzoK6uaEP5Lt3fHNgpKcHXc/A2HGETy0uJQ github.com/sanity-io/litter v1.5.5/go.mod h1:9gzJgR2i4ZpjZHsKvUXIRQVk7P+yM3e+jAF7bU2UI5U= github.com/secure-systems-lab/go-securesystemslib v0.8.0 h1:mr5An6X45Kb2nddcFlbmfHkLguCE9laoZCUzEEpIZXA= github.com/secure-systems-lab/go-securesystemslib v0.8.0/go.mod h1:UH2VZVuJfCYR8WgMlCU1uFsOUU+KeyrTWcSS73NBOzU= +github.com/segmentio/kafka-go v0.4.42 h1:qffhBZCz4WcWyNuHEclHjIMLs2slp6mZO8px+5W5tfU= +github.com/segmentio/kafka-go v0.4.42/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8= github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= diff --git a/_integration-tests/tests/segmentio_kafka.v0/gen_test.go b/_integration-tests/tests/segmentio_kafka.v0/gen_test.go new file mode 100644 index 00000000..426be34d --- /dev/null +++ b/_integration-tests/tests/segmentio_kafka.v0/gen_test.go @@ -0,0 +1,19 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2023-present Datadog, Inc. +// +// Code generated by 'go generate'; DO NOT EDIT. + +//go:build integration + +package segmentio_kafka_v0 + +import ( + "datadoghq.dev/orchestrion/_integration-tests/utils" + "testing" +) + +func TestIntegration_segmentio_kafka_v0(t *testing.T) { + utils.RunTest(t, new(TestCase)) +} diff --git a/_integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go b/_integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go new file mode 100644 index 00000000..a4f2e722 --- /dev/null +++ b/_integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go @@ -0,0 +1,245 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2023-present Datadog, Inc. + +package segmentio_kafka_v0 + +import ( + "context" + "errors" + "net" + "strconv" + "testing" + "time" + + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + kafkatest "github.com/testcontainers/testcontainers-go/modules/kafka" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + + "datadoghq.dev/orchestrion/_integration-tests/utils" + "datadoghq.dev/orchestrion/_integration-tests/validator/trace" +) + +const ( + topicA = "topic-A" + topicB = "topic-B" + consumerGroup = "group-A" +) + +type TestCase struct { + kafka *kafkatest.KafkaContainer + addr string + writer *kafka.Writer +} + +func (tc *TestCase) Setup(t *testing.T) { + utils.SkipIfProviderIsNotHealthy(t) + + tc.kafka, tc.addr = utils.StartKafkaTestContainer(t) + + tc.writer = &kafka.Writer{ + Addr: kafka.TCP(tc.addr), + Balancer: &kafka.LeastBytes{}, + } + tc.createTopic(t) +} + +func (tc *TestCase) newReader(topic string) *kafka.Reader { + return kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{tc.addr}, + GroupID: consumerGroup, + Topic: topic, + MaxWait: 10 * time.Millisecond, + MaxBytes: 10e6, // 10MB + }) +} + +func (tc *TestCase) createTopic(t *testing.T) { + conn, err := kafka.Dial("tcp", tc.addr) + require.NoError(t, err) + defer conn.Close() + + controller, err := conn.Controller() + require.NoError(t, err) + + controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + require.NoError(t, err) + defer controllerConn.Close() + + topicConfigs := []kafka.TopicConfig{ + { + Topic: topicA, + NumPartitions: 1, + ReplicationFactor: 1, + }, + { + Topic: topicB, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + err = controllerConn.CreateTopics(topicConfigs...) + require.NoError(t, err) +} + +func (tc *TestCase) Run(t *testing.T) { + tc.produce(t) + tc.consume(t) +} + +func (tc *TestCase) produce(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + span, ctx := tracer.StartSpanFromContext(ctx, "test.root") + defer span.Finish() + + messages := []kafka.Message{ + { + Topic: topicA, + Key: []byte("Key-A"), + Value: []byte("Hello World!"), + }, + { + Topic: topicB, + Key: []byte("Key-A"), + Value: []byte("Second message"), + }, + { + Topic: topicB, + Key: []byte("Key-A"), + Value: []byte("Third message"), + }, + } + const ( + maxRetries = 10 + retryDelay = 100 * time.Millisecond + ) + var ( + retryCount int + err error + ) + for retryCount < maxRetries { + err = tc.writer.WriteMessages(ctx, messages...) + if err == nil { + break + } + // This error happens sometimes with brand-new topics, as there is a delay between when the topic is created + // on the broker, and when the topic can actually be written to. + if errors.Is(err, kafka.UnknownTopicOrPartition) { + retryCount++ + t.Logf("failed to produce kafka messages, will retry in %s (retryCount: %d)", retryDelay, retryCount) + time.Sleep(retryDelay) + } + } + require.NoError(t, err) + require.NoError(t, tc.writer.Close()) +} + +func (tc *TestCase) consume(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + readerA := tc.newReader(topicA) + m, err := readerA.ReadMessage(ctx) + require.NoError(t, err) + assert.Equal(t, "Hello World!", string(m.Value)) + assert.Equal(t, "Key-A", string(m.Key)) + require.NoError(t, readerA.Close()) + + readerB := tc.newReader(topicB) + m, err = readerB.FetchMessage(ctx) + require.NoError(t, err) + assert.Equal(t, "Second message", string(m.Value)) + assert.Equal(t, "Key-A", string(m.Key)) + err = readerB.CommitMessages(ctx, m) + require.NoError(t, err) + require.NoError(t, readerB.Close()) +} + +func (tc *TestCase) Teardown(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + require.NoError(t, tc.kafka.Terminate(ctx)) +} + +func (*TestCase) ExpectedTraces() trace.Traces { + return trace.Traces{ + { + Tags: map[string]any{ + "name": "test.root", + }, + Children: trace.Traces{ + { + Tags: map[string]any{ + "name": "kafka.produce", + "type": "queue", + "service": "kafka", + "resource": "Produce Topic topic-A", + }, + Meta: map[string]string{ + "span.kind": "producer", + "component": "segmentio/kafka.go.v0", + }, + Children: trace.Traces{ + { + Tags: map[string]any{ + "name": "kafka.consume", + "type": "queue", + "service": "kafka", + "resource": "Consume Topic topic-A", + }, + Meta: map[string]string{ + "span.kind": "consumer", + "component": "segmentio/kafka.go.v0", + }, + }, + }, + }, + { + Tags: map[string]any{ + "name": "kafka.produce", + "type": "queue", + "service": "kafka", + "resource": "Produce Topic topic-B", + }, + Meta: map[string]string{ + "span.kind": "producer", + "component": "segmentio/kafka.go.v0", + }, + Children: trace.Traces{ + { + Tags: map[string]any{ + "name": "kafka.consume", + "type": "queue", + "service": "kafka", + "resource": "Consume Topic topic-B", + }, + Meta: map[string]string{ + "span.kind": "consumer", + "component": "segmentio/kafka.go.v0", + }, + }, + }, + }, + { + Tags: map[string]any{ + "name": "kafka.produce", + "type": "queue", + "service": "kafka", + "resource": "Produce Topic topic-B", + }, + Meta: map[string]string{ + "span.kind": "producer", + "component": "segmentio/kafka.go.v0", + }, + Children: nil, + }, + }, + }, + } +} diff --git a/internal/injector/builtin/generated.go b/internal/injector/builtin/generated.go index 044efebc..a941e91b 100644 --- a/internal/injector/builtin/generated.go +++ b/internal/injector/builtin/generated.go @@ -708,6 +708,81 @@ var Aspects = [...]aspect.Aspect{ )), }, }, + // From datastreams/segmentio_kafka_v0.yml + { + JoinPoint: join.StructDefinition(join.MustTypeName("github.com/segmentio/kafka-go.Reader")), + Advice: []advice.Advice{ + advice.InjectDeclarations(code.MustTemplate( + "type __dd_wMessage struct {\n *Message\n}\n \nfunc __dd_wrapMessage(msg *Message) tracing.Message {\n if msg == nil {\n return nil\n }\n return &__dd_wMessage{msg}\n}\n \nfunc (w *__dd_wMessage) GetValue() []byte {\n return w.Value\n}\n \nfunc (w *__dd_wMessage) GetKey() []byte {\n return w.Key\n}\n \nfunc (w *__dd_wMessage) GetHeaders() []tracing.Header {\n hs := make([]tracing.Header, 0, len(w.Headers))\n for _, h := range w.Headers {\n hs = append(hs, __dd_wrapHeader(h))\n }\n return hs\n}\n \nfunc (w *__dd_wMessage) SetHeaders(headers []tracing.Header) {\n hs := make([]Header, 0, len(headers))\n for _, h := range headers {\n hs = append(hs, Header{\n Key: h.GetKey(),\n Value: h.GetValue(),\n })\n }\n w.Message.Headers = hs\n}\n \nfunc (w *__dd_wMessage) GetTopic() string {\n return w.Topic\n}\n \nfunc (w *__dd_wMessage) GetPartition() int {\n return w.Partition\n}\n \nfunc (w *__dd_wMessage) GetOffset() int64 {\n return w.Offset\n}\n \ntype __dd_wHeader struct {\n Header\n}\n \nfunc __dd_wrapHeader(h Header) tracing.Header {\n return &__dd_wHeader{h}\n}\n \nfunc (w __dd_wHeader) GetKey() string {\n return w.Key\n}\n \nfunc (w __dd_wHeader) GetValue() []byte {\n return w.Value\n}\n \ntype __dd_wWriter struct {\n *Writer\n}\n \nfunc (w *__dd_wWriter) GetTopic() string {\n return w.Topic\n}\n \nfunc __dd_wrapTracingWriter(w *Writer) tracing.Writer {\n return &__dd_wWriter{w}\n}\n\nfunc __dd_initReader(r *Reader) {\n if r.__dd_tracer != nil {\n return\n }\n kafkaCfg := tracing.KafkaConfig{}\n if r.Config().Brokers != nil {\n kafkaCfg.BootstrapServers = strings.Join(r.Config().Brokers, \",\")\n }\n if r.Config().GroupID != \"\" {\n kafkaCfg.ConsumerGroupID = r.Config().GroupID\n }\n r.__dd_tracer = tracing.NewTracer(kafkaCfg)\n}\n\ntype __dd_span = ddtrace.Span", + map[string]string{ + "ddtrace": "gopkg.in/DataDog/dd-trace-go.v1/ddtrace", + "strings": "strings", + "tracing": "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing", + }, + context.GoLangVersion{}, + ), []string{}), + advice.AddStructField("__dd_tracer", join.MustTypeName("*gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing.Tracer")), + advice.AddStructField("__dd_prevSpan", join.MustTypeName("__dd_span")), + }, + }, + { + JoinPoint: join.FunctionBody(join.Function( + join.Receiver(join.MustTypeName("*github.com/segmentio/kafka-go.Reader")), + join.Name("FetchMessage"), + )), + Advice: []advice.Advice{ + advice.PrependStmts(code.MustTemplate( + "{{- $r := .Function.Receiver -}}\n{{- $ctx := .Function.Argument 0 -}}\n{{- $msg := .Function.Result 0 -}}\n{{- $err := .Function.Result 1 -}}\n__dd_initReader(r)\nif {{ $r }}.__dd_prevSpan != nil {\n {{ $r }}.__dd_prevSpan.Finish()\n {{ $r }}.__dd_prevSpan = nil\n}\ndefer func() {\n if {{ $err }} != nil {\n return\n }\n tMsg := __dd_wrapMessage(&{{ $msg }})\n {{ $r }}.__dd_prevSpan = {{ $r }}.__dd_tracer.StartConsumeSpan({{ $ctx }}, tMsg)\n {{ $r }}.__dd_tracer.SetConsumeDSMCheckpoint(tMsg)\n}()", + map[string]string{ + "tracing": "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing", + }, + context.GoLangVersion{}, + )), + }, + }, + { + JoinPoint: join.FunctionBody(join.Function( + join.Receiver(join.MustTypeName("*github.com/segmentio/kafka-go.Reader")), + join.Name("Close"), + )), + Advice: []advice.Advice{ + advice.PrependStmts(code.MustTemplate( + "{{- $r := .Function.Receiver -}}\nif {{ $r }}.__dd_prevSpan != nil {\n {{ $r }}.__dd_prevSpan.Finish()\n {{ $r }}.__dd_prevSpan = nil\n}", + map[string]string{}, + context.GoLangVersion{}, + )), + }, + }, + { + JoinPoint: join.StructDefinition(join.MustTypeName("github.com/segmentio/kafka-go.Writer")), + Advice: []advice.Advice{ + advice.InjectDeclarations(code.MustTemplate( + "func __dd_initWriter(w *Writer) {\n if w.__dd_tracer != nil {\n return\n }\n kafkaCfg := tracing.KafkaConfig{\n BootstrapServers: w.Addr.String(),\n }\n w.__dd_tracer = tracing.NewTracer(kafkaCfg)\n}", + map[string]string{ + "tracing": "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing", + }, + context.GoLangVersion{}, + ), []string{}), + advice.AddStructField("__dd_tracer", join.MustTypeName("*gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing.Tracer")), + }, + }, + { + JoinPoint: join.FunctionBody(join.Function( + join.Receiver(join.MustTypeName("*github.com/segmentio/kafka-go.Writer")), + join.Name("WriteMessages"), + )), + Advice: []advice.Advice{ + advice.PrependStmts(code.MustTemplate( + "{{- $w := .Function.Receiver -}}\n{{- $ctx := .Function.Argument 0 -}}\n{{- $msgs := .Function.Argument 1 -}}\n{{- $err := .Function.Result 0 -}}\nspans := make([]ddtrace.Span, len({{ $msgs }}))\n__dd_initWriter(w)\n\nvar spanOpts []tracer.StartSpanOption\nprevSpan, ok := tracer.SpanFromContext({{ $ctx }})\nif ok {\n spanOpts = append(spanOpts, tracer.ChildOf(prevSpan.Context()))\n}\n\nfor i := range {{ $msgs }} {\n tMsg := __dd_wrapMessage(&{{ $msgs }}[i])\n tWriter := __dd_wrapTracingWriter({{ $w }})\n spans[i] = {{ $w }}.__dd_tracer.StartProduceSpan(nil, tWriter, tMsg, spanOpts...)\n {{ $w }}.__dd_tracer.SetProduceDSMCheckpoint(tMsg, tWriter)\n}\n\ndefer func() {\n for i, span := range spans {\n {{ $w }}.__dd_tracer.FinishProduceSpan(span, {{ $msgs }}[i].Partition, {{ $msgs }}[i].Offset, {{ $err }})\n }\n}()", + map[string]string{ + "ddtrace": "gopkg.in/DataDog/dd-trace-go.v1/ddtrace", + "tracer": "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer", + "tracing": "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing", + }, + context.GoLangVersion{}, + )), + }, + }, // From datastreams/shopify_sarama.yml { JoinPoint: join.OneOf( @@ -1390,6 +1465,7 @@ var InjectedPaths = [...]string{ "gopkg.in/DataDog/dd-trace-go.v1/contrib/log/slog", "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http", "gopkg.in/DataDog/dd-trace-go.v1/contrib/redis/go-redis.v9", + "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing", "gopkg.in/DataDog/dd-trace-go.v1/contrib/twitchtv/twirp", "gopkg.in/DataDog/dd-trace-go.v1/ddtrace", "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext", @@ -1411,8 +1487,9 @@ var InjectedPaths = [...]string{ "net/http", "os", "strconv", + "strings", "testing", } // Checksum is a checksum of the built-in configuration which can be used to invalidate caches. -const Checksum = "sha512:WJ6cUkZYX1c1gLi4hqYmuW7WV6w9sQPxW+OdiS1liKFzjvzj8wXQ+1qmdmbxI+//S++ReqWp9ayuu3LLIECoyg==" +const Checksum = "sha512:PWroVJ3UroS2uea+T7JL0bxViL8t/b02MjCMX9wOSbZa4+Yl42h2KUb+GqyOfUhxYNXzMbR+6shPVksDNKWiWw==" diff --git a/internal/injector/builtin/generated_deps.go b/internal/injector/builtin/generated_deps.go index 47a8fc7d..e675f473 100644 --- a/internal/injector/builtin/generated_deps.go +++ b/internal/injector/builtin/generated_deps.go @@ -48,6 +48,7 @@ import ( _ "gopkg.in/DataDog/dd-trace-go.v1/contrib/log/slog" _ "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http" _ "gopkg.in/DataDog/dd-trace-go.v1/contrib/redis/go-redis.v9" + _ "gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing" _ "gopkg.in/DataDog/dd-trace-go.v1/contrib/twitchtv/twirp" _ "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" _ "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" @@ -69,5 +70,6 @@ import ( _ "net/http" _ "os" _ "strconv" + _ "strings" _ "testing" ) diff --git a/internal/injector/builtin/yaml/datastreams/segmentio_kafka_v0.yml b/internal/injector/builtin/yaml/datastreams/segmentio_kafka_v0.yml new file mode 100644 index 00000000..0e2865fb --- /dev/null +++ b/internal/injector/builtin/yaml/datastreams/segmentio_kafka_v0.yml @@ -0,0 +1,233 @@ +# Unless explicitly stated otherwise all files in this repository are licensed +# under the Apache License Version 2.0. +# This product includes software developed at Datadog (https://www.datadoghq.com/). +# Copyright 2023-present Datadog, Inc. +--- +# yaml-language-server: $schema=../../../../../docs/static/schema.json +meta: + name: github.com/segmentio/kafka-go + description: Kafka library in Go + icon: fast-forward + +aspects: + + ## Trace Consume ## + + - id: Add struct fields to kafka.Reader + join-point: + struct-definition: github.com/segmentio/kafka-go.Reader + advice: + - inject-declarations: + imports: + tracing: gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing + ddtrace: gopkg.in/DataDog/dd-trace-go.v1/ddtrace + strings: strings + template: |- + type __dd_wMessage struct { + *Message + } + + func __dd_wrapMessage(msg *Message) tracing.Message { + if msg == nil { + return nil + } + return &__dd_wMessage{msg} + } + + func (w *__dd_wMessage) GetValue() []byte { + return w.Value + } + + func (w *__dd_wMessage) GetKey() []byte { + return w.Key + } + + func (w *__dd_wMessage) GetHeaders() []tracing.Header { + hs := make([]tracing.Header, 0, len(w.Headers)) + for _, h := range w.Headers { + hs = append(hs, __dd_wrapHeader(h)) + } + return hs + } + + func (w *__dd_wMessage) SetHeaders(headers []tracing.Header) { + hs := make([]Header, 0, len(headers)) + for _, h := range headers { + hs = append(hs, Header{ + Key: h.GetKey(), + Value: h.GetValue(), + }) + } + w.Message.Headers = hs + } + + func (w *__dd_wMessage) GetTopic() string { + return w.Topic + } + + func (w *__dd_wMessage) GetPartition() int { + return w.Partition + } + + func (w *__dd_wMessage) GetOffset() int64 { + return w.Offset + } + + type __dd_wHeader struct { + Header + } + + func __dd_wrapHeader(h Header) tracing.Header { + return &__dd_wHeader{h} + } + + func (w __dd_wHeader) GetKey() string { + return w.Key + } + + func (w __dd_wHeader) GetValue() []byte { + return w.Value + } + + type __dd_wWriter struct { + *Writer + } + + func (w *__dd_wWriter) GetTopic() string { + return w.Topic + } + + func __dd_wrapTracingWriter(w *Writer) tracing.Writer { + return &__dd_wWriter{w} + } + + func __dd_initReader(r *Reader) { + if r.__dd_tracer != nil { + return + } + kafkaCfg := tracing.KafkaConfig{} + if r.Config().Brokers != nil { + kafkaCfg.BootstrapServers = strings.Join(r.Config().Brokers, ",") + } + if r.Config().GroupID != "" { + kafkaCfg.ConsumerGroupID = r.Config().GroupID + } + r.__dd_tracer = tracing.NewTracer(kafkaCfg) + } + + type __dd_span = ddtrace.Span + - add-struct-field: + name: __dd_tracer + type: "*gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing.Tracer" + - add-struct-field: + name: __dd_prevSpan + type: "__dd_span" + + - id: Trace kafka.Reader#FetchMessage + join-point: + function-body: + function: + - receiver: '*github.com/segmentio/kafka-go.Reader' + - name: FetchMessage # ReadMessage calls FetchMessage internally, so tracing this should be enough. + advice: + - prepend-statements: + imports: + tracing: gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing + template: |- + {{- $r := .Function.Receiver -}} + {{- $ctx := .Function.Argument 0 -}} + {{- $msg := .Function.Result 0 -}} + {{- $err := .Function.Result 1 -}} + __dd_initReader(r) + if {{ $r }}.__dd_prevSpan != nil { + {{ $r }}.__dd_prevSpan.Finish() + {{ $r }}.__dd_prevSpan = nil + } + defer func() { + if {{ $err }} != nil { + return + } + tMsg := __dd_wrapMessage(&{{ $msg }}) + {{ $r }}.__dd_prevSpan = {{ $r }}.__dd_tracer.StartConsumeSpan({{ $ctx }}, tMsg) + {{ $r }}.__dd_tracer.SetConsumeDSMCheckpoint(tMsg) + }() + + - id: Trace kafka.Reader#Close + join-point: + function-body: + function: + - receiver: '*github.com/segmentio/kafka-go.Reader' + - name: Close + advice: + - prepend-statements: + template: |- + {{- $r := .Function.Receiver -}} + if {{ $r }}.__dd_prevSpan != nil { + {{ $r }}.__dd_prevSpan.Finish() + {{ $r }}.__dd_prevSpan = nil + } + + ## Trace Produce ## + + - id: Add struct fields to kafka.Writer + join-point: + struct-definition: github.com/segmentio/kafka-go.Writer + advice: + - inject-declarations: + imports: + tracing: gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing + template: |- + func __dd_initWriter(w *Writer) { + if w.__dd_tracer != nil { + return + } + kafkaCfg := tracing.KafkaConfig{ + BootstrapServers: w.Addr.String(), + } + w.__dd_tracer = tracing.NewTracer(kafkaCfg) + } + - add-struct-field: + name: __dd_tracer + type: "*gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing.Tracer" + + - id: Trace kafka.Writer#WriteMessages + join-point: + function-body: + function: + - receiver: '*github.com/segmentio/kafka-go.Writer' + - name: WriteMessages + advice: + - prepend-statements: + imports: + ddtrace: gopkg.in/DataDog/dd-trace-go.v1/ddtrace + tracing: gopkg.in/DataDog/dd-trace-go.v1/contrib/segmentio/kafka.go.v0/internal/tracing + tracer: gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer + # Here we pass a nil context to tracing.StartProduceSpan as the GLS modifies the context and makes the + # spans started in the for loop child of the previous ones instead of being sibling spans (which is the + # desired behavior). Until GLS supports starting sibling spans, we set the parent span manually as a workaround. + template: |- + {{- $w := .Function.Receiver -}} + {{- $ctx := .Function.Argument 0 -}} + {{- $msgs := .Function.Argument 1 -}} + {{- $err := .Function.Result 0 -}} + spans := make([]ddtrace.Span, len({{ $msgs }})) + __dd_initWriter(w) + + var spanOpts []tracer.StartSpanOption + prevSpan, ok := tracer.SpanFromContext({{ $ctx }}) + if ok { + spanOpts = append(spanOpts, tracer.ChildOf(prevSpan.Context())) + } + + for i := range {{ $msgs }} { + tMsg := __dd_wrapMessage(&{{ $msgs }}[i]) + tWriter := __dd_wrapTracingWriter({{ $w }}) + spans[i] = {{ $w }}.__dd_tracer.StartProduceSpan(nil, tWriter, tMsg, spanOpts...) + {{ $w }}.__dd_tracer.SetProduceDSMCheckpoint(tMsg, tWriter) + } + + defer func() { + for i, span := range spans { + {{ $w }}.__dd_tracer.FinishProduceSpan(span, {{ $msgs }}[i].Partition, {{ $msgs }}[i].Offset, {{ $err }}) + } + }()