From a0bf1082b5ab5a440910109741c911665f5ffc77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Wr=C3=B3blewski?= Date: Wed, 13 Nov 2024 16:22:34 +0000 Subject: [PATCH] Add flag to force remove long unregistered nodes --- .../config/autoscaling_options.go | 2 + cluster-autoscaler/core/static_autoscaler.go | 41 +-- .../core/static_autoscaler_test.go | 234 +++++++++--------- cluster-autoscaler/main.go | 2 + 4 files changed, 151 insertions(+), 128 deletions(-) diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index d3d6fdbfa3f0..aa902058c184 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -307,6 +307,8 @@ type AutoscalingOptions struct { CheckCapacityProvisioningRequestMaxBatchSize int // CheckCapacityProvisioningRequestBatchTimebox is the maximum time to spend processing a batch of provisioning requests CheckCapacityProvisioningRequestBatchTimebox time.Duration + // ForceDeleteLongUnregisteredNodes is used to enable/disable ignoring min size constraints during removal of long unregistered nodes + ForceDeleteLongUnregisteredNodes bool } // KubeClientOptions specify options for kube client diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 216a684ee304..b8417c85d63c 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -785,29 +785,38 @@ func (a *StaticAutoscaler) removeOldUnregisteredNodes(allUnregisteredNodes []clu nodeGroup := nodeGroups[nodeGroupId] klog.V(0).Infof("Removing %v unregistered nodes for node group %v", len(unregisteredNodesToDelete), nodeGroupId) - size, err := nodeGroup.TargetSize() - if err != nil { - klog.Warningf("Failed to get node group size; nodeGroup=%v; err=%v", nodeGroup.Id(), err) - continue - } - possibleToDelete := size - nodeGroup.MinSize() - if possibleToDelete <= 0 { - klog.Warningf("Node group %s min size reached, skipping removal of %v unregistered nodes", nodeGroupId, len(unregisteredNodesToDelete)) - continue - } - if len(unregisteredNodesToDelete) > possibleToDelete { - klog.Warningf("Capping node group %s unregistered node removal to %d nodes, removing all %d would exceed min size constaint", nodeGroupId, possibleToDelete, len(unregisteredNodesToDelete)) - unregisteredNodesToDelete = unregisteredNodesToDelete[:possibleToDelete] + if !a.ForceDeleteLongUnregisteredNodes { + size, err := nodeGroup.TargetSize() + if err != nil { + klog.Warningf("Failed to get node group size; nodeGroup=%v; err=%v", nodeGroup.Id(), err) + continue + } + possibleToDelete := size - nodeGroup.MinSize() + if possibleToDelete <= 0 { + klog.Warningf("Node group %s min size reached, skipping removal of %v unregistered nodes", nodeGroupId, len(unregisteredNodesToDelete)) + continue + } + if len(unregisteredNodesToDelete) > possibleToDelete { + klog.Warningf("Capping node group %s unregistered node removal to %d nodes, removing all %d would exceed min size constaint", nodeGroupId, possibleToDelete, len(unregisteredNodesToDelete)) + unregisteredNodesToDelete = unregisteredNodesToDelete[:possibleToDelete] + } } - nodesToDelete := toNodes(unregisteredNodesToDelete) - nodesToDelete, err = overrideNodesToDeleteForZeroOrMax(a.NodeGroupDefaults, nodeGroup, nodesToDelete) + nodesToDelete := toNodes(unregisteredNodesToDelete) + nodesToDelete, err := overrideNodesToDeleteForZeroOrMax(a.NodeGroupDefaults, nodeGroup, nodesToDelete) if err != nil { klog.Warningf("Failed to remove unregistered nodes from node group %s: %v", nodeGroupId, err) continue } - err = nodeGroup.DeleteNodes(nodesToDelete) + if a.ForceDeleteLongUnregisteredNodes { + err = nodeGroup.ForceDeleteNodes(nodesToDelete) + if err == cloudprovider.ErrNotImplemented { + err = nodeGroup.DeleteNodes(nodesToDelete) + } + } else { + err = nodeGroup.DeleteNodes(nodesToDelete) + } csr.InvalidateNodeInstancesCacheEntry(nodeGroup) if err != nil { klog.Warningf("Failed to remove %v unregistered nodes from node group %s: %v", len(nodesToDelete), nodeGroupId, err) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index fcff16331eef..45162841033e 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -876,134 +876,144 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { } func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { - readyNodeLister := kubernetes.NewTestNodeLister(nil) - allNodeLister := kubernetes.NewTestNodeLister(nil) - allPodListerMock := &podListerMock{} - podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{} - daemonSetListerMock := &daemonSetListerMock{} - onScaleUpMock := &onScaleUpMock{} - onScaleDownMock := &onScaleDownMock{} - deleteFinished := make(chan bool, 1) - - now := time.Now() - later := now.Add(1 * time.Minute) + for _, forceDeleteLongUnregisteredNodes := range []bool{false, true} { + t.Run(fmt.Sprintf("forceDeleteLongUnregisteredNodes=%v", forceDeleteLongUnregisteredNodes), func(t *testing.T) { + readyNodeLister := kubernetes.NewTestNodeLister(nil) + allNodeLister := kubernetes.NewTestNodeLister(nil) + allPodListerMock := &podListerMock{} + podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{} + daemonSetListerMock := &daemonSetListerMock{} + onScaleUpMock := &onScaleUpMock{} + onScaleDownMock := &onScaleDownMock{} + deleteFinished := make(chan bool, 1) + + now := time.Now() + later := now.Add(1 * time.Minute) + + n1 := BuildTestNode("n1", 1000, 1000) + SetNodeReadyState(n1, true, time.Now()) + n2 := BuildTestNode("n2", 1000, 1000) + SetNodeReadyState(n2, true, time.Now()) + + p1 := BuildTestPod("p1", 600, 100) + p1.Spec.NodeName = "n1" + p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) + + provider := testprovider.NewTestCloudProvider( + func(id string, delta int) error { + return onScaleUpMock.ScaleUp(id, delta) + }, func(id string, name string) error { + ret := onScaleDownMock.ScaleDown(id, name) + deleteFinished <- true + return ret + }) + provider.AddNodeGroup("ng1", 2, 10, 2) + provider.AddNode("ng1", n1) - n1 := BuildTestNode("n1", 1000, 1000) - SetNodeReadyState(n1, true, time.Now()) - n2 := BuildTestNode("n2", 1000, 1000) - SetNodeReadyState(n2, true, time.Now()) + // broken node, that will be just hanging out there during + // the test (it can't be removed since that would validate group min size) + brokenNode := BuildTestNode("broken", 1000, 1000) + provider.AddNode("ng1", brokenNode) - p1 := BuildTestPod("p1", 600, 100) - p1.Spec.NodeName = "n1" - p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) + ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup) + assert.NotNil(t, ng1) + assert.NotNil(t, provider) - provider := testprovider.NewTestCloudProvider( - func(id string, delta int) error { - return onScaleUpMock.ScaleUp(id, delta) - }, func(id string, name string) error { - ret := onScaleDownMock.ScaleDown(id, name) - deleteFinished <- true - return ret - }) - provider.AddNodeGroup("ng1", 2, 10, 2) - provider.AddNode("ng1", n1) + // Create context with mocked lister registry. + options := config.AutoscalingOptions{ + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + ScaleDownUnneededTime: time.Minute, + ScaleDownUnreadyTime: time.Minute, + ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, + }, + EstimatorName: estimator.BinpackingEstimatorName, + ScaleDownEnabled: true, + MaxNodesTotal: 10, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, + ForceDeleteLongUnregisteredNodes: forceDeleteLongUnregisteredNodes, + } + processorCallbacks := newStaticAutoscalerProcessorCallbacks() - // broken node, that will be just hanging out there during - // the test (it can't be removed since that would validate group min size) - brokenNode := BuildTestNode("broken", 1000, 1000) - provider.AddNode("ng1", brokenNode) + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) + assert.NoError(t, err) - ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup) - assert.NotNil(t, ng1) - assert.NotNil(t, provider) + setUpScaleDownActuator(&context, options) - // Create context with mocked lister registry. - options := config.AutoscalingOptions{ - NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ - ScaleDownUnneededTime: time.Minute, - ScaleDownUnreadyTime: time.Minute, - ScaleDownUtilizationThreshold: 0.5, - MaxNodeProvisionTime: 10 * time.Second, - }, - EstimatorName: estimator.BinpackingEstimatorName, - ScaleDownEnabled: true, - MaxNodesTotal: 10, - MaxCoresTotal: 10, - MaxMemoryTotal: 100000, - } - processorCallbacks := newStaticAutoscalerProcessorCallbacks() + listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock, + podDisruptionBudgetListerMock, daemonSetListerMock, + nil, nil, nil, nil) + context.ListerRegistry = listerRegistry - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) - assert.NoError(t, err) + clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ + OkTotalUnreadyCount: 1, + } + processors := processorstest.NewTestProcessors(&context) - setUpScaleDownActuator(&context, options) + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) + // broken node detected as unregistered - listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, - nil, nil, nil, nil) - context.ListerRegistry = listerRegistry + nodes := []*apiv1.Node{n1} + // nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker) + clusterState.UpdateNodes(nodes, nil, now) - clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ - OkTotalUnreadyCount: 1, - } - processors := processorstest.NewTestProcessors(&context) + // broken node failed to register in time + clusterState.UpdateNodes(nodes, nil, later) - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) - // broken node detected as unregistered + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) + suOrchestrator := orchestrator.New() + suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) - nodes := []*apiv1.Node{n1} - // nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker) - clusterState.UpdateNodes(nodes, nil, now) + autoscaler := &StaticAutoscaler{ + AutoscalingContext: &context, + clusterStateRegistry: clusterState, + lastScaleUpTime: time.Now(), + lastScaleDownFailTime: time.Now(), + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, + scaleUpOrchestrator: suOrchestrator, + processors: processors, + loopStartNotifier: loopstart.NewObserversList(nil), + processorCallbacks: processorCallbacks, + } - // broken node failed to register in time - clusterState.UpdateNodes(nodes, nil, later) + // If deletion of unregistered nodes is not forced, we need to simulate + // additional scale-up to respect min size constraints. + if !forceDeleteLongUnregisteredNodes { + // Scale up. + readyNodeLister.SetNodes(nodes) + allNodeLister.SetNodes(nodes) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once() + + err = autoscaler.RunOnce(later.Add(time.Hour)) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, allPodListerMock, + podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) + + nodes = append(nodes, n2) + provider.AddNode("ng1", n2) + ng1.SetTargetSize(3) + } - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) - suOrchestrator := orchestrator.New() - suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) + // Remove broken node + readyNodeLister.SetNodes(nodes) + allNodeLister.SetNodes(nodes) + allPodListerMock.On("List").Return([]*apiv1.Pod{}, nil).Twice() + onScaleDownMock.On("ScaleDown", "ng1", "broken").Return(nil).Once() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - autoscaler := &StaticAutoscaler{ - AutoscalingContext: &context, - clusterStateRegistry: clusterState, - lastScaleUpTime: time.Now(), - lastScaleDownFailTime: time.Now(), - scaleDownPlanner: sdPlanner, - scaleDownActuator: sdActuator, - scaleUpOrchestrator: suOrchestrator, - processors: processors, - loopStartNotifier: loopstart.NewObserversList(nil), - processorCallbacks: processorCallbacks, + err = autoscaler.RunOnce(later.Add(2 * time.Hour)) + waitForDeleteToFinish(t, deleteFinished) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, allPodListerMock, + podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) + }) } - - // Scale up. - readyNodeLister.SetNodes([]*apiv1.Node{n1}) - allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() - daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once() - - err = autoscaler.RunOnce(later.Add(time.Hour)) - assert.NoError(t, err) - mock.AssertExpectationsForObjects(t, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) - - // Remove broken node after going over min size - provider.AddNode("ng1", n2) - ng1.SetTargetSize(3) - - readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() - onScaleDownMock.On("ScaleDown", "ng1", "broken").Return(nil).Once() - daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - - err = autoscaler.RunOnce(later.Add(2 * time.Hour)) - waitForDeleteToFinish(t, deleteFinished) - assert.NoError(t, err) - mock.AssertExpectationsForObjects(t, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) } func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index d50cc0cebde4..cf3ca31ccbe5 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -279,6 +279,7 @@ var ( checkCapacityBatchProcessing = flag.Bool("check-capacity-batch-processing", false, "Whether to enable batch processing for check capacity requests.") checkCapacityProvisioningRequestMaxBatchSize = flag.Int("check-capacity-provisioning-request-max-batch-size", 10, "Maximum number of provisioning requests to process in a single batch.") checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.") + forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.") ) func isFlagPassed(name string) bool { @@ -457,6 +458,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { CheckCapacityBatchProcessing: *checkCapacityBatchProcessing, CheckCapacityProvisioningRequestMaxBatchSize: *checkCapacityProvisioningRequestMaxBatchSize, CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox, + ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes, } }