Skip to content

Commit

Permalink
feat: support confluent-kafka-go v1 and v2 (#320)
Browse files Browse the repository at this point in the history
Requires: DataDog/dd-trace-go#2907

---------

Co-authored-by: Dario Castañé <[email protected]>
Co-authored-by: Romain Marcadier <[email protected]>
Co-authored-by: Romain Marcadier <[email protected]>
  • Loading branch information
4 people authored Nov 5, 2024
1 parent bfd6900 commit 6652139
Show file tree
Hide file tree
Showing 14 changed files with 1,441 additions and 12 deletions.
5 changes: 3 additions & 2 deletions _integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.28.1
github.com/aws/aws-sdk-go-v2/credentials v1.17.42
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.36.3
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/confluentinc/confluent-kafka-go/v2 v2.5.4
github.com/dave/jennifer v1.7.1
github.com/docker/go-connections v0.5.0
github.com/elastic/go-elasticsearch/v6 v6.8.5
Expand Down Expand Up @@ -76,7 +78,6 @@ require (
cloud.google.com/go/monitoring v1.21.2 // indirect
cloud.google.com/go/storage v1.46.0 // indirect
dario.cat/mergo v1.0.1 // indirect
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.16.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
Expand Down Expand Up @@ -158,6 +159,7 @@ require (
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 // indirect
github.com/containerd/errdefs v0.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/platforms v0.2.1 // indirect
github.com/cpuguy83/dockercfg v0.3.2 // indirect
Expand Down Expand Up @@ -359,7 +361,6 @@ require (
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
Expand Down
197 changes: 193 additions & 4 deletions _integration-tests/go.sum

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions _integration-tests/tests/confluent-kafka-go.v1/gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

146 changes: 146 additions & 0 deletions _integration-tests/tests/confluent-kafka-go.v1/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

//go:build integration && !windows

package kafka

import (
"context"
"strings"
"testing"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/require"
kafkatest "github.com/testcontainers/testcontainers-go/modules/kafka"

"datadoghq.dev/orchestrion/_integration-tests/utils"
"datadoghq.dev/orchestrion/_integration-tests/validator/trace"
)

var (
topic = "gotest"
consumerGroup = "gotest"
partition = int32(0)
)

type TestCase struct {
container *kafkatest.KafkaContainer
addr []string
}

func (tc *TestCase) Setup(t *testing.T) {
utils.SkipIfProviderIsNotHealthy(t)
container, addr := utils.StartKafkaTestContainer(t)
tc.container = container
tc.addr = []string{addr}
}

func (tc *TestCase) Run(t *testing.T) {
tc.produceMessage(t)
tc.consumeMessage(t)
}

func (tc *TestCase) kafkaBootstrapServers() string {
return strings.Join(tc.addr, ",")
}

func (tc *TestCase) produceMessage(t *testing.T) {
t.Helper()

cfg := &kafka.ConfigMap{
"bootstrap.servers": tc.kafkaBootstrapServers(),
"go.delivery.reports": true,
}
delivery := make(chan kafka.Event, 1)

producer, err := kafka.NewProducer(cfg)
require.NoError(t, err, "failed to create producer")
defer func() {
<-delivery
producer.Close()
}()

err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: partition,
},
Key: []byte("key2"),
Value: []byte("value2"),
}, delivery)
require.NoError(t, err, "failed to send message")
}

func (tc *TestCase) consumeMessage(t *testing.T) {
t.Helper()

cfg := &kafka.ConfigMap{
"group.id": consumerGroup,
"bootstrap.servers": tc.kafkaBootstrapServers(),
"fetch.wait.max.ms": 500,
"socket.timeout.ms": 1500,
"session.timeout.ms": 1500,
"enable.auto.offset.store": false,
}
c, err := kafka.NewConsumer(cfg)
require.NoError(t, err, "failed to create consumer")
defer c.Close()

err = c.Assign([]kafka.TopicPartition{
{Topic: &topic, Partition: 0},
})
require.NoError(t, err)

m, err := c.ReadMessage(3000 * time.Millisecond)
require.NoError(t, err)

_, err = c.CommitMessage(m)
require.NoError(t, err)

require.Equal(t, "key2", string(m.Key))
}

func (tc *TestCase) Teardown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

require.NoError(t, tc.container.Terminate(ctx))
}

func (*TestCase) ExpectedTraces() trace.Traces {
return trace.Traces{
{
Tags: map[string]any{
"name": "kafka.produce",
"type": "queue",
"service": "kafka",
"resource": "Produce Topic gotest",
},
Meta: map[string]string{
"span.kind": "producer",
"component": "confluentinc/confluent-kafka-go/kafka",
"messaging.system": "kafka",
},
Children: trace.Traces{
{
Tags: map[string]any{
"name": "kafka.consume",
"type": "queue",
"service": "kafka",
"resource": "Consume Topic gotest",
},
Meta: map[string]string{
"span.kind": "consumer",
"component": "confluentinc/confluent-kafka-go/kafka",
"messaging.system": "kafka",
"messaging.kafka.bootstrap.servers": "localhost",
},
},
},
},
}
}
26 changes: 26 additions & 0 deletions _integration-tests/tests/confluent-kafka-go.v1/skip_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

//go:build integration && windows

package kafka

import (
"testing"

"datadoghq.dev/orchestrion/_integration-tests/validator/trace"
)

type skip struct{}

func (skip) Setup(t *testing.T) {
t.Skip("skipping test since confluent-kafka-go requires extra setup to build on Windows: https://github.com/confluentinc/confluent-kafka-go/issues/889")
}

func (skip) Run(t *testing.T) {}
func (skip) Teardown(t *testing.T) {}
func (skip) ExpectedTraces() trace.Traces { return nil }

type TestCase = skip
19 changes: 19 additions & 0 deletions _integration-tests/tests/confluent-kafka-go.v2/gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

146 changes: 146 additions & 0 deletions _integration-tests/tests/confluent-kafka-go.v2/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

//go:build integration && !windows

package kafka

import (
"context"
"strings"
"testing"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/stretchr/testify/require"
kafkatest "github.com/testcontainers/testcontainers-go/modules/kafka"

"datadoghq.dev/orchestrion/_integration-tests/utils"
"datadoghq.dev/orchestrion/_integration-tests/validator/trace"
)

var (
topic = "gotest"
consumerGroup = "gotest"
partition = int32(0)
)

type TestCase struct {
container *kafkatest.KafkaContainer
addr []string
}

func (tc *TestCase) Setup(t *testing.T) {
utils.SkipIfProviderIsNotHealthy(t)
container, addr := utils.StartKafkaTestContainer(t)
tc.container = container
tc.addr = []string{addr}
}

func (tc *TestCase) Run(t *testing.T) {
tc.produceMessage(t)
tc.consumeMessage(t)
}

func (tc *TestCase) kafkaBootstrapServers() string {
return strings.Join(tc.addr, ",")
}

func (tc *TestCase) produceMessage(t *testing.T) {
t.Helper()

cfg := &kafka.ConfigMap{
"bootstrap.servers": tc.kafkaBootstrapServers(),
"go.delivery.reports": true,
}
delivery := make(chan kafka.Event, 1)

producer, err := kafka.NewProducer(cfg)
require.NoError(t, err, "failed to create producer")
defer func() {
<-delivery
producer.Close()
}()

err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: partition,
},
Key: []byte("key2"),
Value: []byte("value2"),
}, delivery)
require.NoError(t, err, "failed to send message")
}

func (tc *TestCase) consumeMessage(t *testing.T) {
t.Helper()

cfg := &kafka.ConfigMap{
"group.id": consumerGroup,
"bootstrap.servers": tc.kafkaBootstrapServers(),
"fetch.wait.max.ms": 500,
"socket.timeout.ms": 1500,
"session.timeout.ms": 1500,
"enable.auto.offset.store": false,
}
c, err := kafka.NewConsumer(cfg)
require.NoError(t, err, "failed to create consumer")
defer c.Close()

err = c.Assign([]kafka.TopicPartition{
{Topic: &topic, Partition: 0},
})
require.NoError(t, err)

m, err := c.ReadMessage(3000 * time.Millisecond)
require.NoError(t, err)

_, err = c.CommitMessage(m)
require.NoError(t, err)

require.Equal(t, "key2", string(m.Key))
}

func (tc *TestCase) Teardown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

require.NoError(t, tc.container.Terminate(ctx))
}

func (*TestCase) ExpectedTraces() trace.Traces {
return trace.Traces{
{
Tags: map[string]any{
"name": "kafka.produce",
"type": "queue",
"service": "kafka",
"resource": "Produce Topic gotest",
},
Meta: map[string]string{
"span.kind": "producer",
"component": "confluentinc/confluent-kafka-go/kafka.v2",
"messaging.system": "kafka",
},
Children: trace.Traces{
{
Tags: map[string]any{
"name": "kafka.consume",
"type": "queue",
"service": "kafka",
"resource": "Consume Topic gotest",
},
Meta: map[string]string{
"span.kind": "consumer",
"component": "confluentinc/confluent-kafka-go/kafka.v2",
"messaging.system": "kafka",
"messaging.kafka.bootstrap.servers": "localhost",
},
},
},
},
}
}
Loading

0 comments on commit 6652139

Please sign in to comment.