diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index b45733bdbb20..1fcc08c08ad8 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -532,7 +532,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime) - preScaleUp := func() time.Time { scaleUpStart := time.Now() metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 0402d5cd91c5..3717e94cb0e1 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -508,7 +508,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator) opts.ScaleUpOrchestrator = scaleUpOrchestrator - provreqProcesor := provreq.NewProvReqProcessor(client) + provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker) if err != nil { return nil, err } @@ -518,6 +518,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter return nil, err } podListProcessor.AddProcessor(injector) + podListProcessor.AddProcessor(provreqProcesor) } opts.Processors.PodListProcessor = podListProcessor scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{} diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go b/cluster-autoscaler/processors/provreq/processor.go similarity index 59% rename from cluster-autoscaler/processors/provreq/provisioning_request_processor.go rename to cluster-autoscaler/processors/provreq/processor.go index 816fd22b0e29..a22934864b11 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go +++ b/cluster-autoscaler/processors/provreq/processor.go @@ -17,16 +17,24 @@ limitations under the License. package provreq import ( + "fmt" "time" + apiv1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" + provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" ) const ( @@ -36,15 +44,20 @@ const ( defaultMaxUpdated = 20 ) +type injector interface { + TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error) +} + type provReqProcessor struct { now func() time.Time maxUpdated int client *provreqclient.ProvisioningRequestClient + injector injector } // NewProvReqProcessor return ProvisioningRequestProcessor. -func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient) *provReqProcessor { - return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client} +func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient, predicateChecker predicatechecker.PredicateChecker) *provReqProcessor { + return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator(predicateChecker)} } // Refresh implements loop.Observer interface and will be run at the start @@ -56,15 +69,14 @@ func (p *provReqProcessor) Refresh() { klog.Errorf("Failed to get ProvisioningRequests list, err: %v", err) return } - - p.Process(provReqs) + p.refresh(provReqs) } -// Process iterates over ProvisioningRequests and apply: +// refresh iterates over ProvisioningRequests and apply: // -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired. // -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime. // TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest -func (p *provReqProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) { +func (p *provReqProcessor) refresh(provReqs []*provreqwrapper.ProvisioningRequest) { expiredProvReq := []*provreqwrapper.ProvisioningRequest{} failedProvReq := []*provreqwrapper.ProvisioningRequest{} for _, provReq := range provReqs { @@ -108,5 +120,50 @@ func (p *provReqProcessor) Process(provReqs []*provreqwrapper.ProvisioningReques } } -// Cleanup cleans up internal state. +// CleanUp cleans up internal state func (p *provReqProcessor) CleanUp() {} + +// Process implements PodListProcessor.Process() and inject fake pods to the cluster snapshoot for Provisioned ProvReqs in order to +// reserve capacity from ScaleDown. +func (p *provReqProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) { + err := p.bookCapacity(context) + if err != nil { + klog.Warning("Failed to book capacity for ProvisioningRequests: %s", err) + } + return unschedulablePods, nil +} + +// bookCapacity schedule fake pods for ProvisioningRequest that should have reserved capacity +// in the cluster. +func (p *provReqProcessor) bookCapacity(ctx *context.AutoscalingContext) error { + provReqs, err := p.client.ProvisioningRequests() + if err != nil { + return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err) + } + podsToCreate := []*apiv1.Pod{} + for _, provReq := range provReqs { + if !conditions.ShouldCapacityBeBooked(provReq) { + continue + } + pods, err := provreq_pods.PodsForProvisioningRequest(provReq) + if err != nil { + // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. + // If there is an error, mark PR as invalid, because we won't be able to book capacity + // for it anyway. + conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) + if _, err := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil { + klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err) + } + continue + } + podsToCreate = append(podsToCreate, pods...) + } + if len(podsToCreate) == 0 { + return nil + } + // Scheduling the pods to reserve capacity for provisioning request. + if _, _, err = p.injector.TrySchedulePods(ctx.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil { + return err + } + return nil +} diff --git a/cluster-autoscaler/processors/provreq/processor_test.go b/cluster-autoscaler/processors/provreq/processor_test.go index d485e7bb84c5..2a592fd6ee4a 100644 --- a/cluster-autoscaler/processors/provreq/processor_test.go +++ b/cluster-autoscaler/processors/provreq/processor_test.go @@ -17,19 +17,26 @@ limitations under the License. package provreq import ( + "context" "testing" "time" "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/config" + . "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" ) -func TestProcess(t *testing.T) { +func TestRefresh(t *testing.T) { now := time.Now() dayAgo := now.Add(-1 * 24 * time.Hour) weekAgo := now.Add(-1 * defaultExpirationTime).Add(-1 * 5 * time.Minute) @@ -146,8 +153,8 @@ func TestProcess(t *testing.T) { additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional") additionalPr.CreationTimestamp = metav1.NewTime(weekAgo) additionalPr.Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity - processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr)} - processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr}) + processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil} + processor.refresh([]*provreqwrapper.ProvisioningRequest{pr, additionalPr}) assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions) if len(test.conditions) == len(test.wantConditions) { assert.ElementsMatch(t, []metav1.Condition{ @@ -164,3 +171,78 @@ func TestProcess(t *testing.T) { } } } + +type fakeInjector struct { + pods []*apiv1.Pod +} + +func (f *fakeInjector) TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error) { + f.pods = pods + return nil, 0, nil +} + +func TestBookCapacity(t *testing.T) { + testCases := []struct { + name string + conditions []string + provReq *provreqwrapper.ProvisioningRequest + capacityIsBooked bool + }{ + { + name: "ProvReq is new, check-capacity class", + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity), + capacityIsBooked: false, + }, + { + name: "ProvReq is Failed, best-effort-atomic class", + conditions: []string{v1beta1.Failed}, + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp), + capacityIsBooked: false, + }, + { + name: "ProvReq is Provisioned, unknown class", + conditions: []string{v1beta1.Provisioned}, + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), "unknown"), + capacityIsBooked: false, + }, + { + name: "ProvReq is Provisioned, capacity should be booked, check-capacity class", + conditions: []string{v1beta1.Provisioned}, + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity), + capacityIsBooked: true, + }, + { + name: "ProvReq is Provisioned, capacity should be booked, best-effort-atomic class", + conditions: []string{v1beta1.Provisioned}, + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp), + capacityIsBooked: true, + }, + { + name: "ProvReq has BookingExpired, capacity should not be booked, best-effort-atomic class", + conditions: []string{v1beta1.Provisioned, v1beta1.BookingExpired}, + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp), + capacityIsBooked: false, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + test := test + injector := &fakeInjector{pods: []*apiv1.Pod{}} + for _, condition := range test.conditions { + conditions.AddOrUpdateCondition(test.provReq, condition, metav1.ConditionTrue, "", "", metav1.Now()) + } + + processor := &provReqProcessor{ + now: func() time.Time { return time.Now() }, + client: provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, test.provReq), + maxUpdated: 20, + injector: injector, + } + ctx, _ := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, nil, nil, nil, nil, nil) + processor.bookCapacity(&ctx) + if (test.capacityIsBooked && len(injector.pods) == 0) || (!test.capacityIsBooked && len(injector.pods) > 0) { + t.Fail() + } + }) + } +} diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go index a2f7b9116fc3..d59ad2fa9186 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -21,19 +21,14 @@ import ( appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/processors/status" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" - provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" - "k8s.io/klog/v2" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" @@ -96,7 +91,6 @@ func (o *provReqOrchestrator) ScaleUp( o.context.ClusterSnapshot.Fork() defer o.context.ClusterSnapshot.Revert() - o.bookCapacity() // unschedulablePods pods should belong to one ProvisioningClass, so only one provClass should try to ScaleUp. for _, provClass := range o.provisioningClasses { @@ -115,35 +109,3 @@ func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize( ) (*status.ScaleUpStatus, ca_errors.AutoscalerError) { return nil, nil } - -func (o *provReqOrchestrator) bookCapacity() error { - provReqs, err := o.client.ProvisioningRequests() - if err != nil { - return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err) - } - podsToCreate := []*apiv1.Pod{} - for _, provReq := range provReqs { - if conditions.ShouldCapacityBeBooked(provReq) { - pods, err := provreq_pods.PodsForProvisioningRequest(provReq) - if err != nil { - // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. - // If there is an error, mark PR as invalid, because we won't be able to book capacity - // for it anyway. - conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) - if _, err := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil { - klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err) - } - continue - } - podsToCreate = append(podsToCreate, pods...) - } - } - if len(podsToCreate) == 0 { - return nil - } - // scheduling the pods to reserve capacity for provisioning request with BookCapacity condition - if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil { - klog.Warningf("Error during capacity booking: %v", err) - } - return nil -} diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index 6e0ead7d6283..658c4f7341c7 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -188,10 +188,10 @@ func TestScaleUp(t *testing.T) { scaleUpResult: status.ScaleUpNotNeeded, }, { - name: "capacity in the cluster is booked", - provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, bookedCapacityProvReq}, + name: "capacity is there, check-capacity class", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq}, provReqToScaleUp: newCheckCapacityMemProvReq, - scaleUpResult: status.ScaleUpNoOptionsAvailable, + scaleUpResult: status.ScaleUpSuccessful, }, { name: "unsupported ProvisioningRequest is ignored", @@ -211,12 +211,6 @@ func TestScaleUp(t *testing.T) { provReqToScaleUp: atomicScaleUpProvReq, scaleUpResult: status.ScaleUpNotNeeded, }, - { - name: "some capacity is pre-booked, large atomic scale-up request doesn't fit", - provReqs: []*provreqwrapper.ProvisioningRequest{bookedCapacityProvReq, largeAtomicScaleUpProvReq}, - provReqToScaleUp: largeAtomicScaleUpProvReq, - scaleUpResult: status.ScaleUpNoOptionsAvailable, - }, { name: "capacity is there, large atomic scale-up request doesn't require scale-up", provReqs: []*provreqwrapper.ProvisioningRequest{largeAtomicScaleUpProvReq},