Skip to content

Commit

Permalink
feat: expose rackID option in ingester samara client (jaegertracing#3395
Browse files Browse the repository at this point in the history
)
  • Loading branch information
shyimo authored and rbroggi committed Nov 22, 2021
1 parent 11708c7 commit bc6b060
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 0 deletions.
7 changes: 7 additions & 0 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
SuffixBrokers = ".brokers"
// SuffixTopic is a suffix for the topic flag
SuffixTopic = ".topic"
// SuffixRackID is a suffix for the consumer rack-id flag
SuffixRackID = ".rack-id"
// SuffixGroupID is a suffix for the group-id flag
SuffixGroupID = ".group-id"
// SuffixClientID is a suffix for the client-id flag
Expand Down Expand Up @@ -111,6 +113,10 @@ func AddFlags(flagSet *flag.FlagSet) {
KafkaConsumerConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \"")))
flagSet.String(
KafkaConsumerConfigPrefix+SuffixRackID,
"",
"Rack identifier for this client. This can be any string value which indicates where this client is located. It corresponds with the broker config `broker.rack`")

auth.AddFlags(KafkaConsumerConfigPrefix, flagSet)
}
Expand All @@ -123,6 +129,7 @@ func (o *Options) InitFromViper(v *viper.Viper) {
o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID)
o.ProtocolVersion = v.GetString(KafkaConsumerConfigPrefix + SuffixProtocolVersion)
o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding)
o.RackID = v.GetString(KafkaConsumerConfigPrefix + SuffixRackID)

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval)
Expand Down
2 changes: 2 additions & 0 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.consumer.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.consumer.group-id=group1",
"--kafka.consumer.client-id=client-id1",
"--kafka.consumer.rack-id=rack1",
"--kafka.consumer.encoding=json",
"--kafka.consumer.protocol-version=1.0.0",
"--ingester.parallelism=5",
Expand All @@ -46,6 +47,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "topic1", o.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, "rack1", o.RackID)
assert.Equal(t, "client-id1", o.ClientID)
assert.Equal(t, "1.0.0", o.ProtocolVersion)
assert.Equal(t, 5, o.Parallelism)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ type Configuration struct {
GroupID string `mapstructure:"group_id"`
ClientID string `mapstructure:"client_id"`
ProtocolVersion string `mapstructure:"protocol_version"`
RackID string `mapstructure:"rack_id"`
}

// NewConsumer creates a new kafka consumer
func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) {
saramaConfig := cluster.NewConfig()
saramaConfig.Group.Mode = cluster.ConsumerModePartitions
saramaConfig.ClientID = c.ClientID
saramaConfig.RackID = c.RackID
if len(c.ProtocolVersion) > 0 {
ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion)
if err != nil {
Expand Down

0 comments on commit bc6b060

Please sign in to comment.