From 7f8938a73da73bbdb0fface8e3016250d977d2ae Mon Sep 17 00:00:00 2001 From: Nemanja Mikic Date: Wed, 2 Oct 2019 17:34:20 +0200 Subject: [PATCH] Add non-blocking send to kafka provider --- reporter/kafka/kafka.go | 40 +++++++++++++++++++++++------- reporter/kafka/kafka_test.go | 47 ++++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 9 deletions(-) diff --git a/reporter/kafka/kafka.go b/reporter/kafka/kafka.go index a50986f..382475b 100644 --- a/reporter/kafka/kafka.go +++ b/reporter/kafka/kafka.go @@ -21,6 +21,7 @@ import ( "encoding/json" "log" "os" + "time" "github.com/Shopify/sarama" "github.com/openzipkin/zipkin-go/model" @@ -35,10 +36,11 @@ const defaultKafkaTopic = "zipkin" // kafkaReporter implements Reporter by publishing spans to a Kafka // broker. type kafkaReporter struct { - producer sarama.AsyncProducer - logger *log.Logger - topic string - serializer reporter.SpanSerializer + producer sarama.AsyncProducer + logger *log.Logger + topic string + serializer reporter.SpanSerializer + nonBlockingTimeout time.Duration } // ReporterOption sets a parameter for the kafkaReporter @@ -76,13 +78,21 @@ func Serializer(serializer reporter.SpanSerializer) ReporterOption { } } +// AsyncSendTimeout enables and sets timeout for non-blocking sending data +func AsyncSendTimeout(duration time.Duration) ReporterOption { + return func(c *kafkaReporter) { + c.nonBlockingTimeout = duration + } +} + // NewReporter returns a new Kafka-backed Reporter. address should be a slice of // TCP endpoints of the form "host:port". func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter, error) { r := &kafkaReporter{ - logger: log.New(os.Stderr, "", log.LstdFlags), - topic: defaultKafkaTopic, - serializer: reporter.JSONSerializer{}, + logger: log.New(os.Stderr, "", log.LstdFlags), + topic: defaultKafkaTopic, + serializer: reporter.JSONSerializer{}, + nonBlockingTimeout: -1, } for _, option := range options { @@ -115,12 +125,24 @@ func (r *kafkaReporter) Send(s model.SpanModel) { r.logger.Printf("failed when marshalling the span: %s\n", err.Error()) return } - - r.producer.Input() <- &sarama.ProducerMessage{ + msg := &sarama.ProducerMessage{ Topic: r.topic, Key: nil, Value: sarama.ByteEncoder(m), } + + // check if non-blocking send is allowed + if r.nonBlockingTimeout >= 0 { + select { + case r.producer.Input() <- msg: + return + case <-time.After(r.nonBlockingTimeout): + r.logger.Printf("failed to send msg beaceuse chan is full, msg %s\n", msg.Value) + return + } + } else { + r.producer.Input() <- msg + } } func (r *kafkaReporter) Close() error { diff --git a/reporter/kafka/kafka_test.go b/reporter/kafka/kafka_test.go index fcc5e21..6f63329 100644 --- a/reporter/kafka/kafka_test.go +++ b/reporter/kafka/kafka_test.go @@ -100,6 +100,26 @@ func TestKafkaProduceProto(t *testing.T) { } } +func TestKafkaProduceProtoNonBlocking(t *testing.T) { + p := newStubProducer(false) + c, err := kafka.NewReporter( + []string{"192.0.2.10:9092"}, + kafka.Producer(p), + kafka.Serializer(zipkin_proto3.SpanSerializer{}), + kafka.AsyncSendTimeout(0), + ) + if err != nil { + t.Fatal(err) + } + + for _, want := range spans { + m := sendSpan(t, c, p, *want) + testMetadata(t, m) + have := deserializeSpan(t, m.Value) + testEqual(t, want, have) + } +} + func TestKafkaClose(t *testing.T) { p := newStubProducer(false) r, err := kafka.NewReporter( @@ -173,6 +193,33 @@ func TestKafkaErrors(t *testing.T) { } } +func TestKafkaAsyncSend(t *testing.T) { + p := newStubProducer(false) + errs := make(chan []interface{}, len(spans)) + + c, err := kafka.NewReporter( + []string{"192.0.2.10:9092"}, + kafka.Producer(p), + kafka.AsyncSendTimeout(0), + kafka.Logger(log.New(&chanWriter{errs}, "", log.LstdFlags)), + ) + if err != nil { + t.Fatal(err) + } + + for _, want := range spans { + c.Send(*want) + } + + for i := 0; i < len(spans); i++ { + select { + case <-errs: + case <-time.After(100 * time.Millisecond): + t.Fatalf("errors not logged. have %d, wanted %d", i, len(spans)) + } + } +} + func sendSpan(t *testing.T, r reporter.Reporter, p *stubProducer, s model.SpanModel) *sarama.ProducerMessage { var m *sarama.ProducerMessage received := make(chan bool, 1)