Skip to content

Commit

Permalink
contrib/Shopify/sarama: update tests and add deprecation comment
Browse files Browse the repository at this point in the history
  • Loading branch information
rarguelloF committed Jul 17, 2024
1 parent 5b7af84 commit 4951be3
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 45 deletions.
3 changes: 3 additions & 0 deletions contrib/Shopify/sarama/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
// Copyright 2016 Datadog, Inc.

// Package sarama provides functions to trace the Shopify/sarama package (https://github.com/Shopify/sarama).
//
// Deprecated: github.com/Shopify/sarama is no longer maintained. Please migrate to github.com/IBM/sarama
// and use the corresponding instrumentation.
package sarama // import "gopkg.in/DataDog/dd-trace-go.v1/contrib/Shopify/sarama"

import (
Expand Down
111 changes: 66 additions & 45 deletions contrib/Shopify/sarama/sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ func genTestSpans(t *testing.T, serviceOverride string) []mocktracer.Span {
require.NoError(t, err)

pc, err := c.ConsumePartition("test-topic", 0, 0)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
_ = <-pc.Messages()
err = pc.Close()
require.NoError(t, err)
Expand Down Expand Up @@ -104,32 +102,28 @@ func TestConsumer(t *testing.T) {
})
cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion

client, err := sarama.NewClient([]string{broker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
defer client.Close()

consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
defer consumer.Close()

consumer = WrapConsumer(consumer, WithDataStreams())

partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, 0)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
msg1 := <-partitionConsumer.Messages()
msg2 := <-partitionConsumer.Messages()
partitionConsumer.Close()
err = partitionConsumer.Close()
require.NoError(t, err)
// wait for the channel to be closed
<-partitionConsumer.Messages()

spans := mt.FinishedSpans()
assert.Len(t, spans, 2)
require.Len(t, spans, 2)
{
s := spans[0]
spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg1))
Expand All @@ -146,8 +140,9 @@ func TestConsumer(t *testing.T) {
assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))

p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg1)))
assert.True(t, ok)
require.True(t, ok, "pathway not found in context")
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
Expand All @@ -169,8 +164,9 @@ func TestConsumer(t *testing.T) {
assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg1)))
assert.True(t, ok)

p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewConsumerMessageCarrier(msg2)))
require.True(t, ok, "pathway not found in context")
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:in", "topic:test-topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
Expand Down Expand Up @@ -204,17 +200,16 @@ func TestSyncProducer(t *testing.T) {
cfg.Producer.Return.Successes = true

producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
producer = WrapSyncProducer(cfg, producer, WithDataStreams())

msg1 := &sarama.ProducerMessage{
Topic: "my_topic",
Value: sarama.StringEncoder("test 1"),
Metadata: "test",
}
producer.SendMessage(msg1)
_, _, err = producer.SendMessage(msg1)
require.NoError(t, err)

spans := mt.FinishedSpans()
assert.Len(t, spans, 1)
Expand All @@ -229,8 +224,9 @@ func TestSyncProducer(t *testing.T) {
assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))

p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1)))
assert.True(t, ok)
require.True(t, ok, "pathway not found in context")
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
Expand All @@ -248,24 +244,24 @@ func TestSyncProducerSendMessages(t *testing.T) {
defer leader.Close()

metadataResponse := new(sarama.MetadataResponse)
metadataResponse.Version = 1
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError)
seedBroker.Returns(metadataResponse)

prodSuccess := new(sarama.ProduceResponse)
prodSuccess.Version = 2
prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError)
leader.Returns(prodSuccess)

cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion
cfg.Version = sarama.V0_11_0_0 // first version that supports headers
cfg.Producer.Return.Successes = true
cfg.Producer.Flush.Messages = 2

producer, err := sarama.NewSyncProducer([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
producer = WrapSyncProducer(cfg, producer)
require.NoError(t, err)
producer = WrapSyncProducer(cfg, producer, WithDataStreams())

msg1 := &sarama.ProducerMessage{
Topic: "my_topic",
Expand All @@ -277,9 +273,11 @@ func TestSyncProducerSendMessages(t *testing.T) {
Value: sarama.StringEncoder("test 2"),
Metadata: "test",
}
producer.SendMessages([]*sarama.ProducerMessage{msg1, msg2})
err = producer.SendMessages([]*sarama.ProducerMessage{msg1, msg2})
require.NoError(t, err)

spans := mt.FinishedSpans()
assert.Len(t, spans, 2)
require.Len(t, spans, 2)
for _, s := range spans {
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
Expand All @@ -290,14 +288,23 @@ func TestSyncProducerSendMessages(t *testing.T) {
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
}

for _, msg := range []*sarama.ProducerMessage{msg1, msg2} {
p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg)))
if !assert.True(t, ok, "pathway not found in context") {
continue
}
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())
}
}

func TestAsyncProducer(t *testing.T) {
// the default for producers is a fire-and-forget model that doesn't return
// successes
t.Run("Without Successes", func(t *testing.T) {
t.Skip("Skipping test because sarama.MockBroker doesn't work with versions >= sarama.V0_11_0_0 " +
"https://github.com/Shopify/sarama/issues/1665")
mt := mocktracer.Start()
defer mt.Stop()

Expand All @@ -306,10 +313,8 @@ func TestAsyncProducer(t *testing.T) {
cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0
producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
producer = WrapAsyncProducer(nil, producer)
require.NoError(t, err)
producer = WrapAsyncProducer(nil, producer, WithDataStreams())

msg1 := &sarama.ProducerMessage{
Topic: "my_topic",
Expand All @@ -320,24 +325,33 @@ func TestAsyncProducer(t *testing.T) {
waitForSpans(mt, 1)

spans := mt.FinishedSpans()
assert.Len(t, spans, 1)
require.Len(t, spans, 1)
{
s := spans[0]
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
assert.Equal(t, "queue", s.Tag(ext.SpanType))
assert.Equal(t, "Produce Topic my_topic", s.Tag(ext.ResourceName))
assert.Equal(t, "kafka.produce", s.OperationName())
assert.Equal(t, int32(0), s.Tag(ext.MessagingKafkaPartition))
assert.Equal(t, int64(0), s.Tag("offset"))

// these tags are set in the finishProducerSpan function, but in this case it's never used, and instead we
// automatically finish spans after being started because we don't have a way to know when they are finished.
assert.Nil(t, s.Tag(ext.MessagingKafkaPartition))
assert.Nil(t, s.Tag("offset"))

assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))

p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1)))
require.True(t, ok, "pathway not found in context")
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())
}
})

t.Run("With Successes", func(t *testing.T) {
t.Skip("Skipping test because sarama.MockBroker doesn't work with versions >= sarama.V0_11_0_0 " +
"https://github.com/Shopify/sarama/issues/1665")
mt := mocktracer.Start()
defer mt.Stop()

Expand All @@ -348,10 +362,8 @@ func TestAsyncProducer(t *testing.T) {
cfg.Producer.Return.Successes = true

producer, err := sarama.NewAsyncProducer([]string{broker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
producer = WrapAsyncProducer(cfg, producer)
require.NoError(t, err)
producer = WrapAsyncProducer(cfg, producer, WithDataStreams())

msg1 := &sarama.ProducerMessage{
Topic: "my_topic",
Expand All @@ -361,7 +373,7 @@ func TestAsyncProducer(t *testing.T) {
<-producer.Successes()

spans := mt.FinishedSpans()
assert.Len(t, spans, 1)
require.Len(t, spans, 1)
{
s := spans[0]
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
Expand All @@ -373,6 +385,13 @@ func TestAsyncProducer(t *testing.T) {
assert.Equal(t, "Shopify/sarama", s.Tag(ext.Component))
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))

p, ok := datastreams.PathwayFromContext(datastreams.ExtractFromBase64Carrier(context.Background(), NewProducerMessageCarrier(msg1)))
require.True(t, ok, "pathway not found in context")
expectedCtx, _ := tracer.SetDataStreamsCheckpoint(context.Background(), "direction:out", "topic:my_topic", "type:kafka")
expected, _ := datastreams.PathwayFromContext(expectedCtx)
assert.NotEqual(t, expected.GetHash(), 0)
assert.Equal(t, expected.GetHash(), p.GetHash())
}
})
}
Expand All @@ -385,11 +404,13 @@ func newMockBroker(t *testing.T) *sarama.MockBroker {
broker := sarama.NewMockBroker(t, 1)

metadataResponse := new(sarama.MetadataResponse)
metadataResponse.Version = 1
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, sarama.ErrNoError)
broker.Returns(metadataResponse)

prodSuccess := new(sarama.ProduceResponse)
prodSuccess.Version = 2
prodSuccess.AddTopicPartition("my_topic", 0, sarama.ErrNoError)
for i := 0; i < 10; i++ {
broker.Returns(prodSuccess)
Expand Down

0 comments on commit 4951be3

Please sign in to comment.