Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScaledJob: introduce MultipleScalersCalculation #2016

Merged
merged 23 commits into from
Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c0c4814
Fix bug of ScaledJob multiple triggers
TsuyoshiUshio Aug 5, 2021
16671c8
fix pre-commit issue
TsuyoshiUshio Aug 6, 2021
12dc683
Update logs on scale handler and add crds
TsuyoshiUshio Aug 9, 2021
2416bfa
update change log
TsuyoshiUshio Aug 9, 2021
96b9907
Add test case for exceeding MaxReplicaCount in sum
TsuyoshiUshio Aug 9, 2021
d273f13
fix pre-commit
TsuyoshiUshio Aug 9, 2021
12a06ae
Update pkg/scaling/scale_handler.go
TsuyoshiUshio Sep 4, 2021
a9d9476
Update pkg/scaling/scale_handler.go
TsuyoshiUshio Sep 4, 2021
c5e4cd0
revert version and refactor scaling logic
TsuyoshiUshio Sep 4, 2021
e604a68
Merge branch 'main' into tsushi/fixscaledjobmultitrigger
TsuyoshiUshio Sep 4, 2021
784bc7c
Change option name from MultipleScalersOption to MultipleScalersCalcu…
TsuyoshiUshio Sep 4, 2021
15e39d9
update changelog for multiple triggers section
TsuyoshiUshio Sep 8, 2021
72147d2
Error Handling
TsuyoshiUshio Sep 8, 2021
93568a9
Refactor logging
TsuyoshiUshio Sep 8, 2021
96116f8
remove scaledjob prometheus
TsuyoshiUshio Sep 8, 2021
d5b519c
Update pkg/scaling/scaledjob/scale_metrics.go
TsuyoshiUshio Sep 11, 2021
49c6eb4
Update pkg/scaling/scaledjob/scale_metrics.go
TsuyoshiUshio Sep 11, 2021
3821105
Update pkg/scaling/scaledjob/scale_metrics.go
TsuyoshiUshio Sep 11, 2021
faa30df
Update pkg/scaling/scaledjob/scale_metrics_test.go
TsuyoshiUshio Sep 11, 2021
4c3bf05
Update pkg/scaling/scaledjob/scale_metrics.go
TsuyoshiUshio Sep 11, 2021
87740cf
Update CHANGELOG.md
TsuyoshiUshio Sep 11, 2021
6b7e6da
Move section to new section
TsuyoshiUshio Sep 11, 2021
6dda3c9
fix pre-commit white space issue
TsuyoshiUshio Sep 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
### New

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))

### Improvements

Expand Down
2 changes: 2 additions & 0 deletions apis/keda/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type ScalingStrategy struct {
CustomScalingRunningJobPercentage string `json:"customScalingRunningJobPercentage,omitempty"`
// +optional
PendingPodConditions []string `json:"pendingPodConditions,omitempty"`
// +optional
MultipleScalersCalculation string `json:"multipleScalersCalculation,omitempty"`
}

func init() {
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7362,6 +7362,8 @@ spec:
type: integer
customScalingRunningJobPercentage:
type: string
multipleScalersCalculation:
type: string
pendingPodConditions:
items:
type: string
Expand Down
96 changes: 2 additions & 94 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/go-logr/logr"
"k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -37,6 +36,7 @@ import (
"github.com/kedacore/keda/v2/pkg/scalers"
"github.com/kedacore/keda/v2/pkg/scaling/executor"
"github.com/kedacore/keda/v2/pkg/scaling/resolver"
"github.com/kedacore/keda/v2/pkg/scaling/scaledjob"
)

// ScaleHandler encapsulates the logic of calling the right scalers for
Expand Down Expand Up @@ -264,99 +264,7 @@ func (h *scaleHandler) isScaledObjectActive(ctx context.Context, scalers []scale
}

func (h *scaleHandler) isScaledJobActive(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
var queueLength int64
var targetAverageValue int64
var maxValue int64
isActive := false

// TODO refactor this, do chores, reduce the verbosity ie: V(1) and frequency of logs
// move relevant funcs getTargetAverageValue(), min() and divideWithCeil() out of scaler_handler.go
for _, scaler := range scalers {
scalerLogger := h.logger.WithValues("Scaler", scaler)

metricSpecs := scaler.GetMetricSpecForScaling()

// skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata)
// or skip cpu/memory resource scaler
if len(metricSpecs) < 1 || metricSpecs[0].External == nil {
continue
}

isTriggerActive, err := scaler.IsActive(ctx)

scalerLogger.Info("Active trigger", "isTriggerActive", isTriggerActive)

targetAverageValue = getTargetAverageValue(metricSpecs)

scalerLogger.Info("Scaler targetAverageValue", "targetAverageValue", targetAverageValue)

metrics, _ := scaler.GetMetrics(ctx, "queueLength", nil)

var metricValue int64

for _, m := range metrics {
if m.MetricName == "queueLength" {
metricValue, _ = m.Value.AsInt64()
queueLength += metricValue
}
}
scalerLogger.Info("QueueLength Metric value", "queueLength", queueLength)

scaler.Close()
if err != nil {
scalerLogger.V(1).Info("Error getting scale decision, but continue", "Error", err)
h.recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
continue
} else if isTriggerActive {
isActive = true
scalerLogger.Info("Scaler is active")
}
}
if targetAverageValue != 0 {
maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue))
}
h.logger.Info("Scaler maxValue", "maxValue", maxValue)
return isActive, queueLength, maxValue
}

func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 {
var targetAverageValue int64
var metricValue int64
var flag bool
for _, metric := range metricSpecs {
if metric.External.Target.AverageValue == nil {
metricValue = 0
} else {
metricValue, flag = metric.External.Target.AverageValue.AsInt64()
if !flag {
metricValue = 0
}
}

targetAverageValue += metricValue
}
count := int64(len(metricSpecs))
if count != 0 {
return targetAverageValue / count
}
return 0
}

func divideWithCeil(x, y int64) int64 {
ans := x / y
reminder := x % y
if reminder != 0 {
return ans + 1
}
return ans
}

// Min function for int64
func min(x, y int64) int64 {
if x > y {
return y
}
return x
return scaledjob.GetScaleMetrics(ctx, scalers, scaledJob, h.recorder)
}

// buildScalers returns list of Scalers for the specified triggers
Expand Down
31 changes: 0 additions & 31 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,6 @@ import (
"github.com/kedacore/keda/v2/pkg/scaling/executor"
)

func TestTargetAverageValue(t *testing.T) {
// count = 0
specs := []v2beta2.MetricSpec{}
targetAverageValue := getTargetAverageValue(specs)
assert.Equal(t, int64(0), targetAverageValue)
// 1 1
specs = []v2beta2.MetricSpec{
createMetricSpec(1),
createMetricSpec(1),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(1), targetAverageValue)
// 5 5 3
specs = []v2beta2.MetricSpec{
createMetricSpec(5),
createMetricSpec(5),
createMetricSpec(3),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(4), targetAverageValue)

// 5 5 4
specs = []v2beta2.MetricSpec{
createMetricSpec(5),
createMetricSpec(5),
createMetricSpec(3),
}
targetAverageValue = getTargetAverageValue(specs)
assert.Equal(t, int64(4), targetAverageValue)
}

func TestCheckScaledObjectScalersWithError(t *testing.T) {
ctrl := gomock.NewController(t)
client := mock_client.NewMockClient(ctrl)
Expand Down
184 changes: 184 additions & 0 deletions pkg/scaling/scaledjob/scale_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package scaledjob

import (
"context"
"fmt"

"github.com/go-logr/logr"
"k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/scalers"
)

type scalerMetrics struct {
queueLength int64
maxValue int64
isActive bool
}

// GetScaleMetrics gets the metrics for decision making of scaling.
func GetScaleMetrics(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob, recorder record.EventRecorder) (bool, int64, int64) {
var queueLength int64
var maxValue int64
isActive := false

logger := logf.Log.WithName("scalemetrics")
scalersMetrics := getScalersMetrics(ctx, scalers, scaledJob, logger, recorder)
switch scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation {
case "min":
for _, metrics := range scalersMetrics {
if (queueLength == 0 || metrics.queueLength < queueLength) && metrics.isActive {
queueLength = metrics.queueLength
maxValue = metrics.maxValue
isActive = metrics.isActive
}
}
case "avg":
queueLengthSum := int64(0)
maxValueSum := int64(0)
length := 0
for _, metrics := range scalersMetrics {
if metrics.isActive {
queueLengthSum += metrics.queueLength
maxValueSum += metrics.maxValue
isActive = metrics.isActive
length++
}
}
if length != 0 {
queueLength = divideWithCeil(queueLengthSum, int64(length))
maxValue = divideWithCeil(maxValueSum, int64(length))
}
case "sum":
for _, metrics := range scalersMetrics {
if metrics.isActive {
queueLength += metrics.queueLength
maxValue += metrics.maxValue
isActive = metrics.isActive
}
}
default: // max
for _, metrics := range scalersMetrics {
if metrics.queueLength > queueLength && metrics.isActive {
queueLength = metrics.queueLength
maxValue = metrics.maxValue
isActive = metrics.isActive
}
}
}
maxValue = min(scaledJob.MaxReplicaCount(), maxValue)
logger.V(1).WithValues("ScaledJob", scaledJob.Name).Info("Checking if ScaleJob scalers are active", "isActive", isActive, "maxValue", maxValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation)

return isActive, queueLength, maxValue
}

func getScalersMetrics(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob, logger logr.Logger, recorder record.EventRecorder) []scalerMetrics {
scalersMetrics := []scalerMetrics{}

for _, scaler := range scalers {
var queueLength int64
var targetAverageValue int64
isActive := false
maxValue := int64(0)
scalerType := fmt.Sprintf("%T:", scaler)

scalerLogger := logger.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType)

metricSpecs := scaler.GetMetricSpecForScaling()

// skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata)
// or skip cpu/memory resource scaler
if len(metricSpecs) < 1 || metricSpecs[0].External == nil {
continue
}

isTriggerActive, err := scaler.IsActive(ctx)
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
scalerLogger.V(1).Info("Error getting scaler.IsActive, but continue", "Error", err)
recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
scaler.Close()
continue
}

targetAverageValue = getTargetAverageValue(metricSpecs)

metrics, err := scaler.GetMetrics(ctx, "queueLength", nil)
if err != nil {
scalerLogger.V(1).Info("Error getting scaler metrics, but continue", "Error", err)
recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
scaler.Close()
continue
}

var metricValue int64

for _, m := range metrics {
if m.MetricName == "queueLength" {
metricValue, _ = m.Value.AsInt64()
queueLength += metricValue
}
}
scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, "queueLength", queueLength, "targetAverageValue", targetAverageValue)

scaler.Close()

if isTriggerActive {
isActive = true
}

if targetAverageValue != 0 {
maxValue = min(scaledJob.MaxReplicaCount(), divideWithCeil(queueLength, targetAverageValue))
}
scalersMetrics = append(scalersMetrics, scalerMetrics{
queueLength: queueLength,
maxValue: maxValue,
isActive: isActive,
})
}
return scalersMetrics
}

func getTargetAverageValue(metricSpecs []v2beta2.MetricSpec) int64 {
var targetAverageValue int64
var metricValue int64
var flag bool
for _, metric := range metricSpecs {
if metric.External.Target.AverageValue == nil {
metricValue = 0
} else {
metricValue, flag = metric.External.Target.AverageValue.AsInt64()
if !flag {
metricValue = 0
}
}

targetAverageValue += metricValue
}
count := int64(len(metricSpecs))
if count != 0 {
return targetAverageValue / count
}
return 0
}

func divideWithCeil(x, y int64) int64 {
ans := x / y
reminder := x % y
if reminder != 0 {
return ans + 1
}
return ans
}

// Min function for int64
func min(x, y int64) int64 {
if x > y {
return y
}
return x
}
Loading