From 31c38fa62a3a96320310b788029e63649ae00f9d Mon Sep 17 00:00:00 2001 From: Dorn- Date: Mon, 14 Jun 2021 10:40:00 +0200 Subject: [PATCH] feat: adding version metadata to be able to specify kafka broker version (#1866) Signed-off-by: Flavien Chantelot Signed-off-by: nilayasiktoprak --- pkg/scalers/kafka_scaler.go | 13 ++++++++++++- pkg/scalers/kafka_scaler_test.go | 4 ++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 5772341bb2f..14ea20f6dfd 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -31,6 +31,7 @@ type kafkaMetadata struct { lagThreshold int64 offsetResetPolicy offsetResetPolicy allowIdleConsumers bool + version sarama.KafkaVersion // SASL saslType kafkaSaslType @@ -191,6 +192,16 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) { meta.allowIdleConsumers = t } + meta.version = sarama.V1_0_0_0 + if val, ok := config.TriggerMetadata["version"]; ok { + val = strings.TrimSpace(val) + version, err := sarama.ParseKafkaVersion(val) + if err != nil { + return meta, fmt.Errorf("error parsing kafka version: %s", err) + } + meta.version = version + } + return meta, nil } @@ -224,7 +235,7 @@ func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) { func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin, error) { config := sarama.NewConfig() - config.Version = sarama.V1_0_0_0 + config.Version = metadata.version if metadata.saslType != KafkaSASLTypeNone { config.Net.SASL.Enable = true diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 71eeb0047cd..54cb7dc167a 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -52,6 +52,8 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest", false}, // failure, no topic {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false}, + // failure, version not supported + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // success {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // success, more brokers @@ -66,6 +68,8 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false}, // success, allowIdleConsumers is true {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), true}, + // success, version supported + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), true}, } var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{