Skip to content

Commit

Permalink
CA: stop passing registered upcoming nodes as scale-down candidates
Browse files Browse the repository at this point in the history
Without this, with aggressive settings, scale-down could be removing
registered upcoming nodes before they have a chance to become ready
(the duration of which should be unrelated to the scale-down settings).
  • Loading branch information
towca committed Feb 10, 2023
1 parent 6978ff8 commit 7e67625
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 62 deletions.
19 changes: 13 additions & 6 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,12 +916,14 @@ func (csr *ClusterStateRegistry) GetIncorrectNodeGroupSize(nodeGroupName string)
}

// GetUpcomingNodes returns how many new nodes will be added shortly to the node groups or should become ready soon.
// The function may overestimate the number of nodes.
func (csr *ClusterStateRegistry) GetUpcomingNodes() map[string]int {
// The function may overestimate the number of nodes. The second return value contains the names of upcoming nodes
// that are already registered in the cluster.
func (csr *ClusterStateRegistry) GetUpcomingNodes() (upcomingCounts map[string]int, registeredNodeNames map[string][]string) {
csr.Lock()
defer csr.Unlock()

result := make(map[string]int)
upcomingCounts = map[string]int{}
registeredNodeNames = map[string][]string{}
for _, nodeGroup := range csr.cloudProvider.NodeGroups() {
id := nodeGroup.Id()
readiness := csr.perNodeGroupReadiness[id]
Expand All @@ -932,9 +934,14 @@ func (csr *ClusterStateRegistry) GetUpcomingNodes() map[string]int {
// Negative value is unlikely but theoretically possible.
continue
}
result[id] = newNodes
}
return result
upcomingCounts[id] = newNodes
// newNodes should include instances that have registered with k8s but aren't ready yet, instances that came up on the cloud provider side
// but haven't registered with k8s yet, and instances that haven't even come up on the cloud provider side yet (but are reflected in the target
// size). The first category is categorized as NotStarted in readiness, the other two aren't registered with k8s, so they shouldn't be
// included.
registeredNodeNames[id] = readiness.NotStarted
}
return upcomingCounts, registeredNodeNames
}

// getCloudProviderNodeInstances returns map keyed on node group id where value is list of node instances
Expand Down
19 changes: 14 additions & 5 deletions cluster-autoscaler/clusterstate/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,9 @@ func TestUnreadyLongAfterCreation(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Unready))
assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted))
upcoming := clusterstate.GetUpcomingNodes()
upcoming, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 0, upcoming["ng1"])
assert.Empty(t, upcomingRegistered["ng1"])
}

func TestNotStarted(t *testing.T) {
Expand Down Expand Up @@ -524,12 +525,17 @@ func TestUpcomingNodes(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, clusterstate.GetScaleUpFailures())

upcomingNodes := clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 6, upcomingNodes["ng1"])
assert.Empty(t, upcomingRegistered["ng1"]) // Only unregistered.
assert.Equal(t, 1, upcomingNodes["ng2"])
assert.Empty(t, upcomingRegistered["ng2"]) // Only unregistered.
assert.Equal(t, 2, upcomingNodes["ng3"])
assert.Equal(t, []string{"ng3-1"}, upcomingRegistered["ng3"]) // 1 registered, 1 unregistered.
assert.NotContains(t, upcomingNodes, "ng4")
assert.NotContains(t, upcomingRegistered, "ng4")
assert.Equal(t, 0, upcomingNodes["ng5"])
assert.Empty(t, upcomingRegistered["ng5"])
}

func TestTaintBasedNodeDeletion(t *testing.T) {
Expand Down Expand Up @@ -566,8 +572,9 @@ func TestTaintBasedNodeDeletion(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, clusterstate.GetScaleUpFailures())

upcomingNodes := clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 1, upcomingNodes["ng1"])
assert.Empty(t, upcomingRegistered["ng1"]) // Only unregistered.
}

func TestIncorrectSize(t *testing.T) {
Expand Down Expand Up @@ -624,17 +631,19 @@ func TestUnregisteredNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes()))
assert.Equal(t, "ng1-2", clusterstate.GetUnregisteredNodes()[0].Node.Name)
upcomingNodes := clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 1, upcomingNodes["ng1"])
assert.Empty(t, upcomingRegistered["ng1"]) // Unregistered only.

// The node didn't come up in MaxNodeProvisionTime, it should no longer be
// counted as upcoming (but it is still an unregistered node)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, time.Now().Add(time.Minute))
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes()))
assert.Equal(t, "ng1-2", clusterstate.GetUnregisteredNodes()[0].Node.Name)
upcomingNodes = clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered = clusterstate.GetUpcomingNodes()
assert.Equal(t, 0, len(upcomingNodes))
assert.Empty(t, upcomingRegistered["ng1"])

err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, time.Now().Add(time.Minute))
assert.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,9 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
klogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left())
podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)

upcomingCounts, _ := clusterStateRegistry.GetUpcomingNodes()
upcomingNodes := make([]*schedulerframework.NodeInfo, 0)
for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() {
for nodeGroup, numberOfNodes := range upcomingCounts {
nodeTemplate, found := nodeInfos[nodeGroup]
if !found {
return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(
Expand Down
101 changes: 69 additions & 32 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package core

import (
"errors"
"fmt"
"reflect"
"time"
Expand Down Expand Up @@ -49,7 +50,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
scheduler_utils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
Expand Down Expand Up @@ -239,30 +240,30 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
a.initialized = true
}

func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) errors.AutoscalerError {
func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError {
a.ClusterSnapshot.Clear()

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := a.ClusterSnapshot.AddNode(node); err != nil {
klog.Errorf("Failed to add node %s to cluster snapshot: %v", node.Name, err)
return errors.ToAutoscalerError(errors.InternalError, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := a.ClusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
klog.Errorf("Failed to add pod %s scheduled to node %s to cluster snapshot: %v", pod.Name, pod.Spec.NodeName, err)
return errors.ToAutoscalerError(errors.InternalError, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
}
return nil
}

// RunOnce iterates over node groups and scales them up/down if necessary
func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError {
func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerError {
a.cleanUpIfRequired()
a.processorCallbacks.reset()
a.clusterStateRegistry.PeriodicCleanup()
Expand All @@ -287,7 +288,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
originalScheduledPods, err := scheduledPodLister.List()
if err != nil {
klog.Errorf("Failed to list scheduled pods: %v", err)
return errors.ToAutoscalerError(errors.ApiCallError, err)
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}

if abortLoop, err := a.processors.ActionableClusterProcessor.ShouldAbort(
Expand All @@ -303,7 +304,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
daemonsets, err := a.ListerRegistry.DaemonSetLister().List(labels.Everything())
if err != nil {
klog.Errorf("Failed to get daemonset list: %v", err)
return errors.ToAutoscalerError(errors.ApiCallError, err)
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}

// Call CloudProvider.Refresh before any other calls to cloud provider.
Expand All @@ -312,7 +313,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
metrics.UpdateDurationFromStart(metrics.CloudProviderRefresh, refreshStart)
if err != nil {
klog.Errorf("Failed to refresh cloud provider config: %v", err)
return errors.ToAutoscalerError(errors.CloudProviderError, err)
return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
}

// Update node groups min/max and maximum number of nodes being set for all node groups after cloud provider refresh
Expand Down Expand Up @@ -344,7 +345,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
nodeInfosForGroups, err = a.processors.NodeInfoProcessor.Process(autoscalingContext, nodeInfosForGroups)
if err != nil {
klog.Errorf("Failed to process nodeInfos: %v", err)
return errors.ToAutoscalerError(errors.InternalError, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}

if typedErr := a.updateClusterState(allNodes, nodeInfosForGroups, currentTime); typedErr != nil {
Expand Down Expand Up @@ -421,7 +422,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)
if err != nil {
klog.Errorf("Failed to fix node group sizes: %v", err)
return errors.ToAutoscalerError(errors.CloudProviderError, err)
return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
}
if fixedSomething {
klog.V(0).Infof("Some node group target size was fixed, skipping the iteration")
Expand All @@ -433,7 +434,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
unschedulablePods, err := unschedulablePodLister.List()
if err != nil {
klog.Errorf("Failed to list unscheduled pods: %v", err)
return errors.ToAutoscalerError(errors.ApiCallError, err)
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
metrics.UpdateUnschedulablePodsCount(len(unschedulablePods))

Expand All @@ -450,21 +451,45 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
for _, p := range unschedulableWaitingForLowerPriorityPreemption {
if err := a.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err)
return errors.ToAutoscalerError(errors.InternalError, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}

// add upcoming nodes to ClusterSnapshot
upcomingNodes := getUpcomingNodeInfos(a.clusterStateRegistry, nodeInfosForGroups)
for _, upcomingNode := range upcomingNodes {
// Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet.
upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes()
// For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on
// them and not trigger another scale-up.
// The fake nodes are intentionally not added to the all nodes list, so that they are not considered as candidates for scale-down (which
// doesn't make sense as they're not real).
for _, upcomingNode := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
var pods []*apiv1.Pod
for _, podInfo := range upcomingNode.Pods {
pods = append(pods, podInfo.Pod)
}
err = a.ClusterSnapshot.AddNodeWithPods(upcomingNode.Node(), pods)
if err != nil {
klog.Errorf("Failed to add upcoming node %s to cluster snapshot: %v", upcomingNode.Node().Name, err)
return errors.ToAutoscalerError(errors.InternalError, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
// Some upcoming nodes can already be registered in the cluster, but not yet ready - we still inject replacements for them above. The actual registered nodes
// have to be filtered out of the all nodes list so that scale-down can't consider them as candidates. Otherwise, with aggressive scale-down settings, we
// could be removing the nodes before they have a chance to first become ready (the duration of which should be unrelated to the scale-down settings).
var allRegisteredUpcoming []string
for _, ngRegisteredUpcoming := range registeredUpcoming {
allRegisteredUpcoming = append(allRegisteredUpcoming, ngRegisteredUpcoming...)
}
allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming)
// Remove the nodes from the snapshot as well so that the state is consistent.
for _, notStartedNodeName := range allRegisteredUpcoming {
err := a.ClusterSnapshot.RemoveNode(notStartedNodeName)
if err != nil {
klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
// ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the
// node is not in the snapshot - so we don't have to error out in that case.
if !errors.Is(err, clustersnapshot.ErrNodeNotFound) {
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
}

Expand All @@ -486,7 +511,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return scaleUpStart
}

postScaleUp := func(scaleUpStart time.Time) (bool, errors.AutoscalerError) {
postScaleUp := func(scaleUpStart time.Time) (bool, caerrors.AutoscalerError) {
metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart)

if a.processors != nil && a.processors.ScaleUpStatusProcessor != nil {
Expand Down Expand Up @@ -534,7 +559,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
if err != nil {
scaleDownStatus.Result = scaledownstatus.ScaleDownError
klog.Errorf("Failed to list pod disruption budgets: %v", err)
return errors.ToAutoscalerError(errors.ApiCallError, err)
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}

unneededStart := time.Now()
Expand All @@ -554,7 +579,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleDownCandidates = allNodes
podDestinations = allNodes
} else {
var err errors.AutoscalerError
var err caerrors.AutoscalerError
scaleDownCandidates, err = a.processors.ScaleDownNodeProcessor.GetScaleDownCandidates(
autoscalingContext, allNodes)
if err != nil {
Expand Down Expand Up @@ -827,16 +852,16 @@ func (a *StaticAutoscaler) ExitCleanUp() {
a.clusterStateRegistry.Stop()
}

func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*apiv1.Node, []*apiv1.Node, errors.AutoscalerError) {
func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*apiv1.Node, []*apiv1.Node, caerrors.AutoscalerError) {
allNodes, err := a.AllNodeLister().List()
if err != nil {
klog.Errorf("Failed to list all nodes: %v", err)
return nil, nil, errors.ToAutoscalerError(errors.ApiCallError, err)
return nil, nil, caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
readyNodes, err := a.ReadyNodeLister().List()
if err != nil {
klog.Errorf("Failed to list ready nodes: %v", err)
return nil, nil, errors.ToAutoscalerError(errors.ApiCallError, err)
return nil, nil, caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}

// Handle GPU case - allocatable GPU may be equal to 0 up to 15 minutes after
Expand All @@ -849,12 +874,12 @@ func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*a
return allNodes, readyNodes, nil
}

func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulerframework.NodeInfo, currentTime time.Time) errors.AutoscalerError {
func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulerframework.NodeInfo, currentTime time.Time) caerrors.AutoscalerError {
err := a.clusterStateRegistry.UpdateNodes(allNodes, nodeInfosForGroups, currentTime)
if err != nil {
klog.Errorf("Failed to update node registry: %v", err)
a.scaleDownPlanner.CleanUpUnneededNodes()
return errors.ToAutoscalerError(errors.CloudProviderError, err)
return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err)
}
core_utils.UpdateClusterStateMetrics(a.clusterStateRegistry)

Expand All @@ -869,9 +894,9 @@ func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time) bool {
return found && oldest.Add(unschedulablePodWithGpuTimeBuffer).After(currentTime)
}

func getUpcomingNodeInfos(registry *clusterstate.ClusterStateRegistry, nodeInfos map[string]*schedulerframework.NodeInfo) []*schedulerframework.NodeInfo {
func getUpcomingNodeInfos(upcomingCounts map[string]int, nodeInfos map[string]*schedulerframework.NodeInfo) []*schedulerframework.NodeInfo {
upcomingNodes := make([]*schedulerframework.NodeInfo, 0)
for nodeGroup, numberOfNodes := range registry.GetUpcomingNodes() {
for nodeGroup, numberOfNodes := range upcomingCounts {
nodeTemplate, found := nodeInfos[nodeGroup]
if !found {
klog.Warningf("Couldn't find template for node group %s", nodeGroup)
Expand Down Expand Up @@ -921,17 +946,29 @@ func countsByReason(nodes []*simulator.UnremovableNode) map[simulator.Unremovabl
return counts
}

func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
func subtractNodesByName(nodes []*apiv1.Node, namesToRemove []string) []*apiv1.Node {
var c []*apiv1.Node
namesToDrop := make(map[string]bool)
for _, n := range b {
namesToDrop[n.Name] = true
removeSet := make(map[string]bool)
for _, name := range namesToRemove {
removeSet[name] = true
}
for _, n := range a {
if namesToDrop[n.Name] {
for _, n := range nodes {
if removeSet[n.Name] {
continue
}
c = append(c, n)
}
return c
}

func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
return subtractNodesByName(a, nodeNames(b))
}

func nodeNames(ns []*apiv1.Node) []string {
names := make([]string, len(ns))
for i, node := range ns {
names[i] = node.Name
}
return names
}
Loading

0 comments on commit 7e67625

Please sign in to comment.