From 3bc0a3e2db029d7a7c96c1353f62a7d2958e4a74 Mon Sep 17 00:00:00 2001 From: shaimoria Date: Tue, 16 Nov 2021 09:45:47 +0200 Subject: [PATCH 1/2] Expose rackID option in samara client Support for KIP-392 in Sarama consumer client This will allow ingester consumers to fetch from the closest replica. On branch add-kafka-consumer-client-rack Changes to be committed: modified: cmd/ingester/app/flags.go modified: cmd/ingester/app/flags_test.go modified: pkg/kafka/consumer/config.go Signed-off-by: Shai Moria Signed-off-by: shaimoria --- cmd/ingester/app/flags.go | 7 +++++++ cmd/ingester/app/flags_test.go | 2 ++ pkg/kafka/consumer/config.go | 2 ++ 3 files changed, 11 insertions(+) diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 55127374239..6775dd56439 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -37,6 +37,8 @@ const ( SuffixBrokers = ".brokers" // SuffixTopic is a suffix for the topic flag SuffixTopic = ".topic" + // SuffixRackID is a suffix for the consumer rack-id flag + SuffixRackID = ".rack-id" // SuffixGroupID is a suffix for the group-id flag SuffixGroupID = ".group-id" // SuffixClientID is a suffix for the client-id flag @@ -111,6 +113,10 @@ func AddFlags(flagSet *flag.FlagSet) { KafkaConsumerConfigPrefix+SuffixEncoding, DefaultEncoding, fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \""))) + flagSet.String( + KafkaConsumerConfigPrefix+SuffixRackID, + "", + "Configure the 'rack' in which the consumer resides to enable - must be supported by kafka server") auth.AddFlags(KafkaConsumerConfigPrefix, flagSet) } @@ -123,6 +129,7 @@ func (o *Options) InitFromViper(v *viper.Viper) { o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID) o.ProtocolVersion = v.GetString(KafkaConsumerConfigPrefix + SuffixProtocolVersion) o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding) + o.RackID = v.GetString(KafkaConsumerConfigPrefix + SuffixRackID) o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval) diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go index 5659dbc4bc4..ee1693944c1 100644 --- a/cmd/ingester/app/flags_test.go +++ b/cmd/ingester/app/flags_test.go @@ -36,6 +36,7 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.consumer.brokers=127.0.0.1:9092, 0.0.0:1234", "--kafka.consumer.group-id=group1", "--kafka.consumer.client-id=client-id1", + "--kafka.consumer.rack-id=rack1", "--kafka.consumer.encoding=json", "--kafka.consumer.protocol-version=1.0.0", "--ingester.parallelism=5", @@ -46,6 +47,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "topic1", o.Topic) assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers) assert.Equal(t, "group1", o.GroupID) + assert.Equal(t, "rack1", o.RackID) assert.Equal(t, "client-id1", o.ClientID) assert.Equal(t, "1.0.0", o.ProtocolVersion) assert.Equal(t, 5, o.Parallelism) diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index 129f10842ab..243c120faeb 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -47,6 +47,7 @@ type Configuration struct { GroupID string `mapstructure:"group_id"` ClientID string `mapstructure:"client_id"` ProtocolVersion string `mapstructure:"protocol_version"` + RackID string `mapstructure:"rack_id"` } // NewConsumer creates a new kafka consumer @@ -54,6 +55,7 @@ func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) { saramaConfig := cluster.NewConfig() saramaConfig.Group.Mode = cluster.ConsumerModePartitions saramaConfig.ClientID = c.ClientID + saramaConfig.RackID = c.RackID if len(c.ProtocolVersion) > 0 { ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) if err != nil { From 824ca28f130773d4871cf0247ea1f9be90cd4d28 Mon Sep 17 00:00:00 2001 From: shaimoria Date: Thu, 18 Nov 2021 14:12:34 +0200 Subject: [PATCH 2/2] Signed-off-by: shaimoria shai.moria@zooz.com set better description for 'rack-id' cli flag Changes to be committed: modified: cmd/ingester/app/flags.go Signed-off-by: shaimoria --- cmd/ingester/app/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 6775dd56439..97d99be8eec 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -116,7 +116,7 @@ func AddFlags(flagSet *flag.FlagSet) { flagSet.String( KafkaConsumerConfigPrefix+SuffixRackID, "", - "Configure the 'rack' in which the consumer resides to enable - must be supported by kafka server") + "Rack identifier for this client. This can be any string value which indicates where this client is located. It corresponds with the broker config `broker.rack`") auth.AddFlags(KafkaConsumerConfigPrefix, flagSet) }