diff --git a/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go index 01991b5da2e8..36fe980345ad 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go @@ -17,8 +17,6 @@ limitations under the License. package orchestrator import ( - "fmt" - appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" @@ -28,10 +26,8 @@ import ( ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/processors/provreq" "k8s.io/autoscaler/cluster-autoscaler/processors/status" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" - "k8s.io/client-go/rest" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -41,20 +37,16 @@ import ( type WrapperOrchestrator struct { // scaleUpRegularPods indicates that ScaleUp for regular pods will be run in the current CA loop, if they are present. scaleUpRegularPods bool - scaleUpOrchestrator scaleup.Orchestrator + podsOrchestrator scaleup.Orchestrator provReqOrchestrator scaleup.Orchestrator } // NewWrapperOrchestrator return WrapperOrchestrator -func NewWrapperOrchestrator(kubeConfig *rest.Config) (scaleup.Orchestrator, error) { - provReqOrchestrator, err := checkcapacity.New(kubeConfig) - if err != nil { - return nil, fmt.Errorf("failed create ScaleUp orchestrator for ProvisioningRequests, error: %v", err) - } +func NewWrapperOrchestrator(provReqOrchestrator scaleup.Orchestrator) *WrapperOrchestrator { return &WrapperOrchestrator{ - scaleUpOrchestrator: New(), + podsOrchestrator: New(), provReqOrchestrator: provReqOrchestrator, - }, nil + } } // Initialize initializes the orchestrator object with required fields. @@ -65,7 +57,7 @@ func (o *WrapperOrchestrator) Initialize( estimatorBuilder estimator.EstimatorBuilder, taintConfig taints.TaintConfig, ) { - o.scaleUpOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig) + o.podsOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig) o.provReqOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig) } @@ -86,7 +78,7 @@ func (o *WrapperOrchestrator) ScaleUp( } if o.scaleUpRegularPods { - return o.scaleUpOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos) + return o.podsOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos) } return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos) } @@ -110,5 +102,5 @@ func (o *WrapperOrchestrator) ScaleUpToNodeGroupMinSize( nodes []*apiv1.Node, nodeInfos map[string]*schedulerframework.NodeInfo, ) (*status.ScaleUpStatus, errors.AutoscalerError) { - return o.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos) + return o.podsOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos) } diff --git a/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go index 521b1f362599..2aba59ebf6db 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go @@ -42,7 +42,7 @@ const ( func TestScaleUp(t *testing.T) { o := WrapperOrchestrator{ provReqOrchestrator: &fakeScaleUp{provisioningRequestErrorMsg}, - scaleUpOrchestrator: &fakeScaleUp{regularPodsErrorMsg}, + podsOrchestrator: &fakeScaleUp{regularPodsErrorMsg}, } regularPods := []*apiv1.Pod{ BuildTestPod("pod-1", 1, 100), diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 92555ac6acdc..e12f9d002245 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -59,6 +59,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/previouscandidates" + provreqorchestrator "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/orchestrator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" "k8s.io/autoscaler/cluster-autoscaler/simulator/options" @@ -493,10 +494,12 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager())) restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) - scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(restConfig) + provreqOrchestrator, err := provreqorchestrator.New(restConfig) if err != nil { return nil, err } + scaleUpOrchestrator := orchestrator.NewWrapperOrchestrator(provreqOrchestrator) + opts.ScaleUpOrchestrator = scaleUpOrchestrator provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()}) if err != nil { diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_injector.go b/cluster-autoscaler/processors/provreq/provisioning_request_injector.go index 3af5f177eafa..3bcd06981622 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_injector.go +++ b/cluster-autoscaler/processors/provreq/provisioning_request_injector.go @@ -42,7 +42,7 @@ var SupportedProvisioningClasses = []string{v1beta1.ProvisioningClassCheckCapaci // ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list. type ProvisioningRequestPodsInjector struct { - client provisioningRequestClient + client provreqclient.ProvisioningRequestClient clock clock.PassiveClock } diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go index 0085c026e365..4c8d593e3573 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go +++ b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go @@ -30,15 +30,10 @@ type ProvisioningRequestProcessor interface { CleanUp() } -type provisioningRequestClient interface { - ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) - ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) -} - // CombinedProvReqProcessor is responsible for processing ProvisioningRequest for each ProvisioningClass // every CA loop and updating conditions for expired ProvisioningRequests. type CombinedProvReqProcessor struct { - client provisioningRequestClient + client provreqclient.ProvisioningRequestClient processors []ProvisioningRequestProcessor } diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go deleted file mode 100644 index 9e23f37a7fdc..000000000000 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go +++ /dev/null @@ -1,180 +0,0 @@ -/* -Copyright 2024 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package checkcapacity - -import ( - "fmt" - - appsv1 "k8s.io/api/apps/v1" - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/clusterstate" - "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/estimator" - "k8s.io/autoscaler/cluster-autoscaler/processors/status" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" - provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" - "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" - "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" - "k8s.io/autoscaler/cluster-autoscaler/utils/errors" - "k8s.io/autoscaler/cluster-autoscaler/utils/taints" - "k8s.io/client-go/rest" - "k8s.io/klog/v2" - - ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" - schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" -) - -type provisioningRequestClient interface { - ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) - ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) -} - -type provReqOrchestrator struct { - initialized bool - context *context.AutoscalingContext - client provisioningRequestClient - injector *scheduling.HintingSimulator -} - -// New return new orchestrator. -func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) { - client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) - if err != nil { - return nil, err - } - - return &provReqOrchestrator{client: client}, nil -} - -// Initialize initialize orchestrator. -func (o *provReqOrchestrator) Initialize( - autoscalingContext *context.AutoscalingContext, - processors *ca_processors.AutoscalingProcessors, - clusterStateRegistry *clusterstate.ClusterStateRegistry, - estimatorBuilder estimator.EstimatorBuilder, - taintConfig taints.TaintConfig, -) { - o.initialized = true - o.context = autoscalingContext - o.injector = scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker) -} - -// ScaleUp return if there is capacity in the cluster for pods from ProvisioningRequest. -func (o *provReqOrchestrator) ScaleUp( - unschedulablePods []*apiv1.Pod, - nodes []*apiv1.Node, - daemonSets []*appsv1.DaemonSet, - nodeInfos map[string]*schedulerframework.NodeInfo, -) (*status.ScaleUpStatus, errors.AutoscalerError) { - if !o.initialized { - return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) - } - if len(unschedulablePods) == 0 { - return &status.ScaleUpStatus{}, nil - } - if _, err := o.verifyProvisioningRequestClass(unschedulablePods); err != nil { - return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, err.Error())) - } - - o.context.ClusterSnapshot.Fork() - defer o.context.ClusterSnapshot.Revert() - if err := o.bookCapacity(); err != nil { - return nil, errors.NewAutoscalerError(errors.InternalError, err.Error()) - } - scaleUpIsSuccessful, err := o.scaleUp(unschedulablePods) - if err != nil { - return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error())) - } - if scaleUpIsSuccessful { - return &status.ScaleUpStatus{Result: status.ScaleUpSuccessful}, nil - } - return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable}, nil -} - -// ScaleUpToNodeGroupMinSize is no-op. -func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize( - nodes []*apiv1.Node, - nodeInfos map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) { - return nil, nil -} - -func (o *provReqOrchestrator) bookCapacity() error { - provReqs, err := o.client.ProvisioningRequests() - if err != nil { - return fmt.Errorf("Couldn't fetch ProvisioningRequests in the cluster: %v", err) - } - podsToCreate := []*apiv1.Pod{} - for _, provReq := range provReqs { - if conditions.ShouldCapacityBeBooked(provReq) { - pods, err := provreq_pods.PodsForProvisioningRequest(provReq) - if err != nil { - // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. - // If there is an error, mark PR as invalid, because we won't be able to book capacity - // for it anyway. - conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) - continue - } - podsToCreate = append(podsToCreate, pods...) - } - } - if len(podsToCreate) == 0 { - return nil - } - // scheduling the pods to reserve capacity for provisioning request with BookCapacity condition - if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil { - klog.Warningf("Error during capacity booking: %v", err) - } - return nil -} - -// Assuming that all unschedulable pods comes from one ProvisioningRequest. -func (o *provReqOrchestrator) scaleUp(unschedulablePods []*apiv1.Pod) (bool, error) { - provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name) - if err != nil { - return false, fmt.Errorf("Failed retrive ProvisioningRequest from unscheduled pods, err: %v", err) - } - st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true) - if len(st) < len(unschedulablePods) || err != nil { - conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now()) - return false, err - } - conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now()) - return true, nil -} - -// verifyPods check that all pods belong to one ProvisioningRequest that belongs to check-capacity ProvisioningRequst class. -func (o *provReqOrchestrator) verifyProvisioningRequestClass(unschedulablePods []*apiv1.Pod) (*provreqwrapper.ProvisioningRequest, error) { - provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name) - if err != nil { - return nil, fmt.Errorf("Failed retrive ProvisioningRequest from unscheduled pods, err: %v", err) - } - if provReq.V1Beta1().Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity { - return nil, fmt.Errorf("ProvisioningRequestClass is not %s", v1beta1.ProvisioningClassCheckCapacity) - } - for _, pod := range unschedulablePods { - if pod.Namespace != unschedulablePods[0].Namespace { - return nil, fmt.Errorf("Pods %s and %s are from different namespaces", pod.Name, unschedulablePods[0].Name) - } - if pod.OwnerReferences[0].Name != unschedulablePods[0].OwnerReferences[0].Name { - return nil, fmt.Errorf("Pods %s and %s have different OwnerReference", pod.Name, unschedulablePods[0].Name) - } - } - return provReq, nil -} diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/scaleup.go b/cluster-autoscaler/provisioningrequest/checkcapacity/scaleup.go new file mode 100644 index 000000000000..24368124b186 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/scaleup.go @@ -0,0 +1,105 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkcapacity + +import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/estimator" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type checkCapacityScaleUpMode struct { + context *context.AutoscalingContext + client provreqclient.ProvisioningRequestClient + injector *scheduling.HintingSimulator +} + +// New create check-capacity scale-up mode. +func New( + client provreqclient.ProvisioningRequestClient, +) *checkCapacityScaleUpMode { + return &checkCapacityScaleUpMode{client: client} +} + +func (o *checkCapacityScaleUpMode) Initialize( + autoscalingContext *context.AutoscalingContext, + processors *ca_processors.AutoscalingProcessors, + clusterStateRegistry *clusterstate.ClusterStateRegistry, + estimatorBuilder estimator.EstimatorBuilder, + taintConfig taints.TaintConfig, + injector *scheduling.HintingSimulator, +) { + o.context = autoscalingContext + o.injector = injector +} + +// ScaleUp return if there is capacity in the cluster for pods from ProvisioningRequest. +func (o *checkCapacityScaleUpMode) ScaleUp( + unschedulablePods []*apiv1.Pod, + nodes []*apiv1.Node, + daemonSets []*appsv1.DaemonSet, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, errors.AutoscalerError) { + if pr, err := provreqclient.VerifyProvisioningRequestClass(o.client, unschedulablePods, v1beta1.ProvisioningClassCheckCapacity); err != nil { + + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, err.Error())) + } else if pr == nil { + return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil + } + + o.context.ClusterSnapshot.Fork() + defer o.context.ClusterSnapshot.Revert() + + scaleUpIsSuccessful, err := o.scaleUp(unschedulablePods) + if err != nil { + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error())) + } + if scaleUpIsSuccessful { + return &status.ScaleUpStatus{Result: status.ScaleUpSuccessful}, nil + } + return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable}, nil +} + +// Assuming that all unschedulable pods comes from one ProvisioningRequest. +func (o *checkCapacityScaleUpMode) scaleUp(unschedulablePods []*apiv1.Pod) (bool, error) { + provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name) + if err != nil { + return false, fmt.Errorf("failed retrive ProvisioningRequest from unscheduled pods, err: %v", err) + } + st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true) + if len(st) < len(unschedulablePods) || err != nil { + conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now()) + return false, err + } + conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now()) + return true, nil +} diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go new file mode 100644 index 000000000000..3433e9f550a9 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -0,0 +1,149 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orchestrator + +import ( + "errors" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/estimator" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" + provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" + ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type scaleUpMode interface { + ScaleUp([]*apiv1.Pod, []*apiv1.Node, []*appsv1.DaemonSet, + map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, ca_errors.AutoscalerError) + Initialize(*context.AutoscalingContext, *ca_processors.AutoscalingProcessors, *clusterstate.ClusterStateRegistry, + estimator.EstimatorBuilder, taints.TaintConfig, *scheduling.HintingSimulator) +} + +// provReqOrchestrator is an orchestrator that contains orchestrators for all supported Provisioning Classes. +type provReqOrchestrator struct { + initialized bool + context *context.AutoscalingContext + client provreqclient.ProvisioningRequestClient + injector *scheduling.HintingSimulator + scaleUpModes []scaleUpMode +} + +// New return new orchestrator. +func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) { + client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) + if err != nil { + return nil, err + } + + return &provReqOrchestrator{client: client, scaleUpModes: []scaleUpMode{checkcapacity.New(client)}}, nil +} + +// Initialize initialize orchestrator. +func (o *provReqOrchestrator) Initialize( + autoscalingContext *context.AutoscalingContext, + processors *ca_processors.AutoscalingProcessors, + clusterStateRegistry *clusterstate.ClusterStateRegistry, + estimatorBuilder estimator.EstimatorBuilder, + taintConfig taints.TaintConfig, +) { + o.initialized = true + o.context = autoscalingContext + o.injector = scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker) + for _, mode := range o.scaleUpModes { + mode.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig, o.injector) + } +} + +// ScaleUp run ScaleUp for each Provisionining Class. As of now, CA pick one ProvisioningRequest, +// so only one ProvisioningClass return non empty scaleUp result. +// In case we implement multiple ProvisioningRequest ScaleUp, the function should return combined status +func (o *provReqOrchestrator) ScaleUp( + unschedulablePods []*apiv1.Pod, + nodes []*apiv1.Node, + daemonSets []*appsv1.DaemonSet, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, ca_errors.AutoscalerError) { + var orchestratorStatus *status.ScaleUpStatus + var combinedError error + o.context.ClusterSnapshot.Fork() + defer o.context.ClusterSnapshot.Revert() + o.bookCapacity() + for _, scaleUpMode := range o.scaleUpModes { + st, err := scaleUpMode.ScaleUp(unschedulablePods, nodes, daemonSets, nodeInfos) + errors.Join(combinedError, err) + if st != nil && st.Result != status.ScaleUpNotTried { + orchestratorStatus = st + } + } + if orchestratorStatus == nil { + orchestratorStatus = &status.ScaleUpStatus{Result: status.ScaleUpNotTried} + } + return orchestratorStatus, ca_errors.ToAutoscalerError(ca_errors.InternalError, combinedError) +} + +// ScaleUpToNodeGroupMinSize doesn't have implementation for ProvisioningRequest Orchestrator. +func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize( + nodes []*apiv1.Node, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, ca_errors.AutoscalerError) { + return nil, nil +} + +func (o *provReqOrchestrator) bookCapacity() error { + provReqs, err := o.client.ProvisioningRequests() + if err != nil { + return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err) + } + podsToCreate := []*apiv1.Pod{} + for _, provReq := range provReqs { + if conditions.ShouldCapacityBeBooked(provReq) { + pods, err := provreq_pods.PodsForProvisioningRequest(provReq) + if err != nil { + // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. + // If there is an error, mark PR as invalid, because we won't be able to book capacity + // for it anyway. + conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) + continue + } + podsToCreate = append(podsToCreate, pods...) + } + } + if len(podsToCreate) == 0 { + return nil + } + // scheduling the pods to reserve capacity for provisioning request with BookCapacity condition + if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil { + klog.Warningf("Error during capacity booking: %v", err) + } + return nil +} diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go similarity index 89% rename from cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go rename to cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index 62ad7a9d017a..44990fffe445 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package checkcapacity +package orchestrator import ( "context" @@ -31,11 +31,12 @@ import ( . "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" - "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -68,8 +69,9 @@ func TestScaleUp(t *testing.T) { err bool }{ { - name: "no ProvisioningRequests", - provReqs: []*provreqwrapper.ProvisioningRequest{}, + name: "no ProvisioningRequests", + provReqs: []*provreqwrapper.ProvisioningRequest{}, + scaleUpResult: status.ScaleUpNotTried, }, { name: "one ProvisioningRequest", @@ -87,7 +89,7 @@ func TestScaleUp(t *testing.T) { name: "pods from different ProvisioningRequest class", provReqs: []*provreqwrapper.ProvisioningRequest{newCpuProvReq, bookedCapacityProvReq, differentProvReqClass}, provReqToScaleUp: differentProvReqClass, - err: true, + scaleUpResult: status.ScaleUpNotTried, }, { name: "some capacity is booked, succesfull ScaleUp", @@ -107,12 +109,12 @@ func TestScaleUp(t *testing.T) { clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, allNodes, nil) prPods, err := pods.PodsForProvisioningRequest(tc.provReqToScaleUp) assert.NoError(t, err) + client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) orchestrator := &provReqOrchestrator{ - initialized: true, - context: &autoscalingContext, - client: provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...), - injector: scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker), + client: client, + scaleUpModes: []scaleUpMode{checkcapacity.New(client)}, } + orchestrator.Initialize(&autoscalingContext, nil, nil, nil, taints.TaintConfig{}) st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{}) if !tc.err { assert.NoError(t, err) diff --git a/cluster-autoscaler/provisioningrequest/pods/pods.go b/cluster-autoscaler/provisioningrequest/pods/pods.go index 4e76fb8dea02..ebf2712292c0 100644 --- a/cluster-autoscaler/provisioningrequest/pods/pods.go +++ b/cluster-autoscaler/provisioningrequest/pods/pods.go @@ -48,6 +48,7 @@ func PodsForProvisioningRequest(pr *provreqwrapper.ProvisioningRequest) ([]*v1.P for i, podSet := range podSets { for j := 0; j < int(podSet.Count); j++ { pod, err := controller.GetPodFromTemplate(&podSet.PodTemplate, pr.RuntimeObject(), ownerReference(pr)) + pod.OwnerReferences = []metav1.OwnerReference{*ownerReference(pr)} if err != nil { return nil, fmt.Errorf("while creating pod for pr: %s/%s podSet: %d, got error: %w", pr.Namespace(), pr.Name(), i, err) } diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/client.go b/cluster-autoscaler/provisioningrequest/provreqclient/client.go index 74d9d958b0b8..e88588e9de54 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/client.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/client.go @@ -41,15 +41,21 @@ const ( provisioningRequestClientCallTimeout = 4 * time.Second ) -// ProvisioningRequestClient represents client for v1beta1 ProvReq CRD. -type ProvisioningRequestClient struct { +// ProvisioningRequestClient is an interface with methods for v1beta1 ProvReq CRD client. +type ProvisioningRequestClient interface { + ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) + ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) +} + +// provisioningRequestClient represents client for v1beta1 ProvReq CRD. +type provisioningRequestClient struct { client versioned.Interface provReqLister listers.ProvisioningRequestLister podTemplLister v1.PodTemplateLister } // NewProvisioningRequestClient configures and returns a provisioningRequestClient. -func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequestClient, error) { +func NewProvisioningRequestClient(kubeConfig *rest.Config) (*provisioningRequestClient, error) { prClient, err := newPRClient(kubeConfig) if err != nil { return nil, fmt.Errorf("Failed to create Provisioning Request client: %v", err) @@ -70,7 +76,7 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequest return nil, err } - return &ProvisioningRequestClient{ + return &provisioningRequestClient{ client: prClient, provReqLister: provReqLister, podTemplLister: podTemplLister, @@ -78,7 +84,7 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequest } // ProvisioningRequest gets a specific ProvisioningRequest CR. -func (c *ProvisioningRequestClient) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) { +func (c *provisioningRequestClient) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) { v1Beta1PR, err := c.provReqLister.ProvisioningRequests(namespace).Get(name) if err != nil { return nil, err @@ -91,7 +97,7 @@ func (c *ProvisioningRequestClient) ProvisioningRequest(namespace, name string) } // ProvisioningRequests gets all ProvisioningRequest CRs. -func (c *ProvisioningRequestClient) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) { +func (c *provisioningRequestClient) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) { v1Beta1PRs, err := c.provReqLister.List(labels.Everything()) if err != nil { return nil, fmt.Errorf("error fetching provisioningRequests: %w", err) @@ -108,7 +114,7 @@ func (c *ProvisioningRequestClient) ProvisioningRequests() ([]*provreqwrapper.Pr } // FetchPodTemplates fetches PodTemplates referenced by the Provisioning Request. -func (c *ProvisioningRequestClient) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) { +func (c *provisioningRequestClient) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) { podTemplates := make([]*apiv1.PodTemplate, 0, len(pr.Spec.PodSets)) for _, podSpec := range pr.Spec.PodSets { podTemplate, err := c.podTemplLister.PodTemplates(pr.Namespace).Get(podSpec.PodTemplateRef.Name) @@ -157,3 +163,32 @@ func newPodTemplatesLister(client *kubernetes.Clientset, stopChannel <-chan stru klog.V(2).Info("Successful initial Pod Template sync") return podTemplLister, nil } + +// VerifyProvisioningRequestClass check that all pods belong to one ProvisioningRequest that belongs to check-capacity ProvisioningRequst class. +func VerifyProvisioningRequestClass(client ProvisioningRequestClient, unschedulablePods []*apiv1.Pod, className string) (*provreqwrapper.ProvisioningRequest, error) { + if len(unschedulablePods) == 0 { + return nil, nil + } + if unschedulablePods[0].OwnerReferences == nil || len(unschedulablePods[0].OwnerReferences) == 0 { + return nil, fmt.Errorf("pod %s has no OwnerReference", unschedulablePods[0].Name) + } + provReq, err := client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name) + if err != nil { + return nil, fmt.Errorf("failed retrive ProvisioningRequest from unscheduled pods, err: %v", err) + } + if provReq.V1Beta1().Spec.ProvisioningClassName != className { + return nil, nil + } + for _, pod := range unschedulablePods { + if pod.Namespace != unschedulablePods[0].Namespace { + return nil, fmt.Errorf("pods %s and %s are from different namespaces", pod.Name, unschedulablePods[0].Name) + } + if pod.OwnerReferences == nil || len(pod.OwnerReferences) == 0 { + return nil, fmt.Errorf("pod %s has no OwnerReference", pod.Name) + } + if pod.OwnerReferences[0].Name != unschedulablePods[0].OwnerReferences[0].Name { + return nil, fmt.Errorf("pods %s and %s have different OwnerReference", pod.Name, unschedulablePods[0].Name) + } + } + return provReq, nil +} diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go b/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go index 3c6e72a23775..b7844a638b0f 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go @@ -19,9 +19,15 @@ package provreqclient import ( "context" "testing" + "time" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" ) func TestFetchPodTemplates(t *testing.T) { @@ -42,3 +48,67 @@ func TestFetchPodTemplates(t *testing.T) { t.Errorf("Template mismatch, diff (-want +got):\n%s", diff) } } + +func TestVerifyProvisioningRequestClass(t *testing.T) { + checkCapacityProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "check-capacity", "1m", "100", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) + customProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "custom", "1m", "100", "", int32(100), false, time.Now(), "custom") + checkCapacityPods, _ := pods.PodsForProvisioningRequest(checkCapacityProvReq) + customProvReqPods, _ := pods.PodsForProvisioningRequest(customProvReq) + regularPod := BuildTestPod("p1", 600, 100) + client := NewFakeProvisioningRequestClient(context.Background(), t, checkCapacityProvReq, customProvReq) + testCases := []struct { + name string + pods []*apiv1.Pod + className string + err bool + pr *provreqwrapper.ProvisioningRequest + }{ + { + name: "no pods", + pods: []*apiv1.Pod{}, + className: "some-class", + }, + { + name: "pods from one Provisioning Class", + pods: checkCapacityPods, + className: v1beta1.ProvisioningClassCheckCapacity, + pr: checkCapacityProvReq, + }, + { + name: "pods from different Provisioning Classes", + pods: append(checkCapacityPods, customProvReqPods...), + className: v1beta1.ProvisioningClassCheckCapacity, + err: true, + }, + { + name: "regular pod", + pods: []*apiv1.Pod{regularPod}, + className: v1beta1.ProvisioningClassCheckCapacity, + err: true, + }, + { + name: "provreq pods and regular pod", + pods: append(checkCapacityPods, regularPod), + className: v1beta1.ProvisioningClassCheckCapacity, + err: true, + }, + { + name: "wrong Provisioning Class name", + pods: customProvReqPods, + className: v1beta1.ProvisioningClassCheckCapacity, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + pr, err := VerifyProvisioningRequestClass(client, tc.pods, tc.className) + assert.Equal(t, pr, tc.pr) + if tc.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go b/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go index 7dea1e315447..a410aa9cfa53 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go @@ -35,7 +35,7 @@ import ( ) // NewFakeProvisioningRequestClient mock ProvisioningRequestClient for tests. -func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *ProvisioningRequestClient { +func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *provisioningRequestClient { t.Helper() provReqClient := fake.NewSimpleClientset() podTemplClient := fake_kubernetes.NewSimpleClientset() @@ -60,7 +60,7 @@ func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ... if err != nil { t.Fatalf("Failed to create Provisioning Request lister. Error was: %v", err) } - return &ProvisioningRequestClient{ + return &provisioningRequestClient{ client: provReqClient, provReqLister: provReqLister, podTemplLister: podTemplLister, diff --git a/cluster-autoscaler/utils/errors/errors.go b/cluster-autoscaler/utils/errors/errors.go index 2276e53f3ee9..58066e8ed939 100644 --- a/cluster-autoscaler/utils/errors/errors.go +++ b/cluster-autoscaler/utils/errors/errors.go @@ -81,6 +81,9 @@ func ToAutoscalerError(defaultType AutoscalerErrorType, err error) AutoscalerErr if e, ok := err.(AutoscalerError); ok { return e } + if err == nil { + return nil + } return NewAutoscalerError(defaultType, "%v", err) }