diff --git a/CHANGELOG.md b/CHANGELOG.md index fa18b8f12b24..d6f04b3ae76a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,28 @@ Changes by Version #### Backend Changes ##### Breaking Changes - +- Introduce `kafka.producer` and `kafka.consumer` flags to replace `kafka` flags ([1360](https://github.com/jaegertracing/jaeger/pull/1360), [@ledor473](https://github.com/ledor473)) + + The following flags have been deprecated in the Collector and the Ingester: + ``` + --kafka.brokers + --kafka.encoding + --kafka.topic + ``` + + In the Collector, they are replaced by: + ``` + --kafka.producer.brokers + --kafka.producer.encoding + --kafka.producer.topic + ``` + + In the Ingester, they are replaced by: + ``` + --kafka.consumer.brokers + --kafka.consumer.encoding + --kafka.consumer.group-id + ``` ##### New Features ##### Bug fixes, Minor Improvements diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 097793decdfc..4288553d9a53 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -30,8 +30,10 @@ import ( const ( // ConfigPrefix is a prefix for the ingester flags ConfigPrefix = "ingester" - // KafkaConfigPrefix is a prefix for the Kafka flags - KafkaConfigPrefix = "kafka" + // KafkaConsumerConfigPrefix is a prefix for the Kafka flags + KafkaConsumerConfigPrefix = "kafka.consumer" + // DeprecatedKafkaConfigPrefix is a prefix for the Kafka flags that is replaced by KafkaConfigPrefix + DeprecatedKafkaConfigPrefix = "kafka" // SuffixBrokers is a suffix for the brokers flag SuffixBrokers = ".brokers" // SuffixTopic is a suffix for the topic flag @@ -78,19 +80,19 @@ type Options struct { // AddFlags adds flags for Builder func AddFlags(flagSet *flag.FlagSet) { flagSet.String( - KafkaConfigPrefix+SuffixBrokers, + KafkaConsumerConfigPrefix+SuffixBrokers, DefaultBroker, "The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'") flagSet.String( - KafkaConfigPrefix+SuffixTopic, + KafkaConsumerConfigPrefix+SuffixTopic, DefaultTopic, "The name of the kafka topic to consume from") flagSet.String( - KafkaConfigPrefix+SuffixGroupID, + KafkaConsumerConfigPrefix+SuffixGroupID, DefaultGroupID, "The Consumer Group that ingester will be consuming on behalf of") flagSet.String( - KafkaConfigPrefix+SuffixEncoding, + KafkaConsumerConfigPrefix+SuffixEncoding, DefaultEncoding, fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \""))) flagSet.String( @@ -106,14 +108,61 @@ func AddFlags(flagSet *flag.FlagSet) { DefaultDeadlockInterval, "Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.") + // TODO: Remove deprecated flags after 1.11 + flagSet.String( + DeprecatedKafkaConfigPrefix+SuffixBrokers, + "", + fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixBrokers)) + flagSet.String( + DeprecatedKafkaConfigPrefix+SuffixTopic, + "", + fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixTopic)) + flagSet.String( + DeprecatedKafkaConfigPrefix+SuffixGroupID, + "", + fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixGroupID)) + flagSet.String( + DeprecatedKafkaConfigPrefix+SuffixEncoding, + "", + fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixEncoding)) } // InitFromViper initializes Builder with properties from viper func (o *Options) InitFromViper(v *viper.Viper) { - o.Brokers = strings.Split(v.GetString(KafkaConfigPrefix+SuffixBrokers), ",") - o.Topic = v.GetString(KafkaConfigPrefix + SuffixTopic) - o.GroupID = v.GetString(KafkaConfigPrefix + SuffixGroupID) - o.Encoding = v.GetString(KafkaConfigPrefix + SuffixEncoding) + o.Brokers = strings.Split(v.GetString(KafkaConsumerConfigPrefix+SuffixBrokers), ",") + o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic) + o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID) + o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding) + + if brokers := v.GetString(DeprecatedKafkaConfigPrefix + SuffixBrokers); brokers != "" { + fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n", + DeprecatedKafkaConfigPrefix+SuffixBrokers, + KafkaConsumerConfigPrefix+SuffixBrokers, + ) + o.Brokers = strings.Split(brokers, ",") + } + if topic := v.GetString(DeprecatedKafkaConfigPrefix + SuffixTopic); topic != "" { + fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n", + DeprecatedKafkaConfigPrefix+SuffixTopic, + KafkaConsumerConfigPrefix+SuffixTopic, + ) + o.Topic = topic + } + if groupID := v.GetString(DeprecatedKafkaConfigPrefix + SuffixGroupID); groupID != "" { + fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n", + DeprecatedKafkaConfigPrefix+SuffixGroupID, + KafkaConsumerConfigPrefix+SuffixGroupID, + ) + o.GroupID = groupID + } + if encoding := v.GetString(DeprecatedKafkaConfigPrefix + SuffixEncoding); encoding != "" { + fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n", + DeprecatedKafkaConfigPrefix+SuffixEncoding, + KafkaConsumerConfigPrefix+SuffixEncoding, + ) + o.Encoding = encoding + } + o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) o.IngesterHTTPPort = v.GetInt(ConfigPrefix + SuffixHTTPPort) diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index d0f3b39b8041..3fbd6539db76 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -28,10 +28,10 @@ func TestOptionsWithFlags(t *testing.T) { o := &Options{} v, command := config.Viperize(AddFlags) command.ParseFlags([]string{ - "--kafka.topic=topic1", - "--kafka.brokers=127.0.0.1:9092,0.0.0:1234", - "--kafka.group-id=group1", - "--kafka.encoding=json", + "--kafka.consumer.topic=topic1", + "--kafka.consumer.brokers=127.0.0.1:9092,0.0.0:1234", + "--kafka.consumer.group-id=group1", + "--kafka.consumer.encoding=json", "--ingester.parallelism=5", "--ingester.deadlockInterval=2m", "--ingester.http-port=2345"}) @@ -59,3 +59,39 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, DefaultEncoding, o.Encoding) assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval) } + +func TestOptionsWithDeprecatedFlags(t *testing.T) { + o := &Options{} + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{ + "--kafka.topic=topic1", + "--kafka.brokers=127.0.0.1:9092,0.0.0:1234", + "--kafka.group-id=group1", + "--kafka.encoding=json"}) + o.InitFromViper(v) + + assert.Equal(t, "topic1", o.Topic) + assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers) + assert.Equal(t, "group1", o.GroupID) + assert.Equal(t, kafka.EncodingJSON, o.Encoding) +} + +func TestOptionsWithAllFlags(t *testing.T) { + o := &Options{} + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{ + "--kafka.topic=topic1", + "--kafka.brokers=127.0.0.1:9092,0.0.0:1234", + "--kafka.group-id=group1", + "--kafka.encoding=protobuf", + "--kafka.consumer.topic=topic2", + "--kafka.consumer.brokers=10.0.0.1:9092,10.0.0.2:9092", + "--kafka.consumer.group-id=group2", + "--kafka.consumer.encoding=json"}) + o.InitFromViper(v) + + assert.Equal(t, "topic1", o.Topic) + assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers) + assert.Equal(t, "group1", o.GroupID) + assert.Equal(t, kafka.EncodingProto, o.Encoding) +} diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index d86262bf70c6..3c0edbf952fd 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -32,10 +32,11 @@ const ( // EncodingZipkinThrift is used for spans encoded as Zipkin Thrift. EncodingZipkinThrift = "zipkin-thrift" - configPrefix = "kafka" - suffixBrokers = ".brokers" - suffixTopic = ".topic" - suffixEncoding = ".encoding" + configPrefix = "kafka.producer" + deprecatedPrefix = "kafka" + suffixBrokers = ".brokers" + suffixTopic = ".topic" + suffixEncoding = ".encoding" defaultBroker = "127.0.0.1:9092" defaultTopic = "jaeger-spans" @@ -69,6 +70,20 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultEncoding, fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto), ) + + // TODO: Remove deprecated flags after 1.11 + flagSet.String( + deprecatedPrefix+suffixBrokers, + "", + fmt.Sprintf("Deprecated; replaced by %s", configPrefix+suffixBrokers)) + flagSet.String( + deprecatedPrefix+suffixTopic, + "", + fmt.Sprintf("Deprecated; replaced by %s", configPrefix+suffixTopic)) + flagSet.String( + deprecatedPrefix+suffixEncoding, + "", + fmt.Sprintf("Deprecated; replaced by %s", configPrefix+suffixEncoding)) } // InitFromViper initializes Options with properties from viper @@ -78,6 +93,30 @@ func (opt *Options) InitFromViper(v *viper.Viper) { } opt.topic = v.GetString(configPrefix + suffixTopic) opt.encoding = v.GetString(configPrefix + suffixEncoding) + + if brokers := v.GetString(deprecatedPrefix + suffixBrokers); brokers != "" { + fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n", + deprecatedPrefix+suffixBrokers, + configPrefix+suffixBrokers, + ) + opt.config = producer.Configuration{ + Brokers: strings.Split(stripWhiteSpace(brokers), ","), + } + } + if topic := v.GetString(deprecatedPrefix + suffixTopic); topic != "" { + fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n", + deprecatedPrefix+suffixTopic, + configPrefix+suffixTopic, + ) + opt.topic = topic + } + if encoding := v.GetString(deprecatedPrefix + suffixEncoding); encoding != "" { + fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n", + deprecatedPrefix+suffixEncoding, + configPrefix+suffixEncoding, + ) + opt.encoding = encoding + } } // stripWhiteSpace removes all whitespace characters from a string diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index 095ead19d5db..4370f15c497b 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -26,9 +26,9 @@ func TestOptionsWithFlags(t *testing.T) { opts := &Options{} v, command := config.Viperize(opts.AddFlags) command.ParseFlags([]string{ - "--kafka.topic=topic1", - "--kafka.brokers=127.0.0.1:9092, 0.0.0:1234", - "--kafka.encoding=protobuf"}) + "--kafka.producer.topic=topic1", + "--kafka.producer.brokers=127.0.0.1:9092, 0.0.0:1234", + "--kafka.producer.encoding=protobuf"}) opts.InitFromViper(v) assert.Equal(t, "topic1", opts.topic) @@ -46,3 +46,34 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, []string{defaultBroker}, opts.config.Brokers) assert.Equal(t, defaultEncoding, opts.encoding) } + +func TestOptionsWithDeprecatedFlags(t *testing.T) { + opts := &Options{} + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags([]string{ + "--kafka.topic=topic1", + "--kafka.brokers=127.0.0.1:9092, 0.0.0:1234", + "--kafka.encoding=protobuf"}) + opts.InitFromViper(v) + + assert.Equal(t, "topic1", opts.topic) + assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers) + assert.Equal(t, "protobuf", opts.encoding) +} + +func TestOptionsWithAllFlags(t *testing.T) { + opts := &Options{} + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags([]string{ + "--kafka.topic=topic1", + "--kafka.brokers=127.0.0.1:9092, 0.0.0:1234", + "--kafka.encoding=protobuf", + "--kafka.producer.topic=topic2", + "--kafka.producer.brokers=10.0.0.1:9092, 10.0.0.2:9092", + "--kafka.producer.encoding=json"}) + opts.InitFromViper(v) + + assert.Equal(t, "topic1", opts.topic) + assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers) + assert.Equal(t, "protobuf", opts.encoding) +}