Skip to content

Commit

Permalink
Merge pull request kubernetes#4864 from x13n/scaledown
Browse files Browse the repository at this point in the history
Allow checking specific nodes removal in simulator
  • Loading branch information
k8s-ci-robot authored May 9, 2022
2 parents cff6cde + 07a8d38 commit f92e753
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 90 deletions.
26 changes: 9 additions & 17 deletions cluster-autoscaler/core/scaledown/legacy/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,13 @@ type ScaleDown struct {
usageTracker *simulator.UsageTracker
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
unremovableNodeReasons map[string]*simulator.UnremovableNode
removalSimulator *simulator.RemovalSimulator
}

// NewScaleDown builds new ScaleDown object.
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry) *ScaleDown {
usageTracker := simulator.NewUsageTracker()
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker)
return &ScaleDown{
context: context,
processors: processors,
Expand All @@ -291,10 +294,11 @@ func NewScaleDown(context *context.AutoscalingContext, processors *processors.Au
unremovableNodes: make(map[string]time.Time),
podLocationHints: make(map[string]string),
nodeUtilizationMap: make(map[string]utilization.Info),
usageTracker: simulator.NewUsageTracker(),
usageTracker: usageTracker,
unneededNodesList: make([]*apiv1.Node, 0),
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0 * time.Second),
unremovableNodeReasons: make(map[string]*simulator.UnremovableNode),
removalSimulator: removalSimulator,
}
}

Expand Down Expand Up @@ -450,14 +454,10 @@ func (sd *ScaleDown) UpdateUnneededNodes(
}

// Look for nodes to remove in the current candidates
nodesToRemove, unremovable, newHints, simulatorErr := simulator.FindNodesToRemove(
nodesToRemove, unremovable, newHints, simulatorErr := sd.removalSimulator.FindNodesToRemove(
currentCandidates,
destinations,
sd.context.ListerRegistry,
sd.context.ClusterSnapshot,
sd.context.PredicateChecker,
sd.podLocationHints,
sd.usageTracker,
timestamp,
pdbs)
if simulatorErr != nil {
Expand All @@ -480,14 +480,10 @@ func (sd *ScaleDown) UpdateUnneededNodes(
// Look for additional nodes to remove among the rest of nodes.
klog.V(3).Infof("Finding additional %v candidates for scale down.", additionalCandidatesCount)
additionalNodesToRemove, additionalUnremovable, additionalNewHints, simulatorErr :=
simulator.FindNodesToRemove(
sd.removalSimulator.FindNodesToRemove(
currentNonCandidates[:additionalCandidatesPoolSize],
destinations,
sd.context.ListerRegistry,
sd.context.ClusterSnapshot,
sd.context.PredicateChecker,
sd.podLocationHints,
sd.usageTracker,
timestamp,
pdbs)
if simulatorErr != nil {
Expand Down Expand Up @@ -846,14 +842,10 @@ func (sd *ScaleDown) TryToScaleDown(
findNodesToRemoveStart := time.Now()

// We look for only 1 node so new hints may be incomplete.
nodesToRemove, unremovable, _, err := simulator.FindNodesToRemove(
nodesToRemove, unremovable, _, err := sd.removalSimulator.FindNodesToRemove(
candidateNames,
nodesWithoutMasterNames,
sd.context.ListerRegistry,
sd.context.ClusterSnapshot,
sd.context.PredicateChecker,
sd.podLocationHints,
sd.usageTracker,
time.Now(),
pdbs)
findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart)
Expand Down Expand Up @@ -935,7 +927,7 @@ func (sd *ScaleDown) getEmptyNodesToRemoveNoResourceLimits(candidates []string,
func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits scaleDownResourcesLimits,
timestamp time.Time) []simulator.NodeToBeRemoved {

emptyNodes := simulator.FindEmptyNodesToRemove(sd.context.ClusterSnapshot, candidates, timestamp)
emptyNodes := sd.removalSimulator.FindEmptyNodesToRemove(candidates, timestamp)
availabilityMap := make(map[string]int)
nodesToRemove := make([]simulator.NodeToBeRemoved, 0)
resourcesLimitsCopy := copyScaleDownResourcesLimits(resourcesLimits) // we do not want to modify input parameter
Expand Down
135 changes: 81 additions & 54 deletions cluster-autoscaler/simulator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,32 @@ const (
UnexpectedError
)

// FindNodesToRemove finds nodes that can be removed. Returns also an information about good
// rescheduling location for each of the pods.
func FindNodesToRemove(
// RemovalSimulator is a helper object for simulating node removal scenarios.
type RemovalSimulator struct {
listers kube_util.ListerRegistry
clusterSnapshot ClusterSnapshot
predicateChecker PredicateChecker
usageTracker *UsageTracker
}

// NewRemovalSimulator returns a new RemovalSimulator.
func NewRemovalSimulator(listers kube_util.ListerRegistry, clusterSnapshot ClusterSnapshot, predicateChecker PredicateChecker, usageTracker *UsageTracker) *RemovalSimulator {
return &RemovalSimulator{
listers: listers,
clusterSnapshot: clusterSnapshot,
predicateChecker: predicateChecker,
usageTracker: usageTracker,
}
}

// FindNodesToRemove finds nodes that can be removed. Returns also an
// information about good rescheduling location for each of the pods.
func (r *RemovalSimulator) FindNodesToRemove(
candidates []string,
destinations []string,
listers kube_util.ListerRegistry,
clusterSnapshot ClusterSnapshot,
predicateChecker PredicateChecker,
oldHints map[string]string,
usageTracker *UsageTracker,
timestamp time.Time,
podDisruptionBudgets []*policyv1.PodDisruptionBudget,
pdbs []*policyv1.PodDisruptionBudget,
) (nodesToRemove []NodeToBeRemoved, unremovableNodes []*UnremovableNode, podReschedulingHints map[string]string, finalError errors.AutoscalerError) {
result := make([]NodeToBeRemoved, 0)
unremovable := make([]*UnremovableNode, 0)
Expand All @@ -118,52 +132,66 @@ func FindNodesToRemove(
}

for _, nodeName := range candidates {
nodeInfo, err := clusterSnapshot.NodeInfos().Get(nodeName)
if err != nil {
klog.Errorf("Can't retrieve node %s from snapshot, err: %v", nodeName, err)
rn, urn := r.CheckNodeRemoval(nodeName, destinationMap, oldHints, newHints, timestamp, pdbs)
if rn != nil {
result = append(result, *rn)
} else if urn != nil {
unremovable = append(unremovable, urn)
}
klog.V(2).Infof("%s for removal", nodeName)
}
return result, unremovable, newHints, nil
}

if _, found := destinationMap[nodeName]; !found {
klog.V(2).Infof("nodeInfo for %s not found", nodeName)
unremovable = append(unremovable, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError})
continue
}
// CheckNodeRemoval checks whether a specific node can be removed. Depending on
// the outcome, exactly one of (NodeToBeRemoved, UnremovableNode) will be
// populated in the return value, the other will be nil.
func (r *RemovalSimulator) CheckNodeRemoval(
nodeName string,
destinationMap map[string]bool,
oldHints map[string]string,
newHints map[string]string,
timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget,
) (*NodeToBeRemoved, *UnremovableNode) {
nodeInfo, err := r.clusterSnapshot.NodeInfos().Get(nodeName)
if err != nil {
klog.Errorf("Can't retrieve node %s from snapshot, err: %v", nodeName, err)
}
klog.V(2).Infof("%s for removal", nodeName)

podsToRemove, daemonSetPods, blockingPod, err := DetailedGetPodsForMove(nodeInfo, *skipNodesWithSystemPods,
*skipNodesWithLocalStorage, listers, int32(*minReplicaCount), podDisruptionBudgets, timestamp)
if err != nil {
klog.V(2).Infof("node %s cannot be removed: %v", nodeName, err)
if blockingPod != nil {
unremovable = append(unremovable, &UnremovableNode{Node: nodeInfo.Node(), Reason: BlockedByPod, BlockingPod: blockingPod})
} else {
unremovable = append(unremovable, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError})
}
continue
}
if _, found := destinationMap[nodeName]; !found {
klog.V(2).Infof("nodeInfo for %s not found", nodeName)
return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError}
}

findProblems := findPlaceFor(nodeName, podsToRemove, destinationMap, clusterSnapshot,
predicateChecker, oldHints, newHints, usageTracker, timestamp)
if findProblems == nil {
result = append(result, NodeToBeRemoved{
Node: nodeInfo.Node(),
PodsToReschedule: podsToRemove,
DaemonSetPods: daemonSetPods,
})
klog.V(2).Infof("node %s may be removed", nodeName)
} else {
klog.V(2).Infof("node %s is not suitable for removal: %v", nodeName, findProblems)
unremovable = append(unremovable, &UnremovableNode{Node: nodeInfo.Node(), Reason: NoPlaceToMovePods})
podsToRemove, daemonSetPods, blockingPod, err := DetailedGetPodsForMove(nodeInfo, *skipNodesWithSystemPods,
*skipNodesWithLocalStorage, r.listers, int32(*minReplicaCount), pdbs, timestamp)
if err != nil {
klog.V(2).Infof("node %s cannot be removed: %v", nodeName, err)
if blockingPod != nil {
return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: BlockedByPod, BlockingPod: blockingPod}
}
return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError}
}
return result, unremovable, newHints, nil

err = r.findPlaceFor(nodeName, podsToRemove, destinationMap, oldHints, newHints, timestamp)
if err != nil {
klog.V(2).Infof("node %s is not suitable for removal: %v", nodeName, err)
return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: NoPlaceToMovePods}
}
klog.V(2).Infof("node %s may be removed", nodeName)
return &NodeToBeRemoved{
Node: nodeInfo.Node(),
PodsToReschedule: podsToRemove,
DaemonSetPods: daemonSetPods,
}, nil
}

// FindEmptyNodesToRemove finds empty nodes that can be removed.
func FindEmptyNodesToRemove(snapshot ClusterSnapshot, candidates []string, timestamp time.Time) []string {
func (r *RemovalSimulator) FindEmptyNodesToRemove(candidates []string, timestamp time.Time) []string {
result := make([]string, 0)
for _, node := range candidates {
nodeInfo, err := snapshot.NodeInfos().Get(node)
nodeInfo, err := r.clusterSnapshot.NodeInfos().Get(node)
if err != nil {
klog.Errorf("Can't retrieve node %s from snapshot, err: %v", node, err)
continue
Expand All @@ -177,15 +205,14 @@ func FindEmptyNodesToRemove(snapshot ClusterSnapshot, candidates []string, times
return result
}

func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,
clusterSnapshot ClusterSnapshot, predicateChecker PredicateChecker, oldHints map[string]string, newHints map[string]string, usageTracker *UsageTracker,
timestamp time.Time) error {
func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,
oldHints map[string]string, newHints map[string]string, timestamp time.Time) error {

if err := clusterSnapshot.Fork(); err != nil {
if err := r.clusterSnapshot.Fork(); err != nil {
return err
}
defer func() {
err := clusterSnapshot.Revert()
err := r.clusterSnapshot.Revert()
if err != nil {
klog.Fatalf("Got error when calling ClusterSnapshot.Revert(); %v", err)
}
Expand All @@ -203,7 +230,7 @@ func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,

// remove pods from clusterSnapshot first
for _, pod := range pods {
if err := clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
if err := r.clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
// just log error
klog.Errorf("Simulating removal of %s/%s return error; %v", pod.Namespace, pod.Name, err)
}
Expand All @@ -220,9 +247,9 @@ func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,
klog.V(5).Infof("Looking for place for %s/%s", pod.Namespace, pod.Name)

if hintedNode, hasHint := oldHints[podKey(pod)]; hasHint && isCandidateNode(hintedNode) {
if err := predicateChecker.CheckPredicates(clusterSnapshot, pod, hintedNode); err == nil {
if err := r.predicateChecker.CheckPredicates(r.clusterSnapshot, pod, hintedNode); err == nil {
klog.V(4).Infof("Pod %s/%s can be moved to %s", pod.Namespace, pod.Name, hintedNode)
if err := clusterSnapshot.AddPod(pod, hintedNode); err != nil {
if err := r.clusterSnapshot.AddPod(pod, hintedNode); err != nil {
return fmt.Errorf("Simulating scheduling of %s/%s to %s return error; %v", pod.Namespace, pod.Name, hintedNode, err)
}
newHints[podKey(pod)] = hintedNode
Expand All @@ -232,12 +259,12 @@ func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,
}

if !foundPlace {
newNodeName, err := predicateChecker.FitsAnyNodeMatching(clusterSnapshot, pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
newNodeName, err := r.predicateChecker.FitsAnyNodeMatching(r.clusterSnapshot, pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
return isCandidateNode(nodeInfo.Node().Name)
})
if err == nil {
klog.V(4).Infof("Pod %s/%s can be moved to %s", pod.Namespace, pod.Name, newNodeName)
if err := clusterSnapshot.AddPod(pod, newNodeName); err != nil {
if err := r.clusterSnapshot.AddPod(pod, newNodeName); err != nil {
return fmt.Errorf("Simulating scheduling of %s/%s to %s return error; %v", pod.Namespace, pod.Name, newNodeName, err)
}
newHints[podKey(pod)] = newNodeName
Expand All @@ -247,7 +274,7 @@ func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,
}
}

usageTracker.RegisterUsage(removedNode, targetNode, timestamp)
r.usageTracker.RegisterUsage(removedNode, targetNode, timestamp)
}
return nil
}
30 changes: 11 additions & 19 deletions cluster-autoscaler/simulator/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,18 @@ func TestFindPlaceAllOk(t *testing.T) {
"n1": true,
"n2": true,
}
tracker := NewUsageTracker()

clusterSnapshot := NewBasicClusterSnapshot()
predicateChecker, err := NewTestPredicateChecker()
assert.NoError(t, err)
InitializeClusterSnapshotOrDie(t, clusterSnapshot,
[]*apiv1.Node{node1, node2},
[]*apiv1.Pod{pod1})

err = findPlaceFor(
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker()).findPlaceFor(
"x",
[]*apiv1.Pod{new1, new2},
destinations,
clusterSnapshot,
predicateChecker,
oldHints, newHints, tracker, time.Now())
oldHints, newHints, time.Now())

assert.Len(t, newHints, 2)
assert.Contains(t, newHints, new1.Namespace+"/"+new1.Name)
Expand All @@ -93,21 +89,18 @@ func TestFindPlaceAllBas(t *testing.T) {
"n1": true,
"n2": true,
}
tracker := NewUsageTracker()

clusterSnapshot := NewBasicClusterSnapshot()
predicateChecker, err := NewTestPredicateChecker()
assert.NoError(t, err)
InitializeClusterSnapshotOrDie(t, clusterSnapshot,
[]*apiv1.Node{node1, node2},
[]*apiv1.Pod{pod1})

err = findPlaceFor(
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker()).findPlaceFor(
"nbad",
[]*apiv1.Pod{new1, new2, new3},
destinations,
clusterSnapshot, predicateChecker,
oldHints, newHints, tracker, time.Now())
oldHints, newHints, time.Now())

assert.Error(t, err)
assert.True(t, len(newHints) == 2)
Expand Down Expand Up @@ -136,14 +129,12 @@ func TestFindNone(t *testing.T) {
[]*apiv1.Node{node1, node2},
[]*apiv1.Pod{pod1})

err = findPlaceFor(
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker()).findPlaceFor(
"x",
[]*apiv1.Pod{},
destinations,
clusterSnapshot, predicateChecker,
make(map[string]string),
make(map[string]string),
NewUsageTracker(),
time.Now())
assert.NoError(t, err)
}
Expand Down Expand Up @@ -171,7 +162,8 @@ func TestFindEmptyNodes(t *testing.T) {
clusterSnapshot := NewBasicClusterSnapshot()
InitializeClusterSnapshotOrDie(t, clusterSnapshot, []*apiv1.Node{nodes[0], nodes[1], nodes[2], nodes[3]}, []*apiv1.Pod{pod1, pod2})
testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC)
emptyNodes := FindEmptyNodesToRemove(clusterSnapshot, nodeNames, testTime)
r := NewRemovalSimulator(nil, clusterSnapshot, nil, nil)
emptyNodes := r.FindEmptyNodesToRemove(nodeNames, testTime)
assert.Equal(t, []string{nodeNames[0], nodeNames[2], nodeNames[3]}, emptyNodes)
}

Expand Down Expand Up @@ -317,10 +309,10 @@ func TestFindNodesToRemove(t *testing.T) {
destinations = append(destinations, node.Name)
}
InitializeClusterSnapshotOrDie(t, clusterSnapshot, test.allNodes, test.pods)
toRemove, unremovable, _, err := FindNodesToRemove(
test.candidates, destinations, registry,
clusterSnapshot, predicateChecker, map[string]string{},
tracker, time.Now(), []*policyv1.PodDisruptionBudget{})
r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker)
toRemove, unremovable, _, err := r.FindNodesToRemove(
test.candidates, destinations, map[string]string{},
time.Now(), []*policyv1.PodDisruptionBudget{})
assert.NoError(t, err)
fmt.Printf("Test scenario: %s, found len(toRemove)=%v, expected len(test.toRemove)=%v\n", test.name, len(toRemove), len(test.toRemove))
assert.Equal(t, toRemove, test.toRemove)
Expand Down

0 comments on commit f92e753

Please sign in to comment.