From 5b715199077dfe652f50aecdb32c0c9e345a6e0c Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Fri, 2 Oct 2020 16:47:40 -0700 Subject: [PATCH] Introduce new scaling logic with fix orphan pod issue Signed-off-by: Tsuyoshi Ushio --- api/v1alpha1/scaledjob_types.go | 9 +++++++++ config/crd/bases/keda.sh_scaledjobs.yaml | 2 ++ config/crd/bases/keda.sh_scaledobjects.yaml | 5 +++++ go.sum | 1 + pkg/scaling/executor/scale_jobs.go | 13 +++++++++++-- pkg/scaling/executor/scale_jobs_test.go | 5 ++++- pkg/scaling/scale_handler.go | 8 +------- 7 files changed, 33 insertions(+), 10 deletions(-) diff --git a/api/v1alpha1/scaledjob_types.go b/api/v1alpha1/scaledjob_types.go index cf3a68c545c..41f4484a523 100644 --- a/api/v1alpha1/scaledjob_types.go +++ b/api/v1alpha1/scaledjob_types.go @@ -60,3 +60,12 @@ type ScaledJobList struct { func init() { SchemeBuilder.Register(&ScaledJob{}, &ScaledJobList{}) } + +// MaxReplicaCount returns MaxReplicaCount +func (s ScaledJob) MaxReplicaCount() int64 { + if s.Spec.MaxReplicaCount != nil { + return int64(*s.Spec.MaxReplicaCount) + } else { + return 100 + } +} diff --git a/config/crd/bases/keda.sh_scaledjobs.yaml b/config/crd/bases/keda.sh_scaledjobs.yaml index 3cbe3700a8a..9c0cff943e0 100644 --- a/config/crd/bases/keda.sh_scaledjobs.yaml +++ b/config/crd/bases/keda.sh_scaledjobs.yaml @@ -6218,7 +6218,9 @@ spec: description: ScaledJobStatus defines the observed state of ScaledJob properties: conditions: + description: Conditions an array representation to store multiple Conditions items: + description: Condition to store the condition state properties: message: description: A human readable message indicating details about diff --git a/config/crd/bases/keda.sh_scaledobjects.yaml b/config/crd/bases/keda.sh_scaledobjects.yaml index 98bea64ac5f..11a617fc81f 100644 --- a/config/crd/bases/keda.sh_scaledobjects.yaml +++ b/config/crd/bases/keda.sh_scaledobjects.yaml @@ -61,8 +61,11 @@ spec: description: ScaledObjectSpec is the spec for a ScaledObject resource properties: advanced: + description: AdvancedConfig specifies advance scaling options properties: horizontalPodAutoscalerConfig: + description: HorizontalPodAutoscalerConfig specifies horizontal + scale config properties: behavior: description: HorizontalPodAutoscalerBehavior configures the @@ -300,7 +303,9 @@ spec: description: ScaledObjectStatus is the status for a ScaledObject resource properties: conditions: + description: Conditions an array representation to store multiple Conditions items: + description: Condition to store the condition state properties: message: description: A human readable message indicating details about diff --git a/go.sum b/go.sum index cf66acf745a..d5c9b690c4d 100644 --- a/go.sum +++ b/go.sum @@ -1658,6 +1658,7 @@ k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag= k8s.io/apimachinery v0.18.6/go.mod h1:OaXp26zu/5J7p0f92ASynJa1pZo06YlV9fG7BoWbCko= k8s.io/apimachinery v0.18.8 h1:jimPrycCqgx2QPearX3to1JePz7wSbVLq+7PdBTTwQ0= k8s.io/apimachinery v0.18.8/go.mod h1:6sQd+iHEqmOtALqOFjSWp2KZ9F0wlU/nWm0ZgsYWMig= +k8s.io/apimachinery v0.19.2 h1:5Gy9vQpAGTKHPVOh5c4plE274X8D/6cuEiTO2zve7tc= k8s.io/apiserver v0.18.8 h1:Au4kMn8sb1zFdyKqc8iMHLsYLxRI6Y+iAhRNKKQtlBY= k8s.io/apiserver v0.18.8/go.mod h1:12u5FuGql8Cc497ORNj79rhPdiXQC4bf53X/skR/1YM= k8s.io/cli-runtime v0.17.2/go.mod h1:aa8t9ziyQdbkuizkNLAw3qe3srSyWh9zlSB7zTqRNPI= diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index 05634be953f..3f8e362f448 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -28,7 +28,12 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount) var effectiveMaxScale int64 - effectiveMaxScale = maxScale - runningJobCount + if (maxScale + runningJobCount) > scaledJob.MaxReplicaCount() { + effectiveMaxScale = scaledJob.MaxReplicaCount() - runningJobCount + } else { + effectiveMaxScale = maxScale + } + if effectiveMaxScale < 0 { effectiveMaxScale = 0 } @@ -198,7 +203,11 @@ func (e *scaleExecutor) deleteJobsWithHistoryLimit(logger logr.Logger, jobs []ba deleteJobLength := len(jobs) - int(historyLimit) for _, j := range (jobs)[0:deleteJobLength] { - err := e.client.Delete(context.TODO(), j.DeepCopyObject()) + deletePolicy := metav1.DeletePropagationBackground + deleteOptions := &client.DeleteOptions{ + PropagationPolicy: &deletePolicy, + } + err := e.client.Delete(context.TODO(), j.DeepCopyObject(), deleteOptions) if err != nil { return err } diff --git a/pkg/scaling/executor/scale_jobs_test.go b/pkg/scaling/executor/scale_jobs_test.go index a8fdaa942c2..11f71ffc570 100644 --- a/pkg/scaling/executor/scale_jobs_test.go +++ b/pkg/scaling/executor/scale_jobs_test.go @@ -165,11 +165,14 @@ func getMockClient(t *testing.T, ctrl *gomock.Controller, jobs *[]mockJobParamet Return(nil) client.EXPECT(). - Delete(gomock.Any(), gomock.Any()).Do(func(_ context.Context, obj runtime.Object) { + Delete(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_ context.Context, obj runtime.Object, opt *runtimeclient.DeleteOptions) { j, ok := obj.(*batchv1.Job) if !ok { t.Error("Cast failed on batchv1.Job at mocking client.Delete()") } + if *opt.PropagationPolicy != metav1.DeletePropagationBackground { + t.Error("Job Delete PropagationPolicy is not DeletePropagationForeground") + } (*deletedJobName)[j.GetName()] = j.GetName() }). Return(nil).AnyTimes() diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 4615379088a..2384f329c1f 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -264,13 +264,7 @@ func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scal scalerLogger.Info("Scaler is active") } } - var maxReplicaCount int64 - if scaledJob.Spec.MaxReplicaCount != nil { - maxReplicaCount = int64(*scaledJob.Spec.MaxReplicaCount) - } else { - maxReplicaCount = 100 - } - maxValue = min(maxReplicaCount, devideWithCeil(queueLength, targetAverageValue)) + maxValue = min(scaledJob.MaxReplicaCount(), devideWithCeil(queueLength, targetAverageValue)) h.logger.Info("Scaler maxValue", "maxValue", maxValue) return isActive, queueLength, maxValue }