Skip to content

Commit

Permalink
cluster-autoscaler: support modifying node labels
Browse files Browse the repository at this point in the history
The assumption that all node labels except for the hostname label can be copied
verbatim does not hold for CSI drivers which manage local storage: those
drivers have a topology label where the value also depends on the hostname. It
might be the same as the Kubernetes hostname, but that cannot be assumed.

To solve this, search/replace with regular expressions can be defined to modify
those labels. This then can be used to inform the autoscaler about available
capacity on new nodes:

   --replace-labels ';^topology.hostpath.csi/node=aks-workerpool.*;topology.hostpath.csi/node=aks-workerpool-template;'

   kubectl apply -f - <<EOF
apiVersion: storage.k8s.io/v1beta1
kind: CSIStorageCapacity
metadata:
  name: aks-workerpool-fast-storage
  namespace: kube-system
capacity: 100Gi
maximumVolumeSize: 100Gi
nodeTopology:
  matchLabels:
    # This never matches a real node, only the node templates
    # inside cluster-autoscaler.
    topology.hostpath.csi/node: aks-workerpool-template
storageClassName: csi-hostpath-fast
EOF
  • Loading branch information
pohly committed Sep 21, 2021
1 parent 9b533c3 commit a92adf0
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 31 deletions.
94 changes: 94 additions & 0 deletions cluster-autoscaler/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ this document:
* [How can I prevent Cluster Autoscaler from scaling down a particular node?](#how-can-i-prevent-cluster-autoscaler-from-scaling-down-a-particular-node)
* [How can I configure overprovisioning with Cluster Autoscaler?](#how-can-i-configure-overprovisioning-with-cluster-autoscaler)
* [How can I enable/disable eviction for a specific DaemonSet](#how-can-i-enabledisable-eviction-for-a-specific-daemonset)
* [How can I enable autoscaling for Pods with volumes?](#how-can-i-enable-autoscaling-for-pods-with-volumes)
* [Internals](#internals)
* [Are all of the mentioned heuristics and timings final?](#are-all-of-the-mentioned-heuristics-and-timings-final)
* [How does scale-up work?](#how-does-scale-up-work)
Expand Down Expand Up @@ -461,6 +462,99 @@ This annotation has no effect on pods that are not a part of any DaemonSet.
****************
### How can I enable autoscaling for Pods with volumes?
For network-attached storage, autoscaling works as long as the storage system
does not run out of space for new volumes. Cluster Autoscaler has no support
for automatically increasing storage pools when that happens.
For storage that is local to nodes the situation is different. Solutions
depend on the specific scenario.
#### Dynamic provisioning with immediate binding
When volumes are provisioned dynamically through a storage class with immediate
[binding](https://kubernetes.io/docs/concepts/storage/storage-classes/#volume-binding-mode),
then scheduling and thus the code in Cluster Autoscaler just waits for the
volumes to be created. If that depends on creating new nodes, then scale up is
not triggered. It's better to use the `WaitForFirstConsumer` binding mode.
#### Dynamic provisioning with delayed binding
When the binding mode is `WaitForFirstConsumer`, volume provisioning starts
when the first Pod tries to use a PersistentVolumeClaim. The scheduler is
involved in choosing a node candidate and then the provisioner tries to create
the volume on that node.
When the Cluster Autoscaler considers whether a Pod waiting for such a volume
could run on a new node, the outcome depends on whether storage capacity
tracking, a beta feature for CSI drivers since Kubernetes 1.21, is enabled or
disabled. When disabled, the volume binder will assume that all nodes are
suitable. This may cause the Cluster Autoscaler to create new nodes from a pool
that doesn't actually have local storage.
If it is enabled, then additional configuration of the Cluster Autoscaler is
needed to inform it how much storage new nodes of a pool will have. This must
be done for each CSI driver and each storage class of that driver. Suppose
there is a `csi-lvm-fast` storage class for a fictional LVM CSI driver and a
node pool where node names are `aks-workerpool-<unique id>`. The following
command will create a CSIStorageCapacity object that states that new nodes
will have a certain amount of free storage:
```
kubectl apply -f - <<EOF
apiVersion: storage.k8s.io/v1beta1
kind: CSIStorageCapacity
metadata:
# The name does not matter. It just has to be unique
# inside the namespace.
name: aks-workerpool-csi-lvm-fast-storage
# Other namespaces also work. As this object is owned by the
# cluster administrator, kube-system is a good choice.
namespace: kube-system
# Capacity and maximumVolumeSize must be the same as what the CSI driver
# will report for new nodes with unused storage.
capacity: 100Gi
maximumVolumeSize: 100Gi
nodeTopology:
matchLabels:
# This never matches a real node, only the node templates
# inside Cluster Autoscaler.
topology.hostpath.csi/node: aks-workerpool-template
storageClassName: csi-lvm-fast
EOF
```
Now Cluster Autoscaler must be configured to change the node template labels
such that the volume capacity check looks at that object instead of the one for
the node from which the template was created. This is done with a command line
flag that enables regular expression matching and replacement for labels,
similar to `sed -e s/foo-.*/foo-template/`:
```
--replace-labels ';^topology.lvm.csi/node=aks-workerpool.*;topology.lvm.csi/node=aks-workerpool-template;'
```
`topology.lvm.csi/node` in this example is the label that gets added when the
LVM CSI driver is registered on a node by kubelet. For local storage, the value
of that label is usually the host name, which is what must be modified.
For scaling up from zero, the `topology.lvm.csi/node=aks-workerpool-template`
label must be added to the configuration for the node pool. How to do this
depends on the cloud provider.
TODO: describe how to avoid over-provisioning
#### Static provisioning
When using something like the [Local Persistence Volume Static
Provisioner](https://github.com/kubernetes-sigs/sig-storage-local-static-provisioner),
new PersistentVolumes are created when new nodes are added. Those
PersistentVolumes then may be used to satisfy unbound volume claims that
prevented Pods from running earlier.
Cluster Autoscaler currently has no support for this scenario.
# Internals
### Are all of the mentioned heuristics and timings final?
Expand Down
7 changes: 7 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package config

import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/utils/replace"
)

// GpuLimits define lower and upper bound on GPU instances of given type in cluster
Expand Down Expand Up @@ -145,6 +147,11 @@ type AutoscalingOptions struct {
MaxBulkSoftTaintTime time.Duration
// IgnoredTaints is a list of taints to ignore when considering a node template for scheduling.
IgnoredTaints []string
// LabelReplacements is a list of regular expressions and their replacement that get applied
// to labels of existing nodes when creating node templates. The string that the regular
// expression is matched against is "<key>=<value>". If the value part is empty or missing
// after the transformation, the tag gets removed.
LabelReplacements replace.Replacements
// BalancingExtraIgnoredLabels is a list of labels to additionally ignore when comparing if two node groups are similar.
// Labels in BasicIgnoredLabels and the cloud provider-specific ignored labels are always ignored.
BalancingExtraIgnoredLabels []string
Expand Down
7 changes: 3 additions & 4 deletions cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -322,7 +321,7 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, ignoredTaints taints.TaintKeySet) (*status.ScaleUpStatus, errors.AutoscalerError) {
nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, nodeTransformation *utils.NodeTransformation) (*status.ScaleUpStatus, errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
if len(unschedulablePods) == 0 {
Expand Down Expand Up @@ -496,7 +495,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto

// If possible replace candidate node-info with node info based on crated node group. The latter
// one should be more in line with nodes which will be created by node group.
mainCreatedNodeInfo, err := utils.GetNodeInfoFromTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, context.PredicateChecker, ignoredTaints)
mainCreatedNodeInfo, err := utils.GetNodeInfoFromTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, context.PredicateChecker, nodeTransformation)
if err == nil {
nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = mainCreatedNodeInfo
} else {
Expand All @@ -510,7 +509,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
}

for _, nodeGroup := range createNodeGroupResult.ExtraCreatedNodeGroups {
nodeInfo, err := utils.GetNodeInfoFromTemplate(nodeGroup, daemonSets, context.PredicateChecker, ignoredTaints)
nodeInfo, err := utils.GetNodeInfoFromTemplate(nodeGroup, daemonSets, context.PredicateChecker, nodeTransformation)

if err != nil {
klog.Warningf("Cannot build node info for newly created extra node group %v; balancing similar node groups will not work; err=%v", nodeGroup.Id(), err)
Expand Down
14 changes: 7 additions & 7 deletions cluster-autoscaler/core/scale_up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul
}
context.ExpanderStrategy = expander

nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, nil)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand All @@ -564,7 +564,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul

processors := NewTestProcessors()

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil, nil)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

assert.NoError(t, err)
Expand Down Expand Up @@ -788,7 +788,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
p3 := BuildTestPod("p-new", 550, 0)

processors := NewTestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil, nil)

assert.NoError(t, err)
// Node group is unhealthy.
Expand Down Expand Up @@ -828,7 +828,7 @@ func TestScaleUpNoHelp(t *testing.T) {
p3 := BuildTestPod("p-new", 500, 0)

processors := NewTestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil, nil)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

assert.NoError(t, err)
Expand Down Expand Up @@ -897,7 +897,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
}

processors := NewTestProcessors()
scaleUpStatus, typedErr := ScaleUp(&context, processors, clusterState, pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
scaleUpStatus, typedErr := ScaleUp(&context, processors, clusterState, pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil, nil)

assert.NoError(t, typedErr)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -957,7 +957,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil, nil)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down Expand Up @@ -1010,7 +1010,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil)

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil, nil)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down
14 changes: 9 additions & 5 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
coreutils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
Expand Down Expand Up @@ -75,7 +76,7 @@ type StaticAutoscaler struct {
processors *ca_processors.AutoscalingProcessors
processorCallbacks *staticAutoscalerProcessorCallbacks
initialized bool
ignoredTaints taints.TaintKeySet
nodeTransformation *coreutils.NodeTransformation
}

type staticAutoscalerProcessorCallbacks struct {
Expand Down Expand Up @@ -159,7 +160,10 @@ func NewStaticAutoscaler(
processors: processors,
processorCallbacks: processorCallbacks,
clusterStateRegistry: clusterStateRegistry,
ignoredTaints: ignoredTaints,
nodeTransformation: &coreutils.NodeTransformation{
IgnoredTaints: ignoredTaints,
LabelReplacements: opts.LabelReplacements,
},
}
}

Expand Down Expand Up @@ -277,7 +281,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return typedErr.AddPrefix("Initialize ClusterSnapshot")
}

nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints)
nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.nodeTransformation)
if autoscalerError != nil {
klog.Errorf("Failed to get node infos for groups: %v", autoscalerError)
return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
Expand Down Expand Up @@ -429,7 +433,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)

scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.ignoredTaints)
scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.nodeTransformation)

metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart)

Expand Down Expand Up @@ -739,7 +743,7 @@ func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*a
// our normal handling for booting up nodes deal with this.
// TODO: Remove this call when we handle dynamically provisioned resources.
allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes)
allNodes, readyNodes = taints.FilterOutNodesWithIgnoredTaints(a.ignoredTaints, allNodes, readyNodes)
allNodes, readyNodes = taints.FilterOutNodesWithIgnoredTaints(a.nodeTransformation.IgnoredTaints, allNodes, readyNodes)
return allNodes, readyNodes, nil
}

Expand Down
30 changes: 22 additions & 8 deletions cluster-autoscaler/core/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,19 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/labels"
"k8s.io/autoscaler/cluster-autoscaler/utils/replace"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// NodeTransformation contains settings for creating node templates.
type NodeTransformation struct {
IgnoredTaints taints.TaintKeySet
LabelReplacements replace.Replacements
}

// GetNodeInfoFromTemplate returns NodeInfo object built base on TemplateNodeInfo returned by NodeGroup.TemplateNodeInfo().
func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*appsv1.DaemonSet, predicateChecker simulator.PredicateChecker, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*appsv1.DaemonSet, predicateChecker simulator.PredicateChecker, nodeTransformation *NodeTransformation) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
id := nodeGroup.Id()
baseNodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil {
Expand All @@ -55,7 +62,7 @@ func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*ap
}
fullNodeInfo := schedulerframework.NewNodeInfo(pods...)
fullNodeInfo.SetNode(baseNodeInfo.Node())
sanitizedNodeInfo, typedErr := SanitizeNodeInfo(fullNodeInfo, id, ignoredTaints)
sanitizedNodeInfo, typedErr := SanitizeNodeInfo(fullNodeInfo, id, nodeTransformation)
if typedErr != nil {
return nil, typedErr
}
Expand Down Expand Up @@ -102,9 +109,9 @@ func DeepCopyNodeInfo(nodeInfo *schedulerframework.NodeInfo) (*schedulerframewor
}

// SanitizeNodeInfo modify nodeInfos generated from templates to avoid using duplicated host names
func SanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
func SanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName string, nodeTransformation *NodeTransformation) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
// Sanitize node name.
sanitizedNode, err := sanitizeTemplateNode(nodeInfo.Node(), nodeGroupName, ignoredTaints)
sanitizedNode, err := sanitizeTemplateNode(nodeInfo.Node(), nodeGroupName, nodeTransformation)
if err != nil {
return nil, err
}
Expand All @@ -123,19 +130,26 @@ func SanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName strin
return sanitizedNodeInfo, nil
}

func sanitizeTemplateNode(node *apiv1.Node, nodeGroup string, ignoredTaints taints.TaintKeySet) (*apiv1.Node, errors.AutoscalerError) {
func sanitizeTemplateNode(node *apiv1.Node, nodeGroup string, nodeTransformation *NodeTransformation) (*apiv1.Node, errors.AutoscalerError) {
newNode := node.DeepCopy()
nodeName := fmt.Sprintf("template-node-for-%s-%d", nodeGroup, rand.Int63())
newNode.Name = nodeName
newNode.Labels = make(map[string]string, len(node.Labels))
for k, v := range node.Labels {
if k != apiv1.LabelHostname {
newNode.Labels[k] = v
if nodeTransformation != nil {
k, v = nodeTransformation.LabelReplacements.ApplyToPair(k, v)
if k != "" {
newNode.Labels[k] = v
}
}
} else {
newNode.Labels[k] = nodeName
}
}
newNode.Name = nodeName
newNode.Spec.Taints = taints.SanitizeTaints(newNode.Spec.Taints, ignoredTaints)
if nodeTransformation != nil {
newNode.Spec.Taints = taints.SanitizeTaints(newNode.Spec.Taints, nodeTransformation.IgnoredTaints)
}
return newNode, nil
}

Expand Down
Loading

0 comments on commit a92adf0

Please sign in to comment.