Skip to content

Commit

Permalink
Add non-blocking send to kafka provider
Browse files Browse the repository at this point in the history
  • Loading branch information
n0tl3ss committed Oct 2, 2019
1 parent c29478e commit 7f8938a
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 9 deletions.
40 changes: 31 additions & 9 deletions reporter/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"log"
"os"
"time"

"github.com/Shopify/sarama"
"github.com/openzipkin/zipkin-go/model"
Expand All @@ -35,10 +36,11 @@ const defaultKafkaTopic = "zipkin"
// kafkaReporter implements Reporter by publishing spans to a Kafka
// broker.
type kafkaReporter struct {
producer sarama.AsyncProducer
logger *log.Logger
topic string
serializer reporter.SpanSerializer
producer sarama.AsyncProducer
logger *log.Logger
topic string
serializer reporter.SpanSerializer
nonBlockingTimeout time.Duration
}

// ReporterOption sets a parameter for the kafkaReporter
Expand Down Expand Up @@ -76,13 +78,21 @@ func Serializer(serializer reporter.SpanSerializer) ReporterOption {
}
}

// AsyncSendTimeout enables and sets timeout for non-blocking sending data
func AsyncSendTimeout(duration time.Duration) ReporterOption {
return func(c *kafkaReporter) {
c.nonBlockingTimeout = duration
}
}

// NewReporter returns a new Kafka-backed Reporter. address should be a slice of
// TCP endpoints of the form "host:port".
func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter, error) {
r := &kafkaReporter{
logger: log.New(os.Stderr, "", log.LstdFlags),
topic: defaultKafkaTopic,
serializer: reporter.JSONSerializer{},
logger: log.New(os.Stderr, "", log.LstdFlags),
topic: defaultKafkaTopic,
serializer: reporter.JSONSerializer{},
nonBlockingTimeout: -1,
}

for _, option := range options {
Expand Down Expand Up @@ -115,12 +125,24 @@ func (r *kafkaReporter) Send(s model.SpanModel) {
r.logger.Printf("failed when marshalling the span: %s\n", err.Error())
return
}

r.producer.Input() <- &sarama.ProducerMessage{
msg := &sarama.ProducerMessage{
Topic: r.topic,
Key: nil,
Value: sarama.ByteEncoder(m),
}

// check if non-blocking send is allowed
if r.nonBlockingTimeout >= 0 {
select {
case r.producer.Input() <- msg:
return
case <-time.After(r.nonBlockingTimeout):
r.logger.Printf("failed to send msg beaceuse chan is full, msg %s\n", msg.Value)
return
}
} else {
r.producer.Input() <- msg
}
}

func (r *kafkaReporter) Close() error {
Expand Down
47 changes: 47 additions & 0 deletions reporter/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,26 @@ func TestKafkaProduceProto(t *testing.T) {
}
}

func TestKafkaProduceProtoNonBlocking(t *testing.T) {
p := newStubProducer(false)
c, err := kafka.NewReporter(
[]string{"192.0.2.10:9092"},
kafka.Producer(p),
kafka.Serializer(zipkin_proto3.SpanSerializer{}),
kafka.AsyncSendTimeout(0),
)
if err != nil {
t.Fatal(err)
}

for _, want := range spans {
m := sendSpan(t, c, p, *want)
testMetadata(t, m)
have := deserializeSpan(t, m.Value)
testEqual(t, want, have)
}
}

func TestKafkaClose(t *testing.T) {
p := newStubProducer(false)
r, err := kafka.NewReporter(
Expand Down Expand Up @@ -173,6 +193,33 @@ func TestKafkaErrors(t *testing.T) {
}
}

func TestKafkaAsyncSend(t *testing.T) {
p := newStubProducer(false)
errs := make(chan []interface{}, len(spans))

c, err := kafka.NewReporter(
[]string{"192.0.2.10:9092"},
kafka.Producer(p),
kafka.AsyncSendTimeout(0),
kafka.Logger(log.New(&chanWriter{errs}, "", log.LstdFlags)),
)
if err != nil {
t.Fatal(err)
}

for _, want := range spans {
c.Send(*want)
}

for i := 0; i < len(spans); i++ {
select {
case <-errs:
case <-time.After(100 * time.Millisecond):
t.Fatalf("errors not logged. have %d, wanted %d", i, len(spans))
}
}
}

func sendSpan(t *testing.T, r reporter.Reporter, p *stubProducer, s model.SpanModel) *sarama.ProducerMessage {
var m *sarama.ProducerMessage
received := make(chan bool, 1)
Expand Down

0 comments on commit 7f8938a

Please sign in to comment.