Skip to content

Commit

Permalink
feat: support segmentio/kafka.go.v0 (#293)
Browse files Browse the repository at this point in the history
Adds support for instrumenting `segmentio/kafka.go.v0`

Requires DataDog/dd-trace-go#2885

---------

Signed-off-by: Eliott Bouhana <[email protected]>
Co-authored-by: Romain Marcadier <[email protected]>
Co-authored-by: Eliott Bouhana <[email protected]>
  • Loading branch information
3 people authored Nov 5, 2024
1 parent e8e08e1 commit bfd6900
Show file tree
Hide file tree
Showing 7 changed files with 580 additions and 1 deletion.
1 change: 1 addition & 0 deletions _integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/labstack/echo/v4 v4.12.0
github.com/mattn/go-sqlite3 v1.14.22
github.com/redis/go-redis/v9 v9.7.0
github.com/segmentio/kafka-go v0.4.42
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.34.0
github.com/testcontainers/testcontainers-go/modules/cassandra v0.34.0
Expand Down
2 changes: 2 additions & 0 deletions _integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,8 @@ github.com/sanity-io/litter v1.5.5 h1:iE+sBxPBzoK6uaEP5Lt3fHNgpKcHXc/A2HGETy0uJQ
github.com/sanity-io/litter v1.5.5/go.mod h1:9gzJgR2i4ZpjZHsKvUXIRQVk7P+yM3e+jAF7bU2UI5U=
github.com/secure-systems-lab/go-securesystemslib v0.8.0 h1:mr5An6X45Kb2nddcFlbmfHkLguCE9laoZCUzEEpIZXA=
github.com/secure-systems-lab/go-securesystemslib v0.8.0/go.mod h1:UH2VZVuJfCYR8WgMlCU1uFsOUU+KeyrTWcSS73NBOzU=
github.com/segmentio/kafka-go v0.4.42 h1:qffhBZCz4WcWyNuHEclHjIMLs2slp6mZO8px+5W5tfU=
github.com/segmentio/kafka-go v0.4.42/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=
Expand Down
19 changes: 19 additions & 0 deletions _integration-tests/tests/segmentio_kafka.v0/gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

245 changes: 245 additions & 0 deletions _integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

package segmentio_kafka_v0

import (
"context"
"errors"
"net"
"strconv"
"testing"
"time"

"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
kafkatest "github.com/testcontainers/testcontainers-go/modules/kafka"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"datadoghq.dev/orchestrion/_integration-tests/utils"
"datadoghq.dev/orchestrion/_integration-tests/validator/trace"
)

const (
topicA = "topic-A"
topicB = "topic-B"
consumerGroup = "group-A"
)

type TestCase struct {
kafka *kafkatest.KafkaContainer
addr string
writer *kafka.Writer
}

func (tc *TestCase) Setup(t *testing.T) {
utils.SkipIfProviderIsNotHealthy(t)

tc.kafka, tc.addr = utils.StartKafkaTestContainer(t)

tc.writer = &kafka.Writer{
Addr: kafka.TCP(tc.addr),
Balancer: &kafka.LeastBytes{},
}
tc.createTopic(t)
}

func (tc *TestCase) newReader(topic string) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{tc.addr},
GroupID: consumerGroup,
Topic: topic,
MaxWait: 10 * time.Millisecond,
MaxBytes: 10e6, // 10MB
})
}

func (tc *TestCase) createTopic(t *testing.T) {
conn, err := kafka.Dial("tcp", tc.addr)
require.NoError(t, err)
defer conn.Close()

controller, err := conn.Controller()
require.NoError(t, err)

controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
require.NoError(t, err)
defer controllerConn.Close()

topicConfigs := []kafka.TopicConfig{
{
Topic: topicA,
NumPartitions: 1,
ReplicationFactor: 1,
},
{
Topic: topicB,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
require.NoError(t, err)
}

func (tc *TestCase) Run(t *testing.T) {
tc.produce(t)
tc.consume(t)
}

func (tc *TestCase) produce(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

span, ctx := tracer.StartSpanFromContext(ctx, "test.root")
defer span.Finish()

messages := []kafka.Message{
{
Topic: topicA,
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
{
Topic: topicB,
Key: []byte("Key-A"),
Value: []byte("Second message"),
},
{
Topic: topicB,
Key: []byte("Key-A"),
Value: []byte("Third message"),
},
}
const (
maxRetries = 10
retryDelay = 100 * time.Millisecond
)
var (
retryCount int
err error
)
for retryCount < maxRetries {
err = tc.writer.WriteMessages(ctx, messages...)
if err == nil {
break
}
// This error happens sometimes with brand-new topics, as there is a delay between when the topic is created
// on the broker, and when the topic can actually be written to.
if errors.Is(err, kafka.UnknownTopicOrPartition) {
retryCount++
t.Logf("failed to produce kafka messages, will retry in %s (retryCount: %d)", retryDelay, retryCount)
time.Sleep(retryDelay)
}
}
require.NoError(t, err)
require.NoError(t, tc.writer.Close())
}

func (tc *TestCase) consume(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

readerA := tc.newReader(topicA)
m, err := readerA.ReadMessage(ctx)
require.NoError(t, err)
assert.Equal(t, "Hello World!", string(m.Value))
assert.Equal(t, "Key-A", string(m.Key))
require.NoError(t, readerA.Close())

readerB := tc.newReader(topicB)
m, err = readerB.FetchMessage(ctx)
require.NoError(t, err)
assert.Equal(t, "Second message", string(m.Value))
assert.Equal(t, "Key-A", string(m.Key))
err = readerB.CommitMessages(ctx, m)
require.NoError(t, err)
require.NoError(t, readerB.Close())
}

func (tc *TestCase) Teardown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
require.NoError(t, tc.kafka.Terminate(ctx))
}

func (*TestCase) ExpectedTraces() trace.Traces {
return trace.Traces{
{
Tags: map[string]any{
"name": "test.root",
},
Children: trace.Traces{
{
Tags: map[string]any{
"name": "kafka.produce",
"type": "queue",
"service": "kafka",
"resource": "Produce Topic topic-A",
},
Meta: map[string]string{
"span.kind": "producer",
"component": "segmentio/kafka.go.v0",
},
Children: trace.Traces{
{
Tags: map[string]any{
"name": "kafka.consume",
"type": "queue",
"service": "kafka",
"resource": "Consume Topic topic-A",
},
Meta: map[string]string{
"span.kind": "consumer",
"component": "segmentio/kafka.go.v0",
},
},
},
},
{
Tags: map[string]any{
"name": "kafka.produce",
"type": "queue",
"service": "kafka",
"resource": "Produce Topic topic-B",
},
Meta: map[string]string{
"span.kind": "producer",
"component": "segmentio/kafka.go.v0",
},
Children: trace.Traces{
{
Tags: map[string]any{
"name": "kafka.consume",
"type": "queue",
"service": "kafka",
"resource": "Consume Topic topic-B",
},
Meta: map[string]string{
"span.kind": "consumer",
"component": "segmentio/kafka.go.v0",
},
},
},
},
{
Tags: map[string]any{
"name": "kafka.produce",
"type": "queue",
"service": "kafka",
"resource": "Produce Topic topic-B",
},
Meta: map[string]string{
"span.kind": "producer",
"component": "segmentio/kafka.go.v0",
},
Children: nil,
},
},
},
}
}
79 changes: 78 additions & 1 deletion internal/injector/builtin/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions internal/injector/builtin/generated_deps.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit bfd6900

Please sign in to comment.