Skip to content

Commit

Permalink
Kafka: provide new option to enable scaling past partition count (#1684)
Browse files Browse the repository at this point in the history
* provide new option to enable scaling past partition count

Signed-off-by: Lionel Villard <[email protected]>

* add additional test

Signed-off-by: Lionel Villard <[email protected]>
  • Loading branch information
lionelvillard authored Mar 26, 2021
1 parent dfdcc5e commit 6833a81
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
- Fix memory leak of `keda-metrics-apiserver` by setting a controller-runtime logger properly ([#1654](https://github.com/kedacore/keda/pull/1654))
- AWS SQS Scaler: Add Visible + NotVisible messages for scaling considerations ([#1664](https://github.com/kedacore/keda/pull/1664))
- Fixing behavior on ScaledJob with incorrect External Scaler ([#1672](https://github.com/kedacore/keda/pull/1672))
- Apache Kafka Scaler: Add `allowIdleConsumers` to the list of trigger parameters ([#1684](https://github.com/kedacore/keda/pull/1684))

### Breaking Changes

Expand Down
28 changes: 20 additions & 8 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ type kafkaScaler struct {
}

type kafkaMetadata struct {
bootstrapServers []string
group string
topic string
lagThreshold int64
offsetResetPolicy offsetResetPolicy
bootstrapServers []string
group string
topic string
lagThreshold int64
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool

// SASL
saslType kafkaSaslType
Expand Down Expand Up @@ -181,6 +182,15 @@ func parseKafkaMetadata(config *ScalerConfig) (kafkaMetadata, error) {
}
}

meta.allowIdleConsumers = false
if val, ok := config.TriggerMetadata["allowIdleConsumers"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing allowIdleConsumers: %s", err)
}
meta.allowIdleConsumers = t
}

return meta, nil
}

Expand Down Expand Up @@ -360,9 +370,11 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS

kafkaLog.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, partitions %v, threshold %v", totalLag, len(partitions), s.metadata.lagThreshold))

// don't scale out beyond the number of partitions
if (totalLag / s.metadata.lagThreshold) > int64(len(partitions)) {
totalLag = int64(len(partitions)) * s.metadata.lagThreshold
if !s.metadata.allowIdleConsumers {
// don't scale out beyond the number of partitions
if (totalLag / s.metadata.lagThreshold) > int64(len(partitions)) {
totalLag = int64(len(partitions)) * s.metadata.lagThreshold
}
}

metric := external_metrics.ExternalMetricValue{
Expand Down
45 changes: 27 additions & 18 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
)

type parseKafkaMetadataTestData struct {
metadata map[string]string
isError bool
numBrokers int
brokers []string
group string
topic string
offsetResetPolicy offsetResetPolicy
metadata map[string]string
isError bool
numBrokers int
brokers []string
group string
topic string
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
}

type parseKafkaAuthParamsTestData struct {
Expand All @@ -28,9 +29,10 @@ type kafkaMetricIdentifier struct {

// A complete valid metadata example for reference
var validKafkaMetadata = map[string]string{
"bootstrapServers": "broker1:9092,broker2:9092",
"consumerGroup": "my-group",
"topic": "my-topic",
"bootstrapServers": "broker1:9092,broker2:9092",
"consumerGroup": "my-group",
"topic": "my-topic",
"allowIdleConsumers": "false",
}

// A complete valid authParams example for sasl, with username and passwd
Expand All @@ -45,21 +47,25 @@ var validWithoutAuthParams = map[string]string{}

var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
// failure, no bootstrapServers
{map[string]string{}, true, 0, nil, "", "", ""},
{map[string]string{}, true, 0, nil, "", "", "", false},
// failure, no consumer group
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest"},
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", "latest", false},
// failure, no topic
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest")},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, true, 1, []string{"foobar:9092"}, "my-group", "", offsetResetPolicy("latest"), false},
// success
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success, more brokers
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success, offsetResetPolicy policy latest
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest")},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// failure, offsetResetPolicy policy wrong
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", ""},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", "", false},
// success, offsetResetPolicy policy earliest
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("earliest")},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", offsetResetPolicy("earliest"), false},
// failure, allowIdleConsumers malformed
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success, allowIdleConsumers is true
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), true},
}

var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
Expand Down Expand Up @@ -156,6 +162,9 @@ func TestGetBrokers(t *testing.T) {
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
}
if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers {
t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers)
}
}
}

Expand Down

0 comments on commit 6833a81

Please sign in to comment.