Skip to content

Commit

Permalink
feat: Introduce activationThreshold/minMetricValue for Kafka Scaler (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorge Turrado Ferrero authored Aug 4, 2022
1 parent 9b8fe76 commit 4a4ae5f
Show file tree
Hide file tree
Showing 4 changed files with 528 additions and 436 deletions.
82 changes: 43 additions & 39 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ type kafkaScaler struct {
}

type kafkaMetadata struct {
bootstrapServers []string
group string
topic string
lagThreshold int64
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
version sarama.KafkaVersion
bootstrapServers []string
group string
topic string
lagThreshold int64
activationLagThreshold int64
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
version sarama.KafkaVersion

// If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can
// occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612
Expand Down Expand Up @@ -71,11 +72,13 @@ const (
)

const (
lagThresholdMetricName = "lagThreshold"
kafkaMetricType = "External"
defaultKafkaLagThreshold = 10
defaultOffsetResetPolicy = latest
invalidOffset = -1
lagThresholdMetricName = "lagThreshold"
activationLagThresholdMetricName = "activationLagThreshold"
kafkaMetricType = "External"
defaultKafkaLagThreshold = 10
defaultKafkaActivationLagThreshold = 0
defaultOffsetResetPolicy = latest
invalidOffset = -1
)

// NewKafkaScaler creates a new kafkaScaler
Expand Down Expand Up @@ -212,6 +215,19 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata
meta.lagThreshold = t
}

meta.activationLagThreshold = defaultKafkaActivationLagThreshold

if val, ok := config.TriggerMetadata[activationLagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %q: %s", activationLagThresholdMetricName, err)
}
if t <= 0 {
return meta, fmt.Errorf("%q must be positive number", activationLagThresholdMetricName)
}
meta.activationLagThreshold = t
}

if err := parseKafkaAuthParams(config, &meta); err != nil {
return meta, err
}
Expand Down Expand Up @@ -249,31 +265,12 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata

// IsActive determines if we need to scale from zero
func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) {
topicPartitions, err := s.getTopicPartitions()
totalLag, err := s.getTotalLag()
if err != nil {
return false, err
}

consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(topicPartitions)
if err != nil {
return false, err
}

for topic, partitionsOffsets := range producerOffsets {
for partitionID := range partitionsOffsets {
lag, err := s.getLagForPartition(topic, partitionID, consumerOffsets, producerOffsets)
if err != nil && lag == invalidOffset {
return true, nil
}
s.logger.V(1).Info(fmt.Sprintf("Group %s has a lag of %d for topic %s and partition %d\n", s.metadata.group, lag, topic, partitionID))

// Return as soon as a lag was detected for any partitionID
if lag > 0 {
return true, nil
}
}
}
return false, nil
return totalLag > s.metadata.activationLagThreshold, nil
}

func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin, error) {
Expand Down Expand Up @@ -486,14 +483,24 @@ func (s *kafkaScaler) getConsumerAndProducerOffsets(topicPartitions map[string][

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
topicPartitions, err := s.getTopicPartitions()
totalLag, err := s.getTotalLag()
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}
metric := GenerateMetricInMili(metricName, float64(totalLag))

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *kafkaScaler) getTotalLag() (int64, error) {
topicPartitions, err := s.getTopicPartitions()
if err != nil {
return 0, err
}

consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(topicPartitions)
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
return 0, err
}

totalLag := int64(0)
Expand All @@ -514,10 +521,7 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS
totalLag = totalTopicPartitions * s.metadata.lagThreshold
}
}

metric := GenerateMetricInMili(metricName, float64(totalLag))

return append([]external_metrics.ExternalMetricValue{}, metric), nil
return totalLag, nil
}

type brokerOffsetResult struct {
Expand Down
6 changes: 4 additions & 2 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// failure, lagThreshold is 0
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// failure, activationLagThreshold is not int
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", offsetResetPolicy("latest"), false},
// success, more brokers
Expand Down Expand Up @@ -126,8 +128,8 @@ var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{
}

var kafkaMetricIdentifiers = []kafkaMetricIdentifier{
{&parseKafkaMetadataTestDataset[6], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[6], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[7], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[7], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"},
}

Expand Down
Loading

0 comments on commit 4a4ae5f

Please sign in to comment.