Skip to content

Commit

Permalink
Allow checking specific nodes removal in simulator
Browse files Browse the repository at this point in the history
Wraps simulator functions into an object, so that there's no need to
pass around parametrs that are not changing between CA loop iterations.
  • Loading branch information
x13n committed May 6, 2022
1 parent 9bca83a commit 07a8d38
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 90 deletions.
26 changes: 9 additions & 17 deletions cluster-autoscaler/core/scaledown/legacy/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,13 @@ type ScaleDown struct {
usageTracker *simulator.UsageTracker
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
unremovableNodeReasons map[string]*simulator.UnremovableNode
removalSimulator *simulator.RemovalSimulator
}

// NewScaleDown builds new ScaleDown object.
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry) *ScaleDown {
usageTracker := simulator.NewUsageTracker()
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker)
return &ScaleDown{
context: context,
processors: processors,
Expand All @@ -291,10 +294,11 @@ func NewScaleDown(context *context.AutoscalingContext, processors *processors.Au
unremovableNodes: make(map[string]time.Time),
podLocationHints: make(map[string]string),
nodeUtilizationMap: make(map[string]utilization.Info),
usageTracker: simulator.NewUsageTracker(),
usageTracker: usageTracker,
unneededNodesList: make([]*apiv1.Node, 0),
nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0 * time.Second),
unremovableNodeReasons: make(map[string]*simulator.UnremovableNode),
removalSimulator: removalSimulator,
}
}

Expand Down Expand Up @@ -450,14 +454,10 @@ func (sd *ScaleDown) UpdateUnneededNodes(
}

// Look for nodes to remove in the current candidates
nodesToRemove, unremovable, newHints, simulatorErr := simulator.FindNodesToRemove(
nodesToRemove, unremovable, newHints, simulatorErr := sd.removalSimulator.FindNodesToRemove(
currentCandidates,
destinations,
sd.context.ListerRegistry,
sd.context.ClusterSnapshot,
sd.context.PredicateChecker,
sd.podLocationHints,
sd.usageTracker,
timestamp,
pdbs)
if simulatorErr != nil {
Expand All @@ -480,14 +480,10 @@ func (sd *ScaleDown) UpdateUnneededNodes(
// Look for additional nodes to remove among the rest of nodes.
klog.V(3).Infof("Finding additional %v candidates for scale down.", additionalCandidatesCount)
additionalNodesToRemove, additionalUnremovable, additionalNewHints, simulatorErr :=
simulator.FindNodesToRemove(
sd.removalSimulator.FindNodesToRemove(
currentNonCandidates[:additionalCandidatesPoolSize],
destinations,
sd.context.ListerRegistry,
sd.context.ClusterSnapshot,
sd.context.PredicateChecker,
sd.podLocationHints,
sd.usageTracker,
timestamp,
pdbs)
if simulatorErr != nil {
Expand Down Expand Up @@ -846,14 +842,10 @@ func (sd *ScaleDown) TryToScaleDown(
findNodesToRemoveStart := time.Now()

// We look for only 1 node so new hints may be incomplete.
nodesToRemove, unremovable, _, err := simulator.FindNodesToRemove(
nodesToRemove, unremovable, _, err := sd.removalSimulator.FindNodesToRemove(
candidateNames,
nodesWithoutMasterNames,
sd.context.ListerRegistry,
sd.context.ClusterSnapshot,
sd.context.PredicateChecker,
sd.podLocationHints,
sd.usageTracker,
time.Now(),
pdbs)
findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart)
Expand Down Expand Up @@ -935,7 +927,7 @@ func (sd *ScaleDown) getEmptyNodesToRemoveNoResourceLimits(candidates []string,
func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits scaleDownResourcesLimits,
timestamp time.Time) []simulator.NodeToBeRemoved {

emptyNodes := simulator.FindEmptyNodesToRemove(sd.context.ClusterSnapshot, candidates, timestamp)
emptyNodes := sd.removalSimulator.FindEmptyNodesToRemove(candidates, timestamp)
availabilityMap := make(map[string]int)
nodesToRemove := make([]simulator.NodeToBeRemoved, 0)
resourcesLimitsCopy := copyScaleDownResourcesLimits(resourcesLimits) // we do not want to modify input parameter
Expand Down
135 changes: 81 additions & 54 deletions cluster-autoscaler/simulator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,32 @@ const (
UnexpectedError
)

// FindNodesToRemove finds nodes that can be removed. Returns also an information about good
// rescheduling location for each of the pods.
func FindNodesToRemove(
// RemovalSimulator is a helper object for simulating node removal scenarios.
type RemovalSimulator struct {
listers kube_util.ListerRegistry
clusterSnapshot ClusterSnapshot
predicateChecker PredicateChecker
usageTracker *UsageTracker
}

// NewRemovalSimulator returns a new RemovalSimulator.
func NewRemovalSimulator(listers kube_util.ListerRegistry, clusterSnapshot ClusterSnapshot, predicateChecker PredicateChecker, usageTracker *UsageTracker) *RemovalSimulator {
return &RemovalSimulator{
listers: listers,
clusterSnapshot: clusterSnapshot,
predicateChecker: predicateChecker,
usageTracker: usageTracker,
}
}

// FindNodesToRemove finds nodes that can be removed. Returns also an
// information about good rescheduling location for each of the pods.
func (r *RemovalSimulator) FindNodesToRemove(
candidates []string,
destinations []string,
listers kube_util.ListerRegistry,
clusterSnapshot ClusterSnapshot,
predicateChecker PredicateChecker,
oldHints map[string]string,
usageTracker *UsageTracker,
timestamp time.Time,
podDisruptionBudgets []*policyv1.PodDisruptionBudget,
pdbs []*policyv1.PodDisruptionBudget,
) (nodesToRemove []NodeToBeRemoved, unremovableNodes []*UnremovableNode, podReschedulingHints map[string]string, finalError errors.AutoscalerError) {
result := make([]NodeToBeRemoved, 0)
unremovable := make([]*UnremovableNode, 0)
Expand All @@ -118,52 +132,66 @@ func FindNodesToRemove(
}

for _, nodeName := range candidates {
nodeInfo, err := clusterSnapshot.NodeInfos().Get(nodeName)
if err != nil {
klog.Errorf("Can't retrieve node %s from snapshot, err: %v", nodeName, err)
rn, urn := r.CheckNodeRemoval(nodeName, destinationMap, oldHints, newHints, timestamp, pdbs)
if rn != nil {
result = append(result, *rn)
} else if urn != nil {
unremovable = append(unremovable, urn)
}
klog.V(2).Infof("%s for removal", nodeName)
}
return result, unremovable, newHints, nil
}

if _, found := destinationMap[nodeName]; !found {
klog.V(2).Infof("nodeInfo for %s not found", nodeName)
unremovable = append(unremovable, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError})
continue
}
// CheckNodeRemoval checks whether a specific node can be removed. Depending on
// the outcome, exactly one of (NodeToBeRemoved, UnremovableNode) will be
// populated in the return value, the other will be nil.
func (r *RemovalSimulator) CheckNodeRemoval(
nodeName string,
destinationMap map[string]bool,
oldHints map[string]string,
newHints map[string]string,
timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget,
) (*NodeToBeRemoved, *UnremovableNode) {
nodeInfo, err := r.clusterSnapshot.NodeInfos().Get(nodeName)
if err != nil {
klog.Errorf("Can't retrieve node %s from snapshot, err: %v", nodeName, err)
}
klog.V(2).Infof("%s for removal", nodeName)

podsToRemove, daemonSetPods, blockingPod, err := DetailedGetPodsForMove(nodeInfo, *skipNodesWithSystemPods,
*skipNodesWithLocalStorage, listers, int32(*minReplicaCount), podDisruptionBudgets, timestamp)
if err != nil {
klog.V(2).Infof("node %s cannot be removed: %v", nodeName, err)
if blockingPod != nil {
unremovable = append(unremovable, &UnremovableNode{Node: nodeInfo.Node(), Reason: BlockedByPod, BlockingPod: blockingPod})
} else {
unremovable = append(unremovable, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError})
}
continue
}
if _, found := destinationMap[nodeName]; !found {
klog.V(2).Infof("nodeInfo for %s not found", nodeName)
return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError}
}

findProblems := findPlaceFor(nodeName, podsToRemove, destinationMap, clusterSnapshot,
predicateChecker, oldHints, newHints, usageTracker, timestamp)
if findProblems == nil {
result = append(result, NodeToBeRemoved{
Node: nodeInfo.Node(),
PodsToReschedule: podsToRemove,
DaemonSetPods: daemonSetPods,
})
klog.V(2).Infof("node %s may be removed", nodeName)
} else {
klog.V(2).Infof("node %s is not suitable for removal: %v", nodeName, findProblems)
unremovable = append(unremovable, &UnremovableNode{Node: nodeInfo.Node(), Reason: NoPlaceToMovePods})
podsToRemove, daemonSetPods, blockingPod, err := DetailedGetPodsForMove(nodeInfo, *skipNodesWithSystemPods,
*skipNodesWithLocalStorage, r.listers, int32(*minReplicaCount), pdbs, timestamp)
if err != nil {
klog.V(2).Infof("node %s cannot be removed: %v", nodeName, err)
if blockingPod != nil {
return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: BlockedByPod, BlockingPod: blockingPod}
}
return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError}
}
return result, unremovable, newHints, nil

err = r.findPlaceFor(nodeName, podsToRemove, destinationMap, oldHints, newHints, timestamp)
if err != nil {
klog.V(2).Infof("node %s is not suitable for removal: %v", nodeName, err)
return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: NoPlaceToMovePods}
}
klog.V(2).Infof("node %s may be removed", nodeName)
return &NodeToBeRemoved{
Node: nodeInfo.Node(),
PodsToReschedule: podsToRemove,
DaemonSetPods: daemonSetPods,
}, nil
}

// FindEmptyNodesToRemove finds empty nodes that can be removed.
func FindEmptyNodesToRemove(snapshot ClusterSnapshot, candidates []string, timestamp time.Time) []string {
func (r *RemovalSimulator) FindEmptyNodesToRemove(candidates []string, timestamp time.Time) []string {
result := make([]string, 0)
for _, node := range candidates {
nodeInfo, err := snapshot.NodeInfos().Get(node)
nodeInfo, err := r.clusterSnapshot.NodeInfos().Get(node)
if err != nil {
klog.Errorf("Can't retrieve node %s from snapshot, err: %v", node, err)
continue
Expand All @@ -177,15 +205,14 @@ func FindEmptyNodesToRemove(snapshot ClusterSnapshot, candidates []string, times
return result
}

func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,
clusterSnapshot ClusterSnapshot, predicateChecker PredicateChecker, oldHints map[string]string, newHints map[string]string, usageTracker *UsageTracker,
timestamp time.Time) error {
func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,
oldHints map[string]string, newHints map[string]string, timestamp time.Time) error {

if err := clusterSnapshot.Fork(); err != nil {
if err := r.clusterSnapshot.Fork(); err != nil {
return err
}
defer func() {
err := clusterSnapshot.Revert()
err := r.clusterSnapshot.Revert()
if err != nil {
klog.Fatalf("Got error when calling ClusterSnapshot.Revert(); %v", err)
}
Expand All @@ -203,7 +230,7 @@ func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,

// remove pods from clusterSnapshot first
for _, pod := range pods {
if err := clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
if err := r.clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
// just log error
klog.Errorf("Simulating removal of %s/%s return error; %v", pod.Namespace, pod.Name, err)
}
Expand All @@ -220,9 +247,9 @@ func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,
klog.V(5).Infof("Looking for place for %s/%s", pod.Namespace, pod.Name)

if hintedNode, hasHint := oldHints[podKey(pod)]; hasHint && isCandidateNode(hintedNode) {
if err := predicateChecker.CheckPredicates(clusterSnapshot, pod, hintedNode); err == nil {
if err := r.predicateChecker.CheckPredicates(r.clusterSnapshot, pod, hintedNode); err == nil {
klog.V(4).Infof("Pod %s/%s can be moved to %s", pod.Namespace, pod.Name, hintedNode)
if err := clusterSnapshot.AddPod(pod, hintedNode); err != nil {
if err := r.clusterSnapshot.AddPod(pod, hintedNode); err != nil {
return fmt.Errorf("Simulating scheduling of %s/%s to %s return error; %v", pod.Namespace, pod.Name, hintedNode, err)
}
newHints[podKey(pod)] = hintedNode
Expand All @@ -232,12 +259,12 @@ func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,
}

if !foundPlace {
newNodeName, err := predicateChecker.FitsAnyNodeMatching(clusterSnapshot, pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
newNodeName, err := r.predicateChecker.FitsAnyNodeMatching(r.clusterSnapshot, pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
return isCandidateNode(nodeInfo.Node().Name)
})
if err == nil {
klog.V(4).Infof("Pod %s/%s can be moved to %s", pod.Namespace, pod.Name, newNodeName)
if err := clusterSnapshot.AddPod(pod, newNodeName); err != nil {
if err := r.clusterSnapshot.AddPod(pod, newNodeName); err != nil {
return fmt.Errorf("Simulating scheduling of %s/%s to %s return error; %v", pod.Namespace, pod.Name, newNodeName, err)
}
newHints[podKey(pod)] = newNodeName
Expand All @@ -247,7 +274,7 @@ func findPlaceFor(removedNode string, pods []*apiv1.Pod, nodes map[string]bool,
}
}

usageTracker.RegisterUsage(removedNode, targetNode, timestamp)
r.usageTracker.RegisterUsage(removedNode, targetNode, timestamp)
}
return nil
}
30 changes: 11 additions & 19 deletions cluster-autoscaler/simulator/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,18 @@ func TestFindPlaceAllOk(t *testing.T) {
"n1": true,
"n2": true,
}
tracker := NewUsageTracker()

clusterSnapshot := NewBasicClusterSnapshot()
predicateChecker, err := NewTestPredicateChecker()
assert.NoError(t, err)
InitializeClusterSnapshotOrDie(t, clusterSnapshot,
[]*apiv1.Node{node1, node2},
[]*apiv1.Pod{pod1})

err = findPlaceFor(
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker()).findPlaceFor(
"x",
[]*apiv1.Pod{new1, new2},
destinations,
clusterSnapshot,
predicateChecker,
oldHints, newHints, tracker, time.Now())
oldHints, newHints, time.Now())

assert.Len(t, newHints, 2)
assert.Contains(t, newHints, new1.Namespace+"/"+new1.Name)
Expand All @@ -93,21 +89,18 @@ func TestFindPlaceAllBas(t *testing.T) {
"n1": true,
"n2": true,
}
tracker := NewUsageTracker()

clusterSnapshot := NewBasicClusterSnapshot()
predicateChecker, err := NewTestPredicateChecker()
assert.NoError(t, err)
InitializeClusterSnapshotOrDie(t, clusterSnapshot,
[]*apiv1.Node{node1, node2},
[]*apiv1.Pod{pod1})

err = findPlaceFor(
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker()).findPlaceFor(
"nbad",
[]*apiv1.Pod{new1, new2, new3},
destinations,
clusterSnapshot, predicateChecker,
oldHints, newHints, tracker, time.Now())
oldHints, newHints, time.Now())

assert.Error(t, err)
assert.True(t, len(newHints) == 2)
Expand Down Expand Up @@ -136,14 +129,12 @@ func TestFindNone(t *testing.T) {
[]*apiv1.Node{node1, node2},
[]*apiv1.Pod{pod1})

err = findPlaceFor(
err = NewRemovalSimulator(nil, clusterSnapshot, predicateChecker, NewUsageTracker()).findPlaceFor(
"x",
[]*apiv1.Pod{},
destinations,
clusterSnapshot, predicateChecker,
make(map[string]string),
make(map[string]string),
NewUsageTracker(),
time.Now())
assert.NoError(t, err)
}
Expand Down Expand Up @@ -171,7 +162,8 @@ func TestFindEmptyNodes(t *testing.T) {
clusterSnapshot := NewBasicClusterSnapshot()
InitializeClusterSnapshotOrDie(t, clusterSnapshot, []*apiv1.Node{nodes[0], nodes[1], nodes[2], nodes[3]}, []*apiv1.Pod{pod1, pod2})
testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC)
emptyNodes := FindEmptyNodesToRemove(clusterSnapshot, nodeNames, testTime)
r := NewRemovalSimulator(nil, clusterSnapshot, nil, nil)
emptyNodes := r.FindEmptyNodesToRemove(nodeNames, testTime)
assert.Equal(t, []string{nodeNames[0], nodeNames[2], nodeNames[3]}, emptyNodes)
}

Expand Down Expand Up @@ -317,10 +309,10 @@ func TestFindNodesToRemove(t *testing.T) {
destinations = append(destinations, node.Name)
}
InitializeClusterSnapshotOrDie(t, clusterSnapshot, test.allNodes, test.pods)
toRemove, unremovable, _, err := FindNodesToRemove(
test.candidates, destinations, registry,
clusterSnapshot, predicateChecker, map[string]string{},
tracker, time.Now(), []*policyv1.PodDisruptionBudget{})
r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker)
toRemove, unremovable, _, err := r.FindNodesToRemove(
test.candidates, destinations, map[string]string{},
time.Now(), []*policyv1.PodDisruptionBudget{})
assert.NoError(t, err)
fmt.Printf("Test scenario: %s, found len(toRemove)=%v, expected len(test.toRemove)=%v\n", test.name, len(toRemove), len(test.toRemove))
assert.Equal(t, toRemove, test.toRemove)
Expand Down

0 comments on commit 07a8d38

Please sign in to comment.