Skip to content

Commit

Permalink
Apache Kafka Scaler: Implementation for Excluding Persistent Lag (#3965)
Browse files Browse the repository at this point in the history
Signed-off-by: JosephABC <[email protected]>
  • Loading branch information
josephangbc authored Dec 8, 2022
1 parent a47abc9 commit 98d40ab
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General:** Improve the function used to normalize metric names ([#3789](https://github.com/kedacore/keda/issues/3789)
- **Apache Kafka Scaler:** SASL/OAuthbearer Implementation ([#3681](https://github.com/kedacore/keda/issues/3681))
- **Apache Kafka Scaler:** Limit Kafka Partitions KEDA operates on ([#3830](https://github.com/kedacore/keda/issues/3830))
- **Apache Kafka Scaler:** Implementation for Excluding Persistent Lag ([#3904](https://github.com/kedacore/keda/issues/3904))
- **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610))
- **Azure Event Hub Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569))
- **Azure Event Hub Scaler:** Support using connection strings for Event Hub namespace instead of the Event Hub itself. ([#3922](https://github.com/kedacore/keda/issues/3922))
Expand Down
94 changes: 70 additions & 24 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
)

type kafkaScaler struct {
metricType v2.MetricTargetType
metadata kafkaMetadata
client sarama.Client
admin sarama.ClusterAdmin
logger logr.Logger
metricType v2.MetricTargetType
metadata kafkaMetadata
client sarama.Client
admin sarama.ClusterAdmin
logger logr.Logger
previousOffsets map[string]map[int32]int64
}

type kafkaMetadata struct {
Expand All @@ -33,6 +34,7 @@ type kafkaMetadata struct {
activationLagThreshold int64
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
excludePersistentLag bool
version sarama.KafkaVersion

// If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can
Expand Down Expand Up @@ -105,12 +107,15 @@ func NewKafkaScaler(config *ScalerConfig) (Scaler, error) {
return nil, err
}

previousOffsets := make(map[string]map[int32]int64)

return &kafkaScaler{
client: client,
admin: admin,
metricType: metricType,
metadata: kafkaMetadata,
logger: logger,
client: client,
admin: admin,
metricType: metricType,
metadata: kafkaMetadata,
logger: logger,
previousOffsets: previousOffsets,
}, nil
}

Expand Down Expand Up @@ -270,6 +275,15 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata
meta.allowIdleConsumers = t
}

meta.excludePersistentLag = false
if val, ok := config.TriggerMetadata["excludePersistentLag"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing excludePersistentLag: %s", err)
}
meta.excludePersistentLag = t
}

meta.scaleToZeroOnInvalidOffset = false
if val, ok := config.TriggerMetadata["scaleToZeroOnInvalidOffset"]; ok {
t, err := strconv.ParseBool(val)
Expand All @@ -293,13 +307,14 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata
}

// IsActive determines if we need to scale from zero
// When replicas is zero, all lag will be deemed as persistent, hence use totalLagWithPersistent to determine scaling.
func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) {
totalLag, err := s.getTotalLag()
_, totalLagWithPersistent, err := s.getTotalLag()
if err != nil {
return false, err
}

return totalLag > s.metadata.activationLagThreshold, nil
return totalLagWithPersistent > s.metadata.activationLagThreshold, nil
}

func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin, error) {
Expand Down Expand Up @@ -433,12 +448,16 @@ func (s *kafkaScaler) getConsumerOffsets(topicPartitions map[string][]int32) (*s
return offsets, nil
}

func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, error) {
// getLagForPartition returns (lag, lagWithPersistent, error)
// When excludePersistentLag is set to `false` (default), lag will always be equal to lagWithPersistent
// When excludePersistentLag is set to `true`, if partition is deemed to have persistent lag, lag will be set to 0 and lagWithPersistent will be latestOffset - consumerOffset
// These return values will allow proper scaling from 0 -> 1 replicas by the IsActive func.
func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offsets *sarama.OffsetFetchResponse, topicPartitionOffsets map[string]map[int32]int64) (int64, int64, error) {
block := offsets.GetBlock(topic, partitionID)
if block == nil {
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d", topic, partitionID)
s.logger.Error(errMsg, "")
return 0, errMsg
return 0, 0, errMsg
}
if block.Err > 0 {
errMsg := fmt.Errorf("error finding offset block for topic %s and partition %d: %s", topic, partitionID, offsets.Err.Error())
Expand All @@ -455,17 +474,39 @@ func (s *kafkaScaler) getLagForPartition(topic string, partitionID int32, offset
"invalid offset found for topic %s in group %s and partition %d, probably no offset is committed yet. Returning with lag of %d",
topic, s.metadata.group, partitionID, retVal)
s.logger.V(1).Info(msg)
return retVal, nil
return retVal, retVal, nil
}

if _, found := topicPartitionOffsets[topic]; !found {
return 0, fmt.Errorf("error finding partition offset for topic %s", topic)
return 0, 0, fmt.Errorf("error finding partition offset for topic %s", topic)
}
latestOffset := topicPartitionOffsets[topic][partitionID]
if consumerOffset == invalidOffset && s.metadata.offsetResetPolicy == earliest {
return latestOffset, nil
return latestOffset, latestOffset, nil
}
return latestOffset - consumerOffset, nil

// This code block tries to prevent KEDA Kafka trigger from scaling the scale target based on erroneous events
if s.metadata.excludePersistentLag {
switch previousOffset, found := s.previousOffsets[topic][partitionID]; {
case !found:
// No record of previous offset, so store current consumer offset
// Allow this consumer lag to be considered in scaling
if _, topicFound := s.previousOffsets[topic]; !topicFound {
s.previousOffsets[topic] = map[int32]int64{partitionID: consumerOffset}
} else {
s.previousOffsets[topic][partitionID] = consumerOffset
}
case previousOffset == consumerOffset:
// Indicates consumer is still on the same offset as the previous polling cycle, there may be some issue with consuming this offset.
// return 0, so this consumer lag is not considered for scaling
return 0, latestOffset - consumerOffset, nil
default:
// Successfully Consumed some messages, proceed to change the previous offset
s.previousOffsets[topic][partitionID] = consumerOffset
}
}

return latestOffset - consumerOffset, latestOffset - consumerOffset, nil
}

// Close closes the kafka admin and client
Expand Down Expand Up @@ -535,7 +576,7 @@ func (s *kafkaScaler) getConsumerAndProducerOffsets(topicPartitions map[string][

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) {
totalLag, err := s.getTotalLag()
totalLag, _, err := s.getTotalLag()
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}
Expand All @@ -544,24 +585,29 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string) ([]exte
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *kafkaScaler) getTotalLag() (int64, error) {
// getTotalLag returns totalLag, totalLagWithPersistent, error
// totalLag and totalLagWithPersistent are the summations of lag and lagWithPersistent returned by getLagForPartition function respectively.
// totalLag maybe less than totalLagWithPersistent when excludePersistentLag is set to `true` due to some partitions deemed as having persistent lag
func (s *kafkaScaler) getTotalLag() (int64, int64, error) {
topicPartitions, err := s.getTopicPartitions()
if err != nil {
return 0, err
return 0, 0, err
}

consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(topicPartitions)
if err != nil {
return 0, err
return 0, 0, err
}

totalLag := int64(0)
totalLagWithPersistent := int64(0)
totalTopicPartitions := int64(0)

for topic, partitionsOffsets := range producerOffsets {
for partition := range partitionsOffsets {
lag, _ := s.getLagForPartition(topic, partition, consumerOffsets, producerOffsets)
lag, lagWithPersistent, _ := s.getLagForPartition(topic, partition, consumerOffsets, producerOffsets)
totalLag += lag
totalLagWithPersistent += lagWithPersistent
}
totalTopicPartitions += (int64)(len(partitionsOffsets))
}
Expand All @@ -573,7 +619,7 @@ func (s *kafkaScaler) getTotalLag() (int64, error) {
totalLag = totalTopicPartitions * s.metadata.lagThreshold
}
}
return totalLag, nil
return totalLag, totalLagWithPersistent, nil
}

type brokerOffsetResult struct {
Expand Down
Loading

0 comments on commit 98d40ab

Please sign in to comment.