diff --git a/cluster-autoscaler/.gitignore b/cluster-autoscaler/.gitignore index 7db21091f768..7a3690aa6185 100644 --- a/cluster-autoscaler/.gitignore +++ b/cluster-autoscaler/.gitignore @@ -8,3 +8,4 @@ cluster_autoscaler *.un~ Session.vim .netrwhist +.vscode diff --git a/cluster-autoscaler/FAQ.md b/cluster-autoscaler/FAQ.md index 395d6443f00f..719117ca2f59 100644 --- a/cluster-autoscaler/FAQ.md +++ b/cluster-autoscaler/FAQ.md @@ -590,7 +590,7 @@ new nodes will be added. Expanders can be selected by passing the name to the `--expander` flag, i.e. `./cluster-autoscaler --expander=random`. -Currently Cluster Autoscaler has 4 expanders: +Currently Cluster Autoscaler has 5 expanders: * `random` - this is the default expander, and should be used when you don't have a particular need for the node groups to scale differently. @@ -607,12 +607,15 @@ after scale-up. This is useful when you have different classes of nodes, for exa would match the cluster size. This expander is described in more details [HERE](https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/proposals/pricing.md). Currently it works only for GCE and GKE (patches welcome.) +* `priority` - selects the node group that has the highest priority assigned by the user. It's configuration is described in more details [here](expander/priority/readme.md) + ### Does CA respect node affinity when selecting node groups to scale up? + CA respects `nodeSelector` and `requiredDuringSchedulingIgnoredDuringExecution` in nodeAffinity given that you have labelled your node groups accordingly. If there is a pod that cannot be scheduled with either `nodeSelector` or `requiredDuringSchedulingIgnoredDuringExecution` specified, CA will only consider node groups that satisfy those requirements for expansion. However, CA does not consider "soft" constraints like `preferredDuringSchedulingIgnoredDuringExecution` when selecting node groups. That means that if CA has two or more node groups available for expansion, it will not use soft constraints to pick one node group over another. -************ +**************** ### What are the parameters to CA? diff --git a/cluster-autoscaler/cloudprovider/alicloud/examples/cluster-autoscaler-standard.yaml b/cluster-autoscaler/cloudprovider/alicloud/examples/cluster-autoscaler-standard.yaml index b37c672b373e..013a06bc9b77 100644 --- a/cluster-autoscaler/cloudprovider/alicloud/examples/cluster-autoscaler-standard.yaml +++ b/cluster-autoscaler/cloudprovider/alicloud/examples/cluster-autoscaler-standard.yaml @@ -63,8 +63,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1beta1 diff --git a/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-autodiscover.yaml b/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-autodiscover.yaml index 194d7cb19c8f..87d07b22eee1 100644 --- a/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-autodiscover.yaml +++ b/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-autodiscover.yaml @@ -63,8 +63,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-multi-asg.yaml b/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-multi-asg.yaml index 338607da019f..63b8d7df1cf7 100644 --- a/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-multi-asg.yaml +++ b/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-multi-asg.yaml @@ -63,8 +63,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-one-asg.yaml b/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-one-asg.yaml index 27be4704f62f..f842055f8672 100644 --- a/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-one-asg.yaml +++ b/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-one-asg.yaml @@ -63,8 +63,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-run-on-master.yaml b/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-run-on-master.yaml index ed02bf2c3735..e50dbbf73dc1 100644 --- a/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-run-on-master.yaml +++ b/cluster-autoscaler/cloudprovider/aws/examples/cluster-autoscaler-run-on-master.yaml @@ -63,8 +63,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-containerservice.yaml b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-containerservice.yaml index 6e0b14deaf3b..edf6f25f2f46 100644 --- a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-containerservice.yaml +++ b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-containerservice.yaml @@ -66,8 +66,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard-master.yaml b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard-master.yaml index 44d97e718e38..78891610b68b 100644 --- a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard-master.yaml +++ b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard-master.yaml @@ -66,8 +66,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard-msi.yaml b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard-msi.yaml index b3cf1b3ac2e5..13758dc4bdd7 100644 --- a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard-msi.yaml +++ b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard-msi.yaml @@ -66,8 +66,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard.yaml b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard.yaml index 5a515a403295..855531f8346c 100644 --- a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard.yaml +++ b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-standard.yaml @@ -66,8 +66,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss-master.yaml b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss-master.yaml index db00b9554d44..b6eadbe5ad26 100644 --- a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss-master.yaml +++ b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss-master.yaml @@ -66,8 +66,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss-msi.yaml b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss-msi.yaml index b0ec7df1240a..7a79ff6f9bc5 100644 --- a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss-msi.yaml +++ b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss-msi.yaml @@ -66,8 +66,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss.yaml b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss.yaml index 0b9db585fc21..efaa12c64990 100644 --- a/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss.yaml +++ b/cluster-autoscaler/cloudprovider/azure/examples/cluster-autoscaler-vmss.yaml @@ -66,8 +66,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/cloudprovider/baiducloud/examples/cluster-autoscaler-one-asg.yaml b/cluster-autoscaler/cloudprovider/baiducloud/examples/cluster-autoscaler-one-asg.yaml index 2b43efcb1834..c89e5429ad5c 100644 --- a/cluster-autoscaler/cloudprovider/baiducloud/examples/cluster-autoscaler-one-asg.yaml +++ b/cluster-autoscaler/cloudprovider/baiducloud/examples/cluster-autoscaler-one-asg.yaml @@ -63,8 +63,8 @@ rules: verbs: ["create"] - apiGroups: [""] resources: ["configmaps"] - resourceNames: ["cluster-autoscaler-status"] - verbs: ["delete","get","update"] + resourceNames: ["cluster-autoscaler-status", "cluster-autoscaler-priority-expander"] + verbs: ["delete","get","update","watch"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 6140d673d8d8..3b5aff233e73 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -94,7 +94,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error { } if opts.ExpanderStrategy == nil { expanderStrategy, err := factory.ExpanderStrategyFromString(opts.ExpanderName, - opts.CloudProvider, opts.AutoscalingKubeClients.AllNodeLister()) + opts.CloudProvider, opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace) if err != nil { return err } diff --git a/cluster-autoscaler/expander/expander.go b/cluster-autoscaler/expander/expander.go index af9b83a1035f..7f8e3c35def4 100644 --- a/cluster-autoscaler/expander/expander.go +++ b/cluster-autoscaler/expander/expander.go @@ -24,7 +24,7 @@ import ( var ( // AvailableExpanders is a list of available expander options - AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName, PriceBasedExpanderName} + AvailableExpanders = []string{RandomExpanderName, MostPodsExpanderName, LeastWasteExpanderName, PriceBasedExpanderName, PriorityBasedExpanderName} // RandomExpanderName selects a node group at random RandomExpanderName = "random" // MostPodsExpanderName selects a node group that fits the most pods @@ -34,6 +34,8 @@ var ( // PriceBasedExpanderName selects a node group that is the most cost-effective and consistent with // the preferred node size for the cluster PriceBasedExpanderName = "price" + // PriorityBasedExpanderName selects a node group based on a user-configured priorities assigned to group names + PriorityBasedExpanderName = "priority" ) // Option describes an option to expand the cluster. diff --git a/cluster-autoscaler/expander/factory/expander_factory.go b/cluster-autoscaler/expander/factory/expander_factory.go index 628b8cb8fb81..d6329531b2cd 100644 --- a/cluster-autoscaler/expander/factory/expander_factory.go +++ b/cluster-autoscaler/expander/factory/expander_factory.go @@ -18,19 +18,21 @@ package factory import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/expander/mostpods" "k8s.io/autoscaler/cluster-autoscaler/expander/price" + "k8s.io/autoscaler/cluster-autoscaler/expander/priority" "k8s.io/autoscaler/cluster-autoscaler/expander/random" "k8s.io/autoscaler/cluster-autoscaler/expander/waste" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" - - kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + kube_client "k8s.io/client-go/kubernetes" ) // ExpanderStrategyFromString creates an expander.Strategy according to its name func ExpanderStrategyFromString(expanderFlag string, cloudProvider cloudprovider.CloudProvider, - nodeLister kube_util.NodeLister) (expander.Strategy, errors.AutoscalerError) { + autoscalingKubeClients *context.AutoscalingKubeClients, kubeClient kube_client.Interface, + configNamespace string) (expander.Strategy, errors.AutoscalerError) { switch expanderFlag { case expander.RandomExpanderName: return random.NewStrategy(), nil @@ -44,8 +46,15 @@ func ExpanderStrategyFromString(expanderFlag string, cloudProvider cloudprovider return nil, err } return price.NewStrategy(pricing, - price.NewSimplePreferredNodeProvider(nodeLister), + price.NewSimplePreferredNodeProvider(autoscalingKubeClients.AllNodeLister()), price.SimpleNodeUnfitness), nil + case expander.PriorityBasedExpanderName: + maps := kubeClient.CoreV1().ConfigMaps(configNamespace) + initialPriorities, priorityChangesChan, err := priority.InitPriorityConfigMap(maps, configNamespace) + if err != nil { + return nil, errors.ToAutoscalerError(errors.InternalError, err) + } + return priority.NewStrategy(initialPriorities, priorityChangesChan, autoscalingKubeClients.LogRecorder) } return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag) } diff --git a/cluster-autoscaler/expander/priority/configmap_handler.go b/cluster-autoscaler/expander/priority/configmap_handler.go new file mode 100644 index 000000000000..70c34767a91e --- /dev/null +++ b/cluster-autoscaler/expander/priority/configmap_handler.go @@ -0,0 +1,73 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package priority + +import ( + "errors" + "fmt" + + kube_errors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/watch" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/klog" + api "k8s.io/kubernetes/pkg/apis/core" +) + +const ( + // PriorityConfigMapName defines a name of the ConfigMap used to store priority expander configuration + PriorityConfigMapName = "cluster-autoscaler-priority-expander" + // ConfigMapKey defines the key used in the ConfigMap to configure priorities + ConfigMapKey = "priorities" +) + +// InitPriorityConfigMap initializes ConfigMap with priority expander configurations. It checks if the map exists +// and has the correct top level key. If it doesn't, it returns error or Exits. If the value is found, +// the current value is fetched and a Watcher is started to watch for changes. It returns the current value of +// the config map, the channel with value updates and an error. +func InitPriorityConfigMap(maps v1.ConfigMapInterface, namespace string) (string, <-chan watch.Event, error) { + errMsg := "" + priorities := "" + + configMap, getStatusError := maps.Get(PriorityConfigMapName, metav1.GetOptions{}) + if getStatusError == nil { + priorities = configMap.Data[ConfigMapKey] + } else if kube_errors.IsNotFound(getStatusError) { + errMsg = fmt.Sprintf("Priority expander config map %s/%s not found. You have to create it before starting cluster-autoscaler "+ + "with priority expander.", namespace, PriorityConfigMapName) + } else { + errMsg = fmt.Sprintf("Failed to retrieve priority expander configmap %s/%s: %v", namespace, PriorityConfigMapName, + getStatusError) + } + if errMsg != "" { + return "", nil, errors.New(errMsg) + } + + watcher, err := maps.Watch(metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(api.ObjectNameField, PriorityConfigMapName).String(), + Watch: true, + }) + if err != nil { + errMsg = fmt.Sprintf("Error when starting a watcher for changes of the priority expander configmap %s/%s: %v", + namespace, PriorityConfigMapName, err) + klog.Errorf(errMsg) + return "", nil, errors.New(errMsg) + } + + return priorities, watcher.ResultChan(), nil +} diff --git a/cluster-autoscaler/expander/priority/priority-expander-configmap.yaml b/cluster-autoscaler/expander/priority/priority-expander-configmap.yaml new file mode 100644 index 000000000000..cd4e0d3496a5 --- /dev/null +++ b/cluster-autoscaler/expander/priority/priority-expander-configmap.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: cluster-autoscaler-priority-expander +data: + priorities: |- + 10: + - .*t2\.large.* + - .*t3\.large.* + 50: + - .*m4\.4xlarge.* \ No newline at end of file diff --git a/cluster-autoscaler/expander/priority/priority.go b/cluster-autoscaler/expander/priority/priority.go new file mode 100644 index 000000000000..1ed05ee59d8a --- /dev/null +++ b/cluster-autoscaler/expander/priority/priority.go @@ -0,0 +1,189 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package priority + +import ( + "fmt" + "regexp" + "sync" + + "gopkg.in/yaml.v2" + + "k8s.io/autoscaler/cluster-autoscaler/expander" + "k8s.io/autoscaler/cluster-autoscaler/expander/random" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" +) + +type priority struct { + fallbackStrategy expander.Strategy + changesChan <-chan watch.Event + priorities map[int][]*regexp.Regexp + padlock sync.RWMutex + okConfigUpdates int + badConfigUpdates int + logRecorder EventRecorder +} + +// NewStrategy returns an expansion strategy that picks node groups based on user-defined priorities +func NewStrategy(initialPriorities string, priorityChangesChan <-chan watch.Event, + logRecorder EventRecorder) (expander.Strategy, errors.AutoscalerError) { + res := &priority{ + fallbackStrategy: random.NewStrategy(), + changesChan: priorityChangesChan, + logRecorder: logRecorder, + } + if err := res.parsePrioritiesYAMLString(initialPriorities); err != nil { + return nil, errors.ToAutoscalerError(errors.InternalError, err) + } + go func() { + // TODO: how to terminate on process shutdown? + for event := range priorityChangesChan { + cm, ok := event.Object.(*apiv1.ConfigMap) + if !ok { + klog.Exit("Unexpected object type received on the configmap update channel in priority expander") + } + + if event.Type == watch.Deleted { + msg := "Configmap for priority expander was deleted, no updates will be processed until recreated." + res.logConfigWarning("PriorityConfigMapDeleted", msg) + continue + } + + prioString, found := cm.Data[ConfigMapKey] + if !found { + msg := fmt.Sprintf("Wrong configmap for priority expander, doesn't contain %s key. Ignoring update.", + ConfigMapKey) + res.logConfigWarning("PriorityConfigMapInvalid", msg) + continue + } + if err := res.parsePrioritiesYAMLString(prioString); err != nil { + msg := fmt.Sprintf("Wrong configuration for priority expander: %v. Ignoring update.", err) + res.logConfigWarning("PriorityConfigMapInvalid", msg) + continue + } + } + }() + return res, nil +} + +func (p *priority) logConfigWarning(reason, msg string) { + p.logRecorder.Event(apiv1.EventTypeWarning, reason, msg) + klog.Warning(msg) + p.badConfigUpdates++ +} + +func (p *priority) parsePrioritiesYAMLString(prioritiesYAML string) error { + if prioritiesYAML == "" { + p.badConfigUpdates++ + return fmt.Errorf("priority configuration in %s configmap is empty; please provide valid configuration", PriorityConfigMapName) + } + var config map[int][]string + if err := yaml.Unmarshal([]byte(prioritiesYAML), &config); err != nil { + p.badConfigUpdates++ + return fmt.Errorf("Can't parse YAML with priorities in the configmap: %v", err) + } + + newPriorities := make(map[int][]*regexp.Regexp) + for prio, reList := range config { + for _, re := range reList { + regexp, err := regexp.Compile(re) + if err != nil { + p.badConfigUpdates++ + return fmt.Errorf("Can't compile regexp rule for priority %d and rule %s: %v", prio, re, err) + } + newPriorities[prio] = append(newPriorities[prio], regexp) + } + } + + p.padlock.Lock() + p.priorities = newPriorities + p.okConfigUpdates++ + p.padlock.Unlock() + + msg := "Successfully reloaded priority configuration from configmap." + klog.V(4).Info(msg) + p.logRecorder.Event(apiv1.EventTypeNormal, "PriorityConfigMapReloaded", msg) + + return nil +} + +func (p *priority) BestOption(expansionOptions []expander.Option, nodeInfo map[string]*schedulernodeinfo.NodeInfo) *expander.Option { + if len(expansionOptions) <= 0 { + return nil + } + + maxPrio := -1 + best := []expander.Option{} + p.padlock.RLock() + for _, option := range expansionOptions { + id := option.NodeGroup.Id() + found := false + for prio, nameRegexpList := range p.priorities { + if prio < maxPrio { + continue + } + if !p.groupIDMatchesList(id, nameRegexpList) { + continue + } + if prio > maxPrio { + maxPrio = prio + best = nil + } + best = append(best, option) + found = true + break + } + if !found { + msg := fmt.Sprintf("Priority expander: node group %s not found in priority expander configuration. "+ + "The group won't be used.", id) + p.logConfigWarning("PriorityConfigMapNotMatchedGroup", msg) + } + } + p.padlock.RUnlock() + + if len(best) == 0 { + msg := "Priority expander: no priorities info found for any of the expansion options. Falling back to random choice." + p.logConfigWarning("PriorityConfigMapNoGroupMatched", msg) + return p.fallbackStrategy.BestOption(expansionOptions, nodeInfo) + } + + return p.fallbackStrategy.BestOption(best, nodeInfo) +} + +func (p *priority) groupIDMatchesList(id string, nameRegexpList []*regexp.Regexp) bool { + for _, re := range nameRegexpList { + if re.FindStringIndex(id) != nil { + return true + } + } + return false +} + +// EventRecorder is an interface to abstract kubernetes event recording. +type EventRecorder interface { + // Event records a new event of given type, reason and description given with message. + Event(eventtype, reason, message string) + + // Events records a new event of given type, reason and description given with message, + // which can be formatted using args. + Eventf(eventtype, reason, message string, args ...interface{}) +} diff --git a/cluster-autoscaler/expander/priority/priority_test.go b/cluster-autoscaler/expander/priority/priority_test.go new file mode 100644 index 000000000000..bc3020eb01f4 --- /dev/null +++ b/cluster-autoscaler/expander/priority/priority_test.go @@ -0,0 +1,282 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package priority + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/record" + schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" + "k8s.io/autoscaler/cluster-autoscaler/expander" +) + +const ( + configOKMessage = "Normal PriorityConfigMapReloaded Successfully reloaded priority " + + "configuration from configmap." + configWarnGroupNotFoundMessage = "Warning PriorityConfigMapNotMatchedGroup Priority expander: node group " + + "%s not found in priority expander configuration. The group won't be used." + configWarnConfigMapDeleted = "Warning PriorityConfigMapDeleted Configmap for priority expander was deleted, " + + "no updates will be processed until recreated." +) + +var ( + config = ` +5: + - ".*t2\\.micro.*" +10: + - ".*t2\\.large.*" + - ".*t3\\.large.*" +50: + - ".*m4\\.4xlarge.*" +` + oneEntryConfig = ` +10: + - ".*t2\\.large.*" +` + notMatchingConfig = ` +5: + - ".*t\\.micro.*" +10: + - ".*t\\.large.*" +` + eoT2Micro = expander.Option{ + Debug: "t2.micro", + NodeGroup: &testNodeGroup{ + id: "my-asg.t2.micro", + }, + } + eoT2Large = expander.Option{ + Debug: "t2.large", + NodeGroup: &testNodeGroup{ + id: "my-asg.t2.large", + }, + } + eoT3Large = expander.Option{ + Debug: "t3.large", + NodeGroup: &testNodeGroup{ + id: "my-asg.t3.large", + }, + } + eoM44XLarge = expander.Option{ + Debug: "m4.4xlarge", + NodeGroup: &testNodeGroup{ + id: "my-asg.m4.4xlarge", + }, + } +) + +func getStrategyInstance(t *testing.T, config string) (expander.Strategy, chan watch.Event, *testEventRecorder, error) { + c := make(chan watch.Event) + + r := newTestRecorder() + s, err := NewStrategy(config, c, r) + assert.Nil(t, err) + return s, c, r, err +} + +func TestPriorityExpanderCorrecltySelectsSingleMatchingOptionOutOfOne(t *testing.T) { + s, _, _, _ := getStrategyInstance(t, config) + ret := s.BestOption([]expander.Option{eoT2Large}, nil) + assert.Equal(t, *ret, eoT2Large) +} + +func TestPriorityExpanderCorrecltySelectsSingleMatchingOptionOutOfMany(t *testing.T) { + s, _, _, _ := getStrategyInstance(t, config) + ret := s.BestOption([]expander.Option{eoT2Large, eoM44XLarge}, nil) + assert.Equal(t, *ret, eoM44XLarge) +} + +func TestPriorityExpanderCorrecltySelectsOneOfTwoMatchingOptionsOutOfMany(t *testing.T) { + s, _, _, _ := getStrategyInstance(t, config) + for i := 0; i < 10; i++ { + ret := s.BestOption([]expander.Option{eoT2Large, eoT3Large, eoT2Micro}, nil) + assert.True(t, ret.NodeGroup.Id() == eoT2Large.NodeGroup.Id() || ret.NodeGroup.Id() == eoT3Large.NodeGroup.Id()) + } +} + +func TestPriorityExpanderCorrecltyFallsBackToRandomWhenNoMatches(t *testing.T) { + s, _, _, _ := getStrategyInstance(t, config) + for i := 0; i < 10; i++ { + ret := s.BestOption([]expander.Option{eoT2Large, eoT3Large}, nil) + assert.True(t, ret.NodeGroup.Id() == eoT2Large.NodeGroup.Id() || ret.NodeGroup.Id() == eoT3Large.NodeGroup.Id()) + } +} + +func TestPriorityExpanderCorrecltyHandlesConfigUpdate(t *testing.T) { + s, c, r, _ := getStrategyInstance(t, oneEntryConfig) + ret := s.BestOption([]expander.Option{eoT2Large, eoT3Large, eoM44XLarge}, nil) + assert.Equal(t, *ret, eoT2Large) + + event := <-r.recorder.Events + assert.EqualValues(t, configOKMessage, event) + for _, group := range []string{eoT3Large.NodeGroup.Id(), eoM44XLarge.NodeGroup.Id()} { + event = <-r.recorder.Events + assert.EqualValues(t, fmt.Sprintf(configWarnGroupNotFoundMessage, group), event) + } + + c <- watch.Event{ + Type: watch.Modified, + Object: &apiv1.ConfigMap{ + Data: map[string]string{ + ConfigMapKey: config, + }, + }, + } + priority := s.(*priority) + for { + if priority.okConfigUpdates == 2 { + break + } + time.Sleep(time.Millisecond) + } + ret = s.BestOption([]expander.Option{eoT2Large, eoT3Large, eoM44XLarge}, nil) + assert.Equal(t, *ret, eoM44XLarge) + event = <-r.recorder.Events + assert.EqualValues(t, configOKMessage, event) +} + +func TestPriorityExpanderCorrecltySkipsBadChangeConfig(t *testing.T) { + s, c, r, _ := getStrategyInstance(t, oneEntryConfig) + + event := <-r.recorder.Events + assert.EqualValues(t, configOKMessage, event) + + c <- watch.Event{ + Type: watch.Deleted, + Object: &apiv1.ConfigMap{}, + } + priority := s.(*priority) + for { + if priority.badConfigUpdates == 1 { + break + } + time.Sleep(time.Millisecond) + } + + assert.Equal(t, 1, priority.okConfigUpdates) + event = <-r.recorder.Events + assert.EqualValues(t, configWarnConfigMapDeleted, event) + + ret := s.BestOption([]expander.Option{eoT2Large, eoT3Large, eoM44XLarge}, nil) + + assert.Equal(t, *ret, eoT2Large) + for _, group := range []string{eoT3Large.NodeGroup.Id(), eoM44XLarge.NodeGroup.Id()} { + event = <-r.recorder.Events + assert.EqualValues(t, fmt.Sprintf(configWarnGroupNotFoundMessage, group), event) + } +} + +func TestPriorityExpanderFailsToStartWithEmptyConfig(t *testing.T) { + _, err := NewStrategy("", nil, &utils.LogEventRecorder{}) + assert.NotNil(t, err) +} + +func TestPriorityExpanderFailsToStartWithBadConfig(t *testing.T) { + _, err := NewStrategy("not_really_yaml: 34 : 43", nil, &utils.LogEventRecorder{}) + assert.NotNil(t, err) +} + +type testNodeGroup struct { + id string + debug string +} + +func (t *testNodeGroup) MaxSize() int { + return 10 +} + +func (t *testNodeGroup) MinSize() int { + return 0 +} + +func (t *testNodeGroup) TargetSize() (int, error) { + return 5, nil +} + +func (t *testNodeGroup) IncreaseSize(delta int) error { + return nil +} + +func (t *testNodeGroup) DeleteNodes([]*apiv1.Node) error { + return nil +} + +func (t *testNodeGroup) DecreaseTargetSize(delta int) error { + return nil +} + +func (t *testNodeGroup) Id() string { + return t.id +} + +func (t *testNodeGroup) Debug() string { + return t.debug +} + +func (t *testNodeGroup) Nodes() ([]cloudprovider.Instance, error) { + return nil, nil +} + +func (t *testNodeGroup) TemplateNodeInfo() (*schedulernodeinfo.NodeInfo, error) { + return nil, nil +} + +func (t *testNodeGroup) Exist() bool { + return true +} + +func (t *testNodeGroup) Create() (cloudprovider.NodeGroup, error) { + return nil, nil +} + +func (t *testNodeGroup) Delete() error { + return nil +} + +func (t *testNodeGroup) Autoprovisioned() bool { + return false +} + +type testEventRecorder struct { + recorder *record.FakeRecorder + statusObject runtime.Object +} + +func newTestRecorder() *testEventRecorder { + return &testEventRecorder{ + recorder: record.NewFakeRecorder(100), + statusObject: &apiv1.ConfigMap{}, + } +} + +func (ler *testEventRecorder) Event(eventtype, reason, message string) { + ler.recorder.Event(ler.statusObject, eventtype, reason, message) +} + +func (ler *testEventRecorder) Eventf(eventtype, reason, message string, args ...interface{}) { + ler.recorder.Eventf(ler.statusObject, eventtype, reason, message, args...) +} diff --git a/cluster-autoscaler/expander/priority/readme.md b/cluster-autoscaler/expander/priority/readme.md new file mode 100644 index 000000000000..f6413f56ce1a --- /dev/null +++ b/cluster-autoscaler/expander/priority/readme.md @@ -0,0 +1,34 @@ +# Priority based expander for cluster-autoscaler + +## Introduction + +Priority based expander selects an expansion option based on priorities assigned by a user to scaling groups. The assignment is based on matching of the scaling group's name to regular expressions. The correct and meaningful naming of scaling groups is left up to the user. + +## Motivation + +This expander gives the user a lot of control over which scaling group specifically will be used by cluster-autoscaler. It makes a lot of sense when configured manually by the user to match the specific needs. It also makes a lot of sense for environments where user's preferences change frequently based on properties outside of cluster scope. A good example here is the constant change of pricing and termination probability on AWS Spot Market for EC2 instances. +The expander is configured using a single ConfigMap, which is watched by the expander for any changes. The priority expander can be easily integrated with external optimization engines, that can just change the value of the ConfigMap configuration object. That way it's possible to dynamically change the decision of cluster-autoscaler using ConfigMap updates only. + +## Configuration + +Configuration is based on the values stored in a ConfigMap. This ConfigMap has to be created before cluster autoscaler with priority expander can be started. The ConfigMap must be named `cluster-autoscaler-priority-expander` and it must be placed in the same namespace as cluster autoscaler pod. The ConfigMap is watched by the cluster autoscaler and any changes made to it are loaded on the fly, without restarting cluster autoscaler. + +The format of the ConfigMap ([example](priority-expander-configmap.yaml)) is as follows: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: cluster-autoscaler-priority-expander +data: + priorities: |- + 10: + - .*t2\.large.* + - .*t3\.large.* + 50: + - .*m4\.4xlarge.* +``` + +The priority should be a positive value. The highest value wins. For each priority value, a list of regular expressions should be given. If there are multiple node groups matching any of the regular expressions with the highest priority, one group to expand the cluster is selected each time at random. Priority values cannot be duplicated - in that case, only one of the lists will be used. + +In the example above, the user gives the highest priority to any expansion option, where the scaling group ID matches the regular expression `.*m4\.4xlarge.*`. Assuming all of the used scaling groups are based on AWS Spot instances, the user might now want to give up on all the scaling groups based on the `m4.4xlarge` instance family. To do that, it's enough to either reconfigure the priority to a value `<10` or remove the entry with priority `50` altogether.