Skip to content

Commit

Permalink
Add example for Sarama consumer and producer
Browse files Browse the repository at this point in the history
  • Loading branch information
XSAM committed Jul 23, 2020
1 parent d18d1ca commit 39e3052
Show file tree
Hide file tree
Showing 7 changed files with 413 additions and 0 deletions.
31 changes: 31 additions & 0 deletions instrumentation/github.com/Shopify/sarama/example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kafka Sarama instrumentation example

A Kafka producer and consumer using Sarama with instrumentation.

These instructions expect you have
[docker-compose](https://docs.docker.com/compose/) installed.

Bring up the `Kafka` and `ZooKeeper` services to run the
example:

```sh
docker-compose up -d zoo kafka
```

Then up the `kafka-producer` service to produce a message into Kafka:

```sh
docker-compose up kafka-producer
```

At last, up the `kafka-consumer` service to consume messages from Kafka:

```sh
docker-compose up kafka-consumer
```

Shut down the services when you are finished with the example:

```sh
docker-compose down
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM golang:alpine AS base
COPY . /src/
WORKDIR /src/instrumentation/github.com/Shopify/sarama

FROM base AS kafka-consumer
RUN go install ./example/consumer/consumer.go
CMD ["/go/bin/consumer"]

119 changes: 119 additions & 0 deletions instrumentation/github.com/Shopify/sarama/example/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"flag"
"log"
"os"
"strings"
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"

saramatrace "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama"
"go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/example"
)

var (
brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
)

func main() {
example.InitTracer()
flag.Parse()

if *brokers == "" {
flag.PrintDefaults()
os.Exit(1)
}

brokerList := strings.Split(*brokers, ",")
log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))

startConsumerGroup(brokerList)

select {}
}

func startConsumerGroup(brokerList []string) {
consumerGroupHandler := Consumer{}
// Wrap instrumentation
handler := saramatrace.WrapConsumerGroupHandler("example-consumer", &consumerGroupHandler)

config := sarama.NewConfig()
config.Version = sarama.V2_5_0_0
config.Consumer.Offsets.Initial = sarama.OffsetOldest

// Create consumer group
consumerGroup, err := sarama.NewConsumerGroup(brokerList, "example", config)
if err != nil {
log.Fatalln("Failed to start sarama consumer group:", err)
}

err = consumerGroup.Consume(context.Background(), []string{example.KafkaTopic}, handler)
if err != nil {
log.Fatalln("Failed to consume via handler:", err)
}
}

func printMessage(msg *sarama.ConsumerMessage) {
// Extract tracing info from message
ctx := propagation.ExtractHTTP(context.Background(), global.Propagators(), saramatrace.NewConsumerMessageCarrier(msg))

tr := global.Tracer("consumer")
ctx, span := tr.Start(ctx, "consume message", trace.WithAttributes(
standard.MessagingOperationProcess,
))
defer span.End()

// Emulate Work loads
time.Sleep(1 * time.Second)

log.Println("Successful to read message: ", string(msg.Value))
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
printMessage(message)
session.MarkMessage(message, "")
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "3.7"
services:
zoo:
image: zookeeper:3.4.9
hostname: zoo
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo:2888:3888
networks:
- example
kafka:
# Kafka version 2.5.0
image: confluentinc/cp-kafka:5.5.0
hostname: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zoo
networks:
- example

kafka-producer:
build:
dockerfile: $PWD/producer/Dockerfile
context: ../../../../..
command:
- "/bin/sh"
- "-c"
- "/go/bin/producer"
environment:
KAFKA_PEERS: kafka:19092
depends_on:
- kafka
networks:
- example
kafka-consumer:
build:
dockerfile: $PWD/consumer/Dockerfile
context: ../../../../..
command:
- "/bin/sh"
- "-c"
- "/go/bin/consumer"
environment:
KAFKA_PEERS: kafka:19092
depends_on:
- kafka
networks:
- example
networks:
example:
45 changes: 45 additions & 0 deletions instrumentation/github.com/Shopify/sarama/example/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package example

import (
"log"

otelglobal "go.opentelemetry.io/otel/api/global"
oteltracestdout "go.opentelemetry.io/otel/exporters/trace/stdout"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

const (
KafkaTopic = "sarama-instrumentation-example"
)

func InitTracer() {
exporter, err := oteltracestdout.NewExporter(oteltracestdout.Options{PrettyPrint: true})
if err != nil {
log.Fatal(err)
}
cfg := sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
}
tp, err := sdktrace.NewProvider(
sdktrace.WithConfig(cfg),
sdktrace.WithSyncer(exporter),
)
if err != nil {
log.Fatal(err)
}
otelglobal.SetTraceProvider(tp)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM golang:alpine AS base
COPY . /src/
WORKDIR /src/instrumentation/github.com/Shopify/sarama

FROM base AS kafka-producer
RUN go install ./example/producer/producer.go
CMD ["/go/bin/producer"]
Loading

0 comments on commit 39e3052

Please sign in to comment.