From 0b8fdabc9a9b662a41e5f8a1b7556a33da263b7f Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Wed, 29 May 2024 14:17:58 +0000 Subject: [PATCH] WIP: BookCapacity for ProvisioningRequest pods --- cluster-autoscaler/core/static_autoscaler.go | 5 +- cluster-autoscaler/main.go | 14 +++-- .../capacityreservation.go | 26 ++++++++ cluster-autoscaler/processors/processors.go | 4 +- .../provreq/provisioning_request_processor.go | 48 +++++++++++++- .../orchestrator/orchestrator.go | 62 +++---------------- .../orchestrator/orchestrator_test.go | 2 +- 7 files changed, 94 insertions(+), 67 deletions(-) create mode 100644 cluster-autoscaler/processors/capacityreservation/capacityreservation.go diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index b45733bdbb20..efaf2f11a4b7 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -532,7 +532,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime) - + err = a.processors.CapacityReservation.BookCapacity(a.AutoscalingContext) + if err != nil { + klog.Warningf("Failed to reserve capacity: %v", err) + } preScaleUp := func() time.Time { scaleUpStart := time.Now() metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index c5ff21aaa4e7..0563cbd9e671 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -32,6 +32,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/loop" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" @@ -496,22 +497,23 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager())) restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) - provreqOrchestrator, err := provreqorchestrator.New(restConfig) + client, err := provreqclient.NewProvisioningRequestClient(restConfig) if err != nil { return nil, err } + provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{ + checkcapacity.New(client), + besteffortatomic.New(client), + }) scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator) opts.ScaleUpOrchestrator = scaleUpOrchestrator - client, err := provreqclient.NewProvisioningRequestClient(restConfig) - if err != nil { - return nil, err - } - provreqProcesor := provreq.NewCombinedProvReqProcessor(client, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor(client)}) + provreqProcesor := provreq.NewCombinedProvReqProcessor(client, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor(client)}, opts.PredicateChecker) if err != nil { return nil, err } opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor}) + opts.Processors.CapacityReservation = provreqProcesor injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig) if err != nil { return nil, err diff --git a/cluster-autoscaler/processors/capacityreservation/capacityreservation.go b/cluster-autoscaler/processors/capacityreservation/capacityreservation.go new file mode 100644 index 000000000000..f861112b3d8c --- /dev/null +++ b/cluster-autoscaler/processors/capacityreservation/capacityreservation.go @@ -0,0 +1,26 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package capacityreservation + +import ( + "k8s.io/autoscaler/cluster-autoscaler/context" +) + +// CapacityReservation is interface to reserve capacity in the cluster. +type CapacityReservation interface { + BookCapacity(ctx *context.AutoscalingContext) error +} diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 40ad84f4a474..0979015a6c7b 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -21,6 +21,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" + "k8s.io/autoscaler/cluster-autoscaler/processors/capacityreservation" "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" @@ -70,7 +71,8 @@ type AutoscalingProcessors struct { // * scale-downs per nodegroup // * scale-up failures per nodegroup // * scale-down failures per nodegroup - ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList + ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList + CapacityReservation capacityreservation.CapacityReservation } // DefaultProcessors returns default set of processors. diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go index 74dc64e5529b..9dbceddd0827 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go +++ b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go @@ -17,9 +17,18 @@ limitations under the License. package provreq import ( - "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" + "fmt" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" + provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" "k8s.io/klog/v2" ) @@ -34,11 +43,12 @@ type ProvisioningRequestProcessor interface { type CombinedProvReqProcessor struct { client *provreqclient.ProvisioningRequestClient processors []ProvisioningRequestProcessor + injector *scheduling.HintingSimulator } // NewCombinedProvReqProcessor return new CombinedProvReqProcessor. -func NewCombinedProvReqProcessor(client *provreqclient.ProvisioningRequestClient, processors []ProvisioningRequestProcessor) loopstart.Observer { - return &CombinedProvReqProcessor{client: client, processors: processors} +func NewCombinedProvReqProcessor(client *provreqclient.ProvisioningRequestClient, processors []ProvisioningRequestProcessor, predicateChecker predicatechecker.PredicateChecker) *CombinedProvReqProcessor { + return &CombinedProvReqProcessor{client: client, processors: processors, injector: scheduling.NewHintingSimulator(predicateChecker)} } // Refresh iterates over ProvisioningRequests and updates its conditions/state. @@ -55,3 +65,35 @@ func (cp *CombinedProvReqProcessor) Refresh() { // CleanUp cleans up internal state func (cp *CombinedProvReqProcessor) CleanUp() {} + +func (cp *CombinedProvReqProcessor) BookCapacity(ctx *context.AutoscalingContext) error { + provReqs, err := cp.client.ProvisioningRequests() + if err != nil { + return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err) + } + podsToCreate := []*apiv1.Pod{} + for _, provReq := range provReqs { + if conditions.ShouldCapacityBeBooked(provReq) { + pods, err := provreq_pods.PodsForProvisioningRequest(provReq) + if err != nil { + // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. + // If there is an error, mark PR as invalid, because we won't be able to book capacity + // for it anyway. + conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) + if _, err := cp.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil { + klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err) + } + continue + } + podsToCreate = append(podsToCreate, pods...) + } + } + if len(podsToCreate) == 0 { + return nil + } + // scheduling the pods to reserve capacity for provisioning request with BookCapacity condition + if _, _, err = cp.injector.TrySchedulePods(ctx.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil { + klog.Warningf("Error during capacity booking: %v", err) + } + return nil +} diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go index ab86f7193cf7..d59ad2fa9186 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -21,28 +21,21 @@ import ( appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/processors/status" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" - provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" - "k8s.io/client-go/rest" - "k8s.io/klog/v2" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) -type provisioningClass interface { +// ProvisioningClass is an interface for ProvisioningRequests. +type ProvisioningClass interface { Provision([]*apiv1.Pod, []*apiv1.Node, []*appsv1.DaemonSet, map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, ca_errors.AutoscalerError) Initialize(*context.AutoscalingContext, *ca_processors.AutoscalingProcessors, *clusterstate.ClusterStateRegistry, @@ -55,23 +48,15 @@ type provReqOrchestrator struct { context *context.AutoscalingContext client *provreqclient.ProvisioningRequestClient injector *scheduling.HintingSimulator - provisioningClasses []provisioningClass + provisioningClasses []ProvisioningClass } // New return new orchestrator. -func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) { - client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) - if err != nil { - return nil, err - } - +func New(client *provreqclient.ProvisioningRequestClient, classes []ProvisioningClass) *provReqOrchestrator { return &provReqOrchestrator{ - client: client, - provisioningClasses: []provisioningClass{ - checkcapacity.New(client), - besteffortatomic.New(client), - }, - }, nil + client: client, + provisioningClasses: classes, + } } // Initialize initialize orchestrator. @@ -106,7 +91,6 @@ func (o *provReqOrchestrator) ScaleUp( o.context.ClusterSnapshot.Fork() defer o.context.ClusterSnapshot.Revert() - o.bookCapacity() // unschedulablePods pods should belong to one ProvisioningClass, so only one provClass should try to ScaleUp. for _, provClass := range o.provisioningClasses { @@ -125,35 +109,3 @@ func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize( ) (*status.ScaleUpStatus, ca_errors.AutoscalerError) { return nil, nil } - -func (o *provReqOrchestrator) bookCapacity() error { - provReqs, err := o.client.ProvisioningRequests() - if err != nil { - return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err) - } - podsToCreate := []*apiv1.Pod{} - for _, provReq := range provReqs { - if conditions.ShouldCapacityBeBooked(provReq) { - pods, err := provreq_pods.PodsForProvisioningRequest(provReq) - if err != nil { - // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. - // If there is an error, mark PR as invalid, because we won't be able to book capacity - // for it anyway. - conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) - if _, err := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil { - klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err) - } - continue - } - podsToCreate = append(podsToCreate, pods...) - } - } - if len(podsToCreate) == 0 { - return nil - } - // scheduling the pods to reserve capacity for provisioning request with BookCapacity condition - if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil { - klog.Warningf("Error during capacity booking: %v", err) - } - return nil -} diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index fa2fd55576c3..6e0ead7d6283 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -334,7 +334,7 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio orchestrator := &provReqOrchestrator{ client: client, - provisioningClasses: []provisioningClass{checkcapacity.New(client), besteffortatomic.New(client)}, + provisioningClasses: []ProvisioningClass{checkcapacity.New(client), besteffortatomic.New(client)}, } orchestrator.Initialize(&autoscalingContext, processors, clusterState, estimatorBuilder, taints.TaintConfig{}) return orchestrator, nodeInfos