Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add non-blocking send to kafka provider #149

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions reporter/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"log"
"os"
"time"

"github.com/Shopify/sarama"
"github.com/openzipkin/zipkin-go/model"
Expand All @@ -35,10 +36,11 @@ const defaultKafkaTopic = "zipkin"
// kafkaReporter implements Reporter by publishing spans to a Kafka
// broker.
type kafkaReporter struct {
producer sarama.AsyncProducer
logger *log.Logger
topic string
serializer reporter.SpanSerializer
producer sarama.AsyncProducer
logger *log.Logger
topic string
serializer reporter.SpanSerializer
nonBlockingTimeout time.Duration
}

// ReporterOption sets a parameter for the kafkaReporter
Expand Down Expand Up @@ -76,13 +78,21 @@ func Serializer(serializer reporter.SpanSerializer) ReporterOption {
}
}

// AsyncSendTimeout enables and sets timeout for non-blocking sending data
func AsyncSendTimeout(duration time.Duration) ReporterOption {
return func(c *kafkaReporter) {
c.nonBlockingTimeout = duration
}
}

// NewReporter returns a new Kafka-backed Reporter. address should be a slice of
// TCP endpoints of the form "host:port".
func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter, error) {
r := &kafkaReporter{
logger: log.New(os.Stderr, "", log.LstdFlags),
topic: defaultKafkaTopic,
serializer: reporter.JSONSerializer{},
logger: log.New(os.Stderr, "", log.LstdFlags),
topic: defaultKafkaTopic,
serializer: reporter.JSONSerializer{},
nonBlockingTimeout: -1,
}

for _, option := range options {
Expand Down Expand Up @@ -115,12 +125,24 @@ func (r *kafkaReporter) Send(s model.SpanModel) {
r.logger.Printf("failed when marshalling the span: %s\n", err.Error())
return
}

r.producer.Input() <- &sarama.ProducerMessage{
msg := &sarama.ProducerMessage{
Topic: r.topic,
Key: nil,
Value: sarama.ByteEncoder(m),
}

// check if non-blocking send is allowed
if r.nonBlockingTimeout >= 0 {
select {
case r.producer.Input() <- msg:
return
case <-time.After(r.nonBlockingTimeout):
r.logger.Printf("failed to send msg beaceuse chan is full, msg %s\n", msg.Value)
return
}
} else {
r.producer.Input() <- msg
}
}

func (r *kafkaReporter) Close() error {
Expand Down
47 changes: 47 additions & 0 deletions reporter/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,26 @@ func TestKafkaProduceProto(t *testing.T) {
}
}

func TestKafkaProduceProtoNonBlocking(t *testing.T) {
p := newStubProducer(false)
c, err := kafka.NewReporter(
[]string{"192.0.2.10:9092"},
kafka.Producer(p),
kafka.Serializer(zipkin_proto3.SpanSerializer{}),
kafka.AsyncSendTimeout(100*time.Millisecond),
)
if err != nil {
t.Fatal(err)
}

for _, want := range spans {
m := sendSpan(t, c, p, *want)
testMetadata(t, m)
have := deserializeSpan(t, m.Value)
testEqual(t, want, have)
}
}

func TestKafkaClose(t *testing.T) {
p := newStubProducer(false)
r, err := kafka.NewReporter(
Expand Down Expand Up @@ -173,6 +193,33 @@ func TestKafkaErrors(t *testing.T) {
}
}

func TestKafkaAsyncSend(t *testing.T) {
p := newStubProducer(false)
errs := make(chan []interface{}, len(spans))

c, err := kafka.NewReporter(
[]string{"192.0.2.10:9092"},
kafka.Producer(p),
kafka.AsyncSendTimeout(0),
kafka.Logger(log.New(&chanWriter{errs}, "", log.LstdFlags)),
)
if err != nil {
t.Fatal(err)
}

for _, want := range spans {
c.Send(*want)
}

for i := 0; i < len(spans); i++ {
select {
case <-errs:
case <-time.After(100 * time.Millisecond):
t.Fatalf("errors not logged. have %d, wanted %d", i, len(spans))
}
}
}

func sendSpan(t *testing.T, r reporter.Reporter, p *stubProducer, s model.SpanModel) *sarama.ProducerMessage {
var m *sarama.ProducerMessage
received := make(chan bool, 1)
Expand Down