Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow specification of multiple expanders #4233

Merged
merged 1 commit into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cluster-autoscaler/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,12 @@ would match the cluster size. This expander is described in more details

* `priority` - selects the node group that has the highest priority assigned by the user. It's configuration is described in more details [here](expander/priority/readme.md)


Multiple expanders may be passed, i.e.
`.cluster-autoscaler --expander=priority,least-waste`

This will cause the `least-waste` expander to be used as a fallback in the event that the priority expander selects multiple node groups. In general, a list of expanders can be used, where the output of one is passed to the next and the final decision by randomly selecting one. An expander must not appear in the list more than once.

### Does CA respect node affinity when selecting node groups to scale up?

CA respects `nodeSelector` and `requiredDuringSchedulingIgnoredDuringExecution` in nodeAffinity given that you have labelled your node groups accordingly. If there is a pod that cannot be scheduled with either `nodeSelector` or `requiredDuringSchedulingIgnoredDuringExecution` specified, CA will only consider node groups that satisfy those requirements for expansion.
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ type AutoscalingOptions struct {
NodeGroupAutoDiscovery []string
// EstimatorName is the estimator used to estimate the number of needed nodes in scale up.
EstimatorName string
// ExpanderName sets the type of node group expander to be used in scale up
ExpanderName string
// ExpanderNames sets the chain of node group expanders to be used in scale up
ExpanderNames string
// IgnoreDaemonSetsUtilization is whether CA will ignore DaemonSet pods when calculating resource utilization for scaling down
IgnoreDaemonSetsUtilization bool
// IgnoreMirrorPodsUtilization is whether CA will ignore Mirror pods when calculating resource utilization for scaling down
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package core

import (
"strings"
"time"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
Expand Down Expand Up @@ -101,7 +102,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error {
opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions)
}
if opts.ExpanderStrategy == nil {
expanderStrategy, err := factory.ExpanderStrategyFromString(opts.ExpanderName,
expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","),
opts.CloudProvider, opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/expander/expander.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ type Option struct {
type Strategy interface {
BestOption(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) *Option
}

// Filter describes an interface for filtering to equally good options according to some criteria
type Filter interface {
BestOptions(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) []Option
}
46 changes: 46 additions & 0 deletions cluster-autoscaler/expander/factory/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package factory

import (
"k8s.io/autoscaler/cluster-autoscaler/expander"

schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type chainStrategy struct {
filters []expander.Filter
fallback expander.Strategy
}

func newChainStrategy(filters []expander.Filter, fallback expander.Strategy) expander.Strategy {
return &chainStrategy{
filters: filters,
fallback: fallback,
}
}

func (c *chainStrategy) BestOption(options []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
filteredOptions := options
for _, filter := range c.filters {
filteredOptions = filter.BestOptions(filteredOptions, nodeInfo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is non-trivial logic behind BestOptions call sometime, so maybe worth returning earlier if there's just one option left?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, will do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if len(filteredOptions) == 1 {
return &filteredOptions[0]
}
}
return c.fallback.BestOption(filteredOptions, nodeInfo)
}
133 changes: 133 additions & 0 deletions cluster-autoscaler/expander/factory/chain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
Copyright 2021 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package factory

import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"strings"
"testing"

"github.com/stretchr/testify/assert"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type substringTestFilterStrategy struct {
substring string
}

func newSubstringTestFilterStrategy(substring string) *substringTestFilterStrategy {
return &substringTestFilterStrategy{
substring: substring,
}
}

func (s *substringTestFilterStrategy) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
var ret []expander.Option
for _, option := range expansionOptions {
if strings.Contains(option.Debug, s.substring) {
ret = append(ret, option)
}
}
return ret

}

func (s *substringTestFilterStrategy) BestOption(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
ret := s.BestOptions(expansionOptions, nodeInfo)
if len(ret) == 0 {
return nil
}
return &ret[0]
}

func TestChainStrategy_BestOption(t *testing.T) {
for name, tc := range map[string]struct {
filters []expander.Filter
fallback expander.Strategy
options []expander.Option
expected *expander.Option
}{
"selects with no filters": {
filters: []expander.Filter{},
fallback: newSubstringTestFilterStrategy("a"),
options: []expander.Option{
*newOption("b"),
*newOption("a"),
},
expected: newOption("a"),
},
"filters with one filter": {
filters: []expander.Filter{
newSubstringTestFilterStrategy("a"),
},
fallback: newSubstringTestFilterStrategy("b"),
options: []expander.Option{
*newOption("ab"),
*newOption("b"),
},
expected: newOption("ab"),
},
"filters with multiple filters": {
filters: []expander.Filter{
newSubstringTestFilterStrategy("a"),
newSubstringTestFilterStrategy("b"),
},
fallback: newSubstringTestFilterStrategy("x"),
options: []expander.Option{
*newOption("xab"),
*newOption("xa"),
*newOption("x"),
},
expected: newOption("xab"),
},
"selects from multiple after filters": {
filters: []expander.Filter{
newSubstringTestFilterStrategy("x"),
},
fallback: newSubstringTestFilterStrategy("a"),
options: []expander.Option{
*newOption("xc"),
*newOption("xaa"),
*newOption("xab"),
},
expected: newOption("xaa"),
},
"short circuits": {
filters: []expander.Filter{
newSubstringTestFilterStrategy("a"),
newSubstringTestFilterStrategy("b"),
},
fallback: newSubstringTestFilterStrategy("x"),
options: []expander.Option{
*newOption("a"),
},
expected: newOption("a"),
},
} {
t.Run(name, func(t *testing.T) {
subject := newChainStrategy(tc.filters, tc.fallback)
actual := subject.BestOption(tc.options, nil)
assert.Equal(t, tc.expected, actual)
})
}
}

func newOption(debug string) *expander.Option {
return &expander.Option{
Debug: debug,
}
}
62 changes: 40 additions & 22 deletions cluster-autoscaler/expander/factory/expander_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,48 @@ import (
kube_client "k8s.io/client-go/kubernetes"
)

// ExpanderStrategyFromString creates an expander.Strategy according to its name
func ExpanderStrategyFromString(expanderFlag string, cloudProvider cloudprovider.CloudProvider,
// ExpanderStrategyFromStrings creates an expander.Strategy according to the names of the expanders passed in
func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprovider.CloudProvider,
autoscalingKubeClients *context.AutoscalingKubeClients, kubeClient kube_client.Interface,
configNamespace string) (expander.Strategy, errors.AutoscalerError) {
switch expanderFlag {
case expander.RandomExpanderName:
return random.NewStrategy(), nil
case expander.MostPodsExpanderName:
return mostpods.NewStrategy(), nil
case expander.LeastWasteExpanderName:
return waste.NewStrategy(), nil
case expander.PriceBasedExpanderName:
if _, err := cloudProvider.Pricing(); err != nil {
return nil, err
var filters []expander.Filter
seenExpanders := map[string]struct{}{}
strategySeen := false
for i, expanderFlag := range expanderFlags {
if _, ok := seenExpanders[expanderFlag]; ok {
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s was specified multiple times, each expander must not be specified more than once", expanderFlag)
}
if strategySeen {
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s came after an expander %s that will always return only one result, this is not allowed since %s will never be used", expanderFlag, expanderFlags[i-1], expanderFlag)
}
seenExpanders[expanderFlag] = struct{}{}

switch expanderFlag {
case expander.RandomExpanderName:
filters = append(filters, random.NewFilter())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be an error to specify an expander that's guaranteed to provide a single option with a fallback? E.g. random,least-waste is not really a reasonable config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True that the configuration is not a reasonable one, but I felt that catching this case wasn't worth the effort as it would require disambiguating between expanders that

  1. Always return one result and
  2. Those that can return more than one.

My thinking is that since it won't create problems then we can keep the simplicity of allowing this. Happy to be overruled on this though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, something like this. I'm not insisting on this one, but since you're generating an error for one class of invalid configs (duplicated entries), I thought it may make sense to go a step further and disallow bad ordering as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok addressed in this pr now!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat, thanks! Maybe worth adding a comment about this behavior? May be surprising for anyone adding a new expander in the future. Please also squash commits before merging.

case expander.MostPodsExpanderName:
filters = append(filters, mostpods.NewFilter())
case expander.LeastWasteExpanderName:
filters = append(filters, waste.NewFilter())
case expander.PriceBasedExpanderName:
if _, err := cloudProvider.Pricing(); err != nil {
return nil, err
}
filters = append(filters, price.NewFilter(cloudProvider,
price.NewSimplePreferredNodeProvider(autoscalingKubeClients.AllNodeLister()),
price.SimpleNodeUnfitness))
case expander.PriorityBasedExpanderName:
// It seems other listers do the same here - they never receive the termination msg on the ch.
// This should be currently OK.
stopChannel := make(chan struct{})
lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace)
filters = append(filters, priority.NewFilter(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder))
default:
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
}
if _, ok := filters[len(filters)-1].(expander.Strategy); ok {
strategySeen = true
}
return price.NewStrategy(cloudProvider,
price.NewSimplePreferredNodeProvider(autoscalingKubeClients.AllNodeLister()),
price.SimpleNodeUnfitness), nil
case expander.PriorityBasedExpanderName:
// It seems other listers do the same here - they never receive the termination msg on the ch.
// This should be currently OK.
stopChannel := make(chan struct{})
lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace)
return priority.NewStrategy(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder)
}
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
return newChainStrategy(filters, random.NewStrategy()), nil
}
12 changes: 5 additions & 7 deletions cluster-autoscaler/expander/mostpods/mostpods.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@ package mostpods

import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/random"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type mostpods struct {
fallbackStrategy expander.Strategy
}

// NewStrategy returns a scale up strategy (expander) that picks the node group that can schedule the most pods
func NewStrategy() expander.Strategy {
return &mostpods{random.NewStrategy()}
// NewFilter returns a scale up filter that picks the node group that can schedule the most pods
func NewFilter() expander.Filter {
return &mostpods{}
}

// BestOption Selects the expansion option that schedules the most pods
func (m *mostpods) BestOption(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
func (m *mostpods) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
var maxPods int
var maxOptions []expander.Option

Expand All @@ -51,5 +49,5 @@ func (m *mostpods) BestOption(expansionOptions []expander.Option, nodeInfo map[s
return nil
}

return m.fallbackStrategy.BestOption(maxOptions, nodeInfo)
return maxOptions
}
30 changes: 17 additions & 13 deletions cluster-autoscaler/expander/price/price.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ var (
gpuUnfitnessOverride = 1000.0
)

// NewStrategy returns an expansion strategy that picks nodes based on price and preferred node type.
func NewStrategy(cloudProvider cloudprovider.CloudProvider,
// NewFilter returns an expansion filter that picks nodes based on price and preferred node type.
func NewFilter(cloudProvider cloudprovider.CloudProvider,
preferredNodeProvider PreferredNodeProvider,
nodeUnfitness NodeUnfitness,
) expander.Strategy {
) expander.Filter {
return &priceBased{
cloudProvider: cloudProvider,
preferredNodeProvider: preferredNodeProvider,
Expand All @@ -87,8 +87,8 @@ func NewStrategy(cloudProvider cloudprovider.CloudProvider,
}

// BestOption selects option based on cost and preferred node type.
func (p *priceBased) BestOption(expansionOptions []expander.Option, nodeInfos map[string]*schedulerframework.NodeInfo) *expander.Option {
var bestOption *expander.Option
func (p *priceBased) BestOptions(expansionOptions []expander.Option, nodeInfos map[string]*schedulerframework.NodeInfo) []expander.Option {
var bestOptions []expander.Option
bestOptionScore := 0.0
now := time.Now()
then := now.Add(time.Hour)
Expand Down Expand Up @@ -169,17 +169,21 @@ nextoption:

klog.V(5).Infof("Price expander for %s: %s", option.NodeGroup.Id(), debug)

if bestOption == nil || bestOptionScore > optionScore {
bestOption = &expander.Option{
NodeGroup: option.NodeGroup,
NodeCount: option.NodeCount,
Debug: fmt.Sprintf("%s | price-expander: %s", option.Debug, debug),
Pods: option.Pods,
}
maybeBestOption := expander.Option{
NodeGroup: option.NodeGroup,
NodeCount: option.NodeCount,
Debug: fmt.Sprintf("%s | price-expander: %s", option.Debug, debug),
Pods: option.Pods,
}
if len(bestOptions) == 0 || bestOptionScore == optionScore {
bestOptions = append(bestOptions, maybeBestOption)
bestOptionScore = optionScore
} else if bestOptionScore > optionScore {
bestOptions = []expander.Option{maybeBestOption}
bestOptionScore = optionScore
}
}
return bestOption
return bestOptions
}

// buildPod creates a pod with specified resources.
Expand Down
Loading