Skip to content

Commit

Permalink
Merge pull request #5488 from towca/jtuznik/upcoming-unready-fix
Browse files Browse the repository at this point in the history
Stop scale-down from considering registered upcoming nodes as candidates
  • Loading branch information
k8s-ci-robot authored Feb 10, 2023
2 parents a83450e + 7e67625 commit 9f04d4f
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 130 deletions.
128 changes: 66 additions & 62 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (csr *ClusterStateRegistry) IsClusterHealthy() bool {
csr.Lock()
defer csr.Unlock()

totalUnready := csr.totalReadiness.Unready
totalUnready := len(csr.totalReadiness.Unready)

if totalUnready > csr.config.OkTotalUnreadyCount &&
float64(totalUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*float64(len(csr.nodes)) {
Expand Down Expand Up @@ -384,14 +384,14 @@ func (csr *ClusterStateRegistry) IsNodeGroupHealthy(nodeGroupName string) bool {

unjustifiedUnready := 0
// Too few nodes, something is missing. Below the expected node count.
if readiness.Ready < acceptable.MinNodes {
unjustifiedUnready += acceptable.MinNodes - readiness.Ready
if len(readiness.Ready) < acceptable.MinNodes {
unjustifiedUnready += acceptable.MinNodes - len(readiness.Ready)
}
// TODO: verify against max nodes as well.

if unjustifiedUnready > csr.config.OkTotalUnreadyCount &&
float64(unjustifiedUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*
float64(readiness.Ready+readiness.Unready+readiness.NotStarted) {
float64(len(readiness.Ready)+len(readiness.Unready)+len(readiness.NotStarted)) {
return false
}

Expand Down Expand Up @@ -444,7 +444,7 @@ func (csr *ClusterStateRegistry) getProvisionedAndTargetSizesForNodeGroup(nodeGr
}
return 0, target, true
}
provisioned = readiness.Registered - readiness.NotStarted
provisioned = len(readiness.Registered) - len(readiness.NotStarted)

return provisioned, target, true
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func (csr *ClusterStateRegistry) updateAcceptableRanges(targetSize map[string]in
size := targetSize[nodeGroup.Id()]
readiness := csr.perNodeGroupReadiness[nodeGroup.Id()]
result[nodeGroup.Id()] = AcceptableRange{
MinNodes: size - readiness.LongUnregistered,
MinNodes: size - len(readiness.LongUnregistered),
MaxNodes: size,
CurrentTarget: size,
}
Expand All @@ -516,46 +516,45 @@ func (csr *ClusterStateRegistry) updateAcceptableRanges(targetSize map[string]in

// Readiness contains readiness information about a group of nodes.
type Readiness struct {
// Number of ready nodes.
Ready int
// Number of unready nodes that broke down after they started.
Unready int
// Number of nodes that are being currently deleted. They exist in K8S but
// Names of ready nodes.
Ready []string
// Names of unready nodes that broke down after they started.
Unready []string
// Names of nodes that are being currently deleted. They exist in K8S but
// are not included in NodeGroup.TargetSize().
Deleted int
// Number of nodes that are not yet fully started.
NotStarted int
// Number of all registered nodes in the group (ready/unready/deleted/etc).
Registered int
// Number of nodes that failed to register within a reasonable limit.
LongUnregistered int
// Number of nodes that haven't yet registered.
Unregistered int
Deleted []string
// Names of nodes that are not yet fully started.
NotStarted []string
// Names of all registered nodes in the group (ready/unready/deleted/etc).
Registered []string
// Names of nodes that failed to register within a reasonable limit.
LongUnregistered []string
// Names of nodes that haven't yet registered.
Unregistered []string
// Time when the readiness was measured.
Time time.Time
// Number of nodes that are Unready due to missing resources.
// Names of nodes that are Unready due to missing resources.
// This field is only used for exposing information externally and
// doesn't influence CA behavior.
ResourceUnready int
ResourceUnready []string
}

func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {

perNodeGroup := make(map[string]Readiness)
total := Readiness{Time: currentTime}

update := func(current Readiness, node *apiv1.Node, nr kube_util.NodeReadiness) Readiness {
current.Registered++
current.Registered = append(current.Registered, node.Name)
if _, isDeleted := csr.deletedNodes[node.Name]; isDeleted {
current.Deleted++
current.Deleted = append(current.Deleted, node.Name)
} else if nr.Ready {
current.Ready++
current.Ready = append(current.Ready, node.Name)
} else if node.CreationTimestamp.Time.Add(MaxNodeStartupTime).After(currentTime) {
current.NotStarted++
current.NotStarted = append(current.NotStarted, node.Name)
} else {
current.Unready++
current.Unready = append(current.Unready, node.Name)
if nr.Reason == kube_util.ResourceUnready {
current.ResourceUnready++
current.ResourceUnready = append(current.ResourceUnready, node.Name)
}
}
return current
Expand All @@ -579,7 +578,6 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {
total = update(total, node, nr)
}

var longUnregisteredNodeNames []string
for _, unregistered := range csr.unregisteredNodes {
nodeGroup, errNg := csr.cloudProvider.NodeGroupForNode(unregistered.Node)
if errNg != nil {
Expand All @@ -592,17 +590,16 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {
}
perNgCopy := perNodeGroup[nodeGroup.Id()]
if unregistered.UnregisteredSince.Add(csr.config.MaxNodeProvisionTime).Before(currentTime) {
longUnregisteredNodeNames = append(longUnregisteredNodeNames, unregistered.Node.Name)
perNgCopy.LongUnregistered++
total.LongUnregistered++
perNgCopy.LongUnregistered = append(perNgCopy.LongUnregistered, unregistered.Node.Name)
total.LongUnregistered = append(total.LongUnregistered, unregistered.Node.Name)
} else {
perNgCopy.Unregistered++
total.Unregistered++
perNgCopy.Unregistered = append(perNgCopy.Unregistered, unregistered.Node.Name)
total.Unregistered = append(total.Unregistered, unregistered.Node.Name)
}
perNodeGroup[nodeGroup.Id()] = perNgCopy
}
if total.LongUnregistered > 0 {
klog.V(3).Infof("Found longUnregistered Nodes %s", longUnregisteredNodeNames)
if len(total.LongUnregistered) > 0 {
klog.V(3).Infof("Found longUnregistered Nodes %s", total.LongUnregistered)
}

for ngId, ngReadiness := range perNodeGroup {
Expand Down Expand Up @@ -630,10 +627,10 @@ func (csr *ClusterStateRegistry) updateIncorrectNodeGroupSizes(currentTime time.
}
continue
}
if readiness.Registered > acceptableRange.MaxNodes ||
readiness.Registered < acceptableRange.MinNodes {
if len(readiness.Registered) > acceptableRange.MaxNodes ||
len(readiness.Registered) < acceptableRange.MinNodes {
incorrect := IncorrectNodeGroupSize{
CurrentSize: readiness.Registered,
CurrentSize: len(readiness.Registered),
ExpectedSize: acceptableRange.CurrentTarget,
FirstObserved: currentTime,
}
Expand Down Expand Up @@ -752,12 +749,12 @@ func buildHealthStatusNodeGroup(isReady bool, readiness Readiness, acceptable Ac
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerHealth,
Message: fmt.Sprintf("ready=%d unready=%d (resourceUnready=%d) notStarted=%d longNotStarted=0 registered=%d longUnregistered=%d cloudProviderTarget=%d (minSize=%d, maxSize=%d)",
readiness.Ready,
readiness.Unready,
readiness.ResourceUnready,
readiness.NotStarted,
readiness.Registered,
readiness.LongUnregistered,
len(readiness.Ready),
len(readiness.Unready),
len(readiness.ResourceUnready),
len(readiness.NotStarted),
len(readiness.Registered),
len(readiness.LongUnregistered),
acceptable.CurrentTarget,
minSize,
maxSize),
Expand All @@ -775,7 +772,7 @@ func buildScaleUpStatusNodeGroup(isScaleUpInProgress bool, isSafeToScaleUp bool,
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerScaleUp,
Message: fmt.Sprintf("ready=%d cloudProviderTarget=%d",
readiness.Ready,
len(readiness.Ready),
acceptable.CurrentTarget),
LastProbeTime: metav1.Time{Time: readiness.Time},
}
Expand Down Expand Up @@ -807,12 +804,12 @@ func buildHealthStatusClusterwide(isReady bool, readiness Readiness) api.Cluster
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerHealth,
Message: fmt.Sprintf("ready=%d unready=%d (resourceUnready=%d) notStarted=%d longNotStarted=0 registered=%d longUnregistered=%d",
readiness.Ready,
readiness.Unready,
readiness.ResourceUnready,
readiness.NotStarted,
readiness.Registered,
readiness.LongUnregistered,
len(readiness.Ready),
len(readiness.Unready),
len(readiness.ResourceUnready),
len(readiness.NotStarted),
len(readiness.Registered),
len(readiness.LongUnregistered),
),
LastProbeTime: metav1.Time{Time: readiness.Time},
}
Expand All @@ -838,8 +835,8 @@ func buildScaleUpStatusClusterwide(nodeGroupStatuses []api.NodeGroupStatus, read
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerScaleUp,
Message: fmt.Sprintf("ready=%d registered=%d",
readiness.Ready,
readiness.Registered),
len(readiness.Ready),
len(readiness.Registered)),
LastProbeTime: metav1.Time{Time: readiness.Time},
}
if isScaleUpInProgress {
Expand Down Expand Up @@ -919,25 +916,32 @@ func (csr *ClusterStateRegistry) GetIncorrectNodeGroupSize(nodeGroupName string)
}

// GetUpcomingNodes returns how many new nodes will be added shortly to the node groups or should become ready soon.
// The function may overestimate the number of nodes.
func (csr *ClusterStateRegistry) GetUpcomingNodes() map[string]int {
// The function may overestimate the number of nodes. The second return value contains the names of upcoming nodes
// that are already registered in the cluster.
func (csr *ClusterStateRegistry) GetUpcomingNodes() (upcomingCounts map[string]int, registeredNodeNames map[string][]string) {
csr.Lock()
defer csr.Unlock()

result := make(map[string]int)
upcomingCounts = map[string]int{}
registeredNodeNames = map[string][]string{}
for _, nodeGroup := range csr.cloudProvider.NodeGroups() {
id := nodeGroup.Id()
readiness := csr.perNodeGroupReadiness[id]
ar := csr.acceptableRanges[id]
// newNodes is the number of nodes that
newNodes := ar.CurrentTarget - (readiness.Ready + readiness.Unready + readiness.LongUnregistered)
newNodes := ar.CurrentTarget - (len(readiness.Ready) + len(readiness.Unready) + len(readiness.LongUnregistered))
if newNodes <= 0 {
// Negative value is unlikely but theoretically possible.
continue
}
result[id] = newNodes
upcomingCounts[id] = newNodes
// newNodes should include instances that have registered with k8s but aren't ready yet, instances that came up on the cloud provider side
// but haven't registered with k8s yet, and instances that haven't even come up on the cloud provider side yet (but are reflected in the target
// size). The first category is categorized as NotStarted in readiness, the other two aren't registered with k8s, so they shouldn't be
// included.
registeredNodeNames[id] = readiness.NotStarted
}
return result
return upcomingCounts, registeredNodeNames
}

// getCloudProviderNodeInstances returns map keyed on node group id where value is list of node instances
Expand Down Expand Up @@ -1003,7 +1007,7 @@ func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetS
targetSize += accRange.CurrentTarget
}
for _, readiness := range csr.perNodeGroupReadiness {
currentSize += readiness.Registered - readiness.NotStarted
currentSize += len(readiness.Registered) - len(readiness.NotStarted)
}
return currentSize, targetSize
}
Expand Down
41 changes: 25 additions & 16 deletions cluster-autoscaler/clusterstate/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,11 @@ func TestUnreadyLongAfterCreation(t *testing.T) {
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Unready)
assert.Equal(t, 0, clusterstate.GetClusterReadiness().NotStarted)
upcoming := clusterstate.GetUpcomingNodes()
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Unready))
assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted))
upcoming, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 0, upcoming["ng1"])
assert.Empty(t, upcomingRegistered["ng1"])
}

func TestNotStarted(t *testing.T) {
Expand Down Expand Up @@ -390,22 +391,22 @@ func TestNotStarted(t *testing.T) {
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().NotStarted)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Ready)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().NotStarted))
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Ready))

// node ng2_1 moves condition to ready
SetNodeReadyState(ng2_1, true, now.Add(-4*time.Minute))
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().NotStarted)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Ready)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().NotStarted))
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Ready))

// node ng2_1 no longer has the taint
RemoveNodeNotReadyTaint(ng2_1)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, clusterstate.GetClusterReadiness().NotStarted)
assert.Equal(t, 2, clusterstate.GetClusterReadiness().Ready)
assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted))
assert.Equal(t, 2, len(clusterstate.GetClusterReadiness().Ready))
}

func TestExpiredScaleUp(t *testing.T) {
Expand Down Expand Up @@ -524,12 +525,17 @@ func TestUpcomingNodes(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, clusterstate.GetScaleUpFailures())

upcomingNodes := clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 6, upcomingNodes["ng1"])
assert.Empty(t, upcomingRegistered["ng1"]) // Only unregistered.
assert.Equal(t, 1, upcomingNodes["ng2"])
assert.Empty(t, upcomingRegistered["ng2"]) // Only unregistered.
assert.Equal(t, 2, upcomingNodes["ng3"])
assert.Equal(t, []string{"ng3-1"}, upcomingRegistered["ng3"]) // 1 registered, 1 unregistered.
assert.NotContains(t, upcomingNodes, "ng4")
assert.NotContains(t, upcomingRegistered, "ng4")
assert.Equal(t, 0, upcomingNodes["ng5"])
assert.Empty(t, upcomingRegistered["ng5"])
}

func TestTaintBasedNodeDeletion(t *testing.T) {
Expand Down Expand Up @@ -566,8 +572,9 @@ func TestTaintBasedNodeDeletion(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, clusterstate.GetScaleUpFailures())

upcomingNodes := clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 1, upcomingNodes["ng1"])
assert.Empty(t, upcomingRegistered["ng1"]) // Only unregistered.
}

func TestIncorrectSize(t *testing.T) {
Expand Down Expand Up @@ -624,17 +631,19 @@ func TestUnregisteredNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes()))
assert.Equal(t, "ng1-2", clusterstate.GetUnregisteredNodes()[0].Node.Name)
upcomingNodes := clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered := clusterstate.GetUpcomingNodes()
assert.Equal(t, 1, upcomingNodes["ng1"])
assert.Empty(t, upcomingRegistered["ng1"]) // Unregistered only.

// The node didn't come up in MaxNodeProvisionTime, it should no longer be
// counted as upcoming (but it is still an unregistered node)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, time.Now().Add(time.Minute))
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes()))
assert.Equal(t, "ng1-2", clusterstate.GetUnregisteredNodes()[0].Node.Name)
upcomingNodes = clusterstate.GetUpcomingNodes()
upcomingNodes, upcomingRegistered = clusterstate.GetUpcomingNodes()
assert.Equal(t, 0, len(upcomingNodes))
assert.Empty(t, upcomingRegistered["ng1"])

err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, time.Now().Add(time.Minute))
assert.NoError(t, err)
Expand Down Expand Up @@ -686,7 +695,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate)))
assert.Equal(t, "ng1-2", GetCloudProviderDeletedNodeNames(clusterstate)[0])
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted))

// The node is removed from Kubernetes
now.Add(time.Minute)
Expand Down Expand Up @@ -719,7 +728,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate)))
assert.Equal(t, "ng1-3", GetCloudProviderDeletedNodeNames(clusterstate)[0])
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted))

// Confirm that previously identified deleted Cloud Provider nodes are still included
// until it is removed from Kubernetes
Expand All @@ -729,7 +738,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate)))
assert.Equal(t, "ng1-3", GetCloudProviderDeletedNodeNames(clusterstate)[0])
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted))

// The node is removed from Kubernetes
now.Add(time.Minute)
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,9 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
klogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left())
podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)

upcomingCounts, _ := clusterStateRegistry.GetUpcomingNodes()
upcomingNodes := make([]*schedulerframework.NodeInfo, 0)
for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() {
for nodeGroup, numberOfNodes := range upcomingCounts {
nodeTemplate, found := nodeInfos[nodeGroup]
if !found {
return scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(
Expand Down
Loading

0 comments on commit 9f04d4f

Please sign in to comment.