Skip to content

Commit

Permalink
Merge pull request #5147 from x13n/scaledown4
Browse files Browse the repository at this point in the history
Extract criteria for removing unneded nodes to a separate package
  • Loading branch information
k8s-ci-robot authored Oct 17, 2022
2 parents dc73ea9 + 3a3ec38 commit f445a6a
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 212 deletions.
181 changes: 39 additions & 142 deletions cluster-autoscaler/core/scaledown/legacy/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
Expand All @@ -48,9 +47,8 @@ type ScaleDown struct {
context *context.AutoscalingContext
processors *processors.AutoscalingProcessors
clusterStateRegistry *clusterstate.ClusterStateRegistry
unneededNodes map[string]time.Time
unneededNodesList []*apiv1.Node
unremovableNodes *unremovable.Nodes
unneededNodes *unneeded.Nodes
podLocationHints map[string]string
nodeUtilizationMap map[string]utilization.Info
usageTracker *simulator.UsageTracker
Expand All @@ -65,20 +63,20 @@ func NewScaleDown(context *context.AutoscalingContext, processors *processors.Au
usageTracker := simulator.NewUsageTracker()
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, deleteOptions, false)
unremovableNodes := unremovable.NewNodes()
resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor)
return &ScaleDown{
context: context,
processors: processors,
clusterStateRegistry: clusterStateRegistry,
unneededNodes: make(map[string]time.Time),
unremovableNodes: unremovableNodes,
unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder),
podLocationHints: make(map[string]string),
nodeUtilizationMap: make(map[string]utilization.Info),
usageTracker: usageTracker,
unneededNodesList: make([]*apiv1.Node, 0),
nodeDeletionTracker: ndt,
removalSimulator: removalSimulator,
eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor),
resourceLimitsFinder: resource.NewLimitsFinder(processors.CustomResourcesProcessor),
resourceLimitsFinder: resourceLimitsFinder,
}
}

Expand All @@ -91,17 +89,16 @@ func (sd *ScaleDown) CleanUp(timestamp time.Time) {

// CleanUpUnneededNodes clears the list of unneeded nodes.
func (sd *ScaleDown) CleanUpUnneededNodes() {
sd.unneededNodesList = make([]*apiv1.Node, 0)
sd.unneededNodes = make(map[string]time.Time)
sd.unneededNodes.Clear()
}

// UnneededNodes returns a list of nodes that can potentially be scaled down.
func (sd *ScaleDown) UnneededNodes() []*apiv1.Node {
return sd.unneededNodesList
return sd.unneededNodes.AsList()
}

// UpdateUnneededNodes calculates which nodes are not needed, i.e. all pods can be scheduled somewhere else,
// and updates unneededNodes map accordingly. It also computes information where pods can be rescheduled and
// and updates unneededNodes accordingly. It also computes information where pods can be rescheduled and
// node utilization level. The computations are made only for the nodes managed by CA.
// * destinationNodes are the nodes that can potentially take in any pods that are evicted because of a scale down.
// * scaleDownCandidates are the nodes that are being considered for scale down.
Expand Down Expand Up @@ -199,17 +196,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
}

// Update the timestamp map.
result := make(map[string]time.Time)
unneededNodesList := make([]*apiv1.Node, 0, len(nodesToRemove))
for _, node := range nodesToRemove {
name := node.Node.Name
unneededNodesList = append(unneededNodesList, node.Node)
if val, found := sd.unneededNodes[name]; !found {
result[name] = timestamp
} else {
result[name] = val
}
}
sd.unneededNodes.Update(nodesToRemove, timestamp)

// Add nodes to unremovable map
if len(unremovable) > 0 {
Expand All @@ -223,24 +210,18 @@ func (sd *ScaleDown) UpdateUnneededNodes(
// This method won't always check all nodes, so let's give a generic reason for all nodes that weren't checked.
for _, node := range scaleDownCandidates {
unremovableReasonProvided := sd.unremovableNodes.HasReason(node.Name)
_, unneeded := result[node.Name]
unneeded := sd.unneededNodes.Contains(node.Name)
if !unneeded && !unremovableReasonProvided {
sd.unremovableNodes.AddReason(node, simulator.NotUnneededOtherReason)
}
}

// Update state and metrics
sd.unneededNodesList = unneededNodesList
sd.unneededNodes = result
sd.podLocationHints = newHints
sd.nodeUtilizationMap = utilizationMap
sd.clusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp)
metrics.UpdateUnneededNodesCount(len(sd.unneededNodesList))
if klog.V(4).Enabled() {
for key, val := range sd.unneededNodes {
klog.Infof("%s is unneeded since %s duration %s", key, val.String(), timestamp.Sub(val).String())
}
}
unneededNodesList := sd.unneededNodes.AsList()
sd.clusterStateRegistry.UpdateScaleDownCandidates(unneededNodesList, timestamp)
metrics.UpdateUnneededNodesCount(len(unneededNodesList))
return nil
}

Expand All @@ -260,10 +241,9 @@ func (sd *ScaleDown) UnremovableNodes() []*simulator.UnremovableNode {
func (sd *ScaleDown) markSimulationError(simulatorErr errors.AutoscalerError,
timestamp time.Time) errors.AutoscalerError {
klog.Errorf("Error while simulating node drains: %v", simulatorErr)
sd.unneededNodesList = make([]*apiv1.Node, 0)
sd.unneededNodes = make(map[string]time.Time)
sd.unneededNodes.Clear()
sd.nodeUtilizationMap = make(map[string]utilization.Info)
sd.clusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp)
sd.clusterStateRegistry.UpdateScaleDownCandidates(nil, timestamp)
return simulatorErr.AddPrefix("error while simulating node drains: ")
}

Expand All @@ -277,7 +257,7 @@ func (sd *ScaleDown) chooseCandidates(nodes []string) (candidates []string, nonC
return nodes, nil
}
for _, node := range nodes {
if _, found := sd.unneededNodes[node]; found {
if sd.unneededNodes.Contains(node) {
candidates = append(candidates, node)
} else {
nonCandidates = append(nonCandidates, node)
Expand Down Expand Up @@ -322,116 +302,22 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
allNodeNames = append(allNodeNames, ni.Node().Name)
}

candidateNames := make([]string, 0)
readinessMap := make(map[string]bool)
candidateNodeGroups := make(map[string]cloudprovider.NodeGroup)

resourceLimiter, errCP := sd.context.CloudProvider.GetResourceLimiter()
if errCP != nil {
return nil, nil, status.ScaleDownError, errors.ToAutoscalerError(errors.CloudProviderError, errCP)
}

scaleDownResourcesLeft := sd.resourceLimitsFinder.LimitsLeft(sd.context, allNodes, resourceLimiter, currentTime)

nodeGroupSize := utils.GetNodeGroupSizeMap(sd.context.CloudProvider)
resourcesWithLimits := resourceLimiter.GetResources()
for nodeName, unneededSince := range sd.unneededNodes {
klog.V(2).Infof("%s was unneeded for %s", nodeName, currentTime.Sub(unneededSince).String())

nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(nodeName)
if err != nil {
klog.Errorf("Can't retrieve unneeded node %s from snapshot, err: %v", nodeName, err)
continue
}

node := nodeInfo.Node()

// Check if node is marked with no scale down annotation.
if eligibility.HasNoScaleDownAnnotation(node) {
klog.V(4).Infof("Skipping %s - scale down disabled annotation found", node.Name)
sd.unremovableNodes.AddReason(node, simulator.ScaleDownDisabledAnnotation)
continue
}

ready, _, _ := kube_util.GetReadinessState(node)
readinessMap[node.Name] = ready

nodeGroup, err := sd.context.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Errorf("Error while checking node group for %s: %v", node.Name, err)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.V(4).Infof("Skipping %s - no node group config", node.Name)
sd.unremovableNodes.AddReason(node, simulator.NotAutoscaled)
continue
}

if ready {
// Check how long a ready node was underutilized.
unneededTime, err := sd.processors.NodeGroupConfigProcessor.GetScaleDownUnneededTime(sd.context, nodeGroup)
if err != nil {
klog.Errorf("Error trying to get ScaleDownUnneededTime for node %s (in group: %s)", node.Name, nodeGroup.Id())
continue
}
if !unneededSince.Add(unneededTime).Before(currentTime) {
sd.unremovableNodes.AddReason(node, simulator.NotUnneededLongEnough)
continue
}
} else {
// Unready nodes may be deleted after a different time than underutilized nodes.
unreadyTime, err := sd.processors.NodeGroupConfigProcessor.GetScaleDownUnreadyTime(sd.context, nodeGroup)
if err != nil {
klog.Errorf("Error trying to get ScaleDownUnreadyTime for node %s (in group: %s)", node.Name, nodeGroup.Id())
continue
}
if !unneededSince.Add(unreadyTime).Before(currentTime) {
sd.unremovableNodes.AddReason(node, simulator.NotUnreadyLongEnough)
continue
}
}

size, found := nodeGroupSize[nodeGroup.Id()]
if !found {
klog.Errorf("Error while checking node group size %s: group size not found in cache", nodeGroup.Id())
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}

deletionsInProgress := sd.nodeDeletionTracker.DeletionsCount(nodeGroup.Id())
if size-deletionsInProgress <= nodeGroup.MinSize() {
klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
sd.unremovableNodes.AddReason(node, simulator.NodeGroupMinSizeReached)
continue
}

scaleDownResourcesDelta, err := sd.resourceLimitsFinder.DeltaForNode(sd.context, node, nodeGroup, resourcesWithLimits)
if err != nil {
klog.Errorf("Error getting node resources: %v", err)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}

checkResult := scaleDownResourcesLeft.CheckDeltaWithinLimits(scaleDownResourcesDelta)
if checkResult.Exceeded() {
klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.ExceededResources)
sd.unremovableNodes.AddReason(node, simulator.MinimalResourceLimitExceeded)
for _, resource := range checkResult.ExceededResources {
switch resource {
case cloudprovider.ResourceNameCores:
metrics.RegisterSkippedScaleDownCPU()
case cloudprovider.ResourceNameMemory:
metrics.RegisterSkippedScaleDownMemory()
default:
continue
}
}
continue
}

candidateNames = append(candidateNames, node.Name)
candidateNodeGroups[node.Name] = nodeGroup
empty, nonEmpty, unremovable := sd.unneededNodes.RemovableAt(sd.context, currentTime, scaleDownResourcesLeft, resourceLimiter.GetResources(), sd.nodeDeletionTracker)
for _, u := range unremovable {
sd.unremovableNodes.Add(u)
}
candidateNames := make([]string, 0, len(empty)+len(nonEmpty))
for _, n := range empty {
candidateNames = append(candidateNames, n.Name)
}
for _, n := range nonEmpty {
candidateNames = append(candidateNames, n.Name)
}

if len(candidateNames) == 0 {
Expand All @@ -448,7 +334,7 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
var nodes []*apiv1.Node
for _, node := range emptyNodesToRemove {
// Nothing super-bad should happen if the node is removed from tracker prematurely.
simulator.RemoveNodeFromTracker(sd.usageTracker, node.Node.Name, sd.unneededNodes)
sd.removeNodeFromTracker(node.Node.Name)
nodes = append(nodes, node.Node)
}
return nodes, nil, status.ScaleDownNodeDeleteStarted, nil
Expand Down Expand Up @@ -478,10 +364,21 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
}
toRemove := nodesToRemove[0]
// Nothing super-bad should happen if the node is removed from tracker prematurely.
simulator.RemoveNodeFromTracker(sd.usageTracker, toRemove.Node.Name, sd.unneededNodes)
sd.removeNodeFromTracker(toRemove.Node.Name)
return nil, []*apiv1.Node{toRemove.Node}, status.ScaleDownNodeDeleteStarted, nil
}

func (sd *ScaleDown) removeNodeFromTracker(node string) {
unneeded := make([]string, 0, len(sd.unneededNodes.AsList()))
for _, n := range sd.unneededNodes.AsList() {
unneeded = append(unneeded, n.Name)
}
toRemove := simulator.RemoveNodeFromTracker(sd.usageTracker, node, unneeded)
for _, n := range toRemove {
sd.unneededNodes.Drop(n)
}
}

// updateScaleDownMetrics registers duration of different parts of scale down.
// Separates time spent on finding nodes to remove, deleting nodes and other operations.
func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration *time.Duration) {
Expand Down
Loading

0 comments on commit f445a6a

Please sign in to comment.