Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fallback logic in formula-based evaluation #5684

Merged
merged 10 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Here is an overview of all new **experimental** features:
### Fixes

- **General**: Fix CVE-2024-28180 in github.com/go-jose/go-jose/v3 ([#5617](https://github.com/kedacore/keda/pull/5617))
- **General**: Fix fallback logic in formula-based evaluation ([#5666](https://github.com/kedacore/keda/issues/5666))
- **General**: Fix wrong scaler active value and paused value that are pushed to OpenTelemetry ([#5705](https://github.com/kedacore/keda/issues/5705))
- **General**: Log field `ScaledJob` no longer have conflicting types ([#5592](https://github.com/kedacore/keda/pull/5592))
- **General**: Prometheus metrics shows errors correctly ([#5597](https://github.com/kedacore/keda/issues/5597)|[#5663](https://github.com/kedacore/keda/issues/5663))
Expand Down
22 changes: 19 additions & 3 deletions pkg/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package fallback

import (
"context"
"strconv"

v2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -57,6 +58,7 @@ func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, me
status.Health[metricName] = *healthStatus

updateStatus(ctx, client, scaledObject, status, metricSpec)

return metrics, false, nil
}

Expand Down Expand Up @@ -94,16 +96,30 @@ func fallbackExistsInScaledObject(scaledObject *kedav1alpha1.ScaledObject, metri
}

func validateFallback(scaledObject *kedav1alpha1.ScaledObject) bool {
modifierChecking := true
if scaledObject.IsUsingModifiers() {
value, err := strconv.ParseInt(scaledObject.Spec.Advanced.ScalingModifiers.Target, 10, 64)
modifierChecking = err == nil && value > 0
}
return scaledObject.Spec.Fallback.FailureThreshold >= 0 &&
scaledObject.Spec.Fallback.Replicas >= 0
scaledObject.Spec.Fallback.Replicas >= 0 &&
modifierChecking
}

func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec, metricName string, suppressedError error) []external_metrics.ExternalMetricValue {
replicas := int64(scaledObject.Spec.Fallback.Replicas)
normalisationValue := metricSpec.External.Target.AverageValue.AsApproximateFloat64()
var normalisationValue int64
if !scaledObject.IsUsingModifiers() {
normalisationValue = int64(metricSpec.External.Target.AverageValue.AsApproximateFloat64())
} else {
value, _ := strconv.ParseInt(scaledObject.Spec.Advanced.ScalingModifiers.Target, 10, 64)
normalisationValue = value
metricName = kedav1alpha1.CompositeMetricName
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewMilliQuantity(int64(normalisationValue*1000)*replicas, resource.DecimalSI),
Value: *resource.NewMilliQuantity(normalisationValue*1000*replicas, resource.DecimalSI),
Timestamp: metav1.Now(),
}
fallbackMetrics := []external_metrics.ExternalMetricValue{metric}
Expand Down
14 changes: 12 additions & 2 deletions pkg/scaling/modifiers/formula.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ import (
// HandleScalingModifiers is the parent function for scalingModifiers structure.
// If the structure is defined and conditions are met, apply the formula to
// manipulate the metrics and return them
func HandleScalingModifiers(so *kedav1alpha1.ScaledObject, metrics []external_metrics.ExternalMetricValue, metricTriggerList map[string]string, fallbackActive bool, cacheObj *cache.ScalersCache, log logr.Logger) []external_metrics.ExternalMetricValue {
func HandleScalingModifiers(so *kedav1alpha1.ScaledObject, metrics []external_metrics.ExternalMetricValue, metricTriggerList map[string]string, fallbackActive bool, fallbackMetrics []external_metrics.ExternalMetricValue, cacheObj *cache.ScalersCache, log logr.Logger) []external_metrics.ExternalMetricValue {
var err error
if so == nil || !so.IsUsingModifiers() {
return metrics
}
// dont manipulate with metrics if fallback is currently active or structure isnt defined
if !fallbackActive && so != nil && so.IsUsingModifiers() {
if !fallbackActive {
sm := so.Spec.Advanced.ScalingModifiers

// apply formula if defined
Expand All @@ -53,6 +56,13 @@ func HandleScalingModifiers(so *kedav1alpha1.ScaledObject, metrics []external_me
log.Error(err, "error applying custom scalingModifiers.Formula")
}
log.V(1).Info("returned metrics after formula is applied", "metrics", metrics)
} else if len(fallbackMetrics) > 0 {
metrics = []external_metrics.ExternalMetricValue{
{
MetricName: kedav1alpha1.CompositeMetricName,
Value: fallbackMetrics[0].Value,
Timestamp: fallbackMetrics[0].Timestamp,
}}
}
return metrics
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject int
func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricsName string) (*external_metrics.ExternalMetricValueList, error) {
logger := log.WithValues("scaledObject.Namespace", scaledObjectNamespace, "scaledObject.Name", scaledObjectName)
var matchingMetrics []external_metrics.ExternalMetricValue
var fallbackMetrics []external_metrics.ExternalMetricValue

cache, err := h.getScalersCacheForScaledObject(ctx, scaledObjectName, scaledObjectNamespace)
metricscollector.RecordScaledObjectError(scaledObjectNamespace, scaledObjectName, err)
Expand Down Expand Up @@ -561,6 +562,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
}
if fallbackActive {
isFallbackActive = true
fallbackMetrics = append(fallbackMetrics, metrics...)
}
metricscollector.RecordScalerError(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, result.metricName, true, err)
matchingMetrics = append(matchingMetrics, metrics...)
Expand All @@ -575,12 +577,17 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN
logger.V(1).Info("scaler error encountered, clearing scaler cache")
}

// This case happens in failed times under failureThreshold. Report error to HPA directly.
if !isFallbackActive && isScalerError {
return nil, fmt.Errorf("metric:%s encountered error", metricsName)
}

if len(matchingMetrics) == 0 {
return nil, fmt.Errorf("no matching metrics found for " + metricsName)
}

// handle scalingModifiers here and simply return the matchingMetrics
matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, isFallbackActive, cache, logger)
matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, isFallbackActive, fallbackMetrics, cache, logger)
return &external_metrics.ExternalMetricValueList{
Items: matchingMetrics,
}, nil
Expand Down Expand Up @@ -662,7 +669,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
}

// apply scaling modifiers
matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, false, cache, logger)
matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, false, nil, cache, logger)
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved

// when we are using formula, we need to reevaluate if it's active here
if scaledObject.IsUsingModifiers() {
Expand Down
3 changes: 3 additions & 0 deletions tests/internals/scaling_modifiers/scaling_modifiers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package scaling_modifiers_test
import (
"fmt"
"testing"
"time"

"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -312,6 +313,8 @@ func testFormula(t *testing.T, kc *kubernetes.Clientset, data templateData) {

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, namespace, 5, 12, 10),
"replica count should be %d after 2 minutes", 5)
time.Sleep(45 * time.Second) // waiting for passing failureThreshold
AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, namespace, 5, 60)

// ensure state returns to normal after error resolved and triggers are healthy
_, err = ExecuteCommand(fmt.Sprintf("kubectl scale deployment/%s --replicas=1 -n %s", metricsServerDeploymentName, namespace))
Expand Down
Loading