diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index c4fbfae8aad4..4e540f452028 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -115,7 +115,16 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error { opts.ExpanderStrategy = expanderStrategy } if opts.EstimatorBuilder == nil { - estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration), estimator.NewDecreasingPodOrderer()) + thresholds := []estimator.Threshold{ + estimator.NewStaticThreshold(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration), + estimator.NewSngCapacityThreshold(), + estimator.NewClusterCapacityThreshold(), + } + estimatorBuilder, err := estimator.NewEstimatorBuilder( + opts.EstimatorName, + estimator.NewThresholdBasedEstimationLimiter(thresholds), + estimator.NewDecreasingPodOrderer(), + ) if err != nil { return err } diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index 4e9ca1a101ea..a9d5ed1e9592 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -22,6 +22,7 @@ import ( appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" @@ -149,7 +150,7 @@ func (o *ScaleUpOrchestrator) ScaleUp( } for _, nodeGroup := range validNodeGroups { - option := o.ComputeExpansionOption(nodeGroup, schedulablePods, nodeInfos, upcomingNodes, now) + option := o.ComputeExpansionOption(nodeGroup, schedulablePods, nodeInfos, len(nodes)+len(upcomingNodes), now) o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingContext, nodeGroup.Id()) if len(option.Pods) == 0 || option.NodeCount == 0 { @@ -479,7 +480,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption( nodeGroup cloudprovider.NodeGroup, schedulablePods map[string][]*apiv1.Pod, nodeInfos map[string]*schedulerframework.NodeInfo, - upcomingNodes []*schedulerframework.NodeInfo, + currentNodeCount int, now time.Time, ) expander.Option { option := expander.Option{NodeGroup: nodeGroup} @@ -490,11 +491,16 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption( return option } + option.SimilarNodeGroups = o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePods, now) + estimateStart := time.Now() - estimator := o.autoscalingContext.EstimatorBuilder(o.autoscalingContext.PredicateChecker, o.autoscalingContext.ClusterSnapshot) - option.NodeCount, option.Pods = estimator.Estimate(pods, nodeInfo, nodeGroup) + expansionEstimator := o.autoscalingContext.EstimatorBuilder( + o.autoscalingContext.PredicateChecker, + o.autoscalingContext.ClusterSnapshot, + estimator.NewEstimationContext(o.autoscalingContext.MaxNodesTotal, option.SimilarNodeGroups, currentNodeCount), + ) + option.NodeCount, option.Pods = expansionEstimator.Estimate(pods, nodeInfo, nodeGroup) metrics.UpdateDurationFromStart(metrics.Estimate, estimateStart) - option.SimilarNodeGroups = o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePods, now) autoscalingOptions, err := nodeGroup.GetOptions(o.autoscalingContext.NodeGroupDefaults) if err != nil { diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index 911c19535257..9f3630edde27 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -208,7 +208,11 @@ func NewScaleTestAutoscalingContext( } // Ignoring error here is safe - if a test doesn't specify valid estimatorName, // it either doesn't need one, or should fail when it turns out to be nil. - estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(0, 0), estimator.NewDecreasingPodOrderer()) + estimatorBuilder, _ := estimator.NewEstimatorBuilder( + options.EstimatorName, + estimator.NewThresholdBasedEstimationLimiter(nil), + estimator.NewDecreasingPodOrderer(), + ) predicateChecker, err := predicatechecker.NewTestPredicateChecker() if err != nil { return context.AutoscalingContext{}, err diff --git a/cluster-autoscaler/estimator/binpacking_estimator.go b/cluster-autoscaler/estimator/binpacking_estimator.go index f2d61e97be2c..f8f332aaa9e4 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator.go +++ b/cluster-autoscaler/estimator/binpacking_estimator.go @@ -34,6 +34,7 @@ type BinpackingNodeEstimator struct { clusterSnapshot clustersnapshot.ClusterSnapshot limiter EstimationLimiter podOrderer EstimationPodOrderer + context EstimationContext } // NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator. @@ -41,12 +42,15 @@ func NewBinpackingNodeEstimator( predicateChecker predicatechecker.PredicateChecker, clusterSnapshot clustersnapshot.ClusterSnapshot, limiter EstimationLimiter, - podOrderer EstimationPodOrderer) *BinpackingNodeEstimator { + podOrderer EstimationPodOrderer, + context EstimationContext, +) *BinpackingNodeEstimator { return &BinpackingNodeEstimator{ predicateChecker: predicateChecker, clusterSnapshot: clusterSnapshot, limiter: limiter, podOrderer: podOrderer, + context: context, } } @@ -65,7 +69,7 @@ func (e *BinpackingNodeEstimator) Estimate( nodeTemplate *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup) (int, []*apiv1.Pod) { - e.limiter.StartEstimation(pods, nodeGroup) + e.limiter.StartEstimation(pods, nodeGroup, e.context) defer e.limiter.EndEstimation() pods = e.podOrderer.Order(pods, nodeTemplate, nodeGroup) diff --git a/cluster-autoscaler/estimator/binpacking_estimator_test.go b/cluster-autoscaler/estimator/binpacking_estimator_test.go index 48e85471921d..01615e7c85ee 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator_test.go +++ b/cluster-autoscaler/estimator/binpacking_estimator_test.go @@ -185,9 +185,9 @@ func TestBinpackingEstimate(t *testing.T) { predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(t, err) - limiter := NewThresholdBasedEstimationLimiter(tc.maxNodes, time.Duration(0)) + limiter := NewThresholdBasedEstimationLimiter([]Threshold{NewStaticThreshold(tc.maxNodes, time.Duration(0))}) processor := NewDecreasingPodOrderer() - estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, processor) + estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, processor, nil) node := makeNode(tc.millicores, tc.memory, "template", "zone-mars") nodeInfo := schedulerframework.NewNodeInfo() diff --git a/cluster-autoscaler/estimator/cluster_capacity_threshold.go b/cluster-autoscaler/estimator/cluster_capacity_threshold.go new file mode 100644 index 000000000000..eeced6886f9d --- /dev/null +++ b/cluster-autoscaler/estimator/cluster_capacity_threshold.go @@ -0,0 +1,52 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "time" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" +) + +type clusterCapacityThreshold struct { +} + +// NodeLimit returns maximum number of new nodes that can be added to the cluster +// based on its capacity. Possible return values are: +// - -1 when cluster has no available capacity +// - 0 when context or cluster-wide node limit is not set. Return value of 0 means that there is no limit. +// - Any positive number representing maximum possible number of new nodes +func (l *clusterCapacityThreshold) NodeLimit(_ cloudprovider.NodeGroup, context EstimationContext) int { + if context == nil || context.ClusterMaxNodeLimit() == 0 { + return 0 + } + if (context.ClusterMaxNodeLimit() < 0) || (context.ClusterMaxNodeLimit() <= context.CurrentNodeCount()) { + return -1 + } + return context.ClusterMaxNodeLimit() - context.CurrentNodeCount() +} + +// DurationLimit always returns 0 for this threshold, meaning that no limit is set. +func (l *clusterCapacityThreshold) DurationLimit(cloudprovider.NodeGroup, EstimationContext) time.Duration { + return 0 +} + +// NewClusterCapacityThreshold returns a Threshold that can be used to limit binpacking +// by available cluster capacity +func NewClusterCapacityThreshold() Threshold { + return &clusterCapacityThreshold{} +} diff --git a/cluster-autoscaler/estimator/cluster_capacity_threshold_test.go b/cluster-autoscaler/estimator/cluster_capacity_threshold_test.go new file mode 100644 index 000000000000..9358fb9a7754 --- /dev/null +++ b/cluster-autoscaler/estimator/cluster_capacity_threshold_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewClusterCapacityThreshold(t *testing.T) { + tests := []struct { + name string + wantThreshold int + contextMaxNodes int + contextCurrentNodes int + }{ + { + name: "returns available capacity", + contextMaxNodes: 10, + contextCurrentNodes: 5, + wantThreshold: 5, + }, + { + name: "no threshold is set if cluster capacity is unlimited", + contextMaxNodes: 0, + contextCurrentNodes: 10, + wantThreshold: 0, + }, + { + name: "threshold is negative if cluster has no capacity", + contextMaxNodes: 5, + contextCurrentNodes: 10, + wantThreshold: -1, + }, + { + name: "threshold is negative if cluster node limit is negative", + contextMaxNodes: -5, + contextCurrentNodes: 0, + wantThreshold: -1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + context := &estimationContext{ + similarNodeGroups: nil, + currentNodeCount: tt.contextCurrentNodes, + clusterMaxNodeLimit: tt.contextMaxNodes, + } + assert.Equal(t, tt.wantThreshold, NewClusterCapacityThreshold().NodeLimit(nil, context)) + assert.True(t, NewClusterCapacityThreshold().DurationLimit(nil, nil) == 0) + }) + } +} diff --git a/cluster-autoscaler/estimator/estimation_context.go b/cluster-autoscaler/estimator/estimation_context.go new file mode 100644 index 000000000000..8187f598aaee --- /dev/null +++ b/cluster-autoscaler/estimator/estimation_context.go @@ -0,0 +1,59 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" +) + +// EstimationContext stores static and runtime state of autoscaling, used by Estimator +type EstimationContext interface { + SimilarNodeGroups() []cloudprovider.NodeGroup + ClusterMaxNodeLimit() int + CurrentNodeCount() int +} + +type estimationContext struct { + similarNodeGroups []cloudprovider.NodeGroup + currentNodeCount int + clusterMaxNodeLimit int +} + +// NewEstimationContext creates a patch for estimation context with runtime properties. +// This patch is used to update existing context. +func NewEstimationContext(clusterMaxNodeLimit int, similarNodeGroups []cloudprovider.NodeGroup, currentNodeCount int) EstimationContext { + return &estimationContext{ + similarNodeGroups: similarNodeGroups, + currentNodeCount: currentNodeCount, + clusterMaxNodeLimit: clusterMaxNodeLimit, + } +} + +// SimilarNodeGroups returns array of similar node groups +func (c *estimationContext) SimilarNodeGroups() []cloudprovider.NodeGroup { + return c.similarNodeGroups +} + +// ClusterMaxNodeLimit returns maximum node number allowed for the cluster +func (c *estimationContext) ClusterMaxNodeLimit() int { + return c.clusterMaxNodeLimit +} + +// CurrentNodeCount returns current number of nodes in the cluster +func (c *estimationContext) CurrentNodeCount() int { + return c.currentNodeCount +} diff --git a/cluster-autoscaler/estimator/estimator.go b/cluster-autoscaler/estimator/estimator.go index 41db27ddd4eb..2ea63568c4a6 100644 --- a/cluster-autoscaler/estimator/estimator.go +++ b/cluster-autoscaler/estimator/estimator.go @@ -43,7 +43,7 @@ type Estimator interface { } // EstimatorBuilder creates a new estimator object. -type EstimatorBuilder func(predicatechecker.PredicateChecker, clustersnapshot.ClusterSnapshot) Estimator +type EstimatorBuilder func(predicatechecker.PredicateChecker, clustersnapshot.ClusterSnapshot, EstimationContext) Estimator // NewEstimatorBuilder creates a new estimator object from flag. func NewEstimatorBuilder(name string, limiter EstimationLimiter, orderer EstimationPodOrderer) (EstimatorBuilder, error) { @@ -51,8 +51,9 @@ func NewEstimatorBuilder(name string, limiter EstimationLimiter, orderer Estimat case BinpackingEstimatorName: return func( predicateChecker predicatechecker.PredicateChecker, - clusterSnapshot clustersnapshot.ClusterSnapshot) Estimator { - return NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, orderer) + clusterSnapshot clustersnapshot.ClusterSnapshot, + context EstimationContext) Estimator { + return NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, orderer, context) }, nil } return nil, fmt.Errorf("unknown estimator: %s", name) @@ -63,7 +64,7 @@ func NewEstimatorBuilder(name string, limiter EstimationLimiter, orderer Estimat // scale-up is limited by external factors. type EstimationLimiter interface { // StartEstimation is called at the start of estimation. - StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup) + StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup, EstimationContext) // EndEstimation is called at the end of estimation. EndEstimation() // PermissionToAddNode is called by an estimator when it wants to add additional diff --git a/cluster-autoscaler/estimator/sng_capacity_threshold.go b/cluster-autoscaler/estimator/sng_capacity_threshold.go new file mode 100644 index 000000000000..0c319449964f --- /dev/null +++ b/cluster-autoscaler/estimator/sng_capacity_threshold.go @@ -0,0 +1,71 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "time" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/klog/v2" +) + +type sngCapacityThreshold struct { +} + +// NodeLimit returns maximum number of new nodes that can be added to the cluster +// based on capacity of current node group and total capacity of similar node groups. Possible return values are: +// - -1 when this node group AND similar node groups have no available capacity +// - 0 when context is not set. Return value of 0 means that there is no limit. +// - Any positive number representing maximum possible number of new nodes +func (t *sngCapacityThreshold) NodeLimit(nodeGroup cloudprovider.NodeGroup, context EstimationContext) int { + if context == nil { + return 0 + } + totalAvailableCapacity := t.computeNodeGroupCapacity(nodeGroup) + for _, sng := range context.SimilarNodeGroups() { + totalAvailableCapacity += t.computeNodeGroupCapacity(sng) + } + if totalAvailableCapacity <= 0 { + return -1 + } + return totalAvailableCapacity +} + +func (t *sngCapacityThreshold) computeNodeGroupCapacity(nodeGroup cloudprovider.NodeGroup) int { + nodeGroupTargetSize, err := nodeGroup.TargetSize() + // Should not ever happen as only valid node groups are passed to estimator + if err != nil { + klog.Errorf("Error while computing available capacity of a node group %v: can't get target size of the group", nodeGroup.Id(), err) + return 0 + } + groupCapacity := nodeGroup.MaxSize() - nodeGroupTargetSize + if groupCapacity > 0 { + return groupCapacity + } + return 0 +} + +// DurationLimit always returns 0 for this threshold, meaning that no limit is set. +func (t *sngCapacityThreshold) DurationLimit(cloudprovider.NodeGroup, EstimationContext) time.Duration { + return 0 +} + +// NewSngCapacityThreshold returns a Threshold that can be used to limit binpacking +// by available capacity of similar node groups +func NewSngCapacityThreshold() Threshold { + return &sngCapacityThreshold{} +} diff --git a/cluster-autoscaler/estimator/sng_capacity_threshold_test.go b/cluster-autoscaler/estimator/sng_capacity_threshold_test.go new file mode 100644 index 000000000000..5867152179d5 --- /dev/null +++ b/cluster-autoscaler/estimator/sng_capacity_threshold_test.go @@ -0,0 +1,92 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "testing" + + "github.com/stretchr/testify/assert" + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" +) + +func TestSngCapacityThreshold(t *testing.T) { + type nodeGroupConfig struct { + name string + maxNodes int + nodesCount int + } + tests := []struct { + name string + nodeGroupsConfig []nodeGroupConfig + currentNodeGroup nodeGroupConfig + wantThreshold int + }{ + { + name: "returns available capacity", + nodeGroupsConfig: []nodeGroupConfig{ + {name: "ng1", maxNodes: 10, nodesCount: 5}, + {name: "ng2", maxNodes: 100, nodesCount: 50}, + {name: "ng3", maxNodes: 5, nodesCount: 3}, + }, + currentNodeGroup: nodeGroupConfig{name: "main-ng", maxNodes: 20, nodesCount: 10}, + wantThreshold: 67, + }, + { + name: "returns available capacity and skips over-provisioned groups", + nodeGroupsConfig: []nodeGroupConfig{ + {name: "ng1", maxNodes: 10, nodesCount: 5}, + {name: "ng3", maxNodes: 10, nodesCount: 11}, + {name: "ng3", maxNodes: 0, nodesCount: 5}, + }, + currentNodeGroup: nodeGroupConfig{name: "main-ng", maxNodes: 5, nodesCount: 10}, + wantThreshold: 5, + }, + { + name: "threshold is negative if cluster has no capacity", + nodeGroupsConfig: []nodeGroupConfig{ + {name: "ng1", maxNodes: 10, nodesCount: 10}, + {name: "ng2", maxNodes: 100, nodesCount: 100}, + }, + currentNodeGroup: nodeGroupConfig{name: "main-ng", maxNodes: 5, nodesCount: 5}, + wantThreshold: -1, + }, + { + name: "threshold is negative if all groups are over-provisioned", + nodeGroupsConfig: []nodeGroupConfig{ + {name: "ng1", maxNodes: 10, nodesCount: 11}, + {name: "ng3", maxNodes: 100, nodesCount: 111}, + {name: "ng3", maxNodes: 0, nodesCount: 5}, + }, + currentNodeGroup: nodeGroupConfig{name: "main-ng", maxNodes: 5, nodesCount: 10}, + wantThreshold: -1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider := testprovider.NewTestCloudProvider(func(string, int) error { return nil }, nil) + for _, ng := range tt.nodeGroupsConfig { + provider.AddNodeGroup(ng.name, 0, ng.maxNodes, ng.nodesCount) + } + // Context must be constructed first to exclude current node group passed from orchestrator + context := estimationContext{similarNodeGroups: provider.NodeGroups()} + provider.AddNodeGroup(tt.currentNodeGroup.name, 0, tt.currentNodeGroup.maxNodes, tt.currentNodeGroup.nodesCount) + currentNodeGroup := provider.GetNodeGroup(tt.currentNodeGroup.name) + assert.Equalf(t, tt.wantThreshold, NewSngCapacityThreshold().NodeLimit(currentNodeGroup, &context), "NewSngCapacityThreshold()") + assert.True(t, NewClusterCapacityThreshold().DurationLimit(currentNodeGroup, &context) == 0) + }) + } +} diff --git a/cluster-autoscaler/estimator/static_threshold.go b/cluster-autoscaler/estimator/static_threshold.go new file mode 100644 index 000000000000..8aaf93c4b5d0 --- /dev/null +++ b/cluster-autoscaler/estimator/static_threshold.go @@ -0,0 +1,45 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "time" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" +) + +type staticThreshold struct { + maxNodes int + maxDuration time.Duration +} + +func (l *staticThreshold) NodeLimit(cloudprovider.NodeGroup, EstimationContext) int { + return l.maxNodes +} + +func (l *staticThreshold) DurationLimit(cloudprovider.NodeGroup, EstimationContext) time.Duration { + return l.maxDuration +} + +// NewStaticThreshold returns a Threshold that should be used to limit +// result and duration of binpacking by given static values +func NewStaticThreshold(maxNodes int, maxDuration time.Duration) Threshold { + return &staticThreshold{ + maxNodes: maxNodes, + maxDuration: maxDuration, + } +} diff --git a/cluster-autoscaler/estimator/threshold.go b/cluster-autoscaler/estimator/threshold.go new file mode 100644 index 000000000000..6246360009b3 --- /dev/null +++ b/cluster-autoscaler/estimator/threshold.go @@ -0,0 +1,30 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "time" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" +) + +// Threshold provides resources configuration for threshold based estimation limiter. +// Return value of 0 means that no limit is set. +type Threshold interface { + NodeLimit(cloudprovider.NodeGroup, EstimationContext) int + DurationLimit(cloudprovider.NodeGroup, EstimationContext) time.Duration +} diff --git a/cluster-autoscaler/estimator/threshold_based_limiter.go b/cluster-autoscaler/estimator/threshold_based_limiter.go index 295ded1c5c4d..57b4e51d5d28 100644 --- a/cluster-autoscaler/estimator/threshold_based_limiter.go +++ b/cluster-autoscaler/estimator/threshold_based_limiter.go @@ -29,22 +29,39 @@ type thresholdBasedEstimationLimiter struct { maxNodes int nodes int start time.Time + thresholds []Threshold } -func (tbel *thresholdBasedEstimationLimiter) StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup) { +func (tbel *thresholdBasedEstimationLimiter) StartEstimation(_ []*apiv1.Pod, nodeGroup cloudprovider.NodeGroup, context EstimationContext) { tbel.start = time.Now() tbel.nodes = 0 + tbel.maxNodes = 0 + tbel.maxDuration = time.Duration(0) + for _, threshold := range tbel.thresholds { + tbel.maxNodes = getMinLimit(tbel.maxNodes, threshold.NodeLimit(nodeGroup, context)) + tbel.maxDuration = getMinLimit(tbel.maxDuration, threshold.DurationLimit(nodeGroup, context)) + } +} + +func getMinLimit[V int | time.Duration](baseLimit V, targetLimit V) V { + if baseLimit < 0 || targetLimit < 0 { + return -1 + } + if (baseLimit == 0 || baseLimit > targetLimit) && targetLimit > 0 { + return targetLimit + } + return baseLimit } func (*thresholdBasedEstimationLimiter) EndEstimation() {} func (tbel *thresholdBasedEstimationLimiter) PermissionToAddNode() bool { - if tbel.maxNodes > 0 && tbel.nodes >= tbel.maxNodes { + if tbel.maxNodes < 0 || (tbel.maxNodes > 0 && tbel.nodes >= tbel.maxNodes) { klog.V(4).Infof("Capping binpacking after exceeding threshold of %d nodes", tbel.maxNodes) return false } timeDefined := tbel.maxDuration > 0 && tbel.start != time.Time{} - if timeDefined && time.Now().After(tbel.start.Add(tbel.maxDuration)) { + if tbel.maxDuration < 0 || (timeDefined && time.Now().After(tbel.start.Add(tbel.maxDuration))) { klog.V(4).Infof("Capping binpacking after exceeding max duration of %v", tbel.maxDuration) return false } @@ -53,12 +70,13 @@ func (tbel *thresholdBasedEstimationLimiter) PermissionToAddNode() bool { } // NewThresholdBasedEstimationLimiter returns an EstimationLimiter that will prevent estimation -// after either a node count- of time-based threshold is reached. This is meant to prevent cases +// after either a node count of time-based threshold is reached. This is meant to prevent cases // where binpacking of hundreds or thousands of nodes takes extremely long time rendering CA // incredibly slow or even completely crashing it. -func NewThresholdBasedEstimationLimiter(maxNodes int, maxDuration time.Duration) EstimationLimiter { - return &thresholdBasedEstimationLimiter{ - maxNodes: maxNodes, - maxDuration: maxDuration, - } +// Thresholds may return: +// - negative value: no new nodes are allowed to be added if at least one threshold returns negative limit +// - 0: no limit, thresholds with no limits will be ignored in favor of thresholds with positive or negative limits +// - positive value: new nodes can be added and this value represents the limit +func NewThresholdBasedEstimationLimiter(thresholds []Threshold) EstimationLimiter { + return &thresholdBasedEstimationLimiter{thresholds: thresholds} } diff --git a/cluster-autoscaler/estimator/threshold_based_limiter_test.go b/cluster-autoscaler/estimator/threshold_based_limiter_test.go index e80b586f3ebb..ebcfc0c1ddf6 100644 --- a/cluster-autoscaler/estimator/threshold_based_limiter_test.go +++ b/cluster-autoscaler/estimator/threshold_based_limiter_test.go @@ -20,9 +20,9 @@ import ( "testing" "time" - apiv1 "k8s.io/api/core/v1" - "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" ) type limiterOperation func(*testing.T, EstimationLimiter) @@ -35,9 +35,22 @@ func expectAllow(t *testing.T, l EstimationLimiter) { assert.Equal(t, true, l.PermissionToAddNode()) } -func resetLimiter(t *testing.T, l EstimationLimiter) { +func resetLimiter(_ *testing.T, l EstimationLimiter) { l.EndEstimation() - l.StartEstimation([]*apiv1.Pod{}, nil) + l.StartEstimation([]*apiv1.Pod{}, nil, nil) +} + +type dynamicThreshold struct { + nodeLimit int +} + +func (d *dynamicThreshold) DurationLimit(cloudprovider.NodeGroup, EstimationContext) time.Duration { + return 0 +} + +func (d *dynamicThreshold) NodeLimit(cloudprovider.NodeGroup, EstimationContext) int { + d.nodeLimit += 1 + return d.nodeLimit } func TestThresholdBasedLimiter(t *testing.T) { @@ -48,6 +61,7 @@ func TestThresholdBasedLimiter(t *testing.T) { startDelta time.Duration operations []limiterOperation expectNodeCount int + thresholds []Threshold }{ { name: "no limiting happens", @@ -60,19 +74,17 @@ func TestThresholdBasedLimiter(t *testing.T) { expectNodeCount: 3, }, { - name: "time based trigger fires", - maxNodes: 20, - maxDuration: 5 * time.Second, - startDelta: -10 * time.Second, + name: "time based trigger fires", + startDelta: -10 * time.Second, operations: []limiterOperation{ expectDeny, expectDeny, }, expectNodeCount: 0, + thresholds: []Threshold{NewStaticThreshold(20, 5*time.Second)}, }, { - name: "sequence of additions works until the threshold is hit", - maxNodes: 3, + name: "sequence of additions works until the threshold is hit", operations: []limiterOperation{ expectAllow, expectAllow, @@ -80,10 +92,32 @@ func TestThresholdBasedLimiter(t *testing.T) { expectDeny, }, expectNodeCount: 3, + thresholds: []Threshold{NewStaticThreshold(3, 0)}, + }, + { + name: "binpacking is stopped if at least one threshold has negative max nodes limit", + operations: []limiterOperation{ + expectDeny, + }, + expectNodeCount: 0, + thresholds: []Threshold{ + NewStaticThreshold(-1, 0), + NewStaticThreshold(10, 0), + }, + }, + { + name: "binpacking is stopped if at least one threshold has negative max duration limit", + operations: []limiterOperation{ + expectDeny, + }, + expectNodeCount: 0, + thresholds: []Threshold{ + NewStaticThreshold(100, -1), + NewStaticThreshold(10, 60*time.Minute), + }, }, { - name: "node counter is reset", - maxNodes: 2, + name: "node counter is reset", operations: []limiterOperation{ expectAllow, expectAllow, @@ -92,12 +126,11 @@ func TestThresholdBasedLimiter(t *testing.T) { expectAllow, }, expectNodeCount: 1, + thresholds: []Threshold{NewStaticThreshold(2, 0)}, }, { - name: "timer is reset", - maxNodes: 20, - maxDuration: 5 * time.Second, - startDelta: -10 * time.Second, + name: "timer is reset", + startDelta: -10 * time.Second, operations: []limiterOperation{ expectDeny, resetLimiter, @@ -105,15 +138,42 @@ func TestThresholdBasedLimiter(t *testing.T) { expectAllow, }, expectNodeCount: 2, + thresholds: []Threshold{NewStaticThreshold(20, 5*time.Second)}, + }, + { + name: "handles dynamic limits", + maxNodes: 0, + operations: []limiterOperation{ + expectAllow, + expectAllow, + expectDeny, + resetLimiter, + expectAllow, + expectAllow, + expectAllow, + expectDeny, + }, + expectNodeCount: 3, + thresholds: []Threshold{&dynamicThreshold{1}}, + }, + { + name: "duration limit is set to runtime limit", + operations: []limiterOperation{ + expectDeny, + expectDeny, + resetLimiter, // resets startDelta + expectAllow, + expectAllow, + }, + expectNodeCount: 2, + startDelta: -120 * time.Second, + thresholds: []Threshold{NewStaticThreshold(2, 60*time.Second)}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - limiter := &thresholdBasedEstimationLimiter{ - maxNodes: tc.maxNodes, - maxDuration: tc.maxDuration, - } - limiter.StartEstimation([]*apiv1.Pod{}, nil) + limiter := NewThresholdBasedEstimationLimiter(tc.thresholds).(*thresholdBasedEstimationLimiter) + limiter.StartEstimation([]*apiv1.Pod{}, nil, nil) if tc.startDelta != time.Duration(0) { limiter.start = limiter.start.Add(tc.startDelta) @@ -122,8 +182,43 @@ func TestThresholdBasedLimiter(t *testing.T) { for _, op := range tc.operations { op(t, limiter) } - assert.Equal(t, tc.expectNodeCount, limiter.nodes) + assert.Equalf(t, tc.expectNodeCount, limiter.nodes, "Number of allowed nodes does not match expectation") limiter.EndEstimation() }) } } + +func TestMinLimit(t *testing.T) { + type testCase[V interface{ int | time.Duration }] struct { + name string + baseLimit V + targetLimit V + want V + } + tests := []testCase[int]{ + {name: "At least one negative", baseLimit: -10, targetLimit: 10, want: -1}, + {name: "Negative and not set", baseLimit: -10, targetLimit: 0, want: -1}, + {name: "Both negative", baseLimit: -10, targetLimit: -10, want: -1}, + {name: "Both not set", baseLimit: 0, targetLimit: 0, want: 0}, + {name: "At least one not set", baseLimit: 0, targetLimit: 10, want: 10}, + {name: "Both set", baseLimit: 5, targetLimit: 10, want: 5}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, getMinLimit(tt.baseLimit, tt.targetLimit), "getMinLimit(%v, %v)", tt.baseLimit, tt.targetLimit) + }) + } + testsTime := []testCase[time.Duration]{ + {name: "At least one negative duration", baseLimit: time.Now().Sub(time.Now().Add(5 * time.Minute)), targetLimit: time.Duration(10), want: -1}, + {name: "Negative and not set durations", baseLimit: time.Now().Sub(time.Now().Add(5 * time.Minute)), targetLimit: time.Duration(0), want: -1}, + {name: "Both negative durations", baseLimit: time.Now().Sub(time.Now().Add(5 * time.Minute)), targetLimit: time.Duration(-10), want: -1}, + {name: "Both not set durations", baseLimit: time.Duration(0), targetLimit: time.Duration(0)}, + {name: "At least one not set duration", baseLimit: time.Duration(0), targetLimit: time.Duration(10), want: 10}, + {name: "Both set durations", baseLimit: time.Duration(5), targetLimit: time.Duration(10), want: 5}, + } + for _, tt := range testsTime { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, getMinLimit(tt.baseLimit, tt.targetLimit), "getMinLimit(%v, %v)", tt.baseLimit, tt.targetLimit) + }) + } +}