Skip to content

Commit

Permalink
Split removeOldUnregisteredNodes method
Browse files Browse the repository at this point in the history
  • Loading branch information
BigDarkClown committed Nov 18, 2024
1 parent a0bf108 commit 1580315
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 30 deletions.
63 changes: 37 additions & 26 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
unregisteredNodes := a.clusterStateRegistry.GetUnregisteredNodes()
if len(unregisteredNodes) > 0 {
klog.V(1).Infof("%d unregistered nodes present", len(unregisteredNodes))
removedAny, err := a.removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext,
removedAny, err := a.removeOldUnregisteredNodes(unregisteredNodes,
a.clusterStateRegistry, currentTime, autoscalingContext.LogRecorder)
// There was a problem with removing unregistered nodes. Retry in the next loop.
if err != nil {
Expand Down Expand Up @@ -752,36 +752,19 @@ func fixNodeGroupSize(context *context.AutoscalingContext, clusterStateRegistry
return fixed, nil
}

// Removes unregistered nodes if needed. Returns true if anything was removed and error if such occurred.
func (a *StaticAutoscaler) removeOldUnregisteredNodes(allUnregisteredNodes []clusterstate.UnregisteredNode, context *context.AutoscalingContext,
// removeOldUnregisteredNodes removes unregistered nodes if needed. Returns true
// if anything was removed and error if such occurred.
func (a *StaticAutoscaler) removeOldUnregisteredNodes(allUnregisteredNodes []clusterstate.UnregisteredNode,
csr *clusterstate.ClusterStateRegistry, currentTime time.Time, logRecorder *utils.LogEventRecorder) (bool, error) {

nodeGroups := a.nodeGroupsById()
nodesToDeleteByNodeGroupId := make(map[string][]clusterstate.UnregisteredNode)
for _, unregisteredNode := range allUnregisteredNodes {
nodeGroup, err := a.CloudProvider.NodeGroupForNode(unregisteredNode.Node)
if err != nil {
klog.Warningf("Failed to get node group for %s: %v", unregisteredNode.Node.Name, err)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.Warningf("No node group for node %s, skipping", unregisteredNode.Node.Name)
continue
}

maxNodeProvisionTime, err := csr.MaxNodeProvisionTime(nodeGroup)
if err != nil {
return false, fmt.Errorf("failed to retrieve maxNodeProvisionTime for node %s in nodeGroup %s", unregisteredNode.Node.Name, nodeGroup.Id())
}

if unregisteredNode.UnregisteredSince.Add(maxNodeProvisionTime).Before(currentTime) {
klog.V(0).Infof("Marking unregistered node %v for removal", unregisteredNode.Node.Name)
nodesToDeleteByNodeGroupId[nodeGroup.Id()] = append(nodesToDeleteByNodeGroupId[nodeGroup.Id()], unregisteredNode)
}
unregisteredNodesToRemove, err := a.oldUnregisteredNodes(allUnregisteredNodes, csr, currentTime)
if err != nil {
return false, err
}

nodeGroups := a.nodeGroupsById()
removedAny := false
for nodeGroupId, unregisteredNodesToDelete := range nodesToDeleteByNodeGroupId {
for nodeGroupId, unregisteredNodesToDelete := range unregisteredNodesToRemove {
nodeGroup := nodeGroups[nodeGroupId]

klog.V(0).Infof("Removing %v unregistered nodes for node group %v", len(unregisteredNodesToDelete), nodeGroupId)
Expand Down Expand Up @@ -836,6 +819,34 @@ func (a *StaticAutoscaler) removeOldUnregisteredNodes(allUnregisteredNodes []clu
return removedAny, nil
}

// oldUnregisteredNodes returns old unregistered nodes grouped by their node group id.
func (a *StaticAutoscaler) oldUnregisteredNodes(allUnregisteredNodes []clusterstate.UnregisteredNode, csr *clusterstate.ClusterStateRegistry, currentTime time.Time) (map[string][]clusterstate.UnregisteredNode, error) {
nodesByNodeGroupId := make(map[string][]clusterstate.UnregisteredNode)
for _, unregisteredNode := range allUnregisteredNodes {
nodeGroup, err := a.CloudProvider.NodeGroupForNode(unregisteredNode.Node)
if err != nil {
klog.Warningf("Failed to get node group for %s: %v", unregisteredNode.Node.Name, err)
continue
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
klog.Warningf("No node group for node %s, skipping", unregisteredNode.Node.Name)
continue
}

maxNodeProvisionTime, err := csr.MaxNodeProvisionTime(nodeGroup)
if err != nil {
return nil, fmt.Errorf("failed to retrieve maxNodeProvisionTime for node %s in nodeGroup %s", unregisteredNode.Node.Name, nodeGroup.Id())
}

if unregisteredNode.UnregisteredSince.Add(maxNodeProvisionTime).Before(currentTime) {
klog.V(0).Infof("Marking unregistered node %v for removal", unregisteredNode.Node.Name)
nodesByNodeGroupId[nodeGroup.Id()] = append(nodesByNodeGroupId[nodeGroup.Id()], unregisteredNode)
}
}

return nodesByNodeGroupId, nil
}

func toNodes(unregisteredNodes []clusterstate.UnregisteredNode) []*apiv1.Node {
nodes := []*apiv1.Node{}
for _, n := range unregisteredNodes {
Expand Down
8 changes: 4 additions & 4 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2257,12 +2257,12 @@ func TestRemoveOldUnregisteredNodes(t *testing.T) {
}

// Nothing should be removed. The unregistered node is not old enough.
removed, err := autoscaler.removeOldUnregisteredNodes(unregisteredNodes, context, clusterState, now.Add(-50*time.Minute), fakeLogRecorder)
removed, err := autoscaler.removeOldUnregisteredNodes(unregisteredNodes, clusterState, now.Add(-50*time.Minute), fakeLogRecorder)
assert.NoError(t, err)
assert.False(t, removed)

// ng1_2 should be removed.
removed, err = autoscaler.removeOldUnregisteredNodes(unregisteredNodes, context, clusterState, now, fakeLogRecorder)
removed, err = autoscaler.removeOldUnregisteredNodes(unregisteredNodes, clusterState, now, fakeLogRecorder)
assert.NoError(t, err)
assert.True(t, removed)
deletedNode := core_utils.GetStringFromChan(deletedNodes)
Expand Down Expand Up @@ -2317,12 +2317,12 @@ func TestRemoveOldUnregisteredNodesAtomic(t *testing.T) {
}

// Nothing should be removed. The unregistered node is not old enough.
removed, err := autoscaler.removeOldUnregisteredNodes(unregisteredNodes, context, clusterState, now.Add(-50*time.Minute), fakeLogRecorder)
removed, err := autoscaler.removeOldUnregisteredNodes(unregisteredNodes, clusterState, now.Add(-50*time.Minute), fakeLogRecorder)
assert.NoError(t, err)
assert.False(t, removed)

// unregNode is long unregistered, so all of the nodes should be removed due to ZeroOrMaxNodeScaling option
removed, err = autoscaler.removeOldUnregisteredNodes(unregisteredNodes, context, clusterState, now, fakeLogRecorder)
removed, err = autoscaler.removeOldUnregisteredNodes(unregisteredNodes, clusterState, now, fakeLogRecorder)
assert.NoError(t, err)
assert.True(t, removed)
wantNames, deletedNames := []string{}, []string{}
Expand Down

0 comments on commit 1580315

Please sign in to comment.