Skip to content

Commit

Permalink
Add instrumentation for Kafka (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
XSAM authored Jul 23, 2020
1 parent bb438f8 commit 70957fc
Show file tree
Hide file tree
Showing 16 changed files with 1,629 additions and 24 deletions.
4 changes: 4 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ updates:
directory: "/instrumentation/runtime" # Location of package manifests
schedule:
interval: "daily"
- package-ecosystem: "gomod" # See documentation for possible values
directory: "/instrumentation/github.com/Shopify/sarama" # Location of package manifests
schedule:
interval: "daily"
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Add instrumentation for Kafka (github.com/Shopify/sarama). (#134)
- Add links and status message for mock span. (#134)

## [0.9.0] - 2020-07-20

This release upgrades its [go.opentelemetry.io/otel](https://github.com/open-telemetry/opentelemetry-go/releases/tag/v0.9.0) dependency to v0.9.0.
Expand Down
111 changes: 111 additions & 0 deletions instrumentation/github.com/Shopify/sarama/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sarama

import (
"context"
"strconv"

"github.com/Shopify/sarama"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"
)

type partitionConsumer struct {
sarama.PartitionConsumer
messages chan *sarama.ConsumerMessage
}

// Messages returns the read channel for the messages that are returned by
// the broker.
func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.messages
}

// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received
// message to be traced.
func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer {
cfg := newConfig(serviceName, opts...)

wrapped := &partitionConsumer{
PartitionConsumer: pc,
messages: make(chan *sarama.ConsumerMessage),
}
go func() {
msgs := pc.Messages()

for msg := range msgs {
// Extract a span context from message to link.
carrier := NewConsumerMessageCarrier(msg)
parentSpanContext := propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier)

// Create a span.
attrs := []kv.KeyValue{
standard.ServiceNameKey.String(cfg.ServiceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(msg.Topic),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.String(strconv.FormatInt(msg.Offset, 10)),
kafkaPartitionKey.Int32(msg.Partition),
}
opts := []trace.StartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindConsumer),
}
newCtx, span := cfg.Tracer.Start(parentSpanContext, "kafka.consume", opts...)

// Inject current span context, so consumers can use it to propagate span.
propagation.InjectHTTP(newCtx, cfg.Propagators, carrier)

// Send messages back to user.
wrapped.messages <- msg

span.End()
}
close(wrapped.messages)
}()
return wrapped
}

type consumer struct {
sarama.Consumer

serviceName string
opts []Option
}

// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting
// PartitionConsumer.
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
pc, err := c.Consumer.ConsumePartition(topic, partition, offset)
if err != nil {
return nil, err
}
return WrapPartitionConsumer(c.serviceName, pc, c.opts...), nil
}

// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created
// via Consumer.ConsumePartition.
func WrapConsumer(serviceName string, c sarama.Consumer, opts ...Option) sarama.Consumer {
return &consumer{
Consumer: c,
serviceName: serviceName,
opts: opts,
}
}
211 changes: 211 additions & 0 deletions instrumentation/github.com/Shopify/sarama/consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sarama

import (
"context"
"fmt"
"testing"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"

mocktracer "go.opentelemetry.io/contrib/internal/trace"
)

const (
serviceName = "test-service-name"
topic = "test-topic"
)

var (
propagators = global.Propagators()
)

func TestWrapPartitionConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")

// Mock partition consumer controller
consumer := mocks.NewConsumer(t, sarama.NewConfig())
mockPartitionConsumer := consumer.ExpectConsumePartition(topic, 0, 0)

// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)

partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt))

consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer)
}

func TestWrapConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")

// Mock partition consumer controller
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockPartitionConsumer := mockConsumer.ExpectConsumePartition(topic, 0, 0)

// Wrap consumer
consumer := WrapConsumer(serviceName, mockConsumer, WithTracer(mt))

// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)

consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer)
}

func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer *mocks.PartitionConsumer, partitionConsumer sarama.PartitionConsumer) {
// Create message with span context
ctx, _ := mt.Start(context.Background(), "")
message := sarama.ConsumerMessage{Key: []byte("foo")}
propagation.InjectHTTP(ctx, propagators, NewConsumerMessageCarrier(&message))

// Produce message
mockPartitionConsumer.YieldMessage(&message)
mockPartitionConsumer.YieldMessage(&sarama.ConsumerMessage{Key: []byte("foo2")})

// Consume messages
msgList := make([]*sarama.ConsumerMessage, 2)
msgList[0] = <-partitionConsumer.Messages()
msgList[1] = <-partitionConsumer.Messages()
require.NoError(t, partitionConsumer.Close())
// Wait for the channel to be closed
<-partitionConsumer.Messages()

// Check spans length
spans := mt.EndedSpans()
assert.Len(t, spans, 2)

expectedList := []struct {
kvList []kv.KeyValue
parentSpanID trace.SpanID
kind trace.SpanKind
msgKey []byte
}{
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.String("1"),
kafkaPartitionKey.Int32(0),
},
parentSpanID: trace.SpanFromContext(ctx).SpanContext().SpanID,
kind: trace.SpanKindConsumer,
msgKey: []byte("foo"),
},
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.String("2"),
kafkaPartitionKey.Int32(0),
},
kind: trace.SpanKindConsumer,
msgKey: []byte("foo2"),
},
}

for i, expected := range expectedList {
t.Run(fmt.Sprint("index", i), func(t *testing.T) {
span := spans[i]

assert.Equal(t, expected.parentSpanID, span.ParentSpanID)

remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewConsumerMessageCarrier(msgList[i])))
assert.Equal(t, span.SpanContext(), remoteSpanFromMessage,
"span context should be injected into the consumer message headers")

assert.Equal(t, "kafka.consume", span.Name)
assert.Equal(t, expected.kind, span.Kind)
assert.Equal(t, expected.msgKey, msgList[i].Key)
for _, k := range expected.kvList {
assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key)
}
})
}
}

func TestConsumerConsumePartitionWithError(t *testing.T) {
// Mock partition consumer controller
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockConsumer.ExpectConsumePartition(topic, 0, 0)

consumer := WrapConsumer(serviceName, mockConsumer)
_, err := consumer.ConsumePartition(topic, 0, 0)
assert.NoError(t, err)
// Consume twice
_, err = consumer.ConsumePartition(topic, 0, 0)
assert.Error(t, err)
}

func BenchmarkWrapPartitionConsumer(b *testing.B) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")

mockPartitionConsumer, partitionConsumer := createMockPartitionConsumer(b)

partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt))
message := sarama.ConsumerMessage{Key: []byte("foo")}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
mockPartitionConsumer.YieldMessage(&message)
<-partitionConsumer.Messages()
}
}

func BenchmarkMockPartitionConsumer(b *testing.B) {
mockPartitionConsumer, partitionConsumer := createMockPartitionConsumer(b)

message := sarama.ConsumerMessage{Key: []byte("foo")}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
mockPartitionConsumer.YieldMessage(&message)
<-partitionConsumer.Messages()
}
}

func createMockPartitionConsumer(b *testing.B) (*mocks.PartitionConsumer, sarama.PartitionConsumer) {
// Mock partition consumer controller
consumer := mocks.NewConsumer(b, sarama.NewConfig())
mockPartitionConsumer := consumer.ExpectConsumePartition(topic, 0, 0)

// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(b, err)
return mockPartitionConsumer, partitionConsumer
}
23 changes: 23 additions & 0 deletions instrumentation/github.com/Shopify/sarama/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package sarama provides functions to trace the Shopify/sarama package. (https://github.com/Shopify/sarama)
//
// The consumer's span will be created as a child of the producer's span.
//
// Context propagation only works on Kafka versions higher than 0.11.0.0 which supports record headers.
// (https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html)
//
// Based on: https://github.com/DataDog/dd-trace-go/tree/v1/contrib/Shopify/sarama
package sarama // import "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama"
13 changes: 13 additions & 0 deletions instrumentation/github.com/Shopify/sarama/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama

go 1.14

replace go.opentelemetry.io/contrib => ../../../..

require (
github.com/Shopify/sarama v1.26.4
github.com/stretchr/testify v1.6.1
go.opentelemetry.io/contrib v0.9.0
go.opentelemetry.io/otel v0.9.0
google.golang.org/grpc v1.30.0
)
Loading

0 comments on commit 70957fc

Please sign in to comment.