Skip to content

Commit

Permalink
Merge pull request kubernetes#5917 from ystryuchkov/feature/runtime-l…
Browse files Browse the repository at this point in the history
…imits

Implement configurable thresholds for threshold based estimator
  • Loading branch information
k8s-ci-robot authored Jul 5, 2023
2 parents adb16c8 + b4213d8 commit 2779677
Show file tree
Hide file tree
Showing 15 changed files with 600 additions and 46 deletions.
11 changes: 10 additions & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,16 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
opts.ExpanderStrategy = expanderStrategy
}
if opts.EstimatorBuilder == nil {
estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration), estimator.NewDecreasingPodOrderer())
thresholds := []estimator.Threshold{
estimator.NewStaticThreshold(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration),
estimator.NewSngCapacityThreshold(),
estimator.NewClusterCapacityThreshold(),
}
estimatorBuilder, err := estimator.NewEstimatorBuilder(
opts.EstimatorName,
estimator.NewThresholdBasedEstimationLimiter(thresholds),
estimator.NewDecreasingPodOrderer(),
)
if err != nil {
return err
}
Expand Down
16 changes: 11 additions & 5 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"

Expand Down Expand Up @@ -149,7 +150,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}

for _, nodeGroup := range validNodeGroups {
option := o.ComputeExpansionOption(nodeGroup, schedulablePods, nodeInfos, upcomingNodes, now)
option := o.ComputeExpansionOption(nodeGroup, schedulablePods, nodeInfos, len(nodes)+len(upcomingNodes), now)
o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingContext, nodeGroup.Id())

if len(option.Pods) == 0 || option.NodeCount == 0 {
Expand Down Expand Up @@ -479,7 +480,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
nodeGroup cloudprovider.NodeGroup,
schedulablePods map[string][]*apiv1.Pod,
nodeInfos map[string]*schedulerframework.NodeInfo,
upcomingNodes []*schedulerframework.NodeInfo,
currentNodeCount int,
now time.Time,
) expander.Option {
option := expander.Option{NodeGroup: nodeGroup}
Expand All @@ -490,11 +491,16 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
return option
}

option.SimilarNodeGroups = o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePods, now)

estimateStart := time.Now()
estimator := o.autoscalingContext.EstimatorBuilder(o.autoscalingContext.PredicateChecker, o.autoscalingContext.ClusterSnapshot)
option.NodeCount, option.Pods = estimator.Estimate(pods, nodeInfo, nodeGroup)
expansionEstimator := o.autoscalingContext.EstimatorBuilder(
o.autoscalingContext.PredicateChecker,
o.autoscalingContext.ClusterSnapshot,
estimator.NewEstimationContext(o.autoscalingContext.MaxNodesTotal, option.SimilarNodeGroups, currentNodeCount),
)
option.NodeCount, option.Pods = expansionEstimator.Estimate(pods, nodeInfo, nodeGroup)
metrics.UpdateDurationFromStart(metrics.Estimate, estimateStart)
option.SimilarNodeGroups = o.ComputeSimilarNodeGroups(nodeGroup, nodeInfos, schedulablePods, now)

autoscalingOptions, err := nodeGroup.GetOptions(o.autoscalingContext.NodeGroupDefaults)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion cluster-autoscaler/core/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ func NewScaleTestAutoscalingContext(
}
// Ignoring error here is safe - if a test doesn't specify valid estimatorName,
// it either doesn't need one, or should fail when it turns out to be nil.
estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(0, 0), estimator.NewDecreasingPodOrderer())
estimatorBuilder, _ := estimator.NewEstimatorBuilder(
options.EstimatorName,
estimator.NewThresholdBasedEstimationLimiter(nil),
estimator.NewDecreasingPodOrderer(),
)
predicateChecker, err := predicatechecker.NewTestPredicateChecker()
if err != nil {
return context.AutoscalingContext{}, err
Expand Down
8 changes: 6 additions & 2 deletions cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,23 @@ type BinpackingNodeEstimator struct {
clusterSnapshot clustersnapshot.ClusterSnapshot
limiter EstimationLimiter
podOrderer EstimationPodOrderer
context EstimationContext
}

// NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator.
func NewBinpackingNodeEstimator(
predicateChecker predicatechecker.PredicateChecker,
clusterSnapshot clustersnapshot.ClusterSnapshot,
limiter EstimationLimiter,
podOrderer EstimationPodOrderer) *BinpackingNodeEstimator {
podOrderer EstimationPodOrderer,
context EstimationContext,
) *BinpackingNodeEstimator {
return &BinpackingNodeEstimator{
predicateChecker: predicateChecker,
clusterSnapshot: clusterSnapshot,
limiter: limiter,
podOrderer: podOrderer,
context: context,
}
}

Expand All @@ -65,7 +69,7 @@ func (e *BinpackingNodeEstimator) Estimate(
nodeTemplate *schedulerframework.NodeInfo,
nodeGroup cloudprovider.NodeGroup) (int, []*apiv1.Pod) {

e.limiter.StartEstimation(pods, nodeGroup)
e.limiter.StartEstimation(pods, nodeGroup, e.context)
defer e.limiter.EndEstimation()

pods = e.podOrderer.Order(pods, nodeTemplate, nodeGroup)
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/estimator/binpacking_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ func TestBinpackingEstimate(t *testing.T) {

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(t, err)
limiter := NewThresholdBasedEstimationLimiter(tc.maxNodes, time.Duration(0))
limiter := NewThresholdBasedEstimationLimiter([]Threshold{NewStaticThreshold(tc.maxNodes, time.Duration(0))})
processor := NewDecreasingPodOrderer()
estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, processor)
estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, processor, nil)

node := makeNode(tc.millicores, tc.memory, "template", "zone-mars")
nodeInfo := schedulerframework.NewNodeInfo()
Expand Down
52 changes: 52 additions & 0 deletions cluster-autoscaler/estimator/cluster_capacity_threshold.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package estimator

import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)

type clusterCapacityThreshold struct {
}

// NodeLimit returns maximum number of new nodes that can be added to the cluster
// based on its capacity. Possible return values are:
// - -1 when cluster has no available capacity
// - 0 when context or cluster-wide node limit is not set. Return value of 0 means that there is no limit.
// - Any positive number representing maximum possible number of new nodes
func (l *clusterCapacityThreshold) NodeLimit(_ cloudprovider.NodeGroup, context EstimationContext) int {
if context == nil || context.ClusterMaxNodeLimit() == 0 {
return 0
}
if (context.ClusterMaxNodeLimit() < 0) || (context.ClusterMaxNodeLimit() <= context.CurrentNodeCount()) {
return -1
}
return context.ClusterMaxNodeLimit() - context.CurrentNodeCount()
}

// DurationLimit always returns 0 for this threshold, meaning that no limit is set.
func (l *clusterCapacityThreshold) DurationLimit(cloudprovider.NodeGroup, EstimationContext) time.Duration {
return 0
}

// NewClusterCapacityThreshold returns a Threshold that can be used to limit binpacking
// by available cluster capacity
func NewClusterCapacityThreshold() Threshold {
return &clusterCapacityThreshold{}
}
68 changes: 68 additions & 0 deletions cluster-autoscaler/estimator/cluster_capacity_threshold_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package estimator

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewClusterCapacityThreshold(t *testing.T) {
tests := []struct {
name string
wantThreshold int
contextMaxNodes int
contextCurrentNodes int
}{
{
name: "returns available capacity",
contextMaxNodes: 10,
contextCurrentNodes: 5,
wantThreshold: 5,
},
{
name: "no threshold is set if cluster capacity is unlimited",
contextMaxNodes: 0,
contextCurrentNodes: 10,
wantThreshold: 0,
},
{
name: "threshold is negative if cluster has no capacity",
contextMaxNodes: 5,
contextCurrentNodes: 10,
wantThreshold: -1,
},
{
name: "threshold is negative if cluster node limit is negative",
contextMaxNodes: -5,
contextCurrentNodes: 0,
wantThreshold: -1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
context := &estimationContext{
similarNodeGroups: nil,
currentNodeCount: tt.contextCurrentNodes,
clusterMaxNodeLimit: tt.contextMaxNodes,
}
assert.Equal(t, tt.wantThreshold, NewClusterCapacityThreshold().NodeLimit(nil, context))
assert.True(t, NewClusterCapacityThreshold().DurationLimit(nil, nil) == 0)
})
}
}
59 changes: 59 additions & 0 deletions cluster-autoscaler/estimator/estimation_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package estimator

import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)

// EstimationContext stores static and runtime state of autoscaling, used by Estimator
type EstimationContext interface {
SimilarNodeGroups() []cloudprovider.NodeGroup
ClusterMaxNodeLimit() int
CurrentNodeCount() int
}

type estimationContext struct {
similarNodeGroups []cloudprovider.NodeGroup
currentNodeCount int
clusterMaxNodeLimit int
}

// NewEstimationContext creates a patch for estimation context with runtime properties.
// This patch is used to update existing context.
func NewEstimationContext(clusterMaxNodeLimit int, similarNodeGroups []cloudprovider.NodeGroup, currentNodeCount int) EstimationContext {
return &estimationContext{
similarNodeGroups: similarNodeGroups,
currentNodeCount: currentNodeCount,
clusterMaxNodeLimit: clusterMaxNodeLimit,
}
}

// SimilarNodeGroups returns array of similar node groups
func (c *estimationContext) SimilarNodeGroups() []cloudprovider.NodeGroup {
return c.similarNodeGroups
}

// ClusterMaxNodeLimit returns maximum node number allowed for the cluster
func (c *estimationContext) ClusterMaxNodeLimit() int {
return c.clusterMaxNodeLimit
}

// CurrentNodeCount returns current number of nodes in the cluster
func (c *estimationContext) CurrentNodeCount() int {
return c.currentNodeCount
}
9 changes: 5 additions & 4 deletions cluster-autoscaler/estimator/estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ type Estimator interface {
}

// EstimatorBuilder creates a new estimator object.
type EstimatorBuilder func(predicatechecker.PredicateChecker, clustersnapshot.ClusterSnapshot) Estimator
type EstimatorBuilder func(predicatechecker.PredicateChecker, clustersnapshot.ClusterSnapshot, EstimationContext) Estimator

// NewEstimatorBuilder creates a new estimator object from flag.
func NewEstimatorBuilder(name string, limiter EstimationLimiter, orderer EstimationPodOrderer) (EstimatorBuilder, error) {
switch name {
case BinpackingEstimatorName:
return func(
predicateChecker predicatechecker.PredicateChecker,
clusterSnapshot clustersnapshot.ClusterSnapshot) Estimator {
return NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, orderer)
clusterSnapshot clustersnapshot.ClusterSnapshot,
context EstimationContext) Estimator {
return NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, orderer, context)
}, nil
}
return nil, fmt.Errorf("unknown estimator: %s", name)
Expand All @@ -63,7 +64,7 @@ func NewEstimatorBuilder(name string, limiter EstimationLimiter, orderer Estimat
// scale-up is limited by external factors.
type EstimationLimiter interface {
// StartEstimation is called at the start of estimation.
StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup)
StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup, EstimationContext)
// EndEstimation is called at the end of estimation.
EndEstimation()
// PermissionToAddNode is called by an estimator when it wants to add additional
Expand Down
Loading

0 comments on commit 2779677

Please sign in to comment.