Skip to content

Commit

Permalink
Kafka: Add new Consumer and Producer flags (jaegertracing#1360)
Browse files Browse the repository at this point in the history
* Add new kafka.consumer + kafka.producer flags instead of deprecated flags

Signed-off-by: Louis-Etienne Dorval <[email protected]>

* fmt the new Kafka consumer/producer code

Signed-off-by: Louis-Etienne Dorval <[email protected]>
  • Loading branch information
ledor473 authored and Konrad Galuszka committed Feb 26, 2019
1 parent b08df26 commit 2c8e304
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 22 deletions.
23 changes: 22 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,28 @@ Changes by Version
#### Backend Changes

##### Breaking Changes

- Introduce `kafka.producer` and `kafka.consumer` flags to replace `kafka` flags ([1360](https://github.com/jaegertracing/jaeger/pull/1360), [@ledor473](https://github.com/ledor473))

The following flags have been deprecated in the Collector and the Ingester:
```
--kafka.brokers
--kafka.encoding
--kafka.topic
```
In the Collector, they are replaced by:
```
--kafka.producer.brokers
--kafka.producer.encoding
--kafka.producer.topic
```
In the Ingester, they are replaced by:
```
--kafka.consumer.brokers
--kafka.consumer.encoding
--kafka.consumer.group-id
```
##### New Features
##### Bug fixes, Minor Improvements
Expand Down
69 changes: 59 additions & 10 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import (
const (
// ConfigPrefix is a prefix for the ingester flags
ConfigPrefix = "ingester"
// KafkaConfigPrefix is a prefix for the Kafka flags
KafkaConfigPrefix = "kafka"
// KafkaConsumerConfigPrefix is a prefix for the Kafka flags
KafkaConsumerConfigPrefix = "kafka.consumer"
// DeprecatedKafkaConfigPrefix is a prefix for the Kafka flags that is replaced by KafkaConfigPrefix
DeprecatedKafkaConfigPrefix = "kafka"
// SuffixBrokers is a suffix for the brokers flag
SuffixBrokers = ".brokers"
// SuffixTopic is a suffix for the topic flag
Expand Down Expand Up @@ -78,19 +80,19 @@ type Options struct {
// AddFlags adds flags for Builder
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
KafkaConfigPrefix+SuffixBrokers,
KafkaConsumerConfigPrefix+SuffixBrokers,
DefaultBroker,
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
flagSet.String(
KafkaConfigPrefix+SuffixTopic,
KafkaConsumerConfigPrefix+SuffixTopic,
DefaultTopic,
"The name of the kafka topic to consume from")
flagSet.String(
KafkaConfigPrefix+SuffixGroupID,
KafkaConsumerConfigPrefix+SuffixGroupID,
DefaultGroupID,
"The Consumer Group that ingester will be consuming on behalf of")
flagSet.String(
KafkaConfigPrefix+SuffixEncoding,
KafkaConsumerConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \"")))
flagSet.String(
Expand All @@ -106,14 +108,61 @@ func AddFlags(flagSet *flag.FlagSet) {
DefaultDeadlockInterval,
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")

// TODO: Remove deprecated flags after 1.11
flagSet.String(
DeprecatedKafkaConfigPrefix+SuffixBrokers,
"",
fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixBrokers))
flagSet.String(
DeprecatedKafkaConfigPrefix+SuffixTopic,
"",
fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixTopic))
flagSet.String(
DeprecatedKafkaConfigPrefix+SuffixGroupID,
"",
fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixGroupID))
flagSet.String(
DeprecatedKafkaConfigPrefix+SuffixEncoding,
"",
fmt.Sprintf("Deprecated; replaced by %s", KafkaConsumerConfigPrefix+SuffixEncoding))
}

// InitFromViper initializes Builder with properties from viper
func (o *Options) InitFromViper(v *viper.Viper) {
o.Brokers = strings.Split(v.GetString(KafkaConfigPrefix+SuffixBrokers), ",")
o.Topic = v.GetString(KafkaConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(KafkaConfigPrefix + SuffixGroupID)
o.Encoding = v.GetString(KafkaConfigPrefix + SuffixEncoding)
o.Brokers = strings.Split(v.GetString(KafkaConsumerConfigPrefix+SuffixBrokers), ",")
o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID)
o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding)

if brokers := v.GetString(DeprecatedKafkaConfigPrefix + SuffixBrokers); brokers != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
DeprecatedKafkaConfigPrefix+SuffixBrokers,
KafkaConsumerConfigPrefix+SuffixBrokers,
)
o.Brokers = strings.Split(brokers, ",")
}
if topic := v.GetString(DeprecatedKafkaConfigPrefix + SuffixTopic); topic != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
DeprecatedKafkaConfigPrefix+SuffixTopic,
KafkaConsumerConfigPrefix+SuffixTopic,
)
o.Topic = topic
}
if groupID := v.GetString(DeprecatedKafkaConfigPrefix + SuffixGroupID); groupID != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
DeprecatedKafkaConfigPrefix+SuffixGroupID,
KafkaConsumerConfigPrefix+SuffixGroupID,
)
o.GroupID = groupID
}
if encoding := v.GetString(DeprecatedKafkaConfigPrefix + SuffixEncoding); encoding != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
DeprecatedKafkaConfigPrefix+SuffixEncoding,
KafkaConsumerConfigPrefix+SuffixEncoding,
)
o.Encoding = encoding
}

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
o.IngesterHTTPPort = v.GetInt(ConfigPrefix + SuffixHTTPPort)

Expand Down
44 changes: 40 additions & 4 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func TestOptionsWithFlags(t *testing.T) {
o := &Options{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--kafka.topic=topic1",
"--kafka.brokers=127.0.0.1:9092,0.0.0:1234",
"--kafka.group-id=group1",
"--kafka.encoding=json",
"--kafka.consumer.topic=topic1",
"--kafka.consumer.brokers=127.0.0.1:9092,0.0.0:1234",
"--kafka.consumer.group-id=group1",
"--kafka.consumer.encoding=json",
"--ingester.parallelism=5",
"--ingester.deadlockInterval=2m",
"--ingester.http-port=2345"})
Expand Down Expand Up @@ -59,3 +59,39 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, DefaultEncoding, o.Encoding)
assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval)
}

func TestOptionsWithDeprecatedFlags(t *testing.T) {
o := &Options{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--kafka.topic=topic1",
"--kafka.brokers=127.0.0.1:9092,0.0.0:1234",
"--kafka.group-id=group1",
"--kafka.encoding=json"})
o.InitFromViper(v)

assert.Equal(t, "topic1", o.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, kafka.EncodingJSON, o.Encoding)
}

func TestOptionsWithAllFlags(t *testing.T) {
o := &Options{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--kafka.topic=topic1",
"--kafka.brokers=127.0.0.1:9092,0.0.0:1234",
"--kafka.group-id=group1",
"--kafka.encoding=protobuf",
"--kafka.consumer.topic=topic2",
"--kafka.consumer.brokers=10.0.0.1:9092,10.0.0.2:9092",
"--kafka.consumer.group-id=group2",
"--kafka.consumer.encoding=json"})
o.InitFromViper(v)

assert.Equal(t, "topic1", o.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, kafka.EncodingProto, o.Encoding)
}
47 changes: 43 additions & 4 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ const (
// EncodingZipkinThrift is used for spans encoded as Zipkin Thrift.
EncodingZipkinThrift = "zipkin-thrift"

configPrefix = "kafka"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"
configPrefix = "kafka.producer"
deprecatedPrefix = "kafka"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixEncoding = ".encoding"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
Expand Down Expand Up @@ -69,6 +70,20 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultEncoding,
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
)

// TODO: Remove deprecated flags after 1.11
flagSet.String(
deprecatedPrefix+suffixBrokers,
"",
fmt.Sprintf("Deprecated; replaced by %s", configPrefix+suffixBrokers))
flagSet.String(
deprecatedPrefix+suffixTopic,
"",
fmt.Sprintf("Deprecated; replaced by %s", configPrefix+suffixTopic))
flagSet.String(
deprecatedPrefix+suffixEncoding,
"",
fmt.Sprintf("Deprecated; replaced by %s", configPrefix+suffixEncoding))
}

// InitFromViper initializes Options with properties from viper
Expand All @@ -78,6 +93,30 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
}
opt.topic = v.GetString(configPrefix + suffixTopic)
opt.encoding = v.GetString(configPrefix + suffixEncoding)

if brokers := v.GetString(deprecatedPrefix + suffixBrokers); brokers != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
deprecatedPrefix+suffixBrokers,
configPrefix+suffixBrokers,
)
opt.config = producer.Configuration{
Brokers: strings.Split(stripWhiteSpace(brokers), ","),
}
}
if topic := v.GetString(deprecatedPrefix + suffixTopic); topic != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
deprecatedPrefix+suffixTopic,
configPrefix+suffixTopic,
)
opt.topic = topic
}
if encoding := v.GetString(deprecatedPrefix + suffixEncoding); encoding != "" {
fmt.Printf("WARNING: found deprecated option %s, please use %s instead\n",
deprecatedPrefix+suffixEncoding,
configPrefix+suffixEncoding,
)
opt.encoding = encoding
}
}

// stripWhiteSpace removes all whitespace characters from a string
Expand Down
37 changes: 34 additions & 3 deletions plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func TestOptionsWithFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
"--kafka.topic=topic1",
"--kafka.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.encoding=protobuf"})
"--kafka.producer.topic=topic1",
"--kafka.producer.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.producer.encoding=protobuf"})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.topic)
Expand All @@ -46,3 +46,34 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, []string{defaultBroker}, opts.config.Brokers)
assert.Equal(t, defaultEncoding, opts.encoding)
}

func TestOptionsWithDeprecatedFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
"--kafka.topic=topic1",
"--kafka.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.encoding=protobuf"})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers)
assert.Equal(t, "protobuf", opts.encoding)
}

func TestOptionsWithAllFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags([]string{
"--kafka.topic=topic1",
"--kafka.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.encoding=protobuf",
"--kafka.producer.topic=topic2",
"--kafka.producer.brokers=10.0.0.1:9092, 10.0.0.2:9092",
"--kafka.producer.encoding=json"})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.config.Brokers)
assert.Equal(t, "protobuf", opts.encoding)
}

0 comments on commit 2c8e304

Please sign in to comment.