From 243b56f52973bec79bb641ce2e670c727b34d2c6 Mon Sep 17 00:00:00 2001 From: apm-opentt <49722990+apm-opentt@users.noreply.github.com> Date: Fri, 31 Jan 2020 18:50:14 -0500 Subject: [PATCH] Add CLI flags for Kafka batching params (#2047) Signed-off-by: Albert Chung --- pkg/kafka/producer/config.go | 8 ++++++++ plugin/storage/kafka/options.go | 24 ++++++++++++++++++++++++ plugin/storage/kafka/options_test.go | 13 ++++++++++++- 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 1560bc3c122..05ed761735c 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -15,6 +15,8 @@ package producer import ( + "time" + "github.com/Shopify/sarama" "github.com/jaegertracing/jaeger/pkg/kafka/auth" @@ -32,6 +34,9 @@ type Configuration struct { Compression sarama.CompressionCodec CompressionLevel int ProtocolVersion string + BatchLinger time.Duration + BatchSize int + BatchMaxMessages int auth.AuthenticationConfig } @@ -42,6 +47,9 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig.Producer.Compression = c.Compression saramaConfig.Producer.CompressionLevel = c.CompressionLevel saramaConfig.Producer.Return.Successes = true + saramaConfig.Producer.Flush.Bytes = c.BatchSize + saramaConfig.Producer.Flush.Frequency = c.BatchLinger + saramaConfig.Producer.Flush.MaxMessages = c.BatchMaxMessages if len(c.ProtocolVersion) > 0 { ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) if err != nil { diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 0fa1867e3e4..2d3b5f8a60c 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -43,6 +43,9 @@ const ( suffixCompression = ".compression" suffixCompressionLevel = ".compression-level" suffixProtocolVersion = ".protocol-version" + suffixBatchLinger = ".batch-linger" + suffixBatchSize = ".batch-size" + suffixBatchMaxMessages = ".batch-max-messages" defaultBroker = "127.0.0.1:9092" defaultTopic = "jaeger-spans" @@ -50,6 +53,9 @@ const ( defaultRequiredAcks = "local" defaultCompression = "none" defaultCompressionLevel = 0 + defaultBatchLinger = 0 + defaultBatchSize = 0 + defaultBatchMaxMessages = 0 ) var ( @@ -141,6 +147,21 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultCompressionLevel, "(experimental) compression level to use on messages. gzip = 1-9 (default = 6), snappy = none, lz4 = 1-17 (default = 9), zstd = -131072 - 22 (default = 3)", ) + flagSet.Duration( + configPrefix+suffixBatchLinger, + defaultBatchLinger, + "(experimental) Time interval to wait before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", + ) + flagSet.Int( + configPrefix+suffixBatchSize, + defaultBatchSize, + "(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", + ) + flagSet.Int( + configPrefix+suffixBatchMaxMessages, + defaultBatchMaxMessages, + "(experimental) Number of message to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", + ) auth.AddFlags(configPrefix, flagSet) } @@ -172,6 +193,9 @@ func (opt *Options) InitFromViper(v *viper.Viper) { CompressionLevel: compressionLevel, ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion), AuthenticationConfig: authenticationOptions, + BatchLinger: v.GetDuration(configPrefix + suffixBatchLinger), + BatchSize: v.GetInt(configPrefix + suffixBatchSize), + BatchMaxMessages: v.GetInt(configPrefix + suffixBatchMaxMessages), } opt.topic = v.GetString(configPrefix + suffixTopic) opt.encoding = v.GetString(configPrefix + suffixEncoding) diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index 78e402a0184..e27ee91f55f 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -16,6 +16,7 @@ package kafka import ( "testing" + "time" "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" @@ -33,7 +34,11 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.producer.encoding=protobuf", "--kafka.producer.required-acks=local", "--kafka.producer.compression=gzip", - "--kafka.producer.compression-level=7"}) + "--kafka.producer.compression-level=7", + "--kafka.producer.batch-linger=1s", + "--kafka.producer.batch-size=128000", + "--kafka.producer.batch-max-messages=100", + }) opts.InitFromViper(v) assert.Equal(t, "topic1", opts.topic) @@ -42,6 +47,9 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks) assert.Equal(t, sarama.CompressionGZIP, opts.config.Compression) assert.Equal(t, 7, opts.config.CompressionLevel) + assert.Equal(t, 128000, opts.config.BatchSize) + assert.Equal(t, time.Duration(1*time.Second), opts.config.BatchLinger) + assert.Equal(t, 100, opts.config.BatchMaxMessages) } func TestFlagDefaults(t *testing.T) { @@ -56,6 +64,9 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks) assert.Equal(t, sarama.CompressionNone, opts.config.Compression) assert.Equal(t, 0, opts.config.CompressionLevel) + assert.Equal(t, 0, opts.config.BatchSize) + assert.Equal(t, time.Duration(0*time.Second), opts.config.BatchLinger) + assert.Equal(t, 0, opts.config.BatchMaxMessages) } func TestCompressionLevelDefaults(t *testing.T) {