diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 82d28e557a49..62031776be1e 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -151,4 +151,11 @@ type AutoscalingOptions struct { ClusterAPICloudConfigAuthoritative bool // Enable or disable cordon nodes functionality before terminating the node during downscale process CordonNodeBeforeTerminate bool + // MaxNodesPerScaleUp controls how many nodes can be added in a single scale-up. + // Note that this is strictly a performance optimization aimed at limiting binpacking time, not a tool to rate-limit + // scale-up. There is nothing stopping CA from adding MaxNodesPerScaleUp every loop. + MaxNodesPerScaleUp int + // MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold + // is exceeded binpacking will be cut short and a partial scale-up will be performed. + MaxNodeGroupBinpackingDuration time.Duration } diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 62216018769d..33650fec2892 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -110,7 +110,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error { opts.ExpanderStrategy = expanderStrategy } if opts.EstimatorBuilder == nil { - estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName) + estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration)) if err != nil { return err } diff --git a/cluster-autoscaler/core/scale_test_common.go b/cluster-autoscaler/core/scale_test_common.go index 8779ae670a9d..24337adc351a 100644 --- a/cluster-autoscaler/core/scale_test_common.go +++ b/cluster-autoscaler/core/scale_test_common.go @@ -157,7 +157,7 @@ func NewScaleTestAutoscalingContext( } // Ignoring error here is safe - if a test doesn't specify valid estimatorName, // it either doesn't need one, or should fail when it turns out to be nil. - estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName) + estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(0, 0)) predicateChecker, err := simulator.NewTestPredicateChecker() if err != nil { return context.AutoscalingContext{}, err diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index 0209c4cbd58d..b03943d1e4bc 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -300,7 +300,7 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG if len(option.Pods) > 0 { estimator := context.EstimatorBuilder(context.PredicateChecker, context.ClusterSnapshot) - option.NodeCount = estimator.Estimate(option.Pods, nodeInfo) + option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup) } return option, nil diff --git a/cluster-autoscaler/estimator/binpacking_estimator.go b/cluster-autoscaler/estimator/binpacking_estimator.go index 87482f4921f1..5e6134a56d53 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator.go +++ b/cluster-autoscaler/estimator/binpacking_estimator.go @@ -22,6 +22,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" klog "k8s.io/klog/v2" @@ -38,15 +39,18 @@ type podInfo struct { type BinpackingNodeEstimator struct { predicateChecker simulator.PredicateChecker clusterSnapshot simulator.ClusterSnapshot + limiter EstimationLimiter } // NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator. func NewBinpackingNodeEstimator( predicateChecker simulator.PredicateChecker, - clusterSnapshot simulator.ClusterSnapshot) *BinpackingNodeEstimator { + clusterSnapshot simulator.ClusterSnapshot, + limiter EstimationLimiter) *BinpackingNodeEstimator { return &BinpackingNodeEstimator{ predicateChecker: predicateChecker, clusterSnapshot: clusterSnapshot, + limiter: limiter, } } @@ -57,69 +61,102 @@ func NewBinpackingNodeEstimator( // still be maintained. // It is assumed that all pods from the given list can fit to nodeTemplate. // Returns the number of nodes needed to accommodate all pods from the list. -func (estimator *BinpackingNodeEstimator) Estimate( +func (e *BinpackingNodeEstimator) Estimate( pods []*apiv1.Pod, - nodeTemplate *schedulerframework.NodeInfo) int { + nodeTemplate *schedulerframework.NodeInfo, + nodeGroup cloudprovider.NodeGroup) (int, []*apiv1.Pod) { + + e.limiter.StartEstimation(pods, nodeGroup) + defer e.limiter.EndEstimation() + podInfos := calculatePodScore(pods, nodeTemplate) sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score }) newNodeNames := make(map[string]bool) + newNodesWithPods := make(map[string]bool) - if err := estimator.clusterSnapshot.Fork(); err != nil { + if err := e.clusterSnapshot.Fork(); err != nil { klog.Errorf("Error while calling ClusterSnapshot.Fork; %v", err) - return 0 + return 0, nil } defer func() { - if err := estimator.clusterSnapshot.Revert(); err != nil { + if err := e.clusterSnapshot.Revert(); err != nil { klog.Fatalf("Error while calling ClusterSnapshot.Revert; %v", err) } }() newNodeNameIndex := 0 + scheduledPods := []*apiv1.Pod{} + lastNodeName := "" for _, podInfo := range podInfos { found := false - nodeName, err := estimator.predicateChecker.FitsAnyNodeMatching(estimator.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool { + nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool { return newNodeNames[nodeInfo.Node().Name] }) if err == nil { found = true - if err := estimator.clusterSnapshot.AddPod(podInfo.pod, nodeName); err != nil { + if err := e.clusterSnapshot.AddPod(podInfo.pod, nodeName); err != nil { klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, nodeName, err) - return 0 + return 0, nil } + scheduledPods = append(scheduledPods, podInfo.pod) + newNodesWithPods[nodeName] = true } if !found { + // Stop binpacking if we reach the limit of nodes we can add. + // We return the result of the binpacking that we already performed. + if !e.limiter.PermissionToAddNode() { + break + } + + // If the last node we've added is empty and the pod couldn't schedule on it, it wouldn't be able to schedule + // on a new node either. There is no point adding more nodes to snapshot in such case, especially because of + // performance cost each extra node adds to future FitsAnyNodeMatching calls. + if lastNodeName != "" && !newNodesWithPods[lastNodeName] { + continue + } + // Add new node - newNodeName, err := estimator.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex) + newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex) if err != nil { klog.Errorf("Error while adding new node for template to ClusterSnapshot; %v", err) - return 0 + return 0, nil } newNodeNameIndex++ - // And schedule pod to it - if err := estimator.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil { + newNodeNames[newNodeName] = true + lastNodeName = newNodeName + + // And try to schedule pod to it. + // Note that this may still fail (ex. if topology spreading with zonal topologyKey is used); + // in this case we can't help the pending pod. We keep the node in clusterSnapshot to avoid + // adding and removing node to snapshot for each such pod. + if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, podInfo.pod, newNodeName); err != nil { + continue + } + if err := e.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil { klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, newNodeName, err) - return 0 + return 0, nil } - newNodeNames[newNodeName] = true + newNodesWithPods[newNodeName] = true + scheduledPods = append(scheduledPods, podInfo.pod) } } - return len(newNodeNames) + return len(newNodesWithPods), scheduledPods } -func (estimator *BinpackingNodeEstimator) addNewNodeToSnapshot( +func (e *BinpackingNodeEstimator) addNewNodeToSnapshot( template *schedulerframework.NodeInfo, nameIndex int) (string, error) { - newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("estimator-%d", nameIndex)) + newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", nameIndex)) var pods []*apiv1.Pod for _, podInfo := range newNodeInfo.Pods { pods = append(pods, podInfo.Pod) } - if err := estimator.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil { + if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil { return "", err } return newNodeInfo.Node().Name, nil diff --git a/cluster-autoscaler/estimator/binpacking_estimator_test.go b/cluster-autoscaler/estimator/binpacking_estimator_test.go index 6aa4ee1b84ec..797d3d28e742 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator_test.go +++ b/cluster-autoscaler/estimator/binpacking_estimator_test.go @@ -22,6 +22,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/autoscaler/cluster-autoscaler/utils/units" @@ -30,89 +31,157 @@ import ( "github.com/stretchr/testify/assert" ) -func makePod(cpuPerPod, memoryPerPod int64) *apiv1.Pod { - return &apiv1.Pod{ +func makePods(cpuPerPod int64, memoryPerPod int64, hostport int32, maxSkew int32, topologySpreadingKey string, podCount int) []*apiv1.Pod { + pod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "estimatee", + Namespace: "universe", + Labels: map[string]string{ + "app": "estimatee", + }, + }, Spec: apiv1.PodSpec{ Containers: []apiv1.Container{ { Resources: apiv1.ResourceRequirements{ Requests: apiv1.ResourceList{ apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod, resource.DecimalSI), + apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod*units.MiB, resource.DecimalSI), }, }, }, }, }, } -} - -func TestBinpackingEstimate(t *testing.T) { - estimator := newBinPackingEstimator(t) - - cpuPerPod := int64(350) - memoryPerPod := int64(1000 * units.MiB) - pod := makePod(cpuPerPod, memoryPerPod) - - pods := make([]*apiv1.Pod, 0) - for i := 0; i < 10; i++ { + if hostport > 0 { + pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{ + { + HostPort: hostport, + }, + } + } + if maxSkew > 0 { + pod.Spec.TopologySpreadConstraints = []apiv1.TopologySpreadConstraint{ + { + MaxSkew: maxSkew, + TopologyKey: topologySpreadingKey, + WhenUnsatisfiable: "DoNotSchedule", + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "estimatee", + }, + }, + }, + } + } + pods := []*apiv1.Pod{} + for i := 0; i < podCount; i++ { pods = append(pods, pod) } + return pods +} + +func makeNode(cpu int64, mem int64, name string, zone string) *apiv1.Node { node := &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "kubernetes.io/hostname": name, + "topology.kubernetes.io/zone": zone, + }, + }, Status: apiv1.NodeStatus{ Capacity: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod*3-50, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(2*memoryPerPod, resource.DecimalSI), + apiv1.ResourceCPU: *resource.NewMilliQuantity(cpu, resource.DecimalSI), + apiv1.ResourceMemory: *resource.NewQuantity(mem*units.MiB, resource.DecimalSI), apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), }, }, } node.Status.Allocatable = node.Status.Capacity SetNodeReadyState(node, true, time.Time{}) - - nodeInfo := schedulerframework.NewNodeInfo() - nodeInfo.SetNode(node) - estimate := estimator.Estimate(pods, nodeInfo) - assert.Equal(t, 5, estimate) + return node } -func TestBinpackingEstimateWithPorts(t *testing.T) { - estimator := newBinPackingEstimator(t) - - cpuPerPod := int64(200) - memoryPerPod := int64(1000 * units.MiB) - pod := makePod(cpuPerPod, memoryPerPod) - pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{ +func TestBinpackingEstimate(t *testing.T) { + testCases := []struct { + name string + millicores int64 + memory int64 + maxNodes int + pods []*apiv1.Pod + topologySpreadingKey string + expectNodeCount int + expectPodCount int + }{ { - HostPort: 5555, + name: "simple resource-based binpacking", + millicores: 350*3 - 50, + memory: 2 * 1000, + pods: makePods(350, 1000, 0, 0, "", 10), + expectNodeCount: 5, + expectPodCount: 10, }, - } - pods := make([]*apiv1.Pod, 0) - for i := 0; i < 8; i++ { - pods = append(pods, pod) - } - node := &apiv1.Node{ - Status: apiv1.NodeStatus{ - Capacity: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(5*cpuPerPod, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(5*memoryPerPod, resource.DecimalSI), - apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), - }, + { + name: "pods-per-node bound binpacking", + millicores: 10000, + memory: 20000, + pods: makePods(10, 100, 0, 0, "", 20), + expectNodeCount: 2, + expectPodCount: 20, + }, + { + name: "hostport conflict forces pod-per-node", + millicores: 1000, + memory: 5000, + pods: makePods(200, 1000, 5555, 0, "", 8), + expectNodeCount: 8, + expectPodCount: 8, + }, + { + name: "limiter cuts binpacking", + millicores: 1000, + memory: 5000, + pods: makePods(500, 1000, 0, 0, "", 20), + maxNodes: 5, + expectNodeCount: 5, + expectPodCount: 10, + }, + { + name: "hostname topology spreading with maxSkew=2 forces 2 pods/node", + millicores: 1000, + memory: 5000, + pods: makePods(20, 100, 0, 2, "kubernetes.io/hostname", 8), + expectNodeCount: 4, + expectPodCount: 8, + }, + { + name: "zonal topology spreading with maxSkew=2 only allows 2 pods to schedule", + millicores: 1000, + memory: 5000, + pods: makePods(20, 100, 0, 2, "topology.kubernetes.io/zone", 8), + expectNodeCount: 1, + expectPodCount: 2, }, } - node.Status.Allocatable = node.Status.Capacity - SetNodeReadyState(node, true, time.Time{}) - - nodeInfo := schedulerframework.NewNodeInfo() - nodeInfo.SetNode(node) - estimate := estimator.Estimate(pods, nodeInfo) - assert.Equal(t, 8, estimate) -} - -func newBinPackingEstimator(t *testing.T) *BinpackingNodeEstimator { - predicateChecker, err := simulator.NewTestPredicateChecker() - clusterSnapshot := simulator.NewBasicClusterSnapshot() - assert.NoError(t, err) - estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot) - return estimator + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + clusterSnapshot := simulator.NewBasicClusterSnapshot() + // Add one node in different zone to trigger topology spread constraints + clusterSnapshot.AddNode(makeNode(100, 100, "oldnode", "zone-jupiter")) + + predicateChecker, err := simulator.NewTestPredicateChecker() + assert.NoError(t, err) + limiter := NewThresholdBasedEstimationLimiter(tc.maxNodes, time.Duration(0)) + estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter) + + node := makeNode(tc.millicores, tc.memory, "template", "zone-mars") + nodeInfo := schedulerframework.NewNodeInfo() + nodeInfo.SetNode(node) + + estimatedNodes, estimatedPods := estimator.Estimate(tc.pods, nodeInfo, nil) + assert.Equal(t, tc.expectNodeCount, estimatedNodes) + assert.Equal(t, tc.expectPodCount, len(estimatedPods)) + }) + } } diff --git a/cluster-autoscaler/estimator/estimator.go b/cluster-autoscaler/estimator/estimator.go index 8a86ec9c66b5..7d4f819dcf19 100644 --- a/cluster-autoscaler/estimator/estimator.go +++ b/cluster-autoscaler/estimator/estimator.go @@ -20,6 +20,7 @@ import ( "fmt" apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/simulator" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -33,22 +34,40 @@ const ( var AvailableEstimators = []string{BinpackingEstimatorName} // Estimator calculates the number of nodes of given type needed to schedule pods. +// It returns the number of new nodes needed as well as the list of pods it managed +// to schedule on those nodes. type Estimator interface { - Estimate([]*apiv1.Pod, *schedulerframework.NodeInfo) int + Estimate([]*apiv1.Pod, *schedulerframework.NodeInfo, cloudprovider.NodeGroup) (int, []*apiv1.Pod) } // EstimatorBuilder creates a new estimator object. type EstimatorBuilder func(simulator.PredicateChecker, simulator.ClusterSnapshot) Estimator // NewEstimatorBuilder creates a new estimator object from flag. -func NewEstimatorBuilder(name string) (EstimatorBuilder, error) { +func NewEstimatorBuilder(name string, limiter EstimationLimiter) (EstimatorBuilder, error) { switch name { case BinpackingEstimatorName: return func( predicateChecker simulator.PredicateChecker, clusterSnapshot simulator.ClusterSnapshot) Estimator { - return NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot) + return NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter) }, nil } return nil, fmt.Errorf("unknown estimator: %s", name) } + +// EstimationLimiter controls how many nodes can be added by Estimator. +// A limiter can be used to prevent costly estimation if an actual ability to +// scale-up is limited by external factors. +type EstimationLimiter interface { + // StartEstimation is called at the start of estimation. + StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup) + // EndEstimation is called at the end of estimation. + EndEstimation() + // PermissionToAddNode is called by an estimator when it wants to add additional + // nodes to simulation. If permission is not granted the Estimator is expected + // not to add any more nodes in this simulation. + // There is no requirement for the Estimator to stop calculations, it's + // just not expected to add any more nodes. + PermissionToAddNode() bool +} diff --git a/cluster-autoscaler/estimator/threshold_based_limiter.go b/cluster-autoscaler/estimator/threshold_based_limiter.go new file mode 100644 index 000000000000..4381721d0c53 --- /dev/null +++ b/cluster-autoscaler/estimator/threshold_based_limiter.go @@ -0,0 +1,64 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + klog "k8s.io/klog/v2" +) + +type thresholdBasedEstimationLimiter struct { + maxDuration time.Duration + maxNodes int + nodes int + start time.Time +} + +func (tbel *thresholdBasedEstimationLimiter) StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup) { + tbel.start = time.Now() + tbel.nodes = 0 +} + +func (*thresholdBasedEstimationLimiter) EndEstimation() {} + +func (tbel *thresholdBasedEstimationLimiter) PermissionToAddNode() bool { + if tbel.maxNodes > 0 && tbel.nodes >= tbel.maxNodes { + klog.V(4).Infof("Capping binpacking after exceeding threshold of %i nodes", tbel.maxNodes) + return false + } + timeDefined := tbel.maxDuration > 0 && tbel.start != time.Time{} + if timeDefined && time.Now().After(tbel.start.Add(tbel.maxDuration)) { + klog.V(4).Infof("Capping binpacking after exceeding max duration of %v", tbel.maxDuration) + return false + } + tbel.nodes++ + return true +} + +// NewThresholdBasedEstimationLimiter returns an EstimationLimiter that will prevent estimation +// after either a node count- of time-based threshold is reached. This is meant to prevent cases +// where binpacking of hundreds or thousands of nodes takes extremely long time rendering CA +// incredibly slow or even completely crashing it. +func NewThresholdBasedEstimationLimiter(maxNodes int, maxDuration time.Duration) EstimationLimiter { + return &thresholdBasedEstimationLimiter{ + maxNodes: maxNodes, + maxDuration: maxDuration, + } +} diff --git a/cluster-autoscaler/estimator/threshold_based_limiter_test.go b/cluster-autoscaler/estimator/threshold_based_limiter_test.go new file mode 100644 index 000000000000..e80b586f3ebb --- /dev/null +++ b/cluster-autoscaler/estimator/threshold_based_limiter_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "testing" + "time" + + apiv1 "k8s.io/api/core/v1" + + "github.com/stretchr/testify/assert" +) + +type limiterOperation func(*testing.T, EstimationLimiter) + +func expectDeny(t *testing.T, l EstimationLimiter) { + assert.Equal(t, false, l.PermissionToAddNode()) +} + +func expectAllow(t *testing.T, l EstimationLimiter) { + assert.Equal(t, true, l.PermissionToAddNode()) +} + +func resetLimiter(t *testing.T, l EstimationLimiter) { + l.EndEstimation() + l.StartEstimation([]*apiv1.Pod{}, nil) +} + +func TestThresholdBasedLimiter(t *testing.T) { + testCases := []struct { + name string + maxNodes int + maxDuration time.Duration + startDelta time.Duration + operations []limiterOperation + expectNodeCount int + }{ + { + name: "no limiting happens", + maxNodes: 20, + operations: []limiterOperation{ + expectAllow, + expectAllow, + expectAllow, + }, + expectNodeCount: 3, + }, + { + name: "time based trigger fires", + maxNodes: 20, + maxDuration: 5 * time.Second, + startDelta: -10 * time.Second, + operations: []limiterOperation{ + expectDeny, + expectDeny, + }, + expectNodeCount: 0, + }, + { + name: "sequence of additions works until the threshold is hit", + maxNodes: 3, + operations: []limiterOperation{ + expectAllow, + expectAllow, + expectAllow, + expectDeny, + }, + expectNodeCount: 3, + }, + { + name: "node counter is reset", + maxNodes: 2, + operations: []limiterOperation{ + expectAllow, + expectAllow, + expectDeny, + resetLimiter, + expectAllow, + }, + expectNodeCount: 1, + }, + { + name: "timer is reset", + maxNodes: 20, + maxDuration: 5 * time.Second, + startDelta: -10 * time.Second, + operations: []limiterOperation{ + expectDeny, + resetLimiter, + expectAllow, + expectAllow, + }, + expectNodeCount: 2, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + limiter := &thresholdBasedEstimationLimiter{ + maxNodes: tc.maxNodes, + maxDuration: tc.maxDuration, + } + limiter.StartEstimation([]*apiv1.Pod{}, nil) + + if tc.startDelta != time.Duration(0) { + limiter.start = limiter.start.Add(tc.startDelta) + } + + for _, op := range tc.operations { + op(t, limiter) + } + assert.Equal(t, tc.expectNodeCount, limiter.nodes) + limiter.EndEstimation() + }) + } +} diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index b11e8efffa97..2098c4426308 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -178,6 +178,8 @@ var ( enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled") clusterAPICloudConfigAuthoritative = flag.Bool("clusterapi-cloud-config-authoritative", false, "Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only") cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminating", false, "Should CA cordon nodes before terminating during downscale process") + maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.") + maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.") ) func createAutoscalingOptions() config.AutoscalingOptions { @@ -250,6 +252,8 @@ func createAutoscalingOptions() config.AutoscalingOptions { AWSUseStaticInstanceList: *awsUseStaticInstanceList, ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative, CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate, + MaxNodesPerScaleUp: *maxNodesPerScaleUp, + MaxNodeGroupBinpackingDuration: *maxNodeGroupBinpackingDuration, } }