Skip to content

Commit

Permalink
Introduce new scaling logic with fix orphan pod issue
Browse files Browse the repository at this point in the history
Signed-off-by: Tsuyoshi Ushio <[email protected]>
  • Loading branch information
TsuyoshiUshio committed Oct 3, 2020
1 parent e9a9f94 commit 5b71519
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 10 deletions.
9 changes: 9 additions & 0 deletions api/v1alpha1/scaledjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,12 @@ type ScaledJobList struct {
func init() {
SchemeBuilder.Register(&ScaledJob{}, &ScaledJobList{})
}

// MaxReplicaCount returns MaxReplicaCount
func (s ScaledJob) MaxReplicaCount() int64 {
if s.Spec.MaxReplicaCount != nil {
return int64(*s.Spec.MaxReplicaCount)
} else {
return 100
}
}
2 changes: 2 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6218,7 +6218,9 @@ spec:
description: ScaledJobStatus defines the observed state of ScaledJob
properties:
conditions:
description: Conditions an array representation to store multiple Conditions
items:
description: Condition to store the condition state
properties:
message:
description: A human readable message indicating details about
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ spec:
description: ScaledObjectSpec is the spec for a ScaledObject resource
properties:
advanced:
description: AdvancedConfig specifies advance scaling options
properties:
horizontalPodAutoscalerConfig:
description: HorizontalPodAutoscalerConfig specifies horizontal
scale config
properties:
behavior:
description: HorizontalPodAutoscalerBehavior configures the
Expand Down Expand Up @@ -300,7 +303,9 @@ spec:
description: ScaledObjectStatus is the status for a ScaledObject resource
properties:
conditions:
description: Conditions an array representation to store multiple Conditions
items:
description: Condition to store the condition state
properties:
message:
description: A human readable message indicating details about
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,7 @@ k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag=
k8s.io/apimachinery v0.18.6/go.mod h1:OaXp26zu/5J7p0f92ASynJa1pZo06YlV9fG7BoWbCko=
k8s.io/apimachinery v0.18.8 h1:jimPrycCqgx2QPearX3to1JePz7wSbVLq+7PdBTTwQ0=
k8s.io/apimachinery v0.18.8/go.mod h1:6sQd+iHEqmOtALqOFjSWp2KZ9F0wlU/nWm0ZgsYWMig=
k8s.io/apimachinery v0.19.2 h1:5Gy9vQpAGTKHPVOh5c4plE274X8D/6cuEiTO2zve7tc=
k8s.io/apiserver v0.18.8 h1:Au4kMn8sb1zFdyKqc8iMHLsYLxRI6Y+iAhRNKKQtlBY=
k8s.io/apiserver v0.18.8/go.mod h1:12u5FuGql8Cc497ORNj79rhPdiXQC4bf53X/skR/1YM=
k8s.io/cli-runtime v0.17.2/go.mod h1:aa8t9ziyQdbkuizkNLAw3qe3srSyWh9zlSB7zTqRNPI=
Expand Down
13 changes: 11 additions & 2 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount)

var effectiveMaxScale int64
effectiveMaxScale = maxScale - runningJobCount
if (maxScale + runningJobCount) > scaledJob.MaxReplicaCount() {
effectiveMaxScale = scaledJob.MaxReplicaCount() - runningJobCount
} else {
effectiveMaxScale = maxScale
}

if effectiveMaxScale < 0 {
effectiveMaxScale = 0
}
Expand Down Expand Up @@ -198,7 +203,11 @@ func (e *scaleExecutor) deleteJobsWithHistoryLimit(logger logr.Logger, jobs []ba

deleteJobLength := len(jobs) - int(historyLimit)
for _, j := range (jobs)[0:deleteJobLength] {
err := e.client.Delete(context.TODO(), j.DeepCopyObject())
deletePolicy := metav1.DeletePropagationBackground
deleteOptions := &client.DeleteOptions{
PropagationPolicy: &deletePolicy,
}
err := e.client.Delete(context.TODO(), j.DeepCopyObject(), deleteOptions)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/scaling/executor/scale_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,14 @@ func getMockClient(t *testing.T, ctrl *gomock.Controller, jobs *[]mockJobParamet
Return(nil)

client.EXPECT().
Delete(gomock.Any(), gomock.Any()).Do(func(_ context.Context, obj runtime.Object) {
Delete(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_ context.Context, obj runtime.Object, opt *runtimeclient.DeleteOptions) {
j, ok := obj.(*batchv1.Job)
if !ok {
t.Error("Cast failed on batchv1.Job at mocking client.Delete()")
}
if *opt.PropagationPolicy != metav1.DeletePropagationBackground {
t.Error("Job Delete PropagationPolicy is not DeletePropagationForeground")
}
(*deletedJobName)[j.GetName()] = j.GetName()
}).
Return(nil).AnyTimes()
Expand Down
8 changes: 1 addition & 7 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,7 @@ func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scal
scalerLogger.Info("Scaler is active")
}
}
var maxReplicaCount int64
if scaledJob.Spec.MaxReplicaCount != nil {
maxReplicaCount = int64(*scaledJob.Spec.MaxReplicaCount)
} else {
maxReplicaCount = 100
}
maxValue = min(maxReplicaCount, devideWithCeil(queueLength, targetAverageValue))
maxValue = min(scaledJob.MaxReplicaCount(), devideWithCeil(queueLength, targetAverageValue))
h.logger.Info("Scaler maxValue", "maxValue", maxValue)
return isActive, queueLength, maxValue
}
Expand Down

0 comments on commit 5b71519

Please sign in to comment.