Skip to content

Commit

Permalink
cluster-autoscaler: support modifying node labels
Browse files Browse the repository at this point in the history
The assumption that all node labels except for the hostname label can be copied
verbatim does not hold for CSI drivers which manage local storage: those
drivers have a topology label where the value also depends on the hostname. It
might be the same as the Kubernetes hostname, but that cannot be assumed.

To solve this, search/replace with regular expressions can be defined to modify
those labels. This then can be used to inform the autoscaler about available
capacity on new nodes:

   --replace-labels ';^topology.hostpath.csi/node=aks-workerpool.*;topology.hostpath.csi/node=aks-workerpool-template;'

   kubectl apply -f - <<EOF
apiVersion: storage.k8s.io/v1beta1
kind: CSIStorageCapacity
metadata:
  name: aks-workerpool-fast-storage
  namespace: kube-system
capacity: 100Gi
maximumVolumeSize: 100Gi
nodeTopology:
  matchLabels:
    # This never matches a real node, only the node templates
    # inside cluster-autoscaler.
    topology.hostpath.csi/node: aks-workerpool-template
storageClassName: csi-hostpath-fast
EOF
  • Loading branch information
pohly committed Sep 1, 2021
1 parent e4c4dc7 commit fcd9bb2
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 17 deletions.
7 changes: 7 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package config

import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/utils/replace"
)

// GpuLimits define lower and upper bound on GPU instances of given type in cluster
Expand Down Expand Up @@ -145,6 +147,11 @@ type AutoscalingOptions struct {
MaxBulkSoftTaintTime time.Duration
// IgnoredTaints is a list of taints to ignore when considering a node template for scheduling.
IgnoredTaints []string
// LabelReplacements is a list of regular expressions and their replacement that get applied
// to labels of existing nodes when creating node templates. The string that the regular
// expression is matched against is "<key>=<value>". If the value part is empty or missing
// after the transformation, the tag gets removed.
LabelReplacements replace.Replacements
// BalancingExtraIgnoredLabels is a list of labels to additionally ignore when comparing if two node groups are similar.
// Labels in BasicIgnoredLabels and the cloud provider-specific ignored labels are always ignored.
BalancingExtraIgnoredLabels []string
Expand Down
7 changes: 4 additions & 3 deletions cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/replace"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -322,7 +323,7 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, ignoredTaints taints.TaintKeySet) (*status.ScaleUpStatus, errors.AutoscalerError) {
nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulerframework.NodeInfo, ignoredTaints taints.TaintKeySet, labelReplacements replace.Replacements) (*status.ScaleUpStatus, errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
if len(unschedulablePods) == 0 {
Expand Down Expand Up @@ -496,7 +497,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto

// If possible replace candidate node-info with node info based on crated node group. The latter
// one should be more in line with nodes which will be created by node group.
mainCreatedNodeInfo, err := utils.GetNodeInfoFromTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, context.PredicateChecker, ignoredTaints)
mainCreatedNodeInfo, err := utils.GetNodeInfoFromTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, context.PredicateChecker, ignoredTaints, labelReplacements)
if err == nil {
nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = mainCreatedNodeInfo
} else {
Expand All @@ -510,7 +511,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
}

for _, nodeGroup := range createNodeGroupResult.ExtraCreatedNodeGroups {
nodeInfo, err := utils.GetNodeInfoFromTemplate(nodeGroup, daemonSets, context.PredicateChecker, ignoredTaints)
nodeInfo, err := utils.GetNodeInfoFromTemplate(nodeGroup, daemonSets, context.PredicateChecker, ignoredTaints, labelReplacements)

if err != nil {
klog.Warningf("Cannot build node info for newly created extra node group %v; balancing similar node groups will not work; err=%v", nodeGroup.Id(), err)
Expand Down
11 changes: 7 additions & 4 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/replace"
scheduler_utils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
Expand Down Expand Up @@ -76,8 +77,9 @@ type StaticAutoscaler struct {
processorCallbacks *staticAutoscalerProcessorCallbacks
initialized bool
// Caches nodeInfo computed for previously seen nodes
nodeInfoCache map[string]*schedulerframework.NodeInfo
ignoredTaints taints.TaintKeySet
nodeInfoCache map[string]*schedulerframework.NodeInfo
ignoredTaints taints.TaintKeySet
labelReplacements replace.Replacements
}

type staticAutoscalerProcessorCallbacks struct {
Expand Down Expand Up @@ -163,6 +165,7 @@ func NewStaticAutoscaler(
clusterStateRegistry: clusterStateRegistry,
nodeInfoCache: make(map[string]*schedulerframework.NodeInfo),
ignoredTaints: ignoredTaints,
labelReplacements: opts.LabelReplacements,
}
}

Expand Down Expand Up @@ -281,7 +284,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
}

nodeInfosForGroups, autoscalerError := core_utils.GetNodeInfosForGroups(
readyNodes, a.nodeInfoCache, autoscalingContext.CloudProvider, autoscalingContext.ListerRegistry, daemonsets, autoscalingContext.PredicateChecker, a.ignoredTaints)
readyNodes, a.nodeInfoCache, autoscalingContext.CloudProvider, autoscalingContext.ListerRegistry, daemonsets, autoscalingContext.PredicateChecker, a.ignoredTaints, a.labelReplacements)
if autoscalerError != nil {
klog.Errorf("Failed to get node infos for groups: %v", autoscalerError)
return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
Expand Down Expand Up @@ -433,7 +436,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)

scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.ignoredTaints)
scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.ignoredTaints, a.labelReplacements)

metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart)

Expand Down
22 changes: 13 additions & 9 deletions cluster-autoscaler/core/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/labels"
"k8s.io/autoscaler/cluster-autoscaler/utils/replace"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"

Expand All @@ -43,7 +44,7 @@ import (
func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedulerframework.NodeInfo, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
// TODO(mwielgus): Review error policy - sometimes we may continue with partial errors.
daemonsets []*appsv1.DaemonSet, predicateChecker simulator.PredicateChecker, ignoredTaints taints.TaintKeySet) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
daemonsets []*appsv1.DaemonSet, predicateChecker simulator.PredicateChecker, ignoredTaints taints.TaintKeySet, labelReplacements replace.Replacements) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
result := make(map[string]*schedulerframework.NodeInfo)
seenGroups := make(map[string]bool)

Expand All @@ -68,7 +69,7 @@ func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedu
if err != nil {
return false, "", err
}
sanitizedNodeInfo, err := sanitizeNodeInfo(nodeInfo, id, ignoredTaints)
sanitizedNodeInfo, err := sanitizeNodeInfo(nodeInfo, id, ignoredTaints, labelReplacements)
if err != nil {
return false, "", err
}
Expand Down Expand Up @@ -112,7 +113,7 @@ func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedu

// No good template, trying to generate one. This is called only if there are no
// working nodes in the node groups. By default CA tries to use a real-world example.
nodeInfo, err := GetNodeInfoFromTemplate(nodeGroup, daemonsets, predicateChecker, ignoredTaints)
nodeInfo, err := GetNodeInfoFromTemplate(nodeGroup, daemonsets, predicateChecker, ignoredTaints, labelReplacements)
if err != nil {
if err == cloudprovider.ErrNotImplemented {
continue
Expand Down Expand Up @@ -166,7 +167,7 @@ func getPodsForNodes(listers kube_util.ListerRegistry) (map[string][]*apiv1.Pod,
}

// GetNodeInfoFromTemplate returns NodeInfo object built base on TemplateNodeInfo returned by NodeGroup.TemplateNodeInfo().
func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*appsv1.DaemonSet, predicateChecker simulator.PredicateChecker, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*appsv1.DaemonSet, predicateChecker simulator.PredicateChecker, ignoredTaints taints.TaintKeySet, labelReplacements replace.Replacements) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
id := nodeGroup.Id()
baseNodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil {
Expand All @@ -184,7 +185,7 @@ func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*ap
}
fullNodeInfo := schedulerframework.NewNodeInfo(pods...)
fullNodeInfo.SetNode(baseNodeInfo.Node())
sanitizedNodeInfo, typedErr := sanitizeNodeInfo(fullNodeInfo, id, ignoredTaints)
sanitizedNodeInfo, typedErr := sanitizeNodeInfo(fullNodeInfo, id, ignoredTaints, labelReplacements)
if typedErr != nil {
return nil, typedErr
}
Expand Down Expand Up @@ -229,9 +230,9 @@ func deepCopyNodeInfo(nodeInfo *schedulerframework.NodeInfo) (*schedulerframewor
return newNodeInfo, nil
}

func sanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
func sanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName string, ignoredTaints taints.TaintKeySet, labelReplacements replace.Replacements) (*schedulerframework.NodeInfo, errors.AutoscalerError) {
// Sanitize node name.
sanitizedNode, err := sanitizeTemplateNode(nodeInfo.Node(), nodeGroupName, ignoredTaints)
sanitizedNode, err := sanitizeTemplateNode(nodeInfo.Node(), nodeGroupName, ignoredTaints, labelReplacements)
if err != nil {
return nil, err
}
Expand All @@ -250,13 +251,16 @@ func sanitizeNodeInfo(nodeInfo *schedulerframework.NodeInfo, nodeGroupName strin
return sanitizedNodeInfo, nil
}

func sanitizeTemplateNode(node *apiv1.Node, nodeGroup string, ignoredTaints taints.TaintKeySet) (*apiv1.Node, errors.AutoscalerError) {
func sanitizeTemplateNode(node *apiv1.Node, nodeGroup string, ignoredTaints taints.TaintKeySet, labelReplacements replace.Replacements) (*apiv1.Node, errors.AutoscalerError) {
newNode := node.DeepCopy()
nodeName := fmt.Sprintf("template-node-for-%s-%d", nodeGroup, rand.Int63())
newNode.Labels = make(map[string]string, len(node.Labels))
for k, v := range node.Labels {
if k != apiv1.LabelHostname {
newNode.Labels[k] = v
k, v = labelReplacements.ApplyToPair(k, v)
if k != "" {
newNode.Labels[k] = v
}
} else {
newNode.Labels[k] = nodeName
}
Expand Down
10 changes: 9 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/replace"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
"k8s.io/autoscaler/cluster-autoscaler/version"
kube_client "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -170,7 +171,13 @@ var (
regional = flag.Bool("regional", false, "Cluster is regional.")
newPodScaleUpDelay = flag.Duration("new-pod-scale-up-delay", 0*time.Second, "Pods less than this old will not be considered for scale-up.")

ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group")
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group")
labelReplacements = func() *replace.Replacements {
repl := &replace.Replacements{}
flag.Var(repl, "replace-labels", "Specifies one or more regular expression replacements of the form ;<regexp>;<replacement>; (any other character as separator also allowed) which get applied one after the other to labels of a node to form a template node. Labels are represented as a single string with <key>=<value>. If the key is empty after replacement, the label gets removed.")
return repl
}()

balancingIgnoreLabelsFlag = multiStringFlag("balancing-ignore-label", "Specifies a label to ignore in addition to the basic and cloud-provider set of labels when comparing if two node groups are similar")
awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only")
concurrentGceRefreshes = flag.Int("gce-concurrent-refreshes", 1, "Maximum number of concurrent refreshes per cloud object type.")
Expand Down Expand Up @@ -249,6 +256,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
Regional: *regional,
NewPodScaleUpDelay: *newPodScaleUpDelay,
IgnoredTaints: *ignoreTaintsFlag,
LabelReplacements: *labelReplacements,
BalancingExtraIgnoredLabels: *balancingIgnoreLabelsFlag,
KubeConfigPath: *kubeConfigFile,
NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout,
Expand Down

0 comments on commit fcd9bb2

Please sign in to comment.