Skip to content

Commit

Permalink
Fix fallback logic in formula-based evaluation (#5684)
Browse files Browse the repository at this point in the history
* Fix fallback logic

Signed-off-by: SpiritZhou <[email protected]>

* Update

Signed-off-by: SpiritZhou <[email protected]>

* Fix changelog

Signed-off-by: SpiritZhou <[email protected]>

* Update

Signed-off-by: SpiritZhou <[email protected]>

* Update

Signed-off-by: SpiritZhou <[email protected]>

* Update

Signed-off-by: SpiritZhou <[email protected]>

* Update

Signed-off-by: SpiritZhou <[email protected]>

---------

Signed-off-by: SpiritZhou <[email protected]>
  • Loading branch information
SpiritZhou authored Apr 24, 2024
1 parent f85c510 commit e41294d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Here is an overview of all new **experimental** features:
### Fixes

- **General**: Fix CVE-2024-28180 in github.com/go-jose/go-jose/v3 ([#5617](https://github.com/kedacore/keda/pull/5617))
- **General**: Fix fallback logic in formula-based evaluation ([#5666](https://github.com/kedacore/keda/issues/5666))
- **General**: Fix wrong scaler active value and paused value that are pushed to OpenTelemetry ([#5705](https://github.com/kedacore/keda/issues/5705))
- **General**: Log field `ScaledJob` no longer have conflicting types ([#5592](https://github.com/kedacore/keda/pull/5592))
- **General**: Prometheus metrics shows errors correctly ([#5597](https://github.com/kedacore/keda/issues/5597)|[#5663](https://github.com/kedacore/keda/issues/5663))
Expand Down
22 changes: 19 additions & 3 deletions pkg/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package fallback

import (
"context"
"strconv"

v2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -57,6 +58,7 @@ func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, me
status.Health[metricName] = *healthStatus

updateStatus(ctx, client, scaledObject, status, metricSpec)

return metrics, false, nil
}

Expand Down Expand Up @@ -94,16 +96,30 @@ func fallbackExistsInScaledObject(scaledObject *kedav1alpha1.ScaledObject, metri
}

func validateFallback(scaledObject *kedav1alpha1.ScaledObject) bool {
modifierChecking := true
if scaledObject.IsUsingModifiers() {
value, err := strconv.ParseInt(scaledObject.Spec.Advanced.ScalingModifiers.Target, 10, 64)
modifierChecking = err == nil && value > 0
}
return scaledObject.Spec.Fallback.FailureThreshold >= 0 &&
scaledObject.Spec.Fallback.Replicas >= 0
scaledObject.Spec.Fallback.Replicas >= 0 &&
modifierChecking
}

func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec, metricName string, suppressedError error) []external_metrics.ExternalMetricValue {
replicas := int64(scaledObject.Spec.Fallback.Replicas)
normalisationValue := metricSpec.External.Target.AverageValue.AsApproximateFloat64()
var normalisationValue int64
if !scaledObject.IsUsingModifiers() {
normalisationValue = int64(metricSpec.External.Target.AverageValue.AsApproximateFloat64())
} else {
value, _ := strconv.ParseInt(scaledObject.Spec.Advanced.ScalingModifiers.Target, 10, 64)
normalisationValue = value
metricName = kedav1alpha1.CompositeMetricName
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewMilliQuantity(int64(normalisationValue*1000)*replicas, resource.DecimalSI),
Value: *resource.NewMilliQuantity(normalisationValue*1000*replicas, resource.DecimalSI),
Timestamp: metav1.Now(),
}
fallbackMetrics := []external_metrics.ExternalMetricValue{metric}
Expand Down
14 changes: 12 additions & 2 deletions pkg/scaling/modifiers/formula.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ import (
// HandleScalingModifiers is the parent function for scalingModifiers structure.
// If the structure is defined and conditions are met, apply the formula to
// manipulate the metrics and return them
func HandleScalingModifiers(so *kedav1alpha1.ScaledObject, metrics []external_metrics.ExternalMetricValue, metricTriggerList map[string]string, fallbackActive bool, cacheObj *cache.ScalersCache, log logr.Logger) []external_metrics.ExternalMetricValue {
func HandleScalingModifiers(so *kedav1alpha1.ScaledObject, metrics []external_metrics.ExternalMetricValue, metricTriggerList map[string]string, fallbackActive bool, fallbackMetrics []external_metrics.ExternalMetricValue, cacheObj *cache.ScalersCache, log logr.Logger) []external_metrics.ExternalMetricValue {
var err error
if so == nil || !so.IsUsingModifiers() {
return metrics
}
// dont manipulate with metrics if fallback is currently active or structure isnt defined
if !fallbackActive && so != nil && so.IsUsingModifiers() {
if !fallbackActive {
sm := so.Spec.Advanced.ScalingModifiers

// apply formula if defined
Expand All @@ -53,6 +56,13 @@ func HandleScalingModifiers(so *kedav1alpha1.ScaledObject, metrics []external_me
log.Error(err, "error applying custom scalingModifiers.Formula")
}
log.V(1).Info("returned metrics after formula is applied", "metrics", metrics)
} else if len(fallbackMetrics) > 0 {
metrics = []external_metrics.ExternalMetricValue{
{
MetricName: kedav1alpha1.CompositeMetricName,
Value: fallbackMetrics[0].Value,
Timestamp: fallbackMetrics[0].Timestamp,
}}
}
return metrics
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject int
func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricsName string) (*external_metrics.ExternalMetricValueList, error) {
logger := log.WithValues("scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName)
var matchingMetrics []external_metrics.ExternalMetricValue
var fallbackMetrics []external_metrics.ExternalMetricValue

cache, err := h.getScalersCacheForScaledObject(ctx, scaledObjectName, scaledObjectNamespace)
metricscollector.RecordScaledObjectError(scaledObjectNamespace, scaledObjectName, err)
Expand Down Expand Up @@ -561,6 +562,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
}
if fallbackActive {
isFallbackActive = true
fallbackMetrics = append(fallbackMetrics, metrics...)
}
metricscollector.RecordScalerError(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, result.metricName, true, err)
matchingMetrics = append(matchingMetrics, metrics...)
Expand All @@ -575,12 +577,17 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
logger.V(1).Info("scaler error encountered, clearing scaler cache")
}

// This case happens in failed times under failureThreshold. Report error to HPA directly.
if !isFallbackActive && isScalerError {
return nil, fmt.Errorf("metric:%s encountered error", metricsName)
}

if len(matchingMetrics) == 0 {
return nil, fmt.Errorf("no matching metrics found for " + metricsName)
}

// handle scalingModifiers here and simply return the matchingMetrics
matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, isFallbackActive, cache, logger)
matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, isFallbackActive, fallbackMetrics, cache, logger)
return &external_metrics.ExternalMetricValueList{
Items: matchingMetrics,
}, nil
Expand Down Expand Up @@ -662,7 +669,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
}

// apply scaling modifiers
matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, false, cache, logger)
matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, false, nil, cache, logger)

// when we are using formula, we need to reevaluate if it's active here
if scaledObject.IsUsingModifiers() {
Expand Down
3 changes: 3 additions & 0 deletions tests/internals/scaling_modifiers/scaling_modifiers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package scaling_modifiers_test
import (
"fmt"
"testing"
"time"

"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -312,6 +313,8 @@ func testFormula(t *testing.T, kc *kubernetes.Clientset, data templateData) {

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, namespace, 5, 12, 10),
"replica count should be %d after 2 minutes", 5)
time.Sleep(45 * time.Second) // waiting for passing failureThreshold
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, namespace, 5, 60)

// ensure state returns to normal after error resolved and triggers are healthy
_, err = ExecuteCommand(fmt.Sprintf("kubectl scale deployment/%s --replicas=1 -n %s", metricsServerDeploymentName, namespace))
Expand Down

0 comments on commit e41294d

Please sign in to comment.