diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index 417d4062fb27..4453769ad410 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -88,7 +88,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( nodeInfos map[string]*schedulerframework.NodeInfo, ) (*status.ScaleUpStatus, errors.AutoscalerError) { if !o.initialized { - return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) } loggingQuota := klogx.PodsLoggingQuota() @@ -103,7 +103,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( upcomingNodes, aErr := o.UpcomingNodes(nodeInfos) if aErr != nil { - return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not get upcoming nodes: ")) + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not get upcoming nodes: ")) } klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes)) @@ -112,7 +112,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( var err error nodeGroups, nodeInfos, err = o.processors.NodeGroupListProcessor.Process(o.autoscalingContext, nodeGroups, nodeInfos, unschedulablePods) if err != nil { - return scaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err)) + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err)) } } @@ -121,7 +121,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes) if aErr != nil { - return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: ")) + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: ")) } now := time.Now() @@ -186,7 +186,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( newNodes, aErr := o.GetCappedNewNodeCount(bestOption.NodeCount, len(nodes)+len(upcomingNodes)) if aErr != nil { - return scaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr) + return status.UpdateScaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr) } createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0) @@ -194,7 +194,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( oldId := bestOption.NodeGroup.Id() createNodeGroupResult, aErr := o.processors.NodeGroupManager.CreateNodeGroup(o.autoscalingContext, bestOption.NodeGroup) if aErr != nil { - return scaleUpError( + return status.UpdateScaleUpError( &status.ScaleUpStatus{FailedCreationNodeGroups: []cloudprovider.NodeGroup{bestOption.NodeGroup}, PodsTriggeredScaleUp: bestOption.Pods}, aErr) } @@ -253,7 +253,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( if !found { // This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup. klog.Errorf("No node info for: %s", bestOption.NodeGroup.Id()) - return scaleUpError( + return status.UpdateScaleUpError( &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, errors.NewAutoscalerError( errors.CloudProviderError, @@ -263,7 +263,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( // Apply upper limits for CPU and memory. newNodes, aErr = o.resourceManager.ApplyLimits(o.autoscalingContext, newNodes, resourcesLeft, nodeInfo, bestOption.NodeGroup) if aErr != nil { - return scaleUpError( + return status.UpdateScaleUpError( &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, aErr) } @@ -283,7 +283,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( scaleUpInfos, aErr := o.processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(o.autoscalingContext, targetNodeGroups, newNodes) if aErr != nil { - return scaleUpError( + return status.UpdateScaleUpError( &status.ScaleUpStatus{CreateNodeGroupResults: createNodeGroupResults, PodsTriggeredScaleUp: bestOption.Pods}, aErr) } @@ -291,7 +291,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos) aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now) if aErr != nil { - return scaleUpError( + return status.UpdateScaleUpError( &status.ScaleUpStatus{ CreateNodeGroupResults: createNodeGroupResults, FailedResizeNodeGroups: failedNodeGroups, @@ -322,7 +322,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize( nodeInfos map[string]*schedulerframework.NodeInfo, ) (*status.ScaleUpStatus, errors.AutoscalerError) { if !o.initialized { - return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) } now := time.Now() @@ -331,7 +331,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize( resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingContext, nodeInfos, nodes) if aErr != nil { - return scaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: ")) + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: ")) } for _, ng := range nodeGroups { @@ -397,7 +397,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize( klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos) aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now) if aErr != nil { - return scaleUpError( + return status.UpdateScaleUpError( &status.ScaleUpStatus{ FailedResizeNodeGroups: failedNodeGroups, }, @@ -717,9 +717,3 @@ func GetPodsAwaitingEvaluation(egs []*equivalence.PodGroup, bestOption string) [ } return awaitsEvaluation } - -func scaleUpError(s *status.ScaleUpStatus, err errors.AutoscalerError) (*status.ScaleUpStatus, errors.AutoscalerError) { - s.ScaleUpError = &err - s.Result = status.ScaleUpError - return s, err -} diff --git a/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go new file mode 100644 index 000000000000..afd539d9ac6b --- /dev/null +++ b/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go @@ -0,0 +1,112 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orchestrator + +import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/scaleup" + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/provreq" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + "k8s.io/client-go/rest" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +// WrapperOrchestrator is an orchestrator which wraps Scale Up for ProvisioningRequests and regular pods. +// Each loop WrapperOrchestrator split out regular and pods from ProvisioningRequest, pick one group that +// wasn't picked in the last loop and run ScaleUp for it. +type WrapperOrchestrator struct { + // scaleUpRegularPods indicates that ScaleUp for regular pods will be run in the current CA loop, if they are present. + scaleUpRegularPods bool + scaleUpOrchestrator scaleup.Orchestrator + provReqOrchestrator scaleup.Orchestrator +} + +// NewWrapperOrchestrator return WrapperOrchestrator +func NewWrapperOrchestrator(kubeConfig *rest.Config) (scaleup.Orchestrator, error) { + provReqOrchestrator, err := checkcapacity.New(kubeConfig) + if err != nil { + return nil, fmt.Errorf("failed create ScaleUp orchestrator for ProvisioningRequests, error: %v", err) + } + return &WrapperOrchestrator{ + scaleUpOrchestrator: New(), + provReqOrchestrator: provReqOrchestrator, + }, nil +} + +// Initialize initializes the orchestrator object with required fields. +func (o *WrapperOrchestrator) Initialize( + autoscalingContext *context.AutoscalingContext, + processors *ca_processors.AutoscalingProcessors, + clusterStateRegistry *clusterstate.ClusterStateRegistry, + taintConfig taints.TaintConfig, +) { + o.scaleUpOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, taintConfig) + o.provReqOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, taintConfig) +} + +// ScaleUp run scaleUp function for regular pods of pods from ProvisioningRequest. +func (o *WrapperOrchestrator) ScaleUp( + unschedulablePods []*apiv1.Pod, + nodes []*apiv1.Node, + daemonSets []*appsv1.DaemonSet, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, errors.AutoscalerError) { + defer func() { o.scaleUpRegularPods = !o.scaleUpRegularPods }() + + provReqPods, regularPods := splitOut(unschedulablePods) + if len(provReqPods) == 0 { + o.scaleUpRegularPods = true + } else if len(regularPods) == 0 { + o.scaleUpRegularPods = false + } + + if o.scaleUpRegularPods { + return o.scaleUpOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos) + } + return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos) +} + +func splitOut(unschedulablePods []*apiv1.Pod) (provReqPods, regularPods []*apiv1.Pod) { + for _, pod := range unschedulablePods { + if _, ok := pod.Annotations[provreq.ProvisioningRequestPodAnnotationKey]; ok { + provReqPods = append(provReqPods, pod) + } else { + regularPods = append(regularPods, pod) + } + } + return +} + +// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes +// than the configured min size. The source of truth for the current node group +// size is the TargetSize queried directly from cloud providers. Returns +// appropriate status or error if an unexpected error occurred. +func (o *WrapperOrchestrator) ScaleUpToNodeGroupMinSize( + nodes []*apiv1.Node, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, errors.AutoscalerError) { + return o.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos) +} diff --git a/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go new file mode 100644 index 000000000000..be61d8511db7 --- /dev/null +++ b/cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator_test.go @@ -0,0 +1,90 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orchestrator + +import ( + "testing" + + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/provreq" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +const ( + provisioningRequestErrorMsg = "provisioningRequestError" + regularPodsErrorMsg = "regularPodsError" +) + +func TestScaleUp(t *testing.T) { + o := WrapperOrchestrator{ + provReqOrchestrator: &fakeScaleUp{provisioningRequestErrorMsg}, + scaleUpOrchestrator: &fakeScaleUp{regularPodsErrorMsg}, + } + regularPods := []*apiv1.Pod{ + BuildTestPod("pod-1", 1, 100), + BuildTestPod("pod-2", 1, 100), + } + provReqPods := []*apiv1.Pod{ + BuildTestPod("pr-pod-1", 1, 100), + BuildTestPod("pr-pod-2", 1, 100), + } + for _, pod := range provReqPods { + pod.Annotations[provreq.ProvisioningRequestPodAnnotationKey] = "true" + } + unschedulablePods := append(regularPods, provReqPods...) + _, err := o.ScaleUp(unschedulablePods, nil, nil, nil) + assert.Equal(t, err.Error(), provisioningRequestErrorMsg) + _, err = o.ScaleUp(unschedulablePods, nil, nil, nil) + assert.Equal(t, err.Error(), regularPodsErrorMsg) +} + +type fakeScaleUp struct { + errorMsg string +} + +func (f *fakeScaleUp) ScaleUp( + unschedulablePods []*apiv1.Pod, + nodes []*apiv1.Node, + daemonSets []*appsv1.DaemonSet, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, errors.AutoscalerError) { + return nil, errors.NewAutoscalerError(errors.InternalError, f.errorMsg) +} + +func (f *fakeScaleUp) Initialize( + autoscalingContext *context.AutoscalingContext, + processors *ca_processors.AutoscalingProcessors, + clusterStateRegistry *clusterstate.ClusterStateRegistry, + taintConfig taints.TaintConfig, +) { +} + +func (f *fakeScaleUp) ScaleUpToNodeGroupMinSize( + nodes []*apiv1.Node, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, errors.AutoscalerError) { + return nil, nil +} diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 0c4fcd569c57..d0480eb1f99a 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -29,6 +29,7 @@ import ( "time" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" + "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -468,6 +469,15 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) drainabilityRules := rules.Default(deleteOptions) + scaleUpOrchestrator := orchestrator.New() + if *provisioningRequestsEnabled { + kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) + scaleUpOrchestrator, err = orchestrator.NewWrapperOrchestrator(kubeClient) + if err != nil { + return nil, err + } + } + opts := core.AutoscalerOptions{ AutoscalingOptions: autoscalingOptions, ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(), @@ -477,6 +487,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter PredicateChecker: predicateChecker, DeleteOptions: deleteOptions, DrainabilityRules: drainabilityRules, + ScaleUpOrchestrator: scaleUpOrchestrator, } opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions) diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processors.go b/cluster-autoscaler/processors/provreq/provisioning_request_processors.go index c48d489c0746..a75f62dfe67d 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_processors.go +++ b/cluster-autoscaler/processors/provreq/provisioning_request_processors.go @@ -28,7 +28,8 @@ import ( ) const ( - provisioningRequestPodAnnotationKey = "cluster-autoscaler.kubernetes.io/consume-provisioning-request" + // ProvisioningRequestPodAnnotationKey is an annotation on pod that indicate that pod was created by ProvisioningRequest. + ProvisioningRequestPodAnnotationKey = "cluster-autoscaler.kubernetes.io/consume-provisioning-request" maxProvReqEvent = 50 ) @@ -101,6 +102,6 @@ func provisioningRequestName(pod *v1.Pod) (string, bool) { if pod == nil || pod.Annotations == nil { return "", false } - provReqName, found := pod.Annotations[provisioningRequestPodAnnotationKey] + provReqName, found := pod.Annotations[ProvisioningRequestPodAnnotationKey] return provReqName, found } diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processors_test.go b/cluster-autoscaler/processors/provreq/provisioning_request_processors_test.go index a245c701cf34..07c224dadac3 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_processors_test.go +++ b/cluster-autoscaler/processors/provreq/provisioning_request_processors_test.go @@ -31,10 +31,10 @@ import ( func TestProvisioningRequestPodsFilter(t *testing.T) { prPod1 := BuildTestPod("pr-pod-1", 500, 10) - prPod1.Annotations[provisioningRequestPodAnnotationKey] = "pr-class" + prPod1.Annotations[ProvisioningRequestPodAnnotationKey] = "pr-class" prPod2 := BuildTestPod("pr-pod-2", 500, 10) - prPod2.Annotations[provisioningRequestPodAnnotationKey] = "pr-class-2" + prPod2.Annotations[ProvisioningRequestPodAnnotationKey] = "pr-class-2" pod1 := BuildTestPod("pod-1", 500, 10) pod2 := BuildTestPod("pod-2", 500, 10) @@ -91,7 +91,7 @@ func TestEventManager(t *testing.T) { for i := 0; i < 10; i++ { prPod := BuildTestPod(fmt.Sprintf("pr-pod-%d", i), 10, 10) - prPod.Annotations[provisioningRequestPodAnnotationKey] = "pr-class" + prPod.Annotations[ProvisioningRequestPodAnnotationKey] = "pr-class" unscheduledPods = append(unscheduledPods, prPod) } got, err := prFilter.Process(ctx, unscheduledPods) diff --git a/cluster-autoscaler/processors/status/scale_up_status_processor.go b/cluster-autoscaler/processors/status/scale_up_status_processor.go index 7386a5fa01d4..5eb780843d7a 100644 --- a/cluster-autoscaler/processors/status/scale_up_status_processor.go +++ b/cluster-autoscaler/processors/status/scale_up_status_processor.go @@ -99,3 +99,10 @@ func (p *NoOpScaleUpStatusProcessor) Process(context *context.AutoscalingContext // CleanUp cleans up the processor's internal structures. func (p *NoOpScaleUpStatusProcessor) CleanUp() { } + +// UpdateScaleUpError updates ScaleUpStatus. +func UpdateScaleUpError(s *ScaleUpStatus, err errors.AutoscalerError) (*ScaleUpStatus, errors.AutoscalerError) { + s.ScaleUpError = &err + s.Result = ScaleUpError + return s, err +} diff --git a/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go b/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go index 4309091c6d6a..881b9ca29b4b 100644 --- a/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go +++ b/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1/types.go @@ -175,9 +175,12 @@ type Detail string // The following constants list all currently available Conditions Type values. // See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition const ( - // CapacityAvailable indicates that all of the requested resources were - // already available in the cluster. - CapacityAvailable string = "CapacityAvailable" + // CapacityFound indicates that all of the requested resources were + // fount in the cluster. + CapacityFound string = "CapacityFound" + // Expired indicates that the ProvisioningRequest had CapacityFound condition before + // and the reservation time is expired. + BookingExpired string = "BookingExpired" // Provisioned indicates that all of the requested resources were created // and are available in the cluster. CA will set this condition when the // VM creation finishes successfully. diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/condition.go b/cluster-autoscaler/provisioningrequest/checkcapacity/condition.go new file mode 100644 index 000000000000..896428371095 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/condition.go @@ -0,0 +1,94 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkcapacity + +import ( + "time" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/klog/v2" +) + +const ( + defaultReservationTime = 10 * time.Minute + defaultExpirationTime = 7 * 24 * time.Hour // 7 days +) + +const ( + //CapacityIsNotFoundReason is added when capacity was not found in the cluster. + CapacityIsNotFoundReason = "CapacityIsNotFound" + //CapacityIsFoundReason is added when capacity was found in the cluster. + CapacityIsFoundReason = "CapacityIsFound" + //FailedToBookCapacityReason is added when Cluster Autoscaler failed to book capacity in the cluster. + FailedToBookCapacityReason = "FailedToBookCapacity" +) + +func shouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool { + if pr.V1Beta1().Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity { + return false + } + if pr.Conditions() == nil || len(pr.Conditions()) == 0 { + return false + } + book := false + for _, condition := range pr.Conditions() { + if checkConditionType(condition, v1beta1.BookingExpired) || checkConditionType(condition, v1beta1.Failed) { + return false + } else if checkConditionType(condition, v1beta1.CapacityFound) { + book = true + } + } + return book +} + +func setCondition(pr *provreqwrapper.ProvisioningRequest, conditionType string, conditionStatus v1.ConditionStatus, reason, message string) { + var newConditions []v1.Condition + newCondition := v1.Condition{ + Type: conditionType, + Status: conditionStatus, + ObservedGeneration: pr.V1Beta1().GetObjectMeta().GetGeneration(), + LastTransitionTime: v1.Now(), + Reason: reason, + Message: message, + } + prevConditions := pr.Conditions() + switch conditionType { + case v1beta1.CapacityFound, v1beta1.BookingExpired, v1beta1.Failed: + conditionFound := false + for _, condition := range prevConditions { + if condition.Type == conditionType { + conditionFound = true + newConditions = append(newConditions, newCondition) + } else { + newConditions = append(newConditions, condition) + } + } + if !conditionFound { + newConditions = append(prevConditions, newCondition) + } + default: + klog.Errorf("Unknown (conditionType; conditionStatus) pair: (%s; %s) ", conditionType, conditionStatus) + newConditions = prevConditions + } + pr.SetConditions(newConditions) +} + +func checkConditionType(condition v1.Condition, conditionType string) bool { + return condition.Type == conditionType && condition.Status == v1.ConditionTrue +} diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/condition_test.go b/cluster-autoscaler/provisioningrequest/checkcapacity/condition_test.go new file mode 100644 index 000000000000..5a8efe78a770 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/condition_test.go @@ -0,0 +1,274 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkcapacity + +import ( + "testing" + + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" +) + +func TestBookCapacity(t *testing.T) { + tests := []struct { + name string + prConditions []v1.Condition + want bool + }{ + { + name: "BookingExpired", + prConditions: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + { + Type: v1beta1.BookingExpired, + Status: v1.ConditionTrue, + }, + }, + want: false, + }, + { + name: "Failed", + prConditions: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + { + Type: v1beta1.Failed, + Status: v1.ConditionTrue, + }, + }, + want: false, + }, + { + name: "empty conditions", + want: false, + }, + { + name: "Capacity found and provisioned", + prConditions: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + { + Type: v1beta1.Provisioned, + Status: v1.ConditionTrue, + }, + }, + want: true, + }, + { + name: "Capacity is not found", + prConditions: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionFalse, + }, + }, + want: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + pr := provreqwrapper.NewV1Beta1ProvisioningRequest( + &v1beta1.ProvisioningRequest{ + Spec: v1beta1.ProvisioningRequestSpec{ + ProvisioningClassName: v1beta1.ProvisioningClassCheckCapacity, + }, + Status: v1beta1.ProvisioningRequestStatus{ + Conditions: test.prConditions, + }, + }, nil) + got := shouldCapacityBeBooked(pr) + if got != test.want { + t.Errorf("Want: %v, got: %v", test.want, got) + } + }) + } +} + +func TestSetCondition(t *testing.T) { + tests := []struct { + name string + oldConditions []v1.Condition + newType string + newStatus v1.ConditionStatus + want []v1.Condition + }{ + { + name: "CapacityFound added, empty conditions before", + newType: v1beta1.CapacityFound, + newStatus: v1.ConditionTrue, + want: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + }, + }, + { + name: "CapacityFound updated", + oldConditions: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionFalse, + }, + }, + newType: v1beta1.CapacityFound, + newStatus: v1.ConditionTrue, + want: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + }, + }, + { + name: "Failed added, non-empty conditions before", + oldConditions: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + }, + newType: v1beta1.Failed, + newStatus: v1.ConditionTrue, + want: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + { + Type: v1beta1.Failed, + Status: v1.ConditionTrue, + }, + }, + }, + { + name: "Provisioned condition type, conditions are not updated", + oldConditions: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + }, + newType: v1beta1.Provisioned, + newStatus: v1.ConditionFalse, + want: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + }, + }, + { + name: "Unknown condition status, conditions are updated", + oldConditions: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + }, + newType: v1beta1.Failed, + newStatus: v1.ConditionUnknown, + want: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + { + Type: v1beta1.Failed, + Status: v1.ConditionUnknown, + }, + }, + }, + { + name: "Unknown condition type, conditions are not updated", + oldConditions: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + }, + newType: "Unknown", + newStatus: v1.ConditionTrue, + want: []v1.Condition{ + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + }, + }, + { + name: "BookingExpired, empty conditions before", + newType: v1beta1.BookingExpired, + newStatus: v1.ConditionFalse, + want: []v1.Condition{ + { + Type: v1beta1.BookingExpired, + Status: v1.ConditionFalse, + }, + }, + }, + { + name: "Capacity found with unknown condition before", + oldConditions: []v1.Condition{ + { + Type: v1beta1.Provisioned, + Status: v1.ConditionTrue, + }, + }, + newType: v1beta1.CapacityFound, + newStatus: v1.ConditionTrue, + want: []v1.Condition{ + { + Type: v1beta1.Provisioned, + Status: v1.ConditionTrue, + }, + { + Type: v1beta1.CapacityFound, + Status: v1.ConditionTrue, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + pr := provreqwrapper.NewV1Beta1ProvisioningRequest( + &v1beta1.ProvisioningRequest{ + Status: v1beta1.ProvisioningRequestStatus{ + Conditions: test.oldConditions, + }, + }, nil) + setCondition(pr, test.newType, test.newStatus, "", "") + got := pr.Conditions() + if len(got) > 2 || len(got) != len(test.want) || got[0].Type != test.want[0].Type || got[0].Status != test.want[0].Status { + t.Errorf("want %v, got: %v", test.want, got) + } + if len(got) == 2 { + if got[1].Type != test.want[1].Type || got[1].Status != test.want[1].Status { + t.Errorf("want %v, got: %v", test.want, got) + } + } + }) + } +} diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go new file mode 100644 index 000000000000..f8e26eabd7d0 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go @@ -0,0 +1,177 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkcapacity + +import ( + "fmt" + + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type provisioningRequestClient interface { + ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) + ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) +} + +type provReqOrchestrator struct { + initialized bool + context *context.AutoscalingContext + client provisioningRequestClient + injector *scheduling.HintingSimulator +} + +// New return new orchestrator. +func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) { + client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) + if err != nil { + return nil, err + } + + return &provReqOrchestrator{client: client}, nil +} + +// Initialize initialize orchestrator. +func (o *provReqOrchestrator) Initialize( + autoscalingContext *context.AutoscalingContext, + processors *ca_processors.AutoscalingProcessors, + clusterStateRegistry *clusterstate.ClusterStateRegistry, + taintConfig taints.TaintConfig, +) { + o.initialized = true + o.context = autoscalingContext + o.injector = scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker) +} + +// ScaleUp return if there is capacity in the cluster for pods from ProvisioningRequest. +func (o *provReqOrchestrator) ScaleUp( + unschedulablePods []*apiv1.Pod, + nodes []*apiv1.Node, + daemonSets []*appsv1.DaemonSet, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, errors.AutoscalerError) { + if !o.initialized { + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized")) + } + if len(unschedulablePods) == 0 { + return &status.ScaleUpStatus{}, nil + } + if _, err := o.verifyProvisioningRequestClass(unschedulablePods); err != nil { + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, err.Error())) + } + + o.context.ClusterSnapshot.Fork() + defer o.context.ClusterSnapshot.Revert() + if err := o.bookCapacity(); err != nil { + return nil, errors.NewAutoscalerError(errors.InternalError, err.Error()) + } + scaleUpIsSuccessful, err := o.scaleUp(unschedulablePods) + if err != nil { + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error())) + } + if scaleUpIsSuccessful { + return &status.ScaleUpStatus{Result: status.ScaleUpSuccessful}, nil + } + return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable}, nil +} + +// ScaleUpToNodeGroupMinSize is no-op. +func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize( + nodes []*apiv1.Node, + nodeInfos map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) { + return nil, nil +} + +func (o *provReqOrchestrator) bookCapacity() error { + provReqs, err := o.client.ProvisioningRequests() + if err != nil { + return fmt.Errorf("Couldn't fetch ProvisioningRequests in the cluster: %v", err) + } + podsToCreate := []*apiv1.Pod{} + for _, provReq := range provReqs { + if shouldCapacityBeBooked(provReq) { + pods, err := provreq_pods.PodsForProvisioningRequest(provReq) + if err != nil { + // ClusterAutoscaler was able to create pods before, so we shouldn't have error here. + // If there is an error, mark PR as invalid, because we won't be able to book capacity + // for it anyway. + setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err)) + continue + } + podsToCreate = append(podsToCreate, pods...) + } + } + if len(podsToCreate) == 0 { + return nil + } + // scheduling the pods to reserve capacity for provisioning request with BookCapacity condition + if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil { + klog.Warningf("Error during capacity booking: %v", err) + } + return nil +} + +// Assuming that all unschedulable pods comes from one ProvisioningRequest. +func (o *provReqOrchestrator) scaleUp(unschedulablePods []*apiv1.Pod) (bool, error) { + provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name) + if err != nil { + return false, fmt.Errorf("Failed retrive ProvisioningRequest from unscheduled pods, err: %v", err) + } + st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true) + if len(st) < len(unschedulablePods) || err != nil { + setCondition(provReq, v1beta1.CapacityFound, metav1.ConditionFalse, CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.") + return false, err + } + setCondition(provReq, v1beta1.CapacityFound, metav1.ConditionTrue, CapacityIsFoundReason, "Capacity is found in the cluster.") + return true, nil +} + +// verifyPods check that all pods belong to one ProvisioningRequest that belongs to check-capacity ProvisioningRequst class. +func (o *provReqOrchestrator) verifyProvisioningRequestClass(unschedulablePods []*apiv1.Pod) (*provreqwrapper.ProvisioningRequest, error) { + provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name) + if err != nil { + return nil, fmt.Errorf("Failed retrive ProvisioningRequest from unscheduled pods, err: %v", err) + } + if provReq.V1Beta1().Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity { + return nil, fmt.Errorf("ProvisioningRequestClass is not %s", v1beta1.ProvisioningClassCheckCapacity) + } + for _, pod := range unschedulablePods { + if pod.Namespace != unschedulablePods[0].Namespace { + return nil, fmt.Errorf("Pods %s and %s are from different namespaces", pod.Name, unschedulablePods[0].Name) + } + if pod.OwnerReferences[0].Name != unschedulablePods[0].OwnerReferences[0].Name { + return nil, fmt.Errorf("Pods %s and %s have different OwnerReference", pod.Name, unschedulablePods[0].Name) + } + } + return provReq, nil +} diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go new file mode 100644 index 000000000000..4fe874b96c2e --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator_test.go @@ -0,0 +1,125 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package checkcapacity + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/config" + . "k8s.io/autoscaler/cluster-autoscaler/core/test" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +func TestScaleUp(t *testing.T) { + allNodes := []*apiv1.Node{} + for i := 0; i < 100; i++ { + name := fmt.Sprintf("test-cpu-node-%d", i) + node := BuildTestNode(name, 100, 10) + allNodes = append(allNodes, node) + } + for i := 0; i < 100; i++ { + name := fmt.Sprintf("test-mem-node-%d", i) + node := BuildTestNode(name, 1, 1000) + allNodes = append(allNodes, node) + } + newCpuProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "newCpuProvReq", "5m", "5", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) + newMemProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "newMemProvReq", "1m", "100", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) + bookedCapacityProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "bookedCapacity", "1m", "200", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) + bookedCapacityProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.CapacityFound, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}}) + expiredProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "bookedCapacity", "1m", "200", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity) + expiredProvReq.SetConditions([]metav1.Condition{{Type: v1beta1.BookingExpired, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}}) + differentProvReqClass := provreqwrapper.BuildTestProvisioningRequest("ns", "differentProvReqClass", "1", "1", "", int32(5), false, time.Now(), v1beta1.ProvisioningClassAtomicScaleUp) + testCases := []struct { + name string + provReqs []*provreqwrapper.ProvisioningRequest + provReqToScaleUp *provreqwrapper.ProvisioningRequest + scaleUpResult status.ScaleUpResult + err bool + }{ + { + name: "no ProvisioningRequests", + provReqs: []*provreqwrapper.ProvisioningRequest{}, + }, + { + name: "one ProvisioningRequest", + provReqs: []*provreqwrapper.ProvisioningRequest{newCpuProvReq}, + provReqToScaleUp: newCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + }, + { + name: "capacity in the cluster is booked", + provReqs: []*provreqwrapper.ProvisioningRequest{newMemProvReq, bookedCapacityProvReq}, + provReqToScaleUp: newMemProvReq, + scaleUpResult: status.ScaleUpNoOptionsAvailable, + }, + { + name: "pods from different ProvisioningRequest class", + provReqs: []*provreqwrapper.ProvisioningRequest{newCpuProvReq, bookedCapacityProvReq, differentProvReqClass}, + provReqToScaleUp: differentProvReqClass, + err: true, + }, + { + name: "some capacity is booked, succesfull ScaleUp", + provReqs: []*provreqwrapper.ProvisioningRequest{newCpuProvReq, bookedCapacityProvReq, differentProvReqClass}, + provReqToScaleUp: newCpuProvReq, + scaleUpResult: status.ScaleUpSuccessful, + }, + } + for _, tc := range testCases { + tc := tc + allNodes := allNodes + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + provider := testprovider.NewTestCloudProvider(nil, nil) + autoscalingContext, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, nil, provider, nil, nil) + assert.NoError(t, err) + clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, allNodes, nil) + prPods, err := pods.PodsForProvisioningRequest(tc.provReqToScaleUp) + assert.NoError(t, err) + orchestrator := &provReqOrchestrator{ + initialized: true, + context: &autoscalingContext, + client: provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...), + injector: scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker), + } + st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{}) + if !tc.err { + assert.NoError(t, err) + assert.Equal(t, tc.scaleUpResult, st.Result) + } else { + assert.Error(t, err) + } + }) + } +} diff --git a/cluster-autoscaler/provisioningrequest/pods/pods.go b/cluster-autoscaler/provisioningrequest/pods/pods.go new file mode 100644 index 000000000000..4e76fb8dea02 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/pods/pods.go @@ -0,0 +1,84 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pods + +import ( + "fmt" + + "google.golang.org/protobuf/proto" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/kubernetes/pkg/controller" +) + +const ( + // ProvisioningRequestPodAnnotationKey is a key used to annotate pods consuming provisioning request. + ProvisioningRequestPodAnnotationKey = "cluster-autoscaler.kubernetes.io/consume-provisioning-request" + // ProvisioningClassPodAnnotationKey is a key used to add annotation about Provisioning Class + ProvisioningClassPodAnnotationKey = "cluster-autoscaler.kubernetes.io/provisioning-class-name" +) + +// PodsForProvisioningRequest returns a list of pods for which Provisioning +// Request needs to provision resources. +func PodsForProvisioningRequest(pr *provreqwrapper.ProvisioningRequest) ([]*v1.Pod, error) { + if pr == nil { + return nil, nil + } + podSets, err := pr.PodSets() + if err != nil { + return nil, err + } + pods := make([]*v1.Pod, 0) + for i, podSet := range podSets { + for j := 0; j < int(podSet.Count); j++ { + pod, err := controller.GetPodFromTemplate(&podSet.PodTemplate, pr.RuntimeObject(), ownerReference(pr)) + if err != nil { + return nil, fmt.Errorf("while creating pod for pr: %s/%s podSet: %d, got error: %w", pr.Namespace(), pr.Name(), i, err) + } + populatePodFields(pr, pod, i, j) + pods = append(pods, pod) + } + } + return pods, nil +} + +// ownerReference injects owner reference that points to the ProvReq object. +// This allows CA to group the pods as coming from one controller and simplifies +// the scale-up simulation logic and number of logs lines emitted. +func ownerReference(pr *provreqwrapper.ProvisioningRequest) *metav1.OwnerReference { + return &metav1.OwnerReference{ + APIVersion: pr.APIVersion(), + Kind: pr.Kind(), + Name: pr.Name(), + UID: pr.UID(), + Controller: proto.Bool(true), + } +} + +func populatePodFields(pr *provreqwrapper.ProvisioningRequest, pod *v1.Pod, i, j int) { + pod.Name = fmt.Sprintf("%s%d-%d", pod.GenerateName, i, j) + pod.Namespace = pr.Namespace() + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + pod.Annotations[ProvisioningRequestPodAnnotationKey] = pr.Name() + pod.Annotations[ProvisioningClassPodAnnotationKey] = pr.V1Beta1().Spec.ProvisioningClassName + pod.UID = types.UID(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)) + pod.CreationTimestamp = pr.CreationTimestamp() +} diff --git a/cluster-autoscaler/provisioningrequest/pods/pods_test.go b/cluster-autoscaler/provisioningrequest/pods/pods_test.go new file mode 100644 index 000000000000..75ad8cd64682 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/pods/pods_test.go @@ -0,0 +1,269 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pods + +import ( + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/proto" + + // "google.golang.org/protobuf/testing/protocmp" + apiv1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" +) + +const testProvisioningClassName = "TestProvisioningClass" + +func TestPodsForProvisioningRequest(t *testing.T) { + testPod := func(name, genName, containerName, containerImage, prName string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + GenerateName: genName, + Namespace: "test-namespace", + UID: types.UID(fmt.Sprintf("test-namespace/%s", name)), + Annotations: map[string]string{ + ProvisioningRequestPodAnnotationKey: prName, + ProvisioningClassPodAnnotationKey: testProvisioningClassName, + }, + Labels: map[string]string{}, + Finalizers: []string{}, + OwnerReferences: []metav1.OwnerReference{ + { + Controller: proto.Bool(true), + Name: prName, + }, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: containerName, + Image: containerImage, + }, + }, + }, + } + } + + tests := []struct { + desc string + pr *v1beta1.ProvisioningRequest + podTemplates []*apiv1.PodTemplate + want []*v1.Pod + wantErr bool + }{ + { + desc: "simple ProvReq", + pr: &v1beta1.ProvisioningRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pr-name", + Namespace: "test-namespace", + }, + Spec: v1beta1.ProvisioningRequestSpec{ + PodSets: []v1beta1.PodSet{ + { + Count: 1, + PodTemplateRef: v1beta1.Reference{Name: "template-1"}, + }, + }, + ProvisioningClassName: testProvisioningClassName, + }, + }, + podTemplates: []*apiv1.PodTemplate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "template-1", + Namespace: "test-namespace", + }, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test-container", + Image: "test-image", + }, + }, + }, + }, + }, + }, + want: []*v1.Pod{ + testPod("test-pr-name-0-0", "test-pr-name-", "test-container", "test-image", "test-pr-name"), + }, + }, + { + desc: "ProvReq already having taint", + pr: &v1beta1.ProvisioningRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pr-name", + Namespace: "test-namespace", + }, + Spec: v1beta1.ProvisioningRequestSpec{ + PodSets: []v1beta1.PodSet{ + { + Count: 1, + PodTemplateRef: v1beta1.Reference{Name: "template-1"}, + }, + }, + ProvisioningClassName: testProvisioningClassName, + }, + }, + podTemplates: []*apiv1.PodTemplate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "template-1", + Namespace: "test-namespace", + }, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test-container", + Image: "test-image", + }, + }, + }, + }, + }, + }, + want: []*v1.Pod{ + testPod("test-pr-name-0-0", "test-pr-name-", "test-container", "test-image", "test-pr-name"), + }, + }, + { + desc: "ProvReq already having nodeSelector", + pr: &v1beta1.ProvisioningRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pr-name", + Namespace: "test-namespace", + }, + Spec: v1beta1.ProvisioningRequestSpec{ + PodSets: []v1beta1.PodSet{ + { + Count: 1, + PodTemplateRef: v1beta1.Reference{Name: "template-1"}, + }, + }, + ProvisioningClassName: testProvisioningClassName, + }, + }, + podTemplates: []*v1.PodTemplate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "template-1", + Namespace: "test-namespace", + }, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test-container", + Image: "test-image", + }, + }, + }, + }, + }, + }, + want: []*v1.Pod{ + testPod("test-pr-name-0-0", "test-pr-name-", "test-container", "test-image", "test-pr-name"), + }, + }, + { + desc: "ProvReq with multiple pod sets", + pr: &v1beta1.ProvisioningRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pr-name", + Namespace: "test-namespace", + }, + Spec: v1beta1.ProvisioningRequestSpec{ + PodSets: []v1beta1.PodSet{ + { + Count: 2, + PodTemplateRef: v1beta1.Reference{Name: "template-1"}, + }, + { + Count: 3, + PodTemplateRef: v1beta1.Reference{Name: "template-2"}, + }, + }, + ProvisioningClassName: testProvisioningClassName, + }, + }, + podTemplates: []*v1.PodTemplate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "template-1", + Namespace: "test-namespace", + }, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test-container", + Image: "test-image", + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "template-2", + Namespace: "test-namespace", + }, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test-container-2", + Image: "test-image-2", + }, + }, + }, + }, + }, + }, + want: []*v1.Pod{ + testPod("test-pr-name-0-0", "test-pr-name-", "test-container", "test-image", "test-pr-name"), + testPod("test-pr-name-0-1", "test-pr-name-", "test-container", "test-image", "test-pr-name"), + testPod("test-pr-name-1-0", "test-pr-name-", "test-container-2", "test-image-2", "test-pr-name"), + testPod("test-pr-name-1-1", "test-pr-name-", "test-container-2", "test-image-2", "test-pr-name"), + testPod("test-pr-name-1-2", "test-pr-name-", "test-container-2", "test-image-2", "test-pr-name"), + }, + }, + } + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + got, err := PodsForProvisioningRequest(provreqwrapper.NewV1Beta1ProvisioningRequest(tc.pr, tc.podTemplates)) + if (err != nil) != tc.wantErr { + t.Errorf("PodsForProvisioningRequest() error = %v, wantErr %v", err, tc.wantErr) + return + } + if diff := cmp.Diff(got, tc.want); diff != "" { + t.Errorf("unexpected response from PodsForProvisioningRequest(), diff (-want +got): %v", diff) + } + }) + } +} diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go b/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go index c05bb490f2f2..333673ec51bb 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/client_test.go @@ -30,7 +30,7 @@ func TestFetchPodTemplates(t *testing.T) { mockProvisioningRequests := []*provreqwrapper.ProvisioningRequest{pr1, pr2} ctx := context.Background() - c, _ := NewFakeProvisioningRequestClient(ctx, t, mockProvisioningRequests...) + c := NewFakeProvisioningRequestClient(ctx, t, mockProvisioningRequests...) got, err := c.FetchPodTemplates(pr1.V1Beta1()) if err != nil { t.Errorf("provisioningRequestClient.ProvisioningRequests() error: %v", err) diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go b/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go index 578c790b7359..b2c4a78a8380 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/testutils.go @@ -35,7 +35,7 @@ import ( ) // NewFakeProvisioningRequestClient mock ProvisioningRequestClient for tests. -func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) (*ProvisioningRequestClientV1beta1, *FakeProvisioningRequestForceClient) { +func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *ProvisioningRequestClientV1beta1 { t.Helper() provReqClient := fake.NewSimpleClientset() podTemplClient := fake_kubernetes.NewSimpleClientset() @@ -61,24 +61,10 @@ func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ... t.Fatalf("Failed to create Provisioning Request lister. Error was: %v", err) } return &ProvisioningRequestClientV1beta1{ - client: provReqClient, - provReqLister: provReqLister, - podTemplLister: podTemplLister, - }, &FakeProvisioningRequestForceClient{ - client: provReqClient, - } -} - -// FakeProvisioningRequestForceClient that allows to skip cache. -type FakeProvisioningRequestForceClient struct { - client *fake.Clientset -} - -// ProvisioningRequest gets a specific ProvisioningRequest CR, skipping cache. -func (c *FakeProvisioningRequestForceClient) ProvisioningRequest(namespace, name string) (*v1beta1.ProvisioningRequest, error) { - ctx, cancel := context.WithTimeout(context.Background(), provisioningRequestClientCallTimeout) - defer cancel() - return c.client.AutoscalingV1beta1().ProvisioningRequests(namespace).Get(ctx, name, metav1.GetOptions{}) + client: provReqClient, + provReqLister: provReqLister, + podTemplLister: podTemplLister, + } } // newFakePodTemplatesLister creates a fake lister for the Pod Templates in the cluster. diff --git a/cluster-autoscaler/provisioningrequest/provreqwrapper/testutils.go b/cluster-autoscaler/provisioningrequest/provreqwrapper/testutils.go new file mode 100644 index 000000000000..b8303ceefa52 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/provreqwrapper/testutils.go @@ -0,0 +1,118 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provreqwrapper + +import ( + "fmt" + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1" +) + +// BuildTestProvisioningRequest builds ProvisioningRequest wrapper. +func BuildTestProvisioningRequest(namespace, name, cpu, memory, gpu string, podCount int32, + antiAffinity bool, creationTimestamp time.Time, provisioningRequestClass string) *ProvisioningRequest { + gpuResource := resource.Quantity{} + tolerations := []apiv1.Toleration{} + if len(gpu) > 0 { + gpuResource = resource.MustParse(gpu) + tolerations = append(tolerations, apiv1.Toleration{Key: "nvidia.com/gpu", Operator: apiv1.TolerationOpExists}) + } + + affinity := &apiv1.Affinity{} + if antiAffinity { + affinity.PodAntiAffinity = &apiv1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []apiv1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"test-app"}, + }, + }, + }, + TopologyKey: "failure-domain.beta.kubernetes.io/zone", + }, + }, + } + } + return NewV1Beta1ProvisioningRequest( + &v1beta1.ProvisioningRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + CreationTimestamp: v1.NewTime(creationTimestamp), + }, + Spec: v1beta1.ProvisioningRequestSpec{ + ProvisioningClassName: provisioningRequestClass, + PodSets: []v1beta1.PodSet{ + { + PodTemplateRef: v1beta1.Reference{Name: fmt.Sprintf("%s-template-name", name)}, + Count: podCount, + }, + }, + }, + Status: v1beta1.ProvisioningRequestStatus{ + Conditions: []metav1.Condition{}, + }, + }, + []*apiv1.PodTemplate{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-template-name", name), + Namespace: namespace, + CreationTimestamp: v1.NewTime(creationTimestamp), + }, + Template: apiv1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "test-app", + }, + }, + Spec: apiv1.PodSpec{ + Tolerations: tolerations, + Affinity: affinity, + Containers: []apiv1.Container{ + { + Name: "pi", + Image: "perl", + Command: []string{"/bin/sh"}, + Resources: apiv1.ResourceRequirements{ + Limits: apiv1.ResourceList{ + apiv1.ResourceCPU: resource.MustParse(cpu), + apiv1.ResourceMemory: resource.MustParse(memory), + "nvidia.com/gpu": gpuResource, + }, + Requests: apiv1.ResourceList{ + apiv1.ResourceCPU: resource.MustParse(cpu), + apiv1.ResourceMemory: resource.MustParse(memory), + "nvidia.com/gpu": gpuResource, + }, + }, + }, + }, + }, + }, + }, + }) +} diff --git a/cluster-autoscaler/utils/kubernetes/client.go b/cluster-autoscaler/utils/kubernetes/client.go index a86d452e86e1..1dbe01938bf9 100644 --- a/cluster-autoscaler/utils/kubernetes/client.go +++ b/cluster-autoscaler/utils/kubernetes/client.go @@ -35,11 +35,11 @@ const ( // CreateKubeClient creates kube client based on AutoscalingOptions.KubeClientOptions func CreateKubeClient(opts config.KubeClientOptions) kube_client.Interface { - return kube_client.NewForConfigOrDie(getKubeConfig(opts)) + return kube_client.NewForConfigOrDie(GetKubeConfig(opts)) } -// getKubeConfig returns the rest config from AutoscalingOptions.KubeClientOptions. -func getKubeConfig(opts config.KubeClientOptions) *rest.Config { +// GetKubeConfig returns the rest config from AutoscalingOptions.KubeClientOptions. +func GetKubeConfig(opts config.KubeClientOptions) *rest.Config { var kubeConfig *rest.Config var err error