From 1de4089b4de3c630524e5577d0998700f8414c57 Mon Sep 17 00:00:00 2001 From: albert Date: Tue, 28 Jan 2020 15:03:46 -0500 Subject: [PATCH] Address issues raised in pull request for #2047 1. Setting proper defaults 2. Rename cli option name 3. Change cli option type Signed-off-by: albert chung Signed-off-by: albert --- pkg/kafka/producer/config.go | 4 ++-- plugin/storage/kafka/options.go | 19 ++++++++++--------- plugin/storage/kafka/options_test.go | 9 +++++---- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index ad665fd0979..2a3ab075769 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -34,7 +34,7 @@ type Configuration struct { Compression sarama.CompressionCodec CompressionLevel int ProtocolVersion string - LingerMS int + BatchLinger time.Duration BatchSize int auth.AuthenticationConfig } @@ -47,7 +47,7 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig.Producer.CompressionLevel = c.CompressionLevel saramaConfig.Producer.Return.Successes = true saramaConfig.Producer.Flush.Bytes = c.BatchSize - saramaConfig.Producer.Flush.Frequency = time.Duration(c.LingerMS) * time.Millisecond + saramaConfig.Producer.Flush.Frequency = c.BatchLinger if len(c.ProtocolVersion) > 0 { ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) if err != nil { diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index f3763c6c23b..06fd2a26630 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -19,6 +19,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/Shopify/sarama" "github.com/spf13/viper" @@ -43,7 +44,7 @@ const ( suffixCompression = ".compression" suffixCompressionLevel = ".compression-level" suffixProtocolVersion = ".protocol-version" - suffixLingerMS = ".linger-ms" + suffixLinger = ".linger" suffixBatchSize = ".batch-size" defaultBroker = "127.0.0.1:9092" @@ -52,8 +53,8 @@ const ( defaultRequiredAcks = "local" defaultCompression = "none" defaultCompressionLevel = 0 - defaultLingerMS = 10 - defaultBatchSize = 1024 + defaultLinger = time.Duration(0 * time.Millisecond) + defaultBatchSize = 16384 ) var ( @@ -145,15 +146,15 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultCompressionLevel, "(experimental) compression level to use on messages. gzip = 1-9 (default = 6), snappy = none, lz4 = 1-17 (default = 9), zstd = -131072 - 22 (default = 3)", ) - flagSet.Int( - configPrefix+suffixLingerMS, - defaultLingerMS, - "(experimental) Number of milliseconds to delay before sending records to Kafka. Higher value reduce request volume to Kafka but increase latency and the possibility of data loss in case of process restart.", + flagSet.Duration( + configPrefix+suffixLinger, + defaultLinger, + "(experimental) Number of milliseconds to delay before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", ) flagSet.Int( configPrefix+suffixBatchSize, defaultBatchSize, - "(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request volume to Kafka but increase latency and the possibility of data loss in case of process restart.", + "(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", ) auth.AddFlags(configPrefix, flagSet) } @@ -186,7 +187,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) { CompressionLevel: compressionLevel, ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion), AuthenticationConfig: authenticationOptions, - LingerMS: v.GetInt(configPrefix + suffixLingerMS), + BatchLinger: v.GetDuration(configPrefix + suffixLinger), BatchSize: v.GetInt(configPrefix + suffixBatchSize), } opt.topic = v.GetString(configPrefix + suffixTopic) diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index 54c6bb0030d..1c02e2ae37d 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -16,6 +16,7 @@ package kafka import ( "testing" + "time" "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" @@ -34,7 +35,7 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.producer.required-acks=local", "--kafka.producer.compression=gzip", "--kafka.producer.compression-level=7", - "--kafka.producer.linger-ms=1000", + "--kafka.producer.linger=1s", "--kafka.producer.batch-size=128000", }) opts.InitFromViper(v) @@ -46,7 +47,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, sarama.CompressionGZIP, opts.config.Compression) assert.Equal(t, 7, opts.config.CompressionLevel) assert.Equal(t, 128000, opts.config.BatchSize) - assert.Equal(t, 1000, opts.config.LingerMS) + assert.Equal(t, time.Duration(1*time.Second), opts.config.BatchLinger) } func TestFlagDefaults(t *testing.T) { @@ -61,8 +62,8 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks) assert.Equal(t, sarama.CompressionNone, opts.config.Compression) assert.Equal(t, 0, opts.config.CompressionLevel) - assert.Equal(t, 1024, opts.config.BatchSize) - assert.Equal(t, 10, opts.config.LingerMS) + assert.Equal(t, 16384, opts.config.BatchSize) + assert.Equal(t, time.Duration(0*time.Second), opts.config.BatchLinger) } func TestCompressionLevelDefaults(t *testing.T) {