Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move handing unremovable nodes to dedicated object #4894

Merged
merged 1 commit into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 41 additions & 93 deletions cluster-autoscaler/core/scaledown/legacy/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
Expand Down Expand Up @@ -268,37 +269,35 @@ func (limits *scaleDownResourcesLimits) tryDecrementLimitsByDelta(delta scaleDow

// ScaleDown is responsible for maintaining the state needed to perform unneeded node removals.
type ScaleDown struct {
context *context.AutoscalingContext
processors *processors.AutoscalingProcessors
clusterStateRegistry *clusterstate.ClusterStateRegistry
unneededNodes map[string]time.Time
unneededNodesList []*apiv1.Node
unremovableNodes map[string]time.Time
podLocationHints map[string]string
nodeUtilizationMap map[string]utilization.Info
usageTracker *simulator.UsageTracker
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
unremovableNodeReasons map[string]*simulator.UnremovableNode
removalSimulator *simulator.RemovalSimulator
context *context.AutoscalingContext
processors *processors.AutoscalingProcessors
clusterStateRegistry *clusterstate.ClusterStateRegistry
unneededNodes map[string]time.Time
unneededNodesList []*apiv1.Node
unremovableNodes *unremovable.Nodes
podLocationHints map[string]string
nodeUtilizationMap map[string]utilization.Info
usageTracker *simulator.UsageTracker
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
removalSimulator *simulator.RemovalSimulator
}

// NewScaleDown builds new ScaleDown object.
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry) *ScaleDown {
usageTracker := simulator.NewUsageTracker()
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker)
return &ScaleDown{
context: context,
processors: processors,
clusterStateRegistry: clusterStateRegistry,
unneededNodes: make(map[string]time.Time),
unremovableNodes: make(map[string]time.Time),
podLocationHints: make(map[string]string),
nodeUtilizationMap: make(map[string]utilization.Info),
usageTracker: usageTracker,
unneededNodesList: make([]*apiv1.Node, 0),
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0 * time.Second),
unremovableNodeReasons: make(map[string]*simulator.UnremovableNode),
removalSimulator: removalSimulator,
context: context,
processors: processors,
clusterStateRegistry: clusterStateRegistry,
unneededNodes: make(map[string]time.Time),
unremovableNodes: unremovable.NewNodes(),
podLocationHints: make(map[string]string),
nodeUtilizationMap: make(map[string]utilization.Info),
usageTracker: usageTracker,
unneededNodesList: make([]*apiv1.Node, 0),
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0 * time.Second),
removalSimulator: removalSimulator,
}
}

Expand All @@ -307,7 +306,6 @@ func (sd *ScaleDown) CleanUp(timestamp time.Time) {
// Use default ScaleDownUnneededTime as in this context the value
// doesn't apply to any specific NodeGroup.
sd.usageTracker.CleanUp(timestamp.Add(-sd.context.NodeGroupDefaults.ScaleDownUnneededTime))
sd.clearUnremovableNodeReasons()
}

// CleanUpUnneededNodes clears the list of unneeded nodes.
Expand All @@ -323,7 +321,7 @@ func (sd *ScaleDown) UnneededNodes() []*apiv1.Node {

func (sd *ScaleDown) checkNodeUtilization(timestamp time.Time, node *apiv1.Node, nodeInfo *schedulerframework.NodeInfo) (simulator.UnremovableReason, *utilization.Info) {
// Skip nodes that were recently checked.
if _, found := sd.unremovableNodes[node.Name]; found {
if sd.unremovableNodes.IsRecent(node.Name) {
return simulator.RecentlyUnremovable, nil
}

Expand Down Expand Up @@ -394,7 +392,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
return errors.ToAutoscalerError(errors.InternalError, err)
}

sd.updateUnremovableNodes(timestamp)
sd.unremovableNodes.Update(sd.context.ClusterSnapshot.NodeInfos(), timestamp)

skipped := 0
utilizationMap := make(map[string]utilization.Info)
Expand All @@ -406,7 +404,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(node.Name)
if err != nil {
klog.Errorf("Can't retrieve scale-down candidate %s from snapshot, err: %v", node.Name, err)
sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}

Expand All @@ -420,7 +418,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
skipped++
}

sd.addUnremovableNodeReason(node, reason)
sd.unremovableNodes.AddReason(node, reason)
continue
}

Expand Down Expand Up @@ -520,18 +518,17 @@ func (sd *ScaleDown) UpdateUnneededNodes(
if len(unremovable) > 0 {
unremovableTimeout := timestamp.Add(sd.context.AutoscalingOptions.UnremovableNodeRecheckTimeout)
for _, unremovableNode := range unremovable {
sd.unremovableNodes[unremovableNode.Node.Name] = unremovableTimeout
sd.addUnremovableNode(unremovableNode)
sd.unremovableNodes.AddTimeout(unremovableNode, unremovableTimeout)
}
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", len(unremovable), unremovableTimeout)
}

// This method won't always check all nodes, so let's give a generic reason for all nodes that weren't checked.
for _, node := range scaleDownCandidates {
_, unremovableReasonProvided := sd.unremovableNodeReasons[node.Name]
unremovableReasonProvided := sd.unremovableNodes.HasReason(node.Name)
_, unneeded := result[node.Name]
if !unneeded && !unremovableReasonProvided {
sd.addUnremovableNodeReason(node, simulator.NotUnneededOtherReason)
sd.unremovableNodes.AddReason(node, simulator.NotUnneededOtherReason)
}
}

Expand Down Expand Up @@ -576,59 +573,10 @@ func (sd *ScaleDown) isNodeBelowUtilizationThreshold(node *apiv1.Node, nodeGroup
return true, nil
}

// updateUnremovableNodes updates unremovableNodes map according to current
// state of the cluster. Removes from the map nodes that are no longer in the
// nodes list.
func (sd *ScaleDown) updateUnremovableNodes(timestamp time.Time) {
if len(sd.unremovableNodes) <= 0 {
return
}
newUnremovableNodes := make(map[string]time.Time, len(sd.unremovableNodes))
for oldUnremovable, ttl := range sd.unremovableNodes {
if _, err := sd.context.ClusterSnapshot.NodeInfos().Get(oldUnremovable); err != nil {
// Not logging on error level as most likely cause is that node is no longer in the cluster.
klog.Infof("Can't retrieve node %s from snapshot, removing from unremovable map, err: %v", oldUnremovable, err)
continue
}
if ttl.After(timestamp) {
// Keep nodes that are still in the cluster and haven't expired yet.
newUnremovableNodes[oldUnremovable] = ttl
}
}
sd.unremovableNodes = newUnremovableNodes
}

func (sd *ScaleDown) clearUnremovableNodeReasons() {
sd.unremovableNodeReasons = make(map[string]*simulator.UnremovableNode)
}

func (sd *ScaleDown) addUnremovableNodeReason(node *apiv1.Node, reason simulator.UnremovableReason) {
sd.unremovableNodeReasons[node.Name] = &simulator.UnremovableNode{Node: node, Reason: reason, BlockingPod: nil}
}

func (sd *ScaleDown) addUnremovableNode(unremovableNode *simulator.UnremovableNode) {
sd.unremovableNodeReasons[unremovableNode.Node.Name] = unremovableNode
}

// UnremovableNodesCount returns a map of unremovable node counts per reason.
func (sd *ScaleDown) UnremovableNodesCount() map[simulator.UnremovableReason]int {
reasons := make(map[simulator.UnremovableReason]int)

for _, node := range sd.unremovableNodeReasons {
reasons[node.Reason]++
}

return reasons
}

// UnremovableNodes returns a list of nodes that cannot be removed according to
// the scale down algorithm.
func (sd *ScaleDown) UnremovableNodes() []*simulator.UnremovableNode {
ns := make([]*simulator.UnremovableNode, 0, len(sd.unremovableNodeReasons))
for _, n := range sd.unremovableNodeReasons {
ns = append(ns, n)
}
return ns
return sd.unremovableNodes.AsList()
}

// markSimulationError indicates a simulation error by clearing relevant scale
Expand Down Expand Up @@ -735,7 +683,7 @@ func (sd *ScaleDown) TryToScaleDown(
// Check if node is marked with no scale down annotation.
if hasNoScaleDownAnnotation(node) {
klog.V(4).Infof("Skipping %s - scale down disabled annotation found", node.Name)
sd.addUnremovableNodeReason(node, simulator.ScaleDownDisabledAnnotation)
sd.unremovableNodes.AddReason(node, simulator.ScaleDownDisabledAnnotation)
continue
}

Expand All @@ -745,12 +693,12 @@ func (sd *ScaleDown) TryToScaleDown(
nodeGroup, err := sd.context.CloudProvider.NodeGroupForNode(node)
if err != nil {
klog.Errorf("Error while checking node group for %s: %v", node.Name, err)
sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.V(4).Infof("Skipping %s - no node group config", node.Name)
sd.addUnremovableNodeReason(node, simulator.NotAutoscaled)
sd.unremovableNodes.AddReason(node, simulator.NotAutoscaled)
continue
}

Expand All @@ -762,7 +710,7 @@ func (sd *ScaleDown) TryToScaleDown(
continue
}
if !unneededSince.Add(unneededTime).Before(currentTime) {
sd.addUnremovableNodeReason(node, simulator.NotUnneededLongEnough)
sd.unremovableNodes.AddReason(node, simulator.NotUnneededLongEnough)
continue
}
} else {
Expand All @@ -773,36 +721,36 @@ func (sd *ScaleDown) TryToScaleDown(
continue
}
if !unneededSince.Add(unreadyTime).Before(currentTime) {
sd.addUnremovableNodeReason(node, simulator.NotUnreadyLongEnough)
sd.unremovableNodes.AddReason(node, simulator.NotUnreadyLongEnough)
continue
}
}

size, found := nodeGroupSize[nodeGroup.Id()]
if !found {
klog.Errorf("Error while checking node group size %s: group size not found in cache", nodeGroup.Id())
sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}

deletionsInProgress := sd.nodeDeletionTracker.DeletionsCount(nodeGroup.Id())
if size-deletionsInProgress <= nodeGroup.MinSize() {
klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
sd.addUnremovableNodeReason(node, simulator.NodeGroupMinSizeReached)
sd.unremovableNodes.AddReason(node, simulator.NodeGroupMinSizeReached)
continue
}

scaleDownResourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesWithLimits)
if err != nil {
klog.Errorf("Error getting node resources: %v", err)
sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}

checkResult := scaleDownResourcesLeft.checkScaleDownDeltaWithinLimits(scaleDownResourcesDelta)
if checkResult.exceeded {
klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.exceededResources)
sd.addUnremovableNodeReason(node, simulator.MinimalResourceLimitExceeded)
sd.unremovableNodes.AddReason(node, simulator.MinimalResourceLimitExceeded)
continue
}

Expand Down Expand Up @@ -851,7 +799,7 @@ func (sd *ScaleDown) TryToScaleDown(
findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart)

for _, unremovableNode := range unremovable {
sd.addUnremovableNode(unremovableNode)
sd.unremovableNodes.Add(unremovableNode)
}

if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions cluster-autoscaler/core/scaledown/legacy/legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
Expand Down Expand Up @@ -169,15 +170,13 @@ func TestFindUnneededNodes(t *testing.T) {
assert.False(t, found, n)
}

sd.unremovableNodes = make(map[string]time.Time)
sd.unremovableNodes = unremovable.NewNodes()
sd.unneededNodes["n1"] = time.Now()
allNodes = []*apiv1.Node{n1, n2, n3, n4}
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, []*apiv1.Pod{p1, p2, p3, p4})
autoscalererr = sd.UpdateUnneededNodes(allNodes, allNodes, time.Now(), nil)
assert.NoError(t, autoscalererr)

sd.unremovableNodes = make(map[string]time.Time)

assert.Equal(t, 1, len(sd.unneededNodes))
addTime2, found := sd.unneededNodes["n2"]
assert.True(t, found)
Expand All @@ -191,7 +190,7 @@ func TestFindUnneededNodes(t *testing.T) {
assert.False(t, found, n)
}

sd.unremovableNodes = make(map[string]time.Time)
sd.unremovableNodes = unremovable.NewNodes()
scaleDownCandidates := []*apiv1.Node{n1, n3, n4}
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, []*apiv1.Pod{p1, p2, p3, p4})
autoscalererr = sd.UpdateUnneededNodes(allNodes, scaleDownCandidates, time.Now(), nil)
Expand All @@ -207,7 +206,7 @@ func TestFindUnneededNodes(t *testing.T) {

assert.Equal(t, 0, len(sd.unneededNodes))
// Verify that no other nodes are in unremovable map.
assert.Equal(t, 1, len(sd.unremovableNodes))
assert.Equal(t, 1, len(sd.unremovableNodes.AsList()))

// But it should be checked after timeout
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, []*apiv1.Pod{})
Expand All @@ -216,7 +215,7 @@ func TestFindUnneededNodes(t *testing.T) {

assert.Equal(t, 1, len(sd.unneededNodes))
// Verify that nodes that are no longer unremovable are removed.
assert.Equal(t, 0, len(sd.unremovableNodes))
assert.Equal(t, 0, len(sd.unremovableNodes.AsList()))
}

func TestFindUnneededGPUNodes(t *testing.T) {
Expand Down
Loading