Skip to content

Commit

Permalink
Add CLI flags for Kafka batching params (#2047)
Browse files Browse the repository at this point in the history
Signed-off-by: Albert Chung <[email protected]>
  • Loading branch information
apm-opentt authored Jan 31, 2020
1 parent ae86232 commit 243b56f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
8 changes: 8 additions & 0 deletions pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package producer

import (
"time"

"github.com/Shopify/sarama"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
Expand All @@ -32,6 +34,9 @@ type Configuration struct {
Compression sarama.CompressionCodec
CompressionLevel int
ProtocolVersion string
BatchLinger time.Duration
BatchSize int
BatchMaxMessages int
auth.AuthenticationConfig
}

Expand All @@ -42,6 +47,9 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig.Producer.Compression = c.Compression
saramaConfig.Producer.CompressionLevel = c.CompressionLevel
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.Flush.Bytes = c.BatchSize
saramaConfig.Producer.Flush.Frequency = c.BatchLinger
saramaConfig.Producer.Flush.MaxMessages = c.BatchMaxMessages
if len(c.ProtocolVersion) > 0 {
ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,19 @@ const (
suffixCompression = ".compression"
suffixCompressionLevel = ".compression-level"
suffixProtocolVersion = ".protocol-version"
suffixBatchLinger = ".batch-linger"
suffixBatchSize = ".batch-size"
suffixBatchMaxMessages = ".batch-max-messages"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = EncodingProto
defaultRequiredAcks = "local"
defaultCompression = "none"
defaultCompressionLevel = 0
defaultBatchLinger = 0
defaultBatchSize = 0
defaultBatchMaxMessages = 0
)

var (
Expand Down Expand Up @@ -141,6 +147,21 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultCompressionLevel,
"(experimental) compression level to use on messages. gzip = 1-9 (default = 6), snappy = none, lz4 = 1-17 (default = 9), zstd = -131072 - 22 (default = 3)",
)
flagSet.Duration(
configPrefix+suffixBatchLinger,
defaultBatchLinger,
"(experimental) Time interval to wait before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/",
)
flagSet.Int(
configPrefix+suffixBatchSize,
defaultBatchSize,
"(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/",
)
flagSet.Int(
configPrefix+suffixBatchMaxMessages,
defaultBatchMaxMessages,
"(experimental) Number of message to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/",
)
auth.AddFlags(configPrefix, flagSet)
}

Expand Down Expand Up @@ -172,6 +193,9 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
CompressionLevel: compressionLevel,
ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion),
AuthenticationConfig: authenticationOptions,
BatchLinger: v.GetDuration(configPrefix + suffixBatchLinger),
BatchSize: v.GetInt(configPrefix + suffixBatchSize),
BatchMaxMessages: v.GetInt(configPrefix + suffixBatchMaxMessages),
}
opt.topic = v.GetString(configPrefix + suffixTopic)
opt.encoding = v.GetString(configPrefix + suffixEncoding)
Expand Down
13 changes: 12 additions & 1 deletion plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kafka

import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
Expand All @@ -33,7 +34,11 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.producer.encoding=protobuf",
"--kafka.producer.required-acks=local",
"--kafka.producer.compression=gzip",
"--kafka.producer.compression-level=7"})
"--kafka.producer.compression-level=7",
"--kafka.producer.batch-linger=1s",
"--kafka.producer.batch-size=128000",
"--kafka.producer.batch-max-messages=100",
})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.topic)
Expand All @@ -42,6 +47,9 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks)
assert.Equal(t, sarama.CompressionGZIP, opts.config.Compression)
assert.Equal(t, 7, opts.config.CompressionLevel)
assert.Equal(t, 128000, opts.config.BatchSize)
assert.Equal(t, time.Duration(1*time.Second), opts.config.BatchLinger)
assert.Equal(t, 100, opts.config.BatchMaxMessages)
}

func TestFlagDefaults(t *testing.T) {
Expand All @@ -56,6 +64,9 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks)
assert.Equal(t, sarama.CompressionNone, opts.config.Compression)
assert.Equal(t, 0, opts.config.CompressionLevel)
assert.Equal(t, 0, opts.config.BatchSize)
assert.Equal(t, time.Duration(0*time.Second), opts.config.BatchLinger)
assert.Equal(t, 0, opts.config.BatchMaxMessages)
}

func TestCompressionLevelDefaults(t *testing.T) {
Expand Down

0 comments on commit 243b56f

Please sign in to comment.