Skip to content

Commit

Permalink
CA: Make CSR's Readiness keep lists of node names instead of just the…
Browse files Browse the repository at this point in the history
…ir count

This does make us call len() in a bunch of places within CSR, but allows
for greater flexibility - it's possible to act on the sets of nodes determined
by Readiness.
  • Loading branch information
towca committed Feb 6, 2023
1 parent 0ae555c commit 6978ff8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 69 deletions.
111 changes: 54 additions & 57 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (csr *ClusterStateRegistry) IsClusterHealthy() bool {
csr.Lock()
defer csr.Unlock()

totalUnready := csr.totalReadiness.Unready
totalUnready := len(csr.totalReadiness.Unready)

if totalUnready > csr.config.OkTotalUnreadyCount &&
float64(totalUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*float64(len(csr.nodes)) {
Expand Down Expand Up @@ -384,14 +384,14 @@ func (csr *ClusterStateRegistry) IsNodeGroupHealthy(nodeGroupName string) bool {

unjustifiedUnready := 0
// Too few nodes, something is missing. Below the expected node count.
if readiness.Ready < acceptable.MinNodes {
unjustifiedUnready += acceptable.MinNodes - readiness.Ready
if len(readiness.Ready) < acceptable.MinNodes {
unjustifiedUnready += acceptable.MinNodes - len(readiness.Ready)
}
// TODO: verify against max nodes as well.

if unjustifiedUnready > csr.config.OkTotalUnreadyCount &&
float64(unjustifiedUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*
float64(readiness.Ready+readiness.Unready+readiness.NotStarted) {
float64(len(readiness.Ready)+len(readiness.Unready)+len(readiness.NotStarted)) {
return false
}

Expand Down Expand Up @@ -444,7 +444,7 @@ func (csr *ClusterStateRegistry) getProvisionedAndTargetSizesForNodeGroup(nodeGr
}
return 0, target, true
}
provisioned = readiness.Registered - readiness.NotStarted
provisioned = len(readiness.Registered) - len(readiness.NotStarted)

return provisioned, target, true
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func (csr *ClusterStateRegistry) updateAcceptableRanges(targetSize map[string]in
size := targetSize[nodeGroup.Id()]
readiness := csr.perNodeGroupReadiness[nodeGroup.Id()]
result[nodeGroup.Id()] = AcceptableRange{
MinNodes: size - readiness.LongUnregistered,
MinNodes: size - len(readiness.LongUnregistered),
MaxNodes: size,
CurrentTarget: size,
}
Expand All @@ -516,46 +516,45 @@ func (csr *ClusterStateRegistry) updateAcceptableRanges(targetSize map[string]in

// Readiness contains readiness information about a group of nodes.
type Readiness struct {
// Number of ready nodes.
Ready int
// Number of unready nodes that broke down after they started.
Unready int
// Number of nodes that are being currently deleted. They exist in K8S but
// Names of ready nodes.
Ready []string
// Names of unready nodes that broke down after they started.
Unready []string
// Names of nodes that are being currently deleted. They exist in K8S but
// are not included in NodeGroup.TargetSize().
Deleted int
// Number of nodes that are not yet fully started.
NotStarted int
// Number of all registered nodes in the group (ready/unready/deleted/etc).
Registered int
// Number of nodes that failed to register within a reasonable limit.
LongUnregistered int
// Number of nodes that haven't yet registered.
Unregistered int
Deleted []string
// Names of nodes that are not yet fully started.
NotStarted []string
// Names of all registered nodes in the group (ready/unready/deleted/etc).
Registered []string
// Names of nodes that failed to register within a reasonable limit.
LongUnregistered []string
// Names of nodes that haven't yet registered.
Unregistered []string
// Time when the readiness was measured.
Time time.Time
// Number of nodes that are Unready due to missing resources.
// Names of nodes that are Unready due to missing resources.
// This field is only used for exposing information externally and
// doesn't influence CA behavior.
ResourceUnready int
ResourceUnready []string
}

func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {

perNodeGroup := make(map[string]Readiness)
total := Readiness{Time: currentTime}

update := func(current Readiness, node *apiv1.Node, nr kube_util.NodeReadiness) Readiness {
current.Registered++
current.Registered = append(current.Registered, node.Name)
if _, isDeleted := csr.deletedNodes[node.Name]; isDeleted {
current.Deleted++
current.Deleted = append(current.Deleted, node.Name)
} else if nr.Ready {
current.Ready++
current.Ready = append(current.Ready, node.Name)
} else if node.CreationTimestamp.Time.Add(MaxNodeStartupTime).After(currentTime) {
current.NotStarted++
current.NotStarted = append(current.NotStarted, node.Name)
} else {
current.Unready++
current.Unready = append(current.Unready, node.Name)
if nr.Reason == kube_util.ResourceUnready {
current.ResourceUnready++
current.ResourceUnready = append(current.ResourceUnready, node.Name)
}
}
return current
Expand All @@ -579,7 +578,6 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {
total = update(total, node, nr)
}

var longUnregisteredNodeNames []string
for _, unregistered := range csr.unregisteredNodes {
nodeGroup, errNg := csr.cloudProvider.NodeGroupForNode(unregistered.Node)
if errNg != nil {
Expand All @@ -592,17 +590,16 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {
}
perNgCopy := perNodeGroup[nodeGroup.Id()]
if unregistered.UnregisteredSince.Add(csr.config.MaxNodeProvisionTime).Before(currentTime) {
longUnregisteredNodeNames = append(longUnregisteredNodeNames, unregistered.Node.Name)
perNgCopy.LongUnregistered++
total.LongUnregistered++
perNgCopy.LongUnregistered = append(perNgCopy.LongUnregistered, unregistered.Node.Name)
total.LongUnregistered = append(total.LongUnregistered, unregistered.Node.Name)
} else {
perNgCopy.Unregistered++
total.Unregistered++
perNgCopy.Unregistered = append(perNgCopy.Unregistered, unregistered.Node.Name)
total.Unregistered = append(total.Unregistered, unregistered.Node.Name)
}
perNodeGroup[nodeGroup.Id()] = perNgCopy
}
if total.LongUnregistered > 0 {
klog.V(3).Infof("Found longUnregistered Nodes %s", longUnregisteredNodeNames)
if len(total.LongUnregistered) > 0 {
klog.V(3).Infof("Found longUnregistered Nodes %s", total.LongUnregistered)
}

for ngId, ngReadiness := range perNodeGroup {
Expand Down Expand Up @@ -630,10 +627,10 @@ func (csr *ClusterStateRegistry) updateIncorrectNodeGroupSizes(currentTime time.
}
continue
}
if readiness.Registered > acceptableRange.MaxNodes ||
readiness.Registered < acceptableRange.MinNodes {
if len(readiness.Registered) > acceptableRange.MaxNodes ||
len(readiness.Registered) < acceptableRange.MinNodes {
incorrect := IncorrectNodeGroupSize{
CurrentSize: readiness.Registered,
CurrentSize: len(readiness.Registered),
ExpectedSize: acceptableRange.CurrentTarget,
FirstObserved: currentTime,
}
Expand Down Expand Up @@ -752,12 +749,12 @@ func buildHealthStatusNodeGroup(isReady bool, readiness Readiness, acceptable Ac
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerHealth,
Message: fmt.Sprintf("ready=%d unready=%d (resourceUnready=%d) notStarted=%d longNotStarted=0 registered=%d longUnregistered=%d cloudProviderTarget=%d (minSize=%d, maxSize=%d)",
readiness.Ready,
readiness.Unready,
readiness.ResourceUnready,
readiness.NotStarted,
readiness.Registered,
readiness.LongUnregistered,
len(readiness.Ready),
len(readiness.Unready),
len(readiness.ResourceUnready),
len(readiness.NotStarted),
len(readiness.Registered),
len(readiness.LongUnregistered),
acceptable.CurrentTarget,
minSize,
maxSize),
Expand All @@ -775,7 +772,7 @@ func buildScaleUpStatusNodeGroup(isScaleUpInProgress bool, isSafeToScaleUp bool,
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerScaleUp,
Message: fmt.Sprintf("ready=%d cloudProviderTarget=%d",
readiness.Ready,
len(readiness.Ready),
acceptable.CurrentTarget),
LastProbeTime: metav1.Time{Time: readiness.Time},
}
Expand Down Expand Up @@ -807,12 +804,12 @@ func buildHealthStatusClusterwide(isReady bool, readiness Readiness) api.Cluster
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerHealth,
Message: fmt.Sprintf("ready=%d unready=%d (resourceUnready=%d) notStarted=%d longNotStarted=0 registered=%d longUnregistered=%d",
readiness.Ready,
readiness.Unready,
readiness.ResourceUnready,
readiness.NotStarted,
readiness.Registered,
readiness.LongUnregistered,
len(readiness.Ready),
len(readiness.Unready),
len(readiness.ResourceUnready),
len(readiness.NotStarted),
len(readiness.Registered),
len(readiness.LongUnregistered),
),
LastProbeTime: metav1.Time{Time: readiness.Time},
}
Expand All @@ -838,8 +835,8 @@ func buildScaleUpStatusClusterwide(nodeGroupStatuses []api.NodeGroupStatus, read
condition := api.ClusterAutoscalerCondition{
Type: api.ClusterAutoscalerScaleUp,
Message: fmt.Sprintf("ready=%d registered=%d",
readiness.Ready,
readiness.Registered),
len(readiness.Ready),
len(readiness.Registered)),
LastProbeTime: metav1.Time{Time: readiness.Time},
}
if isScaleUpInProgress {
Expand Down Expand Up @@ -930,7 +927,7 @@ func (csr *ClusterStateRegistry) GetUpcomingNodes() map[string]int {
readiness := csr.perNodeGroupReadiness[id]
ar := csr.acceptableRanges[id]
// newNodes is the number of nodes that
newNodes := ar.CurrentTarget - (readiness.Ready + readiness.Unready + readiness.LongUnregistered)
newNodes := ar.CurrentTarget - (len(readiness.Ready) + len(readiness.Unready) + len(readiness.LongUnregistered))
if newNodes <= 0 {
// Negative value is unlikely but theoretically possible.
continue
Expand Down Expand Up @@ -1003,7 +1000,7 @@ func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetS
targetSize += accRange.CurrentTarget
}
for _, readiness := range csr.perNodeGroupReadiness {
currentSize += readiness.Registered - readiness.NotStarted
currentSize += len(readiness.Registered) - len(readiness.NotStarted)
}
return currentSize, targetSize
}
Expand Down
22 changes: 11 additions & 11 deletions cluster-autoscaler/clusterstate/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ func TestUnreadyLongAfterCreation(t *testing.T) {
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Unready)
assert.Equal(t, 0, clusterstate.GetClusterReadiness().NotStarted)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Unready))
assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted))
upcoming := clusterstate.GetUpcomingNodes()
assert.Equal(t, 0, upcoming["ng1"])
}
Expand Down Expand Up @@ -390,22 +390,22 @@ func TestNotStarted(t *testing.T) {
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().NotStarted)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Ready)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().NotStarted))
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Ready))

// node ng2_1 moves condition to ready
SetNodeReadyState(ng2_1, true, now.Add(-4*time.Minute))
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().NotStarted)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Ready)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().NotStarted))
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Ready))

// node ng2_1 no longer has the taint
RemoveNodeNotReadyTaint(ng2_1)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, clusterstate.GetClusterReadiness().NotStarted)
assert.Equal(t, 2, clusterstate.GetClusterReadiness().Ready)
assert.Equal(t, 0, len(clusterstate.GetClusterReadiness().NotStarted))
assert.Equal(t, 2, len(clusterstate.GetClusterReadiness().Ready))
}

func TestExpiredScaleUp(t *testing.T) {
Expand Down Expand Up @@ -686,7 +686,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate)))
assert.Equal(t, "ng1-2", GetCloudProviderDeletedNodeNames(clusterstate)[0])
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted))

// The node is removed from Kubernetes
now.Add(time.Minute)
Expand Down Expand Up @@ -719,7 +719,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate)))
assert.Equal(t, "ng1-3", GetCloudProviderDeletedNodeNames(clusterstate)[0])
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted))

// Confirm that previously identified deleted Cloud Provider nodes are still included
// until it is removed from Kubernetes
Expand All @@ -729,7 +729,7 @@ func TestCloudProviderDeletedNodes(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, len(GetCloudProviderDeletedNodeNames(clusterstate)))
assert.Equal(t, "ng1-3", GetCloudProviderDeletedNodeNames(clusterstate)[0])
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)
assert.Equal(t, 1, len(clusterstate.GetClusterReadiness().Deleted))

// The node is removed from Kubernetes
now.Add(time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func UpdateClusterStateMetrics(csr *clusterstate.ClusterStateRegistry) {
}
metrics.UpdateClusterSafeToAutoscale(csr.IsClusterHealthy())
readiness := csr.GetClusterReadiness()
metrics.UpdateNodesCount(readiness.Ready, readiness.Unready, readiness.NotStarted, readiness.LongUnregistered, readiness.Unregistered)
metrics.UpdateNodesCount(len(readiness.Ready), len(readiness.Unready), len(readiness.NotStarted), len(readiness.LongUnregistered), len(readiness.Unregistered))
}

// GetOldestCreateTime returns oldest creation time out of the pods in the set
Expand Down

0 comments on commit 6978ff8

Please sign in to comment.