Skip to content

Commit

Permalink
ScaledJob: introduce MultipleScalersCalculation (kedacore#2016)
Browse files Browse the repository at this point in the history
* Fix bug of ScaledJob multiple triggers

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* fix pre-commit issue

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Update logs on scale handler and add crds

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* update change log

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Add test case for exceeding MaxReplicaCount in sum

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* fix pre-commit

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Update pkg/scaling/scale_handler.go

Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Update pkg/scaling/scale_handler.go

Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: Tsuyoshi Ushio <[email protected]>

* revert version and refactor scaling logic

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Change option name from MultipleScalersOption to MultipleScalersCalculation

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* update changelog for multiple triggers section

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Error Handling

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Refactor logging

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* remove scaledjob prometheus

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Update pkg/scaling/scaledjob/scale_metrics.go

Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Update pkg/scaling/scaledjob/scale_metrics.go

Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Update pkg/scaling/scaledjob/scale_metrics.go

Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Update pkg/scaling/scaledjob/scale_metrics_test.go

Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Update pkg/scaling/scaledjob/scale_metrics.go

Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Update CHANGELOG.md

Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: Tsuyoshi Ushio <[email protected]>

* Move section to new section

Signed-off-by: Tsuyoshi Ushio <[email protected]>

* fix pre-commit white space issue

Signed-off-by: Tsuyoshi Ushio <[email protected]>

Co-authored-by: Zbynek Roubalik <[email protected]>
Signed-off-by: nilayasiktoprak <[email protected]>
  • Loading branch information
2 people authored and nilayasiktoprak committed Oct 23, 2021
1 parent 80d139e commit 588b950
Show file tree
Hide file tree
Showing 7 changed files with 397 additions and 125 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
### New

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))

### Improvements

Expand Down
2 changes: 2 additions & 0 deletions apis/keda/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type ScalingStrategy struct {
CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"`
// +optional
PendingPodConditions []string `json:"pendingPodConditions,omitempty"`
// +optional
MultipleScalersCalculation string `json:"multipleScalersCalculation,omitempty"`
}

func init() {
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7362,6 +7362,8 @@ spec:
type: integer
customScalingRunningJobPercentage:
type: string
multipleScalersCalculation:
type: string
pendingPodConditions:
items:
type: string
Expand Down
96 changes: 2 additions & 94 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/go-logr/logr"
"k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -37,6 +36,7 @@ import (
"github.com/kedacore/keda/v2/pkg/scalers"
"github.com/kedacore/keda/v2/pkg/scaling/executor"
"github.com/kedacore/keda/v2/pkg/scaling/resolver"
"github.com/kedacore/keda/v2/pkg/scaling/scaledjob"
)

// ScaleHandler encapsulates the logic of calling the right scalers for
Expand Down Expand Up @@ -264,99 +264,7 @@ func (h *scaleHandler) isScaledObjectActive(ctx context.Context, scalers []scale
}

func (h *scaleHandler) isScaledJobActive(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
var queueLength int64
var targetAverageValue int64
var maxValue int64
isActive := false

// TODO refactor this, do chores, reduce the verbosity ie: V(1) and frequency of logs
// move relevant funcs getTargetAverageValue(), min() and divideWithCeil() out of scaler_handler.go
for _, scaler := range scalers {
scalerLogger := h.logger.WithValues("Scaler", scaler)

metricSpecs := scaler.GetMetricSpecForScaling()

// skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata)
// or skip cpu/memory resource scaler
if len(metricSpecs) < 1 || metricSpecs[0].External == nil {
continue
}

isTriggerActive, err := scaler.IsActive(ctx)

scalerLogger.Info("Active trigger", "isTriggerActive", isTriggerActive)

targetAverageValue = getTargetAverageValue(metricSpecs)

scalerLogger.Info("Scaler targetAverageValue", "targetAverageValue", targetAverageValue)

metrics, _ := scaler.GetMetrics(ctx, "queueLength", nil)

var metricValue int64

for _, m := range metrics {
if m.MetricName == "queueLength" {
metricValue, _ = m.Value.AsInt64()
queueLength += metricValue
}
}
scalerLogger.Info("QueueLength Metric value", "queueLength", queueLength)

scaler.Close()
if err != nil {
scalerLogger.V(1).Info("Error getting scale decision, but continue", "Error", err)
h.recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
continue
} else if isTriggerActive {
isActive = true
scalerLogger.Info("Scaler is active")
}
}
if targetAverageValue != 0 {
maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue))
}
h.logger.Info("Scaler maxValue", "maxValue", maxValue)
return isActive, queueLength, maxValue
}

func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 {
var targetAverageValue int64
var metricValue int64
var flag bool
for _, metric := range metricSpecs {
if metric.External.Target.AverageValue == nil {
metricValue = 0
} else {
metricValue, flag = metric.External.Target.AverageValue.AsInt64()
if !flag {
metricValue = 0
}
}

targetAverageValue += metricValue
}
count := int64(len(metricSpecs))
if count != 0 {
return targetAverageValue / count
}
return 0
}

func divideWithCeil(x, y int64) int64 {
ans := x / y
reminder := x % y
if reminder != 0 {
return ans + 1
}
return ans
}

// Min function for int64
func min(x, y int64) int64 {
if x > y {
return y
}
return x
return scaledjob.GetScaleMetrics(ctx, scalers, scaledJob, h.recorder)
}

// buildScalers returns list of Scalers for the specified triggers
Expand Down
31 changes: 0 additions & 31 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,6 @@ import (
"github.com/kedacore/keda/v2/pkg/scaling/executor"
)

func TestTargetAverageValue(t *testing.T) {
// count = 0
specs := []v2beta2.MetricSpec{}
targetAverageValue := getTargetAverageValue(specs)
assert.Equal(t, int64(0), targetAverageValue)
// 1 1
specs = []v2beta2.MetricSpec{
createMetricSpec(1),
createMetricSpec(1),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(1), targetAverageValue)
// 5 5 3
specs = []v2beta2.MetricSpec{
createMetricSpec(5),
createMetricSpec(5),
createMetricSpec(3),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(4), targetAverageValue)

// 5 5 4
specs = []v2beta2.MetricSpec{
createMetricSpec(5),
createMetricSpec(5),
createMetricSpec(3),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(4), targetAverageValue)
}

func TestCheckScaledObjectScalersWithError(t *testing.T) {
ctrl := gomock.NewController(t)
client := mock_client.NewMockClient(ctrl)
Expand Down
184 changes: 184 additions & 0 deletions pkg/scaling/scaledjob/scale_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package scaledjob

import (
"context"
"fmt"

"github.com/go-logr/logr"
"k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/scalers"
)

type scalerMetrics struct {
queueLength int64
maxValue int64
isActive bool
}

// GetScaleMetrics gets the metrics for decision making of scaling.
func GetScaleMetrics(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob, recorder record.EventRecorder) (bool, int64, int64) {
var queueLength int64
var maxValue int64
isActive := false

logger := logf.Log.WithName("scalemetrics")
scalersMetrics := getScalersMetrics(ctx, scalers, scaledJob, logger, recorder)
switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation {
case "min":
for _, metrics := range scalersMetrics {
if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive {
queueLength = metrics.queueLength
maxValue = metrics.maxValue
isActive = metrics.isActive
}
}
case "avg":
queueLengthSum := int64(0)
maxValueSum := int64(0)
length := 0
for _, metrics := range scalersMetrics {
if metrics.isActive {
queueLengthSum += metrics.queueLength
maxValueSum += metrics.maxValue
isActive = metrics.isActive
length++
}
}
if length != 0 {
queueLength = divideWithCeil(queueLengthSum, int64(length))
maxValue = divideWithCeil(maxValueSum, int64(length))
}
case "sum":
for _, metrics := range scalersMetrics {
if metrics.isActive {
queueLength += metrics.queueLength
maxValue += metrics.maxValue
isActive = metrics.isActive
}
}
default: // max
for _, metrics := range scalersMetrics {
if metrics.queueLength > queueLength && metrics.isActive {
queueLength = metrics.queueLength
maxValue = metrics.maxValue
isActive = metrics.isActive
}
}
}
maxValue = min(scaledJob.MaxReplicaCount(), maxValue)
logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation)

return isActive, queueLength, maxValue
}

func getScalersMetrics(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob, logger logr.Logger, recorder record.EventRecorder) []scalerMetrics {
scalersMetrics := []scalerMetrics{}

for _, scaler := range scalers {
var queueLength int64
var targetAverageValue int64
isActive := false
maxValue := int64(0)
scalerType := fmt.Sprintf("%T:", scaler)

scalerLogger := logger.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType)

metricSpecs := scaler.GetMetricSpecForScaling()

// skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata)
// or skip cpu/memory resource scaler
if len(metricSpecs) < 1 || metricSpecs[0].External == nil {
continue
}

isTriggerActive, err := scaler.IsActive(ctx)
if err != nil {
scalerLogger.V(1).Info("Error getting scaler.IsActive, but continue", "Error", err)
recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
scaler.Close()
continue
}

targetAverageValue = getTargetAverageValue(metricSpecs)

metrics, err := scaler.GetMetrics(ctx, "queueLength", nil)
if err != nil {
scalerLogger.V(1).Info("Error getting scaler metrics, but continue", "Error", err)
recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
scaler.Close()
continue
}

var metricValue int64

for _, m := range metrics {
if m.MetricName == "queueLength" {
metricValue, _ = m.Value.AsInt64()
queueLength += metricValue
}
}
scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, "queueLength", queueLength, "targetAverageValue", targetAverageValue)

scaler.Close()

if isTriggerActive {
isActive = true
}

if targetAverageValue != 0 {
maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue))
}
scalersMetrics = append(scalersMetrics, scalerMetrics{
queueLength: queueLength,
maxValue: maxValue,
isActive: isActive,
})
}
return scalersMetrics
}

func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 {
var targetAverageValue int64
var metricValue int64
var flag bool
for _, metric := range metricSpecs {
if metric.External.Target.AverageValue == nil {
metricValue = 0
} else {
metricValue, flag = metric.External.Target.AverageValue.AsInt64()
if !flag {
metricValue = 0
}
}

targetAverageValue += metricValue
}
count := int64(len(metricSpecs))
if count != 0 {
return targetAverageValue / count
}
return 0
}

func divideWithCeil(x, y int64) int64 {
ans := x / y
reminder := x % y
if reminder != 0 {
return ans + 1
}
return ans
}

// Min function for int64
func min(x, y int64) int64 {
if x > y {
return y
}
return x
}
Loading

0 comments on commit 588b950

Please sign in to comment.