Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add eagerScalingStrategy for ScaledJob #5872

Merged
merged 9 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- **General**: Added `eagerScalingStrategy` for `ScaledJob` ([#5114](https://github.com/kedacore/keda/issues/5114))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
Expand Down
28 changes: 19 additions & 9 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (e *scaleExecutor) getScalingDecision(scaledJob *kedav1alpha1.ScaledJob, ru
scaleTo = scaleToMinReplica
effectiveMaxScale = scaleToMinReplica
} else {
effectiveMaxScale = NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount-minReplicaCount, pendingJobCount, scaledJob.MaxReplicaCount())
effectiveMaxScale, scaleTo = NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount-minReplicaCount, pendingJobCount, scaledJob.MaxReplicaCount(), scaleTo)
}
return effectiveMaxScale, scaleTo
}
Expand Down Expand Up @@ -391,6 +391,9 @@ func NewScalingStrategy(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) S
case "accurate":
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "accurate")
return accurateScalingStrategy{}
case "eager":
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "eager")
return eagerScalingStrategy{}
default:
logger.V(1).Info("Selecting Scale Strategy", "specified", scaledJob.Spec.ScalingStrategy.Strategy, "selected", "default")
return defaultScalingStrategy{}
Expand All @@ -399,33 +402,40 @@ func NewScalingStrategy(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) S

// ScalingStrategy is an interface for switching scaling algorithm
type ScalingStrategy interface {
GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64
GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount, scaleTo int64) (int64, int64)
}

type defaultScalingStrategy struct {
}

func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, _ int64) int64 {
return maxScale - runningJobCount
func (s defaultScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, _, scaleTo int64) (int64, int64) {
return maxScale - runningJobCount, scaleTo
}

type customScalingStrategy struct {
CustomScalingQueueLengthDeduction *int32
CustomScalingRunningJobPercentage *float64
}

func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, maxReplicaCount int64) int64 {
return min(maxScale-int64(*s.CustomScalingQueueLengthDeduction)-int64(float64(runningJobCount)*(*s.CustomScalingRunningJobPercentage)), maxReplicaCount)
func (s customScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, _, maxReplicaCount, scaleTo int64) (int64, int64) {
return min(maxScale-int64(*s.CustomScalingQueueLengthDeduction)-int64(float64(runningJobCount)*(*s.CustomScalingRunningJobPercentage)), maxReplicaCount), scaleTo
}

type accurateScalingStrategy struct {
}

func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount int64) int64 {
func (s accurateScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount, scaleTo int64) (int64, int64) {
if (maxScale + runningJobCount) > maxReplicaCount {
return maxReplicaCount - runningJobCount
return maxReplicaCount - runningJobCount, scaleTo
}
return maxScale - pendingJobCount
return maxScale - pendingJobCount, scaleTo
}

type eagerScalingStrategy struct {
}

func (s eagerScalingStrategy) GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, maxReplicaCount, _ int64) (int64, int64) {
return min(maxReplicaCount-runningJobCount-pendingJobCount, maxScale), maxReplicaCount
}

func min(x, y int64) int64 {
Expand Down
51 changes: 39 additions & 12 deletions pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,17 @@ func TestNewNewScalingStrategy(t *testing.T) {
assert.Equal(t, "executor.defaultScalingStrategy", fmt.Sprintf("%T", strategy))
}

func maxScaleValue(maxValue, _ int64) int64 {
return maxValue
}

func TestDefaultScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithDefaultStrategy("default"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
// pendingJobCount isn't relevant on this scenario
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 0, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(2), maxScaleValue(strategy.GetEffectiveMaxScale(2, 0, 0, 5, 1)))
}

func TestCustomScalingStrategy(t *testing.T) {
Expand All @@ -97,13 +101,13 @@ func TestCustomScalingStrategy(t *testing.T) {
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
// pendingJobCount isn't relevant on this scenario
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(9), strategy.GetEffectiveMaxScale(10, 0, 0, 10))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(9), maxScaleValue(strategy.GetEffectiveMaxScale(10, 0, 0, 10, 1)))
strategy = NewScalingStrategy(logger, getMockScaledJobWithCustomStrategyWithNilParameter("custom", "custom"))

// If you don't set the two parameters is the same behavior as DefaultStrategy
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(2), strategy.GetEffectiveMaxScale(2, 0, 0, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(2), maxScaleValue(strategy.GetEffectiveMaxScale(2, 0, 0, 5, 1)))

// Empty String will be DefaultStrategy
customScalingQueueLengthDeduction = int32(1)
Expand All @@ -115,25 +119,48 @@ func TestCustomScalingStrategy(t *testing.T) {
customScalingQueueLengthDeduction = int32(2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))

// Exceed the MaxReplicaCount
customScalingQueueLengthDeduction = int32(-2)
customScalingRunningJobPercentage = "0"
strategy = NewScalingStrategy(logger, getMockScaledJobWithStrategy("custom", "custom", customScalingQueueLengthDeduction, customScalingRunningJobPercentage))
assert.Equal(t, int64(4), strategy.GetEffectiveMaxScale(3, 2, 0, 4))
assert.Equal(t, int64(4), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 4, 1)))
}

func TestAccurateScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("accurate", "accurate", 0, "0"))
// maxScale doesn't exceed MaxReplicaCount. You can ignore on this sceanrio
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(3, 2, 0, 5))
assert.Equal(t, int64(3), strategy.GetEffectiveMaxScale(5, 2, 0, 5))
assert.Equal(t, int64(3), maxScaleValue(strategy.GetEffectiveMaxScale(3, 2, 0, 5, 1)))
assert.Equal(t, int64(3), maxScaleValue(strategy.GetEffectiveMaxScale(5, 2, 0, 5, 1)))

// Test with 2 pending jobs
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(3, 4, 2, 10))
assert.Equal(t, int64(1), strategy.GetEffectiveMaxScale(5, 4, 2, 5))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(3, 4, 2, 10, 1)))
assert.Equal(t, int64(1), maxScaleValue(strategy.GetEffectiveMaxScale(5, 4, 2, 5, 1)))
}

func TestEagerScalingStrategy(t *testing.T) {
logger := logf.Log.WithName("ScaledJobTest")
strategy := NewScalingStrategy(logger, getMockScaledJobWithStrategy("eager", "eager", 0, "0"))

maxScale, scaleTo := strategy.GetEffectiveMaxScale(4, 3, 0, 10, 1)
assert.Equal(t, int64(4), maxScale)
assert.Equal(t, int64(10), scaleTo)
maxScale, scaleTo = strategy.GetEffectiveMaxScale(4, 0, 3, 10, 1)
assert.Equal(t, int64(4), maxScale)
assert.Equal(t, int64(10), scaleTo)

maxScale, scaleTo = strategy.GetEffectiveMaxScale(4, 7, 0, 10, 1)
assert.Equal(t, int64(3), maxScale)
assert.Equal(t, int64(10), scaleTo)
maxScale, scaleTo = strategy.GetEffectiveMaxScale(4, 1, 6, 10, 1)
assert.Equal(t, int64(3), maxScale)
assert.Equal(t, int64(10), scaleTo)

maxScale, scaleTo = strategy.GetEffectiveMaxScale(15, 0, 0, 10, 1)
assert.Equal(t, int64(10), maxScale)
assert.Equal(t, int64(10), scaleTo)
}

func TestCleanUpMixedCaseWithSortByTime(t *testing.T) {
Expand Down
Loading