From 9a90ad74ead0e013d9ac8f6edbb2add6329872bb Mon Sep 17 00:00:00 2001 From: Chodor Marek Date: Tue, 9 Jul 2019 09:48:10 +0200 Subject: [PATCH] Configurable kafka protocol/server version in producer Signed-off-by: Chodor Marek --- pkg/kafka/producer/config.go | 10 +++++++++- plugin/storage/kafka/options.go | 26 ++++++++++++++++---------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index c866c4d957f0..4e254e12b1a5 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -27,7 +27,8 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka producer type Configuration struct { - Brokers []string + Brokers []string + ProtocolVersion string auth.AuthenticationConfig } @@ -36,5 +37,12 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() saramaConfig.Producer.Return.Successes = true c.AuthenticationConfig.SetConfiguration(saramaConfig) + if len(c.ProtocolVersion) > 0 { + ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) + if err != nil { + return nil, err + } + saramaConfig.Version = ver + } return sarama.NewAsyncProducer(c.Brokers, saramaConfig) } diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 136449d6e14b..0b8dcc87d6be 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -33,13 +33,14 @@ const ( // EncodingZipkinThrift is used for spans encoded as Zipkin Thrift. EncodingZipkinThrift = "zipkin-thrift" - configPrefix = "kafka.producer" - suffixBrokers = ".brokers" - suffixTopic = ".topic" - suffixEncoding = ".encoding" - defaultBroker = "127.0.0.1:9092" - defaultTopic = "jaeger-spans" - defaultEncoding = EncodingProto + configPrefix = "kafka.producer" + suffixBrokers = ".brokers" + suffixTopic = ".topic" + suffixProtocolVersion = ".protocol-version" + suffixEncoding = ".encoding" + defaultBroker = "127.0.0.1:9092" + defaultTopic = "jaeger-spans" + defaultEncoding = EncodingProto ) var ( @@ -49,9 +50,9 @@ var ( // Options stores the configuration options for Kafka type Options struct { - config producer.Configuration - topic string - encoding string + config producer.Configuration + topic string + encoding string } // AddFlags adds flags for Options @@ -64,6 +65,10 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { configPrefix+suffixTopic, defaultTopic, "(experimental) The name of the kafka topic") + flagSet.String( + configPrefix+suffixProtocolVersion, + "", + "(experimental) Kafka protocol version - must be supported by kafka server") flagSet.String( configPrefix+suffixEncoding, defaultEncoding, @@ -78,6 +83,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) { authenticationOptions.InitFromViper(configPrefix, v) opt.config = producer.Configuration{ Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","), + ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion), AuthenticationConfig: authenticationOptions, } opt.topic = v.GetString(configPrefix + suffixTopic)