From 09ba963d282221e398f079b1a94955adbbc165e6 Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Wed, 29 May 2024 14:17:58 +0000 Subject: [PATCH] BookCapacity for ProvisioningRequest pods --- cluster-autoscaler/core/static_autoscaler.go | 5 +- cluster-autoscaler/core/test/common.go | 2 + cluster-autoscaler/main.go | 3 +- .../capacityreservation.go | 39 +++++++++ cluster-autoscaler/processors/processors.go | 5 +- ...ning_request_processor.go => processor.go} | 54 +++++++++++- .../processors/provreq/processor_test.go | 84 ++++++++++++++++++- .../orchestrator/orchestrator.go | 38 --------- .../orchestrator/orchestrator_test.go | 12 +-- 9 files changed, 188 insertions(+), 54 deletions(-) create mode 100644 cluster-autoscaler/processors/capacityreservation/capacityreservation.go rename cluster-autoscaler/processors/provreq/{provisioning_request_processor.go => processor.go} (65%) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index b45733bdbb20..efaf2f11a4b7 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -532,7 +532,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime) - + err = a.processors.CapacityReservation.BookCapacity(a.AutoscalingContext) + if err != nil { + klog.Warningf("Failed to reserve capacity: %v", err) + } preScaleUp := func() time.Time { scaleUpStart := time.Now() metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart) diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index 0bbb6bdc8815..d4f3b9cf9c16 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -39,6 +39,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks" + "k8s.io/autoscaler/cluster-autoscaler/processors/capacityreservation" "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" @@ -195,6 +196,7 @@ func NewTestProcessors(context *context.AutoscalingContext) *processors.Autoscal ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(), + CapacityReservation: capacityreservation.DefaultCapacityReservationProcessor(), } } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 0402d5cd91c5..0df2df4c31ce 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -508,11 +508,12 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator) opts.ScaleUpOrchestrator = scaleUpOrchestrator - provreqProcesor := provreq.NewProvReqProcessor(client) + provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker) if err != nil { return nil, err } opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor}) + opts.Processors.CapacityReservation = provreqProcesor injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig) if err != nil { return nil, err diff --git a/cluster-autoscaler/processors/capacityreservation/capacityreservation.go b/cluster-autoscaler/processors/capacityreservation/capacityreservation.go new file mode 100644 index 000000000000..446fe53c9b42 --- /dev/null +++ b/cluster-autoscaler/processors/capacityreservation/capacityreservation.go @@ -0,0 +1,39 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package capacityreservation + +import ( + "k8s.io/autoscaler/cluster-autoscaler/context" +) + +// CapacityReservation is interface to reserve capacity in the cluster. +type CapacityReservation interface { + BookCapacity(ctx *context.AutoscalingContext) error +} + +// NoOpCapacityReservation is no-op implementation of CapacityReservation interface. +type NoOpCapacityReservation struct{} + +// DefaultCapacityReservationProcessor returns NoOpCapacityReservation. +func DefaultCapacityReservationProcessor() CapacityReservation { + return &NoOpCapacityReservation{} +} + +// BookCapacity does nothing in NoOpCapacityReservation. +func (c *NoOpCapacityReservation) BookCapacity(ctx *context.AutoscalingContext) error { + return nil +} diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 40ad84f4a474..0de7865b6aed 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -21,6 +21,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" + "k8s.io/autoscaler/cluster-autoscaler/processors/capacityreservation" "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" @@ -70,7 +71,8 @@ type AutoscalingProcessors struct { // * scale-downs per nodegroup // * scale-up failures per nodegroup // * scale-down failures per nodegroup - ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList + ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList + CapacityReservation capacityreservation.CapacityReservation } // DefaultProcessors returns default set of processors. @@ -101,6 +103,7 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false), ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(), + CapacityReservation: capacityreservation.DefaultCapacityReservationProcessor(), } } diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go b/cluster-autoscaler/processors/provreq/processor.go similarity index 65% rename from cluster-autoscaler/processors/provreq/provisioning_request_processor.go rename to cluster-autoscaler/processors/provreq/processor.go index 816fd22b0e29..4379867a0455 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go +++ b/cluster-autoscaler/processors/provreq/processor.go @@ -17,16 +17,24 @@ limitations under the License. package provreq import ( + "fmt" "time" + apiv1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" + provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" ) const ( @@ -36,15 +44,20 @@ const ( defaultMaxUpdated = 20 ) +type injector interface { + TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error) +} + type provReqProcessor struct { now func() time.Time maxUpdated int client *provreqclient.ProvisioningRequestClient + injector injector } // NewProvReqProcessor return ProvisioningRequestProcessor. -func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient) *provReqProcessor { - return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client} +func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient, predicateChecker predicatechecker.PredicateChecker) *provReqProcessor { + return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator(predicateChecker)} } // Refresh implements loop.Observer interface and will be run at the start @@ -108,5 +121,40 @@ func (p *provReqProcessor) Process(provReqs []*provreqwrapper.ProvisioningReques } } -// Cleanup cleans up internal state. +// CleanUp cleans up internal state func (p *provReqProcessor) CleanUp() {} + +// BookCapacity schedule fake pods for ProvisioningRequest that should have reserved capacity +// in the cluster. +func (p *provReqProcessor) BookCapacity(ctx *context.AutoscalingContext) error { + provReqs, err := p.client.ProvisioningRequests() + if err != nil { + return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err) + } + podsToCreate := []*apiv1.Pod{} + for _, provReq := range provReqs { + if !conditions.ShouldCapacityBeBooked(provReq) { + continue + } + pods, err := provreq_pods.PodsForProvisioningRequest(provReq) + if err != nil { + // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. + // If there is an error, mark PR as invalid, because we won't be able to book capacity + // for it anyway. + conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) + if _, err := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil { + klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err) + } + continue + } + podsToCreate = append(podsToCreate, pods...) + } + if len(podsToCreate) == 0 { + return nil + } + // Scheduling the pods to reserve capacity for provisioning request. + if _, _, err = p.injector.TrySchedulePods(ctx.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil { + return err + } + return nil +} diff --git a/cluster-autoscaler/processors/provreq/processor_test.go b/cluster-autoscaler/processors/provreq/processor_test.go index d485e7bb84c5..4efaeb953d54 100644 --- a/cluster-autoscaler/processors/provreq/processor_test.go +++ b/cluster-autoscaler/processors/provreq/processor_test.go @@ -17,16 +17,23 @@ limitations under the License. package provreq import ( + "context" "testing" "time" "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/config" + . "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" ) func TestProcess(t *testing.T) { @@ -146,7 +153,7 @@ func TestProcess(t *testing.T) { additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional") additionalPr.CreationTimestamp = metav1.NewTime(weekAgo) additionalPr.Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity - processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr)} + processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil} processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr}) assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions) if len(test.conditions) == len(test.wantConditions) { @@ -164,3 +171,78 @@ func TestProcess(t *testing.T) { } } } + +type fakeInjector struct { + pods []*apiv1.Pod +} + +func (f *fakeInjector) TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error) { + f.pods = pods + return nil, 0, nil +} + +func TestBookCapacity(t *testing.T) { + testCases := []struct { + name string + conditions []string + provReq *provreqwrapper.ProvisioningRequest + capacityIsBooked bool + }{ + { + name: "ProvReq is new, check-capacity class", + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity), + capacityIsBooked: false, + }, + { + name: "ProvReq is Failed, best-effort-atomic class", + conditions: []string{v1beta1.Failed}, + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp), + capacityIsBooked: false, + }, + { + name: "ProvReq is Provisioned, unknown class", + conditions: []string{v1beta1.Provisioned}, + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), "unknown"), + capacityIsBooked: false, + }, + { + name: "ProvReq is Provisioned, capacity should be booked, check-capacity class", + conditions: []string{v1beta1.Provisioned}, + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity), + capacityIsBooked: true, + }, + { + name: "ProvReq is Provisioned, capacity should be booked, best-effort-atomic class", + conditions: []string{v1beta1.Provisioned}, + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp), + capacityIsBooked: true, + }, + { + name: "ProvReq has BookingExpired, capacity should not be booked, best-effort-atomic class", + conditions: []string{v1beta1.Provisioned, v1beta1.BookingExpired}, + provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp), + capacityIsBooked: false, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + test := test + injector := &fakeInjector{pods: []*apiv1.Pod{}} + for _, condition := range test.conditions { + conditions.AddOrUpdateCondition(test.provReq, condition, metav1.ConditionTrue, "", "", metav1.Now()) + } + + processor := &provReqProcessor{ + now: func() time.Time { return time.Now() }, + client: provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, test.provReq), + maxUpdated: 20, + injector: injector, + } + ctx, _ := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, nil, nil, nil, nil, nil) + processor.BookCapacity(&ctx) + if (test.capacityIsBooked && len(injector.pods) == 0) || (!test.capacityIsBooked && len(injector.pods) > 0) { + t.Fail() + } + }) + } +} diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go index a2f7b9116fc3..d59ad2fa9186 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -21,19 +21,14 @@ import ( appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/processors/status" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" - provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" - "k8s.io/klog/v2" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" @@ -96,7 +91,6 @@ func (o *provReqOrchestrator) ScaleUp( o.context.ClusterSnapshot.Fork() defer o.context.ClusterSnapshot.Revert() - o.bookCapacity() // unschedulablePods pods should belong to one ProvisioningClass, so only one provClass should try to ScaleUp. for _, provClass := range o.provisioningClasses { @@ -115,35 +109,3 @@ func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize( ) (*status.ScaleUpStatus, ca_errors.AutoscalerError) { return nil, nil } - -func (o *provReqOrchestrator) bookCapacity() error { - provReqs, err := o.client.ProvisioningRequests() - if err != nil { - return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err) - } - podsToCreate := []*apiv1.Pod{} - for _, provReq := range provReqs { - if conditions.ShouldCapacityBeBooked(provReq) { - pods, err := provreq_pods.PodsForProvisioningRequest(provReq) - if err != nil { - // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. - // If there is an error, mark PR as invalid, because we won't be able to book capacity - // for it anyway. - conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) - if _, err := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil { - klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err) - } - continue - } - podsToCreate = append(podsToCreate, pods...) - } - } - if len(podsToCreate) == 0 { - return nil - } - // scheduling the pods to reserve capacity for provisioning request with BookCapacity condition - if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil { - klog.Warningf("Error during capacity booking: %v", err) - } - return nil -} diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index 6e0ead7d6283..658c4f7341c7 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -188,10 +188,10 @@ func TestScaleUp(t *testing.T) { scaleUpResult: status.ScaleUpNotNeeded, }, { - name: "capacity in the cluster is booked", - provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, bookedCapacityProvReq}, + name: "capacity is there, check-capacity class", + provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq}, provReqToScaleUp: newCheckCapacityMemProvReq, - scaleUpResult: status.ScaleUpNoOptionsAvailable, + scaleUpResult: status.ScaleUpSuccessful, }, { name: "unsupported ProvisioningRequest is ignored", @@ -211,12 +211,6 @@ func TestScaleUp(t *testing.T) { provReqToScaleUp: atomicScaleUpProvReq, scaleUpResult: status.ScaleUpNotNeeded, }, - { - name: "some capacity is pre-booked, large atomic scale-up request doesn't fit", - provReqs: []*provreqwrapper.ProvisioningRequest{bookedCapacityProvReq, largeAtomicScaleUpProvReq}, - provReqToScaleUp: largeAtomicScaleUpProvReq, - scaleUpResult: status.ScaleUpNoOptionsAvailable, - }, { name: "capacity is there, large atomic scale-up request doesn't require scale-up", provReqs: []*provreqwrapper.ProvisioningRequest{largeAtomicScaleUpProvReq},