From accf58f36c06b49f838cf2015cfcd5be6bfa81ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20K=C5=82obuszewski?= Date: Sat, 10 Sep 2022 17:02:49 +0200 Subject: [PATCH] Base parallel scale down implementation --- .../core/scaledown/eligibility/eligibility.go | 13 +- .../scaledown/eligibility/eligibility_test.go | 2 +- .../core/scaledown/legacy/legacy.go | 6 +- .../core/scaledown/planner/planner.go | 294 +++++++++++++ .../core/scaledown/planner/planner_test.go | 400 ++++++++++++++++++ cluster-autoscaler/simulator/cluster.go | 7 +- cluster-autoscaler/utils/errors/errors.go | 4 + 7 files changed, 714 insertions(+), 12 deletions(-) create mode 100644 cluster-autoscaler/core/scaledown/planner/planner.go create mode 100644 cluster-autoscaler/core/scaledown/planner/planner_test.go diff --git a/cluster-autoscaler/core/scaledown/eligibility/eligibility.go b/cluster-autoscaler/core/scaledown/eligibility/eligibility.go index ba7ddced8311..6717ae9a0b4b 100644 --- a/cluster-autoscaler/core/scaledown/eligibility/eligibility.go +++ b/cluster-autoscaler/core/scaledown/eligibility/eligibility.go @@ -63,9 +63,8 @@ func NewChecker(thresholdGetter utilizationThresholdGetter) *Checker { // utilization info. // TODO(x13n): Node utilization could actually be calculated independently for // all nodes and just used here. Next refactor... -func (c *Checker) FilterOutUnremovable(context *context.AutoscalingContext, scaleDownCandidates []*apiv1.Node, timestamp time.Time, unremovableNodes *unremovable.Nodes) ([]string, map[string]utilization.Info) { - unremovableNodes.Update(context.ClusterSnapshot.NodeInfos(), timestamp) - +func (c *Checker) FilterOutUnremovable(context *context.AutoscalingContext, scaleDownCandidates []*apiv1.Node, timestamp time.Time, unremovableNodes *unremovable.Nodes) ([]string, map[string]utilization.Info, []*simulator.UnremovableNode) { + ineligible := []*simulator.UnremovableNode{} skipped := 0 utilizationMap := make(map[string]utilization.Info) currentlyUnneededNodeNames := make([]string, 0, len(scaleDownCandidates)) @@ -75,13 +74,13 @@ func (c *Checker) FilterOutUnremovable(context *context.AutoscalingContext, scal nodeInfo, err := context.ClusterSnapshot.NodeInfos().Get(node.Name) if err != nil { klog.Errorf("Can't retrieve scale-down candidate %s from snapshot, err: %v", node.Name, err) - unremovableNodes.AddReason(node, simulator.UnexpectedError) + ineligible = append(ineligible, &simulator.UnremovableNode{Node: node, Reason: simulator.UnexpectedError}) continue } // Skip nodes that were recently checked. if unremovableNodes.IsRecent(node.Name) { - unremovableNodes.AddReason(node, simulator.RecentlyUnremovable) + ineligible = append(ineligible, &simulator.UnremovableNode{Node: node, Reason: simulator.RecentlyUnremovable}) skipped++ continue } @@ -91,7 +90,7 @@ func (c *Checker) FilterOutUnremovable(context *context.AutoscalingContext, scal utilizationMap[node.Name] = *utilInfo } if reason != simulator.NoReason { - unremovableNodes.AddReason(node, reason) + ineligible = append(ineligible, &simulator.UnremovableNode{Node: node, Reason: reason}) continue } @@ -102,7 +101,7 @@ func (c *Checker) FilterOutUnremovable(context *context.AutoscalingContext, scal if skipped > 0 { klog.V(1).Infof("Scale-down calculation: ignoring %v nodes unremovable in the last %v", skipped, context.AutoscalingOptions.UnremovableNodeRecheckTimeout) } - return currentlyUnneededNodeNames, utilizationMap + return currentlyUnneededNodeNames, utilizationMap, ineligible } func (c *Checker) unremovableReasonAndNodeUtilization(context *context.AutoscalingContext, timestamp time.Time, nodeInfo *schedulerframework.NodeInfo, utilLogsQuota *klogx.Quota) (simulator.UnremovableReason, *utilization.Info) { diff --git a/cluster-autoscaler/core/scaledown/eligibility/eligibility_test.go b/cluster-autoscaler/core/scaledown/eligibility/eligibility_test.go index 977e792fa83e..2a93623c01e9 100644 --- a/cluster-autoscaler/core/scaledown/eligibility/eligibility_test.go +++ b/cluster-autoscaler/core/scaledown/eligibility/eligibility_test.go @@ -109,7 +109,7 @@ func TestFilterOutUnremovable(t *testing.T) { t.Fatalf("Could not create autoscaling context: %v", err) } unremovableNodes := unremovable.NewNodes() - got, _ := c.FilterOutUnremovable(&context, tc.nodes, now, unremovableNodes) + got, _, _ := c.FilterOutUnremovable(&context, tc.nodes, now, unremovableNodes) assert.Equal(t, tc.want, got) }) } diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy.go b/cluster-autoscaler/core/scaledown/legacy/legacy.go index b65fdd1552f4..00030f847cf5 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy.go @@ -116,7 +116,11 @@ func (sd *ScaleDown) UpdateUnneededNodes( // Phase1 - look at the nodes utilization. Calculate the utilization // only for the managed nodes. - currentlyUnneededNodeNames, utilizationMap := sd.eligibilityChecker.FilterOutUnremovable(sd.context, scaleDownCandidates, timestamp, sd.unremovableNodes) + sd.unremovableNodes.Update(sd.context.ClusterSnapshot.NodeInfos(), timestamp) + currentlyUnneededNodeNames, utilizationMap, ineligible := sd.eligibilityChecker.FilterOutUnremovable(sd.context, scaleDownCandidates, timestamp, sd.unremovableNodes) + for _, n := range ineligible { + sd.unremovableNodes.Add(n) + } emptyNodesToRemove := sd.getEmptyNodesToRemoveNoResourceLimits(currentlyUnneededNodeNames, timestamp) diff --git a/cluster-autoscaler/core/scaledown/planner/planner.go b/cluster-autoscaler/core/scaledown/planner/planner.go new file mode 100644 index 000000000000..c84a844300a6 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/planner/planner.go @@ -0,0 +1,294 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package planner + +import ( + "fmt" + "time" + + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable" + "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" + "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod" + + apiv1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + klog "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" +) + +type eligibilityChecker interface { + FilterOutUnremovable(context *context.AutoscalingContext, scaleDownCandidates []*apiv1.Node, timestamp time.Time, unremovableNodes *unremovable.Nodes) ([]string, map[string]utilization.Info, []*simulator.UnremovableNode) +} + +type removalSimulator interface { + DropOldHints() + SimulateNodeRemoval(node string, podDestinations map[string]bool, timestamp time.Time, pdbs []*policyv1.PodDisruptionBudget) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode) +} + +// Planner is responsible for deciding which nodes should be deleted during scale down. +type Planner struct { + context *context.AutoscalingContext + unremovableNodes *unremovable.Nodes + unneededNodes *unneeded.Nodes + rs removalSimulator + actuationInjector *scheduling.HintingSimulator + latestUpdate time.Time + eligibilityChecker eligibilityChecker + nodeUtilizationMap map[string]utilization.Info + actuationStatus scaledown.ActuationStatus + resourceLimitsFinder *resource.LimitsFinder +} + +// New creates a new Planner object. +func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions simulator.NodeDeleteOptions) *Planner { + resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor) + return &Planner{ + context: context, + unremovableNodes: unremovable.NewNodes(), + unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder), + rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, simulator.NewUsageTracker(), deleteOptions, true), + actuationInjector: scheduling.NewHintingSimulator(context.PredicateChecker), + eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor), + nodeUtilizationMap: make(map[string]utilization.Info), + resourceLimitsFinder: resourceLimitsFinder, + } +} + +// UpdateClusterState needs to be periodically invoked to provide Planner with +// up-to-date information about the cluster. +// Planner will evaluate scaleDownCandidates in the order provided here. +func (p *Planner) UpdateClusterState(podDestinations, scaleDownCandidates []*apiv1.Node, as scaledown.ActuationStatus, pdb []*policyv1.PodDisruptionBudget, currentTime time.Time) errors.AutoscalerError { + p.latestUpdate = currentTime + p.actuationStatus = as + // TODO: clone cluster snapshot to avoid persisting changes done by the + // simulation. Or - better yet - allow the snapshot to be forked twice + // and just fork it here. + err := p.injectOngoingActuation() + if err != nil { + p.CleanUpUnneededNodes() + return errors.ToAutoscalerError(errors.UnexpectedScaleDownStateError, err) + } + deletions := asMap(merged(as.DeletionsInProgress())) + podDestinations = filterOutOngoingDeletions(podDestinations, deletions) + scaleDownCandidates = filterOutOngoingDeletions(scaleDownCandidates, deletions) + p.categorizeNodes(asMap(nodeNames(podDestinations)), scaleDownCandidates, pdb) + p.rs.DropOldHints() + p.actuationInjector.DropOldHints() + return nil +} + +// CleanUpUnneededNodes forces Planner to forget about all nodes considered +// unneeded so far. +func (p *Planner) CleanUpUnneededNodes() { + p.unneededNodes.Clear() +} + +// NodesToDelete returns all Nodes that could be removed right now, according +// to the Planner. +func (p *Planner) NodesToDelete() (empty, needDrain []*apiv1.Node) { + nodes, err := allNodes(p.context.ClusterSnapshot) + if err != nil { + klog.Errorf("Nothing will scale down, failed to list nodes from ClusterSnapshot: %v", err) + return nil, nil + } + resourceLimiter, err := p.context.CloudProvider.GetResourceLimiter() + if err != nil { + klog.Errorf("Nothing will scale down, failed to create resource limiter: %v", err) + return nil, nil + } + limitsLeft := p.resourceLimitsFinder.LimitsLeft(p.context, nodes, resourceLimiter, p.latestUpdate) + empty, needDrain, unremovable := p.unneededNodes.RemovableAt(p.context, p.latestUpdate, limitsLeft, resourceLimiter.GetResources(), p.actuationStatus) + for _, u := range unremovable { + p.unremovableNodes.Add(u) + } + // TODO: filter results with ScaleDownSetProcessor.GetNodesToRemove + return empty, needDrain +} + +func allNodes(s clustersnapshot.ClusterSnapshot) ([]*apiv1.Node, error) { + nodeInfos, err := s.NodeInfos().List() + if err != nil { + // This should never happen, List() returns err only because scheduler interface requires it. + return nil, err + } + nodes := make([]*apiv1.Node, len(nodeInfos)) + for i, ni := range nodeInfos { + nodes[i] = ni.Node() + } + return nodes, nil +} + +// UnneededNodes returns a list of nodes currently considered as unneeded. +func (p *Planner) UnneededNodes() []*apiv1.Node { + return p.unneededNodes.AsList() +} + +// UnremovableNodes returns a list of nodes currently considered as unremovable. +func (p *Planner) UnremovableNodes() []*simulator.UnremovableNode { + return p.unremovableNodes.AsList() +} + +// NodeUtilizationMap returns a map with utilization of nodes. +func (p *Planner) NodeUtilizationMap() map[string]utilization.Info { + return p.nodeUtilizationMap +} + +// injectOngoingActuation injects pods into ClusterSnapshot, to allow +// subsequent simulation to anticipate which pods will end up getting replaced +// due to being evicted by previous scale down(s). There are two sets of such +// pods: +// - existing pods from currently drained nodes +// - pods which were recently evicted (it is up to ActuationStatus to decide +// what "recently" means in this case). +// +// It is entirely possible for some external controller to have already created +// a replacement pod for such recent evictions, in which case the subsequent +// simulation will count them twice. This is ok: it is much safer to disrupt +// the scale down because of double-counting some pods than it is to scale down +// too aggressively. +func (p *Planner) injectOngoingActuation() error { + err := p.injectPods(currentlyDrainedPods(p.context.ClusterSnapshot.NodeInfos(), p.actuationStatus)) + if err != nil { + return err + } + // TODO(x13n): Check owner references to avoid double-counting already + // recreated pods. + return p.injectPods(p.actuationStatus.RecentEvictions()) +} + +func currentlyDrainedPods(niLister framework.NodeInfoLister, as scaledown.ActuationStatus) []*apiv1.Pod { + var pods []*apiv1.Pod + _, ds := as.DeletionsInProgress() + for _, d := range ds { + ni, err := niLister.Get(d) + if err != nil { + klog.Warningf("Couldn't get node %v info, assuming the node got deleted already: %v", d, err) + continue + } + for _, pi := range ni.Pods { + pods = append(pods, pi.Pod) + } + } + return pods +} + +func filterRecreatable(pods []*apiv1.Pod) []*apiv1.Pod { + filtered := make([]*apiv1.Pod, 0, len(pods)) + for _, p := range pods { + if pod_util.IsStaticPod(p) || pod_util.IsMirrorPod(p) || pod_util.IsDaemonSetPod(p) { + continue + } + filtered = append(filtered, p) + } + return filtered +} + +func (p *Planner) injectPods(pods []*apiv1.Pod) error { + pods = filterRecreatable(pods) + pods = clearNodeName(pods) + // Note: We're using ScheduleAnywhere, but the pods won't schedule back + // on the drained nodes due to taints. + _, err := p.actuationInjector.TrySchedulePods(p.context.ClusterSnapshot, pods, scheduling.ScheduleAnywhere) + if err != nil { + return fmt.Errorf("cannot scale down, no place to reschedule pods from ongoing deletions: %v", err) + } + return nil +} + +// categorizeNodes determines, for each node, whether it can be eventually +// removed or if there are reasons preventing that. +// TODO: Track remaining PDB budget. +func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCandidates []*apiv1.Node, pdbs []*policyv1.PodDisruptionBudget) { + unremovableTimeout := p.latestUpdate.Add(p.context.AutoscalingOptions.UnremovableNodeRecheckTimeout) + unremovableCount := 0 + var removableList []simulator.NodeToBeRemoved + p.unremovableNodes.Update(p.context.ClusterSnapshot.NodeInfos(), p.latestUpdate) + currentlyUnneededNodeNames, utilizationMap, ineligible := p.eligibilityChecker.FilterOutUnremovable(p.context, scaleDownCandidates, p.latestUpdate, p.unremovableNodes) + for _, n := range ineligible { + p.unremovableNodes.Add(n) + } + p.nodeUtilizationMap = utilizationMap + for _, node := range currentlyUnneededNodeNames { + // TODO(x13n): break on timeout. Figure out how to handle nodes + // identified as unneeded in previous iteration, but now + // skipped due to timeout. + removable, unremovable := p.rs.SimulateNodeRemoval(node, podDestinations, p.latestUpdate, pdbs) + if unremovable != nil { + unremovableCount += 1 + p.unremovableNodes.AddTimeout(unremovable, unremovableTimeout) + } + if removable != nil { + delete(podDestinations, removable.Node.Name) + removableList = append(removableList, *removable) + } + } + p.unneededNodes.Update(removableList, p.latestUpdate) + if unremovableCount > 0 { + klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout) + } +} + +func merged(a, b []string) []string { + return append(append(make([]string, 0, len(a)+len(b)), a...), b...) +} + +func asMap(strs []string) map[string]bool { + m := make(map[string]bool, len(strs)) + for _, s := range strs { + m[s] = true + } + return m +} + +func nodeNames(nodes []*apiv1.Node) []string { + names := make([]string, len(nodes)) + for i, node := range nodes { + names[i] = node.Name + } + return names +} + +func filterOutOngoingDeletions(ns []*apiv1.Node, deleted map[string]bool) []*apiv1.Node { + rv := make([]*apiv1.Node, 0, len(ns)) + for _, n := range ns { + if deleted[n.Name] { + continue + } + rv = append(rv, n) + } + return rv +} + +func clearNodeName(pods []*apiv1.Pod) []*apiv1.Pod { + newpods := make([]*apiv1.Pod, 0, len(pods)) + for _, podptr := range pods { + newpod := *podptr + newpod.Spec.NodeName = "" + newpods = append(newpods, &newpod) + } + return newpods +} diff --git a/cluster-autoscaler/core/scaledown/planner/planner_test.go b/cluster-autoscaler/core/scaledown/planner/planner_test.go new file mode 100644 index 000000000000..538faa5c70a7 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/planner/planner_test.go @@ -0,0 +1,400 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package planner + +import ( + "testing" + "time" + + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable" + . "k8s.io/autoscaler/cluster-autoscaler/core/test" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" + "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func TestUpdateClusterState(t *testing.T) { + testCases := []struct { + name string + nodes []*apiv1.Node + pods []*apiv1.Pod + actuationStatus *fakeActuationStatus + eligible []string + wantUnneeded []string + wantErr bool + }{ + { + name: "all eligible", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + BuildTestNode("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + BuildTestNode("n4", 1000, 10), + }, + eligible: []string{"n1", "n2", "n3", "n4"}, + actuationStatus: &fakeActuationStatus{}, + wantUnneeded: []string{"n1", "n2", "n3", "n4"}, + }, + { + name: "none eligible", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + BuildTestNode("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + BuildTestNode("n4", 1000, 10), + }, + eligible: []string{}, + actuationStatus: &fakeActuationStatus{}, + wantUnneeded: []string{}, + }, + { + name: "some eligible", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + BuildTestNode("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + BuildTestNode("n4", 1000, 10), + }, + eligible: []string{"n1", "n3"}, + actuationStatus: &fakeActuationStatus{}, + wantUnneeded: []string{"n1", "n3"}, + }, + { + name: "pods from already drained node can schedule elsewhere", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + nodeUndergoingDeletion("n2", 2000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 500, 1, "n2"), + scheduledPod("p2", 500, 1, "n2"), + }, + eligible: []string{"n1"}, + actuationStatus: &fakeActuationStatus{ + currentlyDrained: []string{"n2"}, + }, + wantUnneeded: []string{}, + }, + { + name: "pods from already drained node can't schedule elsewhere", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + nodeUndergoingDeletion("n2", 2000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 500, 1, "n2"), + scheduledPod("p2", 500, 1, "n2"), + scheduledPod("p3", 500, 1, "n2"), + }, + eligible: []string{"n1"}, + actuationStatus: &fakeActuationStatus{ + currentlyDrained: []string{"n2"}, + }, + wantUnneeded: []string{}, + wantErr: true, + }, + { + name: "pods from multiple drained nodes can schedule elsewhere", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + nodeUndergoingDeletion("n2", 2000, 10), + BuildTestNode("n3", 1000, 10), + nodeUndergoingDeletion("n4", 2000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 500, 1, "n2"), + scheduledPod("p2", 500, 1, "n2"), + scheduledPod("p4", 500, 1, "n4"), + scheduledPod("p5", 500, 1, "n4"), + }, + eligible: []string{"n1", "n3"}, + actuationStatus: &fakeActuationStatus{ + currentlyDrained: []string{"n2", "n4"}, + }, + wantUnneeded: []string{}, + }, + { + name: "pods from multiple drained nodes can't schedule elsewhere", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + nodeUndergoingDeletion("n2", 2000, 10), + BuildTestNode("n3", 1000, 10), + nodeUndergoingDeletion("n4", 2000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 500, 1, "n2"), + scheduledPod("p2", 500, 1, "n2"), + scheduledPod("p3", 500, 1, "n2"), + scheduledPod("p4", 500, 1, "n4"), + scheduledPod("p5", 500, 1, "n4"), + }, + eligible: []string{"n1", "n3"}, + actuationStatus: &fakeActuationStatus{ + currentlyDrained: []string{"n2", "n4"}, + }, + wantUnneeded: []string{}, + wantErr: true, + }, + { + name: "multiple drained nodes but new candidates found", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 2000, 10), + nodeUndergoingDeletion("n2", 2000, 10), + BuildTestNode("n3", 2000, 10), + nodeUndergoingDeletion("n4", 2000, 10), + BuildTestNode("n5", 2000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 400, 1, "n1"), + scheduledPod("p2", 400, 1, "n2"), + scheduledPod("p3", 400, 1, "n3"), + scheduledPod("p4", 400, 1, "n4"), + scheduledPod("p5", 400, 1, "n5"), + }, + eligible: []string{"n1", "n3", "n5"}, + actuationStatus: &fakeActuationStatus{ + currentlyDrained: []string{"n2", "n4"}, + }, + wantUnneeded: []string{"n1", "n3"}, + }, + { + name: "recently evicted pods can schedule elsewhere, node uneeded", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + BuildTestNode("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 500, 1, "n2"), + scheduledPod("p2", 500, 1, "n2"), + }, + eligible: []string{"n1", "n2"}, + actuationStatus: &fakeActuationStatus{ + recentEvictions: []*apiv1.Pod{ + scheduledPod("p3", 500, 1, "n4"), + }, + }, + wantUnneeded: []string{"n1"}, + }, + { + name: "recently evicted pods can schedule elsewhere, no unneeded", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + BuildTestNode("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 500, 1, "n2"), + scheduledPod("p2", 500, 1, "n2"), + }, + eligible: []string{"n1", "n2"}, + actuationStatus: &fakeActuationStatus{ + recentEvictions: []*apiv1.Pod{ + scheduledPod("p3", 500, 1, "n4"), + scheduledPod("p4", 500, 1, "n4"), + scheduledPod("p5", 500, 1, "n4"), + }, + }, + wantUnneeded: []string{}, + }, + { + name: "recently evicted pods can't schedule elsewhere", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + BuildTestNode("n2", 1000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 500, 1, "n1"), + scheduledPod("p2", 500, 1, "n1"), + }, + eligible: []string{"n1", "n2"}, + actuationStatus: &fakeActuationStatus{ + recentEvictions: []*apiv1.Pod{ + scheduledPod("p3", 500, 1, "n3"), + scheduledPod("p4", 500, 1, "n3"), + scheduledPod("p5", 500, 1, "n3"), + }, + }, + wantUnneeded: []string{}, + wantErr: true, + }, + { + name: "multiple drained nodes and recent evictions, no unneeded", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + nodeUndergoingDeletion("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + nodeUndergoingDeletion("n4", 1000, 10), + BuildTestNode("n5", 1000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 200, 1, "n1"), + scheduledPod("p2", 200, 1, "n2"), + scheduledPod("p3", 200, 1, "n3"), + scheduledPod("p4", 200, 1, "n4"), + scheduledPod("p5", 200, 1, "n5"), + }, + eligible: []string{"n1", "n3", "n5"}, + actuationStatus: &fakeActuationStatus{ + currentlyDrained: []string{"n2", "n4"}, + recentEvictions: []*apiv1.Pod{ + scheduledPod("p6", 600, 1, "n6"), + scheduledPod("p7", 600, 1, "n6"), + }, + }, + wantUnneeded: []string{}, + }, + { + name: "multiple drained nodes and recent evictions, one unneeded", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + nodeUndergoingDeletion("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + nodeUndergoingDeletion("n4", 1000, 10), + BuildTestNode("n5", 1000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 200, 1, "n1"), + scheduledPod("p2", 200, 1, "n2"), + scheduledPod("p3", 200, 1, "n3"), + scheduledPod("p4", 200, 1, "n4"), + scheduledPod("p5", 200, 1, "n5"), + }, + eligible: []string{"n1", "n3", "n5"}, + actuationStatus: &fakeActuationStatus{ + currentlyDrained: []string{"n2", "n4"}, + recentEvictions: []*apiv1.Pod{ + scheduledPod("p6", 600, 1, "n6"), + }, + }, + wantUnneeded: []string{"n1"}, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + rsLister, err := kube_util.NewTestReplicaSetLister(generateReplicaSets()) + assert.NoError(t, err) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil) + provider := testprovider.NewTestCloudProvider(nil, nil) + context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, registry, provider, nil, nil) + assert.NoError(t, err) + clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods) + deleteOptions := simulator.NodeDeleteOptions{} + p := New(&context, NewTestProcessors(), deleteOptions) + p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)} + // TODO(x13n): test subsets of nodes passed as podDestinations/scaleDownCandidates. + aErr := p.UpdateClusterState(tc.nodes, tc.nodes, tc.actuationStatus, nil, time.Now()) + if tc.wantErr { + assert.Error(t, aErr) + } else { + assert.NoError(t, aErr) + } + wantUnneeded := asMap(tc.wantUnneeded) + for _, n := range tc.nodes { + if wantUnneeded[n.Name] { + assert.True(t, p.unneededNodes.Contains(n.Name), n.Name) + } else { + assert.False(t, p.unneededNodes.Contains(n.Name), n.Name) + } + } + }) + } +} + +func generateReplicaSets() []*appsv1.ReplicaSet { + replicas := int32(5) + return []*appsv1.ReplicaSet{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "rs", + Namespace: "default", + SelfLink: "api/v1/namespaces/default/replicasets/rs", + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: &replicas, + }, + }, + } +} + +func scheduledPod(name string, cpu, memory int64, nodeName string) *apiv1.Pod { + p := BuildTestPod(name, cpu, memory) + p.OwnerReferences = GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", "") + p.Spec.NodeName = nodeName + return p +} + +func nodeUndergoingDeletion(name string, cpu, memory int64) *apiv1.Node { + n := BuildTestNode(name, cpu, memory) + toBeDeletedTaint := apiv1.Taint{Key: deletetaint.ToBeDeletedTaint, Effect: apiv1.TaintEffectNoSchedule} + n.Spec.Taints = append(n.Spec.Taints, toBeDeletedTaint) + return n +} + +type fakeActuationStatus struct { + recentEvictions []*apiv1.Pod + currentlyDrained []string +} + +func (f *fakeActuationStatus) RecentEvictions() []*apiv1.Pod { + return f.recentEvictions +} + +func (f *fakeActuationStatus) DeletionsInProgress() ([]string, []string) { + return nil, f.currentlyDrained +} + +func (f *fakeActuationStatus) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) { + return nil, time.Time{} +} + +func (f *fakeActuationStatus) DeletionsCount(nodeGroup string) int { + return 0 +} + +type fakeEligibilityChecker struct { + eligible map[string]bool +} + +func (f *fakeEligibilityChecker) FilterOutUnremovable(context *context.AutoscalingContext, scaleDownCandidates []*apiv1.Node, timestamp time.Time, unremovableNodes *unremovable.Nodes) ([]string, map[string]utilization.Info, []*simulator.UnremovableNode) { + eligible := []string{} + utilMap := make(map[string]utilization.Info) + for _, n := range scaleDownCandidates { + if f.eligible[n.Name] { + eligible = append(eligible, n.Name) + utilMap[n.Name] = utilization.Info{} + } else { + unremovableNodes.AddReason(n, simulator.UnexpectedError) + } + } + return eligible, utilMap, nil +} diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index 8ec92d8414ca..5cf2439c84f6 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -123,7 +123,7 @@ func (r *RemovalSimulator) FindNodesToRemove( } for _, nodeName := range candidates { - rn, urn := r.CheckNodeRemoval(nodeName, destinationMap, timestamp, pdbs) + rn, urn := r.SimulateNodeRemoval(nodeName, destinationMap, timestamp, pdbs) if rn != nil { result = append(result, *rn) } else if urn != nil { @@ -133,10 +133,11 @@ func (r *RemovalSimulator) FindNodesToRemove( return result, unremovable, nil } -// CheckNodeRemoval checks whether a specific node can be removed. Depending on +// SimulateNodeRemoval simulates removing a node from the cluster to check +// whether it is possible to move its pods. Depending on // the outcome, exactly one of (NodeToBeRemoved, UnremovableNode) will be // populated in the return value, the other will be nil. -func (r *RemovalSimulator) CheckNodeRemoval( +func (r *RemovalSimulator) SimulateNodeRemoval( nodeName string, destinationMap map[string]bool, timestamp time.Time, diff --git a/cluster-autoscaler/utils/errors/errors.go b/cluster-autoscaler/utils/errors/errors.go index 413699e4ac75..2276e53f3ee9 100644 --- a/cluster-autoscaler/utils/errors/errors.go +++ b/cluster-autoscaler/utils/errors/errors.go @@ -61,6 +61,10 @@ const ( // NodeGroupDoesNotExistError signifies that a NodeGroup // does not exist. NodeGroupDoesNotExistError AutoscalerErrorType = "nodeGroupDoesNotExistError" + // UnexpectedScaleDownStateError means Cluster Autoscaler thinks ongoing + // scale down is already removing too much and so further node removals + // shouldn't be attempted. + UnexpectedScaleDownStateError AutoscalerErrorType = "unexpectedScaleDownStateError" ) // NewAutoscalerError returns new autoscaler error with a message constructed from format string