From 6978ff8829b682dc0a2a26b686ef99b9252fb8d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Fri, 3 Feb 2023 17:40:09 +0100 Subject: [PATCH 1/2] CA: Make CSR's Readiness keep lists of node names instead of just their count This does make us call len() in a bunch of places within CSR, but allows for greater flexibility - it's possible to act on the sets of nodes determined by Readiness. --- .../clusterstate/clusterstate.go | 111 +++++++++--------- .../clusterstate/clusterstate_test.go | 22 ++-- cluster-autoscaler/core/utils/utils.go | 2 +- 3 files changed, 66 insertions(+), 69 deletions(-) diff --git a/cluster-autoscaler/clusterstate/clusterstate.go b/cluster-autoscaler/clusterstate/clusterstate.go index 2141188eb03a..d3ec72f8536a 100644 --- a/cluster-autoscaler/clusterstate/clusterstate.go +++ b/cluster-autoscaler/clusterstate/clusterstate.go @@ -354,7 +354,7 @@ func (csr *ClusterStateRegistry) IsClusterHealthy() bool { csr.Lock() defer csr.Unlock() - totalUnready := csr.totalReadiness.Unready + totalUnready := len(csr.totalReadiness.Unready) if totalUnready > csr.config.OkTotalUnreadyCount && float64(totalUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*float64(len(csr.nodes)) { @@ -384,14 +384,14 @@ func (csr *ClusterStateRegistry) IsNodeGroupHealthy(nodeGroupName string) bool { unjustifiedUnready := 0 // Too few nodes, something is missing. Below the expected node count. - if readiness.Ready < acceptable.MinNodes { - unjustifiedUnready += acceptable.MinNodes - readiness.Ready + if len(readiness.Ready) < acceptable.MinNodes { + unjustifiedUnready += acceptable.MinNodes - len(readiness.Ready) } // TODO: verify against max nodes as well. if unjustifiedUnready > csr.config.OkTotalUnreadyCount && float64(unjustifiedUnready) > csr.config.MaxTotalUnreadyPercentage/100.0* - float64(readiness.Ready+readiness.Unready+readiness.NotStarted) { + float64(len(readiness.Ready)+len(readiness.Unready)+len(readiness.NotStarted)) { return false } @@ -444,7 +444,7 @@ func (csr *ClusterStateRegistry) getProvisionedAndTargetSizesForNodeGroup(nodeGr } return 0, target, true } - provisioned = readiness.Registered - readiness.NotStarted + provisioned = len(readiness.Registered) - len(readiness.NotStarted) return provisioned, target, true } @@ -496,7 +496,7 @@ func (csr *ClusterStateRegistry) updateAcceptableRanges(targetSize map[string]in size := targetSize[nodeGroup.Id()] readiness := csr.perNodeGroupReadiness[nodeGroup.Id()] result[nodeGroup.Id()] = AcceptableRange{ - MinNodes: size - readiness.LongUnregistered, + MinNodes: size - len(readiness.LongUnregistered), MaxNodes: size, CurrentTarget: size, } @@ -516,46 +516,45 @@ func (csr *ClusterStateRegistry) updateAcceptableRanges(targetSize map[string]in // Readiness contains readiness information about a group of nodes. type Readiness struct { - // Number of ready nodes. - Ready int - // Number of unready nodes that broke down after they started. - Unready int - // Number of nodes that are being currently deleted. They exist in K8S but + // Names of ready nodes. + Ready []string + // Names of unready nodes that broke down after they started. + Unready []string + // Names of nodes that are being currently deleted. They exist in K8S but // are not included in NodeGroup.TargetSize(). - Deleted int - // Number of nodes that are not yet fully started. - NotStarted int - // Number of all registered nodes in the group (ready/unready/deleted/etc). - Registered int - // Number of nodes that failed to register within a reasonable limit. - LongUnregistered int - // Number of nodes that haven't yet registered. - Unregistered int + Deleted []string + // Names of nodes that are not yet fully started. + NotStarted []string + // Names of all registered nodes in the group (ready/unready/deleted/etc). + Registered []string + // Names of nodes that failed to register within a reasonable limit. + LongUnregistered []string + // Names of nodes that haven't yet registered. + Unregistered []string // Time when the readiness was measured. Time time.Time - // Number of nodes that are Unready due to missing resources. + // Names of nodes that are Unready due to missing resources. // This field is only used for exposing information externally and // doesn't influence CA behavior. - ResourceUnready int + ResourceUnready []string } func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) { - perNodeGroup := make(map[string]Readiness) total := Readiness{Time: currentTime} update := func(current Readiness, node *apiv1.Node, nr kube_util.NodeReadiness) Readiness { - current.Registered++ + current.Registered = append(current.Registered, node.Name) if _, isDeleted := csr.deletedNodes[node.Name]; isDeleted { - current.Deleted++ + current.Deleted = append(current.Deleted, node.Name) } else if nr.Ready { - current.Ready++ + current.Ready = append(current.Ready, node.Name) } else if node.CreationTimestamp.Time.Add(MaxNodeStartupTime).After(currentTime) { - current.NotStarted++ + current.NotStarted = append(current.NotStarted, node.Name) } else { - current.Unready++ + current.Unready = append(current.Unready, node.Name) if nr.Reason == kube_util.ResourceUnready { - current.ResourceUnready++ + current.ResourceUnready = append(current.ResourceUnready, node.Name) } } return current @@ -579,7 +578,6 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) { total = update(total, node, nr) } - var longUnregisteredNodeNames []string for _, unregistered := range csr.unregisteredNodes { nodeGroup, errNg := csr.cloudProvider.NodeGroupForNode(unregistered.Node) if errNg != nil { @@ -592,17 +590,16 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) { } perNgCopy := perNodeGroup[nodeGroup.Id()] if unregistered.UnregisteredSince.Add(csr.config.MaxNodeProvisionTime).Before(currentTime) { - longUnregisteredNodeNames = append(longUnregisteredNodeNames, unregistered.Node.Name) - perNgCopy.LongUnregistered++ - total.LongUnregistered++ + perNgCopy.LongUnregistered = append(perNgCopy.LongUnregistered, unregistered.Node.Name) + total.LongUnregistered = append(total.LongUnregistered, unregistered.Node.Name) } else { - perNgCopy.Unregistered++ - total.Unregistered++ + perNgCopy.Unregistered = append(perNgCopy.Unregistered, unregistered.Node.Name) + total.Unregistered = append(total.Unregistered, unregistered.Node.Name) } perNodeGroup[nodeGroup.Id()] = perNgCopy } - if total.LongUnregistered > 0 { - klog.V(3).Infof("Found longUnregistered Nodes %s", longUnregisteredNodeNames) + if len(total.LongUnregistered) > 0 { + klog.V(3).Infof("Found longUnregistered Nodes %s", total.LongUnregistered) } for ngId, ngReadiness := range perNodeGroup { @@ -630,10 +627,10 @@ func (csr *ClusterStateRegistry) updateIncorrectNodeGroupSizes(currentTime time. } continue } - if readiness.Registered > acceptableRange.MaxNodes || - readiness.Registered < acceptableRange.MinNodes { + if len(readiness.Registered) > acceptableRange.MaxNodes || + len(readiness.Registered) < acceptableRange.MinNodes { incorrect := IncorrectNodeGroupSize{ - CurrentSize: readiness.Registered, + CurrentSize: len(readiness.Registered), ExpectedSize: acceptableRange.CurrentTarget, FirstObserved: currentTime, } @@ -752,12 +749,12 @@ func buildHealthStatusNodeGroup(isReady bool, readiness Readiness, acceptable Ac condition := api.ClusterAutoscalerCondition{ Type: api.ClusterAutoscalerHealth, Message: fmt.Sprintf("ready=%d unready=%d (resourceUnready=%d) notStarted=%d longNotStarted=0 registered=%d longUnregistered=%d cloudProviderTarget=%d (minSize=%d, maxSize=%d)", - readiness.Ready, - readiness.Unready, - readiness.ResourceUnready, - readiness.NotStarted, - readiness.Registered, - readiness.LongUnregistered, + len(readiness.Ready), + len(readiness.Unready), + len(readiness.ResourceUnready), + len(readiness.NotStarted), + len(readiness.Registered), + len(readiness.LongUnregistered), acceptable.CurrentTarget, minSize, maxSize), @@ -775,7 +772,7 @@ func buildScaleUpStatusNodeGroup(isScaleUpInProgress bool, isSafeToScaleUp bool, condition := api.ClusterAutoscalerCondition{ Type: api.ClusterAutoscalerScaleUp, Message: fmt.Sprintf("ready=%d cloudProviderTarget=%d", - readiness.Ready, + len(readiness.Ready), acceptable.CurrentTarget), LastProbeTime: metav1.Time{Time: readiness.Time}, } @@ -807,12 +804,12 @@ func buildHealthStatusClusterwide(isReady bool, readiness Readiness) api.Cluster condition := api.ClusterAutoscalerCondition{ Type: api.ClusterAutoscalerHealth, Message: fmt.Sprintf("ready=%d unready=%d (resourceUnready=%d) notStarted=%d longNotStarted=0 registered=%d longUnregistered=%d", - readiness.Ready, - readiness.Unready, - readiness.ResourceUnready, - readiness.NotStarted, - readiness.Registered, - readiness.LongUnregistered, + len(readiness.Ready), + len(readiness.Unready), + len(readiness.ResourceUnready), + len(readiness.NotStarted), + len(readiness.Registered), + len(readiness.LongUnregistered), ), LastProbeTime: metav1.Time{Time: readiness.Time}, } @@ -838,8 +835,8 @@ func buildScaleUpStatusClusterwide(nodeGroupStatuses []api.NodeGroupStatus, read condition := api.ClusterAutoscalerCondition{ Type: api.ClusterAutoscalerScaleUp, Message: fmt.Sprintf("ready=%d registered=%d", - readiness.Ready, - readiness.Registered), + len(readiness.Ready), + len(readiness.Registered)), LastProbeTime: metav1.Time{Time: readiness.Time}, } if isScaleUpInProgress { @@ -930,7 +927,7 @@ func (csr *ClusterStateRegistry) GetUpcomingNodes() map[string]int { readiness := csr.perNodeGroupReadiness[id] ar := csr.acceptableRanges[id] // newNodes is the number of nodes that - newNodes := ar.CurrentTarget - (readiness.Ready + readiness.Unready + readiness.LongUnregistered) + newNodes := ar.CurrentTarget - (len(readiness.Ready) + len(readiness.Unready) + len(readiness.LongUnregistered)) if newNodes <= 0 { // Negative value is unlikely but theoretically possible. continue @@ -1003,7 +1000,7 @@ func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetS targetSize += accRange.CurrentTarget } for _, readiness := range csr.perNodeGroupReadiness { - currentSize += readiness.Registered - readiness.NotStarted + currentSize += len(readiness.Registered) - len(readiness.NotStarted) } return currentSize, targetSize } diff --git a/cluster-autoscaler/clusterstate/clusterstate_test.go b/cluster-autoscaler/clusterstate/clusterstate_test.go index 8587c32b6dc7..c8abc094b1c8 100644 --- a/cluster-autoscaler/clusterstate/clusterstate_test.go +++ b/cluster-autoscaler/clusterstate/clusterstate_test.go @@ -359,8 +359,8 @@ func TestUnreadyLongAfterCreation(t *testing.T) { }, fakeLogRecorder, newBackoff()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) - assert.Equal(t, 1, clusterstate.GetClusterReadiness().Unready) - assert.Equal(t, 0, clusterstate.GetClusterReadiness().NotStarted) + assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Unready)) + assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted)) upcoming := clusterstate.GetUpcomingNodes() assert.Equal(t, 0, upcoming["ng1"]) } @@ -390,22 +390,22 @@ func TestNotStarted(t *testing.T) { }, fakeLogRecorder, newBackoff()) err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) - assert.Equal(t, 1, clusterstate.GetClusterReadiness().NotStarted) - assert.Equal(t, 1, clusterstate.GetClusterReadiness().Ready) + assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().NotStarted)) + assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Ready)) // node ng2_1 moves condition to ready SetNodeReadyState(ng2_1, true, now.Add(-4*time.Minute)) err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) - assert.Equal(t, 1, clusterstate.GetClusterReadiness().NotStarted) - assert.Equal(t, 1, clusterstate.GetClusterReadiness().Ready) + assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().NotStarted)) + assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Ready)) // node ng2_1 no longer has the taint RemoveNodeNotReadyTaint(ng2_1) err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now) assert.NoError(t, err) - assert.Equal(t, 0, clusterstate.GetClusterReadiness().NotStarted) - assert.Equal(t, 2, clusterstate.GetClusterReadiness().Ready) + assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted)) + assert.Equal(t, 2, len(clusterstate.GetClusterReadiness().Ready)) } func TestExpiredScaleUp(t *testing.T) { @@ -686,7 +686,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate))) assert.Equal(t, "ng1-2", GetCloudProviderDeletedNodeNames(clusterstate)[0]) - assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted) + assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted)) // The node is removed from Kubernetes now.Add(time.Minute) @@ -719,7 +719,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate))) assert.Equal(t, "ng1-3", GetCloudProviderDeletedNodeNames(clusterstate)[0]) - assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted) + assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted)) // Confirm that previously identified deleted Cloud Provider nodes are still included // until it is removed from Kubernetes @@ -729,7 +729,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate))) assert.Equal(t, "ng1-3", GetCloudProviderDeletedNodeNames(clusterstate)[0]) - assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted) + assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted)) // The node is removed from Kubernetes now.Add(time.Minute) diff --git a/cluster-autoscaler/core/utils/utils.go b/cluster-autoscaler/core/utils/utils.go index d4c3b93d87e9..5f35692acf1e 100644 --- a/cluster-autoscaler/core/utils/utils.go +++ b/cluster-autoscaler/core/utils/utils.go @@ -183,7 +183,7 @@ func UpdateClusterStateMetrics(csr *clusterstate.ClusterStateRegistry) { } metrics.UpdateClusterSafeToAutoscale(csr.IsClusterHealthy()) readiness := csr.GetClusterReadiness() - metrics.UpdateNodesCount(readiness.Ready, readiness.Unready, readiness.NotStarted, readiness.LongUnregistered, readiness.Unregistered) + metrics.UpdateNodesCount(len(readiness.Ready), len(readiness.Unready), len(readiness.NotStarted), len(readiness.LongUnregistered), len(readiness.Unregistered)) } // GetOldestCreateTime returns oldest creation time out of the pods in the set From 7e6762535b0e46f24c5a4a3c5495dda772604920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Wed, 8 Feb 2023 12:04:07 +0100 Subject: [PATCH 2/2] CA: stop passing registered upcoming nodes as scale-down candidates Without this, with aggressive settings, scale-down could be removing registered upcoming nodes before they have a chance to become ready (the duration of which should be unrelated to the scale-down settings). --- .../clusterstate/clusterstate.go | 19 +- .../clusterstate/clusterstate_test.go | 19 +- cluster-autoscaler/core/scale_up.go | 3 +- cluster-autoscaler/core/static_autoscaler.go | 101 ++++++---- .../core/static_autoscaler_test.go | 175 +++++++++++++++++- .../simulator/clustersnapshot/basic.go | 8 +- .../clustersnapshot/clustersnapshot.go | 3 +- .../simulator/clustersnapshot/delta.go | 10 +- 8 files changed, 276 insertions(+), 62 deletions(-) diff --git a/cluster-autoscaler/clusterstate/clusterstate.go b/cluster-autoscaler/clusterstate/clusterstate.go index d3ec72f8536a..56d14c4d64a8 100644 --- a/cluster-autoscaler/clusterstate/clusterstate.go +++ b/cluster-autoscaler/clusterstate/clusterstate.go @@ -916,12 +916,14 @@ func (csr *ClusterStateRegistry) GetIncorrectNodeGroupSize(nodeGroupName string) } // GetUpcomingNodes returns how many new nodes will be added shortly to the node groups or should become ready soon. -// The function may overestimate the number of nodes. -func (csr *ClusterStateRegistry) GetUpcomingNodes() map[string]int { +// The function may overestimate the number of nodes. The second return value contains the names of upcoming nodes +// that are already registered in the cluster. +func (csr *ClusterStateRegistry) GetUpcomingNodes() (upcomingCounts map[string]int, registeredNodeNames map[string][]string) { csr.Lock() defer csr.Unlock() - result := make(map[string]int) + upcomingCounts = map[string]int{} + registeredNodeNames = map[string][]string{} for _, nodeGroup := range csr.cloudProvider.NodeGroups() { id := nodeGroup.Id() readiness := csr.perNodeGroupReadiness[id] @@ -932,9 +934,14 @@ func (csr *ClusterStateRegistry) GetUpcomingNodes() map[string]int { // Negative value is unlikely but theoretically possible. continue } - result[id] = newNodes - } - return result + upcomingCounts[id] = newNodes + // newNodes should include instances that have registered with k8s but aren't ready yet, instances that came up on the cloud provider side + // but haven't registered with k8s yet, and instances that haven't even come up on the cloud provider side yet (but are reflected in the target + // size). The first category is categorized as NotStarted in readiness, the other two aren't registered with k8s, so they shouldn't be + // included. + registeredNodeNames[id] = readiness.NotStarted + } + return upcomingCounts, registeredNodeNames } // getCloudProviderNodeInstances returns map keyed on node group id where value is list of node instances diff --git a/cluster-autoscaler/clusterstate/clusterstate_test.go b/cluster-autoscaler/clusterstate/clusterstate_test.go index c8abc094b1c8..28aa89bd334c 100644 --- a/cluster-autoscaler/clusterstate/clusterstate_test.go +++ b/cluster-autoscaler/clusterstate/clusterstate_test.go @@ -361,8 +361,9 @@ func TestUnreadyLongAfterCreation(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Unready)) assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted)) - upcoming := clusterstate.GetUpcomingNodes() + upcoming, upcomingRegistered := clusterstate.GetUpcomingNodes() assert.Equal(t, 0, upcoming["ng1"]) + assert.Empty(t, upcomingRegistered["ng1"]) } func TestNotStarted(t *testing.T) { @@ -524,12 +525,17 @@ func TestUpcomingNodes(t *testing.T) { assert.NoError(t, err) assert.Empty(t, clusterstate.GetScaleUpFailures()) - upcomingNodes := clusterstate.GetUpcomingNodes() + upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes() assert.Equal(t, 6, upcomingNodes["ng1"]) + assert.Empty(t, upcomingRegistered["ng1"]) // Only unregistered. assert.Equal(t, 1, upcomingNodes["ng2"]) + assert.Empty(t, upcomingRegistered["ng2"]) // Only unregistered. assert.Equal(t, 2, upcomingNodes["ng3"]) + assert.Equal(t, []string{"ng3-1"}, upcomingRegistered["ng3"]) // 1 registered, 1 unregistered. assert.NotContains(t, upcomingNodes, "ng4") + assert.NotContains(t, upcomingRegistered, "ng4") assert.Equal(t, 0, upcomingNodes["ng5"]) + assert.Empty(t, upcomingRegistered["ng5"]) } func TestTaintBasedNodeDeletion(t *testing.T) { @@ -566,8 +572,9 @@ func TestTaintBasedNodeDeletion(t *testing.T) { assert.NoError(t, err) assert.Empty(t, clusterstate.GetScaleUpFailures()) - upcomingNodes := clusterstate.GetUpcomingNodes() + upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes() assert.Equal(t, 1, upcomingNodes["ng1"]) + assert.Empty(t, upcomingRegistered["ng1"]) // Only unregistered. } func TestIncorrectSize(t *testing.T) { @@ -624,8 +631,9 @@ func TestUnregisteredNodes(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes())) assert.Equal(t, "ng1-2", clusterstate.GetUnregisteredNodes()[0].Node.Name) - upcomingNodes := clusterstate.GetUpcomingNodes() + upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes() assert.Equal(t, 1, upcomingNodes["ng1"]) + assert.Empty(t, upcomingRegistered["ng1"]) // Unregistered only. // The node didn't come up in MaxNodeProvisionTime, it should no longer be // counted as upcoming (but it is still an unregistered node) @@ -633,8 +641,9 @@ func TestUnregisteredNodes(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes())) assert.Equal(t, "ng1-2", clusterstate.GetUnregisteredNodes()[0].Node.Name) - upcomingNodes = clusterstate.GetUpcomingNodes() + upcomingNodes, upcomingRegistered = clusterstate.GetUpcomingNodes() assert.Equal(t, 0, len(upcomingNodes)) + assert.Empty(t, upcomingRegistered["ng1"]) err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, time.Now().Add(time.Minute)) assert.NoError(t, err) diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index 173d4048db0f..a68c35e8d37a 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -178,8 +178,9 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto klogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left()) podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods) + upcomingCounts, _ := clusterStateRegistry.GetUpcomingNodes() upcomingNodes := make([]*schedulerframework.NodeInfo, 0) - for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() { + for nodeGroup, numberOfNodes := range upcomingCounts { nodeTemplate, found := nodeInfos[nodeGroup] if !found { return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError( diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 08870a1d8573..0bc2f1d501a7 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -17,6 +17,7 @@ limitations under the License. package core import ( + "errors" "fmt" "reflect" "time" @@ -49,7 +50,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" - "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" scheduler_utils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" @@ -239,14 +240,14 @@ func (a *StaticAutoscaler) cleanUpIfRequired() { a.initialized = true } -func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) errors.AutoscalerError { +func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError { a.ClusterSnapshot.Clear() knownNodes := make(map[string]bool) for _, node := range nodes { if err := a.ClusterSnapshot.AddNode(node); err != nil { klog.Errorf("Failed to add node %s to cluster snapshot: %v", node.Name, err) - return errors.ToAutoscalerError(errors.InternalError, err) + return caerrors.ToAutoscalerError(caerrors.InternalError, err) } knownNodes[node.Name] = true } @@ -254,7 +255,7 @@ func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, schedu if knownNodes[pod.Spec.NodeName] { if err := a.ClusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil { klog.Errorf("Failed to add pod %s scheduled to node %s to cluster snapshot: %v", pod.Name, pod.Spec.NodeName, err) - return errors.ToAutoscalerError(errors.InternalError, err) + return caerrors.ToAutoscalerError(caerrors.InternalError, err) } } } @@ -262,7 +263,7 @@ func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, schedu } // RunOnce iterates over node groups and scales them up/down if necessary -func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError { +func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerError { a.cleanUpIfRequired() a.processorCallbacks.reset() a.clusterStateRegistry.PeriodicCleanup() @@ -287,7 +288,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError originalScheduledPods, err := scheduledPodLister.List() if err != nil { klog.Errorf("Failed to list scheduled pods: %v", err) - return errors.ToAutoscalerError(errors.ApiCallError, err) + return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) } if abortLoop, err := a.processors.ActionableClusterProcessor.ShouldAbort( @@ -303,7 +304,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError daemonsets, err := a.ListerRegistry.DaemonSetLister().List(labels.Everything()) if err != nil { klog.Errorf("Failed to get daemonset list: %v", err) - return errors.ToAutoscalerError(errors.ApiCallError, err) + return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) } // Call CloudProvider.Refresh before any other calls to cloud provider. @@ -312,7 +313,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError metrics.UpdateDurationFromStart(metrics.CloudProviderRefresh, refreshStart) if err != nil { klog.Errorf("Failed to refresh cloud provider config: %v", err) - return errors.ToAutoscalerError(errors.CloudProviderError, err) + return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err) } // Update node groups min/max and maximum number of nodes being set for all node groups after cloud provider refresh @@ -344,7 +345,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError nodeInfosForGroups, err = a.processors.NodeInfoProcessor.Process(autoscalingContext, nodeInfosForGroups) if err != nil { klog.Errorf("Failed to process nodeInfos: %v", err) - return errors.ToAutoscalerError(errors.InternalError, err) + return caerrors.ToAutoscalerError(caerrors.InternalError, err) } if typedErr := a.updateClusterState(allNodes, nodeInfosForGroups, currentTime); typedErr != nil { @@ -421,7 +422,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime) if err != nil { klog.Errorf("Failed to fix node group sizes: %v", err) - return errors.ToAutoscalerError(errors.CloudProviderError, err) + return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err) } if fixedSomething { klog.V(0).Infof("Some node group target size was fixed, skipping the iteration") @@ -433,7 +434,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError unschedulablePods, err := unschedulablePodLister.List() if err != nil { klog.Errorf("Failed to list unscheduled pods: %v", err) - return errors.ToAutoscalerError(errors.ApiCallError, err) + return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) } metrics.UpdateUnschedulablePodsCount(len(unschedulablePods)) @@ -450,13 +451,17 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError for _, p := range unschedulableWaitingForLowerPriorityPreemption { if err := a.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil { klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err) - return errors.ToAutoscalerError(errors.InternalError, err) + return caerrors.ToAutoscalerError(caerrors.InternalError, err) } } - // add upcoming nodes to ClusterSnapshot - upcomingNodes := getUpcomingNodeInfos(a.clusterStateRegistry, nodeInfosForGroups) - for _, upcomingNode := range upcomingNodes { + // Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet. + upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes() + // For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on + // them and not trigger another scale-up. + // The fake nodes are intentionally not added to the all nodes list, so that they are not considered as candidates for scale-down (which + // doesn't make sense as they're not real). + for _, upcomingNode := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) { var pods []*apiv1.Pod for _, podInfo := range upcomingNode.Pods { pods = append(pods, podInfo.Pod) @@ -464,7 +469,27 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError err = a.ClusterSnapshot.AddNodeWithPods(upcomingNode.Node(), pods) if err != nil { klog.Errorf("Failed to add upcoming node %s to cluster snapshot: %v", upcomingNode.Node().Name, err) - return errors.ToAutoscalerError(errors.InternalError, err) + return caerrors.ToAutoscalerError(caerrors.InternalError, err) + } + } + // Some upcoming nodes can already be registered in the cluster, but not yet ready - we still inject replacements for them above. The actual registered nodes + // have to be filtered out of the all nodes list so that scale-down can't consider them as candidates. Otherwise, with aggressive scale-down settings, we + // could be removing the nodes before they have a chance to first become ready (the duration of which should be unrelated to the scale-down settings). + var allRegisteredUpcoming []string + for _, ngRegisteredUpcoming := range registeredUpcoming { + allRegisteredUpcoming = append(allRegisteredUpcoming, ngRegisteredUpcoming...) + } + allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming) + // Remove the nodes from the snapshot as well so that the state is consistent. + for _, notStartedNodeName := range allRegisteredUpcoming { + err := a.ClusterSnapshot.RemoveNode(notStartedNodeName) + if err != nil { + klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err) + // ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the + // node is not in the snapshot - so we don't have to error out in that case. + if !errors.Is(err, clustersnapshot.ErrNodeNotFound) { + return caerrors.ToAutoscalerError(caerrors.InternalError, err) + } } } @@ -486,7 +511,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError return scaleUpStart } - postScaleUp := func(scaleUpStart time.Time) (bool, errors.AutoscalerError) { + postScaleUp := func(scaleUpStart time.Time) (bool, caerrors.AutoscalerError) { metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart) if a.processors != nil && a.processors.ScaleUpStatusProcessor != nil { @@ -534,7 +559,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError if err != nil { scaleDownStatus.Result = scaledownstatus.ScaleDownError klog.Errorf("Failed to list pod disruption budgets: %v", err) - return errors.ToAutoscalerError(errors.ApiCallError, err) + return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) } unneededStart := time.Now() @@ -554,7 +579,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError scaleDownCandidates = allNodes podDestinations = allNodes } else { - var err errors.AutoscalerError + var err caerrors.AutoscalerError scaleDownCandidates, err = a.processors.ScaleDownNodeProcessor.GetScaleDownCandidates( autoscalingContext, allNodes) if err != nil { @@ -827,16 +852,16 @@ func (a *StaticAutoscaler) ExitCleanUp() { a.clusterStateRegistry.Stop() } -func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*apiv1.Node, []*apiv1.Node, errors.AutoscalerError) { +func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*apiv1.Node, []*apiv1.Node, caerrors.AutoscalerError) { allNodes, err := a.AllNodeLister().List() if err != nil { klog.Errorf("Failed to list all nodes: %v", err) - return nil, nil, errors.ToAutoscalerError(errors.ApiCallError, err) + return nil, nil, caerrors.ToAutoscalerError(caerrors.ApiCallError, err) } readyNodes, err := a.ReadyNodeLister().List() if err != nil { klog.Errorf("Failed to list ready nodes: %v", err) - return nil, nil, errors.ToAutoscalerError(errors.ApiCallError, err) + return nil, nil, caerrors.ToAutoscalerError(caerrors.ApiCallError, err) } // Handle GPU case - allocatable GPU may be equal to 0 up to 15 minutes after @@ -849,12 +874,12 @@ func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*a return allNodes, readyNodes, nil } -func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulerframework.NodeInfo, currentTime time.Time) errors.AutoscalerError { +func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulerframework.NodeInfo, currentTime time.Time) caerrors.AutoscalerError { err := a.clusterStateRegistry.UpdateNodes(allNodes, nodeInfosForGroups, currentTime) if err != nil { klog.Errorf("Failed to update node registry: %v", err) a.scaleDownPlanner.CleanUpUnneededNodes() - return errors.ToAutoscalerError(errors.CloudProviderError, err) + return caerrors.ToAutoscalerError(caerrors.CloudProviderError, err) } core_utils.UpdateClusterStateMetrics(a.clusterStateRegistry) @@ -869,9 +894,9 @@ func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time) bool { return found && oldest.Add(unschedulablePodWithGpuTimeBuffer).After(currentTime) } -func getUpcomingNodeInfos(registry *clusterstate.ClusterStateRegistry, nodeInfos map[string]*schedulerframework.NodeInfo) []*schedulerframework.NodeInfo { +func getUpcomingNodeInfos(upcomingCounts map[string]int, nodeInfos map[string]*schedulerframework.NodeInfo) []*schedulerframework.NodeInfo { upcomingNodes := make([]*schedulerframework.NodeInfo, 0) - for nodeGroup, numberOfNodes := range registry.GetUpcomingNodes() { + for nodeGroup, numberOfNodes := range upcomingCounts { nodeTemplate, found := nodeInfos[nodeGroup] if !found { klog.Warningf("Couldn't find template for node group %s", nodeGroup) @@ -921,17 +946,29 @@ func countsByReason(nodes []*simulator.UnremovableNode) map[simulator.Unremovabl return counts } -func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node { +func subtractNodesByName(nodes []*apiv1.Node, namesToRemove []string) []*apiv1.Node { var c []*apiv1.Node - namesToDrop := make(map[string]bool) - for _, n := range b { - namesToDrop[n.Name] = true + removeSet := make(map[string]bool) + for _, name := range namesToRemove { + removeSet[name] = true } - for _, n := range a { - if namesToDrop[n.Name] { + for _, n := range nodes { + if removeSet[n.Name] { continue } c = append(c, n) } return c } + +func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node { + return subtractNodesByName(a, nodeNames(b)) +} + +func nodeNames(ns []*apiv1.Node) []string { + names := make([]string, len(ns)) + for i, node := range ns { + names[i] = node.Name + } + return names +} diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 9b0c3a55ce1d..06b2d011468f 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -42,6 +42,9 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/estimator" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -1273,6 +1276,167 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { nodeGroupC.AssertNumberOfCalls(t, "DeleteNodes", 0) } +type candidateTrackingFakePlanner struct { + lastCandidateNodes map[string]bool +} + +func (f *candidateTrackingFakePlanner) UpdateClusterState(podDestinations, scaleDownCandidates []*apiv1.Node, as scaledown.ActuationStatus, pdb []*policyv1.PodDisruptionBudget, currentTime time.Time) errors.AutoscalerError { + f.lastCandidateNodes = map[string]bool{} + for _, node := range scaleDownCandidates { + f.lastCandidateNodes[node.Name] = true + } + return nil +} + +func (f *candidateTrackingFakePlanner) CleanUpUnneededNodes() { +} + +func (f *candidateTrackingFakePlanner) NodesToDelete(currentTime time.Time) (empty, needDrain []*apiv1.Node) { + return nil, nil +} + +func (f *candidateTrackingFakePlanner) UnneededNodes() []*apiv1.Node { + return nil +} + +func (f *candidateTrackingFakePlanner) UnremovableNodes() []*simulator.UnremovableNode { + return nil +} + +func (f *candidateTrackingFakePlanner) NodeUtilizationMap() map[string]utilization.Info { + return nil +} + +func assertSnapshotNodeCount(t *testing.T, snapshot clustersnapshot.ClusterSnapshot, wantCount int) { + nodeInfos, err := snapshot.NodeInfos().List() + assert.NoError(t, err) + assert.Len(t, nodeInfos, wantCount) +} + +func assertNodesNotInSnapshot(t *testing.T, snapshot clustersnapshot.ClusterSnapshot, nodeNames map[string]bool) { + nodeInfos, err := snapshot.NodeInfos().List() + assert.NoError(t, err) + for _, nodeInfo := range nodeInfos { + assert.NotContains(t, nodeNames, nodeInfo.Node().Name) + } +} + +func assertNodesInSnapshot(t *testing.T, snapshot clustersnapshot.ClusterSnapshot, nodeNames map[string]bool) { + nodeInfos, err := snapshot.NodeInfos().List() + assert.NoError(t, err) + snapshotNodeNames := map[string]bool{} + for _, nodeInfo := range nodeInfos { + snapshotNodeNames[nodeInfo.Node().Name] = true + } + for nodeName := range nodeNames { + assert.Contains(t, snapshotNodeNames, nodeName) + } +} + +func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) { + startTime := time.Time{} + + // Generate a number of ready and unready nodes created at startTime, spread across multiple node groups. + provider := testprovider.NewTestCloudProvider(nil, nil) + allNodeNames := map[string]bool{} + readyNodeNames := map[string]bool{} + notReadyNodeNames := map[string]bool{} + var allNodes []*apiv1.Node + var readyNodes []*apiv1.Node + + readyNodesCount := 4 + unreadyNodesCount := 2 + nodeGroupCount := 2 + for ngNum := 0; ngNum < nodeGroupCount; ngNum++ { + ngName := fmt.Sprintf("ng-%d", ngNum) + provider.AddNodeGroup(ngName, 0, 1000, readyNodesCount+unreadyNodesCount) + + for i := 0; i < readyNodesCount; i++ { + node := BuildTestNode(fmt.Sprintf("%s-ready-node-%d", ngName, i), 2000, 1000) + node.CreationTimestamp = metav1.NewTime(startTime) + SetNodeReadyState(node, true, startTime) + provider.AddNode(ngName, node) + + allNodes = append(allNodes, node) + allNodeNames[node.Name] = true + + readyNodes = append(readyNodes, node) + readyNodeNames[node.Name] = true + } + for i := 0; i < unreadyNodesCount; i++ { + node := BuildTestNode(fmt.Sprintf("%s-unready-node-%d", ngName, i), 2000, 1000) + node.CreationTimestamp = metav1.NewTime(startTime) + SetNodeReadyState(node, false, startTime) + provider.AddNode(ngName, node) + + allNodes = append(allNodes, node) + allNodeNames[node.Name] = true + + notReadyNodeNames[node.Name] = true + } + } + + // Create fake listers for the generated nodes, nothing returned by the rest (but the ones used in the tested path have to be defined). + allNodeLister := kubernetes.NewTestNodeLister(allNodes) + readyNodeLister := kubernetes.NewTestNodeLister(readyNodes) + daemonSetLister, err := kubernetes.NewTestDaemonSetLister(nil) + assert.NoError(t, err) + listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, kubernetes.NewTestPodLister(nil), kubernetes.NewTestPodLister(nil), kubernetes.NewTestPodDisruptionBudgetLister(nil), daemonSetLister, nil, nil, nil, nil) + + // Create context with minimal options that guarantee we reach the tested logic. + // We're only testing the input to UpdateClusterState which should be called whenever scale-down is enabled, other options shouldn't matter. + options := config.AutoscalingOptions{ScaleDownEnabled: true} + processorCallbacks := newStaticAutoscalerProcessorCallbacks() + ctx, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listerRegistry, provider, processorCallbacks, nil) + assert.NoError(t, err) + + // Create CSR with unhealthy cluster protection effectively disabled, to guarantee we reach the tested logic. + csrConfig := clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: nodeGroupCount * unreadyNodesCount} + csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff()) + + // Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test. + actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), simulator.NodeDeleteOptions{}) + ctx.ScaleDownActuator = actuator + + // Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState. + planner := &candidateTrackingFakePlanner{} + + autoscaler := &StaticAutoscaler{ + AutoscalingContext: &ctx, + clusterStateRegistry: csr, + scaleDownActuator: actuator, + scaleDownPlanner: planner, + processors: NewTestProcessors(&ctx), + processorCallbacks: processorCallbacks, + } + + // RunOnce run right when the nodes are created. Ready nodes should be passed as scale-down candidates, unready nodes should be classified as + // NotStarted and not passed as scale-down candidates (or inserted into the cluster snapshot). The fake upcoming nodes also shouldn't be passed, + // but they should be inserted into the snapshot. + err = autoscaler.RunOnce(startTime) + assert.NoError(t, err) + assert.Equal(t, readyNodeNames, planner.lastCandidateNodes) + assertNodesInSnapshot(t, autoscaler.ClusterSnapshot, readyNodeNames) + assertNodesNotInSnapshot(t, autoscaler.ClusterSnapshot, notReadyNodeNames) + assertSnapshotNodeCount(t, autoscaler.ClusterSnapshot, len(allNodeNames)) // Ready nodes + fake upcoming copies for unready nodes. + + // RunOnce run in the last moment when unready nodes are still classified as NotStarted - assertions the same as above. + err = autoscaler.RunOnce(startTime.Add(clusterstate.MaxNodeStartupTime).Add(-time.Second)) + assert.NoError(t, err) + assert.Equal(t, readyNodeNames, planner.lastCandidateNodes) + assertNodesInSnapshot(t, autoscaler.ClusterSnapshot, readyNodeNames) + assertNodesNotInSnapshot(t, autoscaler.ClusterSnapshot, notReadyNodeNames) + assertSnapshotNodeCount(t, autoscaler.ClusterSnapshot, len(allNodeNames)) // Ready nodes + fake upcoming copies for unready nodes. + + // RunOnce run in the first moment when unready nodes exceed the startup threshold, stop being classified as NotStarted, and start being classified + // Unready instead. The unready nodes should be passed as scale-down candidates at this point, and inserted into the snapshot. Fake upcoming + // nodes should no longer be inserted. + err = autoscaler.RunOnce(startTime.Add(clusterstate.MaxNodeStartupTime).Add(time.Second)) + assert.Equal(t, allNodeNames, planner.lastCandidateNodes) + assertNodesInSnapshot(t, autoscaler.ClusterSnapshot, allNodeNames) + assertSnapshotNodeCount(t, autoscaler.ClusterSnapshot, len(allNodeNames)) // Ready nodes + actual unready nodes. +} + func TestStaticAutoscalerProcessorCallbacks(t *testing.T) { processorCallbacks := newStaticAutoscalerProcessorCallbacks() assert.Equal(t, false, processorCallbacks.disableScaleDownForLoop) @@ -1426,6 +1590,9 @@ func TestSubtractNodes(t *testing.T) { for _, tc := range testCases { got := subtractNodes(tc.a, tc.b) assert.Equal(t, nodeNames(got), nodeNames(tc.c)) + + got = subtractNodesByName(tc.a, nodeNames(tc.b)) + assert.Equal(t, nodeNames(got), nodeNames(tc.c)) } } @@ -1526,14 +1693,6 @@ func TestFilterOutYoungPods(t *testing.T) { } } -func nodeNames(ns []*apiv1.Node) []string { - names := make([]string, len(ns)) - for i, node := range ns { - names[i] = node.Name - } - return names -} - func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) { select { case <-deleteFinished: diff --git a/cluster-autoscaler/simulator/clustersnapshot/basic.go b/cluster-autoscaler/simulator/clustersnapshot/basic.go index 1e6d1749e93a..7f6f93a51c51 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/basic.go +++ b/cluster-autoscaler/simulator/clustersnapshot/basic.go @@ -68,7 +68,7 @@ func (data *internalBasicSnapshotData) getNodeInfo(nodeName string) (*schedulerf if v, ok := data.nodeInfoMap[nodeName]; ok { return v, nil } - return nil, errNodeNotFound + return nil, ErrNodeNotFound } func (data *internalBasicSnapshotData) isPVCUsedByPods(key string) bool { @@ -162,7 +162,7 @@ func (data *internalBasicSnapshotData) addNodes(nodes []*apiv1.Node) error { func (data *internalBasicSnapshotData) removeNode(nodeName string) error { if _, found := data.nodeInfoMap[nodeName]; !found { - return errNodeNotFound + return ErrNodeNotFound } for _, pod := range data.nodeInfoMap[nodeName].Pods { data.removePvcUsedByPod(pod.Pod) @@ -173,7 +173,7 @@ func (data *internalBasicSnapshotData) removeNode(nodeName string) error { func (data *internalBasicSnapshotData) addPod(pod *apiv1.Pod, nodeName string) error { if _, found := data.nodeInfoMap[nodeName]; !found { - return errNodeNotFound + return ErrNodeNotFound } data.nodeInfoMap[nodeName].AddPod(pod) data.addPvcUsedByPod(pod) @@ -183,7 +183,7 @@ func (data *internalBasicSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e func (data *internalBasicSnapshotData) removePod(namespace, podName, nodeName string) error { nodeInfo, found := data.nodeInfoMap[nodeName] if !found { - return errNodeNotFound + return ErrNodeNotFound } for _, podInfo := range nodeInfo.Pods { if podInfo.Pod.Namespace == namespace && podInfo.Pod.Name == podName { diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go index f5aee0565971..275dc0d8da63 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go @@ -54,7 +54,8 @@ type ClusterSnapshot interface { Clear() } -var errNodeNotFound = errors.New("node not found") +// ErrNodeNotFound means that a node wasn't found in the snapshot. +var ErrNodeNotFound = errors.New("node not found") // WithForkedSnapshot is a helper function for snapshot that makes sure all Fork() calls are closed with Commit() or Revert() calls. // The function return (error, error) pair. The first error comes from the passed function, the second error indicate the success of the function itself. diff --git a/cluster-autoscaler/simulator/clustersnapshot/delta.go b/cluster-autoscaler/simulator/clustersnapshot/delta.go index 84240a8bfa62..0aa53efe6da7 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/delta.go @@ -199,7 +199,7 @@ func (data *internalDeltaSnapshotData) removeNode(nodeName string) error { if _, deleted := data.deletedNodeInfos[nodeName]; deleted { // If node was deleted within this delta, fail with error. - return errNodeNotFound + return ErrNodeNotFound } _, foundInBase := data.baseData.getNodeInfo(nodeName) @@ -210,7 +210,7 @@ func (data *internalDeltaSnapshotData) removeNode(nodeName string) error { if !foundInBase && !foundInDelta { // Node not found in the chain. - return errNodeNotFound + return ErrNodeNotFound } // Maybe consider deleting from the lists instead. Maybe not. @@ -238,7 +238,7 @@ func (data *internalDeltaSnapshotData) nodeInfoToModify(nodeName string) (*sched func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) error { ni, found := data.nodeInfoToModify(nodeName) if !found { - return errNodeNotFound + return ErrNodeNotFound } ni.AddPod(pod) @@ -254,7 +254,7 @@ func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName strin // probably means things are very bad anyway. ni, found := data.nodeInfoToModify(nodeName) if !found { - return errNodeNotFound + return ErrNodeNotFound } podFound := false @@ -378,7 +378,7 @@ func (snapshot *DeltaClusterSnapshot) getNodeInfo(nodeName string) (*schedulerfr data := snapshot.data node, found := data.getNodeInfo(nodeName) if !found { - return nil, errNodeNotFound + return nil, ErrNodeNotFound } return node, nil }