From 747e8a3f0ae384e455880334680e0454abe29ed1 Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Mon, 25 Jul 2022 14:12:13 +0000 Subject: [PATCH] Introduce NodeDeleterBatcher to ScaleDown actuator --- .../config/autoscaling_options.go | 2 + .../core/scaledown/actuation/actuator.go | 98 ++++--- .../core/scaledown/actuation/actuator_test.go | 262 ++++++++++++++++-- .../core/scaledown/actuation/delete.go | 66 ----- .../scaledown/actuation/delete_in_batch.go | 187 +++++++++++++ .../actuation/delete_in_batch_test.go | 169 +++++++++++ .../deletionbatcher/node_deletion_batcher.go | 27 ++ .../core/scaledown/legacy/legacy_test.go | 3 +- cluster-autoscaler/core/static_autoscaler.go | 3 +- .../core/static_autoscaler_test.go | 4 +- cluster-autoscaler/main.go | 16 +- 11 files changed, 680 insertions(+), 157 deletions(-) delete mode 100644 cluster-autoscaler/core/scaledown/actuation/delete.go create mode 100644 cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go create mode 100644 cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go create mode 100644 cluster-autoscaler/core/scaledown/deletionbatcher/node_deletion_batcher.go diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 34408a2ddf41..f2ee682158a5 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -192,4 +192,6 @@ type AutoscalingOptions struct { // MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold // is exceeded binpacking will be cut short and a partial scale-up will be performed. MaxNodeGroupBinpackingDuration time.Duration + // NodeBatchDeletionInterval is a time for how long CA ScaleDown gather nodes to delete them in batch. + NodeBatchDeletionInterval time.Duration } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index 7bc82f23c057..6fb176fe386a 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -27,14 +27,14 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletionbatcher" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" - "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" - "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + kube_record "k8s.io/client-go/tools/record" ) // Actuator is responsible for draining and deleting nodes. @@ -42,15 +42,17 @@ type Actuator struct { ctx *context.AutoscalingContext clusterState *clusterstate.ClusterStateRegistry nodeDeletionTracker *deletiontracker.NodeDeletionTracker + nodeDeletionBatcher deletionbatcher.NodeBatcherDeleter evictor Evictor } // NewActuator returns a new instance of Actuator. -func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker) *Actuator { +func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker, nbd deletionbatcher.NodeBatcherDeleter) *Actuator { return &Actuator{ ctx: ctx, clusterState: csr, nodeDeletionTracker: ndr, + nodeDeletionBatcher: nbd, evictor: NewDefaultEvictor(), } } @@ -152,17 +154,13 @@ func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNod } else { klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err) } - - go func(node *apiv1.Node) { - result := a.deleteNode(node, false) - if result.Err == nil { - a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", node.Name) - } else { - klog.Errorf("Scale-down: couldn't delete empty node, err: %v", err) - a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", result.Err) - _, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate) - } - }(emptyNode) + nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(emptyNode) + if err != nil || nodeGroup == nil { + klog.Errorf("Failed to find node group for %s: %v", emptyNode.Name, err) + continue + } + a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), emptyNode.Name) + go a.scheduleDeletion(emptyNode, false) } return scaledDownNodes, nil } @@ -197,17 +195,13 @@ func (a *Actuator) deleteAsyncDrain(drain []*apiv1.Node) (scaledDownNodes []*sta } else { klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err) } - - go func(node *apiv1.Node) { - result := a.deleteNode(node, true) - if result.Err == nil { - a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: node %s removed with drain", node.Name) - } else { - klog.Errorf("Scale-down: couldn't delete node %q with drain, err: %v", node.Name, result.Err) - a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain and delete node: %v", result.Err) - _, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate) - } - }(drainNode) + nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(drainNode) + if err != nil { + klog.Errorf("Failed to find node group for %s: %v", drainNode.Name, err) + continue + } + a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), drainNode.Name) + go a.scheduleDeletion(drainNode, true) } return scaledDownNodes } @@ -251,41 +245,47 @@ func (a *Actuator) taintNode(node *apiv1.Node) error { return nil } -// deleteNode performs the deletion of the provided node. If drain is true, the node is drained before being deleted. -func (a *Actuator) deleteNode(node *apiv1.Node, drain bool) (result status.NodeDeleteResult) { +func (a *Actuator) prepareNodeForDeletion(node *apiv1.Node, drain bool) status.NodeDeleteResult { nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node) if err != nil { - return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)} + return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "NodeGroupForNode returned error: %v %s", node.Name, err)} } if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name)} } - - defer func() { a.nodeDeletionTracker.EndDeletion(nodeGroup.Id(), node.Name, result) }() if drain { - a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), node.Name) if evictionResults, err := a.evictor.DrainNode(a.ctx, node); err != nil { return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: err, PodEvictionResults: evictionResults} } } else { - a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), node.Name) if err := a.evictor.EvictDaemonSetPods(a.ctx, node, time.Now()); err != nil { // Evicting DS pods is best-effort, so proceed with the deletion even if there are errors. klog.Warningf("Error while evicting DS pods from an empty node %q: %v", node.Name, err) } } - if err := WaitForDelayDeletion(node, a.ctx.ListerRegistry.AllNodeLister(), a.ctx.AutoscalingOptions.NodeDeletionDelayTimeout); err != nil { return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err} } + return status.NodeDeleteResult{ResultType: status.NodeDeleteOk} +} - if err := DeleteNodeFromCloudProvider(a.ctx, node, a.clusterState); err != nil { - return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err} +// scheduleDeletion schedule the deletion on of the provided node by adding a node to NodeDeletionBatcher. If drain is true, the node is drained before being deleted. +func (a *Actuator) scheduleDeletion(node *apiv1.Node, drain bool) { + status := a.prepareNodeForDeletion(node, drain) + if status.Err != nil { + RecordFailedScaleDownEvent(node, drain, a.ctx.Recorder, "prepareNodeForDelition failed", status.Err) + _, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate) + nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node) + if err != nil { + return + } + a.nodeDeletionTracker.EndDeletion(nodeGroup.Id(), node.Name, status) + return + } + err := a.nodeDeletionBatcher.AddNode(node, drain) + if err != nil { + klog.Errorf("Couldn't add node to nodeDeletionBatcher, err: %v", err) } - - metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(a.ctx.CloudProvider.GPULabel(), a.ctx.CloudProvider.GetAvailableGPUTypes(), node, nodeGroup), nodeScaleDownReason(node, drain)) - - return status.NodeDeleteResult{ResultType: status.NodeDeleteOk} } func min(x, y int) int { @@ -303,18 +303,14 @@ func joinPodNames(pods []*apiv1.Pod) string { return strings.Join(names, ",") } -func nodeScaleDownReason(node *apiv1.Node, drain bool) metrics.NodeScaleDownReason { - readiness, err := kubernetes.GetNodeReadiness(node) - if err != nil { - klog.Errorf("Couldn't determine node %q readiness while scaling down - assuming unready: %v", node.Name, err) - return metrics.Unready - } - if !readiness.Ready { - return metrics.Unready - } - // Node is ready. +// RecordFailedScaleDownEvent record failed scale down event and log an error. +func RecordFailedScaleDownEvent(node *apiv1.Node, drain bool, recorder kube_record.EventRecorder, errMsg string, statusErr error) { if drain { - return metrics.Underutilized + klog.Errorf("Scale-down: couldn't delete node %q with drain, %v, status error: %v", node.Name, errMsg, statusErr) + recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain and delete node: %v", statusErr) + + } else { + klog.Errorf("Scale-down: couldn't delete empty node, %v, status error: %v", errMsg, statusErr) + recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", statusErr) } - return metrics.Empty } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index 1c7b7bc0f50f..1135a54e203a 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -230,7 +230,8 @@ func TestCropNodesToBudgets(t *testing.T) { for i := 0; i < tc.drainDeletionsInProgress; i++ { ndr.StartDeletionWithDrain("ng2", fmt.Sprintf("drain-node-%d", i)) } - actuator := NewActuator(ctx, nil, ndr) + nbd := NewNodeBatcherDeleter(ctx, nil, ndr, 0*time.Second) + actuator := NewActuator(ctx, nil, ndr, nbd) gotEmpty, gotDrain := actuator.cropNodesToBudgets(tc.emptyNodes, tc.drainNodes) if diff := cmp.Diff(tc.wantEmpty, gotEmpty, cmpopts.EquateEmpty()); diff != "" { t.Errorf("cropNodesToBudgets empty nodes diff (-want +got):\n%s", diff) @@ -243,8 +244,10 @@ func TestCropNodesToBudgets(t *testing.T) { } func TestStartDeletion(t *testing.T) { - testNg := testprovider.NewTestNodeGroup("test-ng", 0, 100, 3, true, false, "n1-standard-2", nil, nil) toBeDeletedTaint := apiv1.Taint{Key: deletetaint.ToBeDeletedTaint, Effect: apiv1.TaintEffectNoSchedule} + testNg := testprovider.NewTestNodeGroup("test-ng", 0, 100, 3, true, false, "n1-standard-2", nil, nil) + emptyNodeNodeGroup := generateNodesAndNodeGroupMap(4, "empty") + drainNodeNodeGroup := generateNodesAndNodeGroupMap(4, "drain") for tn, tc := range map[string]struct { emptyNodes []*apiv1.Node @@ -274,13 +277,13 @@ func TestStartDeletion(t *testing.T) { ScaledDownNodes: []*status.ScaleDownNode{ { Node: generateNode("empty-node-0"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-0"], EvictedPods: nil, UtilInfo: generateUtilInfo(0, 0), }, { Node: generateNode("empty-node-1"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-1"], EvictedPods: nil, UtilInfo: generateUtilInfo(0, 0), }, @@ -311,13 +314,13 @@ func TestStartDeletion(t *testing.T) { ScaledDownNodes: []*status.ScaleDownNode{ { Node: generateNode("drain-node-0"), - NodeGroup: testNg, + NodeGroup: drainNodeNodeGroup["drain-node-0"], EvictedPods: generatePods(2, "drain-node-0"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, { Node: generateNode("drain-node-1"), - NodeGroup: testNg, + NodeGroup: drainNodeNodeGroup["drain-node-1"], EvictedPods: generatePods(2, "drain-node-1"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, @@ -350,25 +353,25 @@ func TestStartDeletion(t *testing.T) { ScaledDownNodes: []*status.ScaleDownNode{ { Node: generateNode("empty-node-0"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-0"], EvictedPods: nil, UtilInfo: generateUtilInfo(0, 0), }, { Node: generateNode("empty-node-1"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-1"], EvictedPods: nil, UtilInfo: generateUtilInfo(0, 0), }, { Node: generateNode("drain-node-0"), - NodeGroup: testNg, + NodeGroup: drainNodeNodeGroup["drain-node-0"], EvictedPods: generatePods(2, "drain-node-0"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, { Node: generateNode("drain-node-1"), - NodeGroup: testNg, + NodeGroup: drainNodeNodeGroup["drain-node-1"], EvictedPods: generatePods(2, "drain-node-1"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, @@ -409,13 +412,13 @@ func TestStartDeletion(t *testing.T) { ScaledDownNodes: []*status.ScaleDownNode{ { Node: generateNode("empty-node-0"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-0"], EvictedPods: nil, UtilInfo: generateUtilInfo(0, 0), }, { Node: generateNode("empty-node-1"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-1"], EvictedPods: nil, UtilInfo: generateUtilInfo(0, 0), }, @@ -451,13 +454,13 @@ func TestStartDeletion(t *testing.T) { ScaledDownNodes: []*status.ScaleDownNode{ { Node: generateNode("empty-node-0"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-0"], EvictedPods: nil, UtilInfo: generateUtilInfo(0, 0), }, { Node: generateNode("empty-node-1"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-1"], EvictedPods: nil, UtilInfo: generateUtilInfo(0, 0), }, @@ -504,25 +507,25 @@ func TestStartDeletion(t *testing.T) { ScaledDownNodes: []*status.ScaleDownNode{ { Node: generateNode("drain-node-0"), - NodeGroup: testNg, + NodeGroup: drainNodeNodeGroup["drain-node-0"], EvictedPods: generatePods(3, "drain-node-0"), UtilInfo: generateUtilInfo(3./8., 3./8.), }, { Node: generateNode("drain-node-1"), - NodeGroup: testNg, + NodeGroup: drainNodeNodeGroup["drain-node-1"], EvictedPods: generatePods(3, "drain-node-1"), UtilInfo: generateUtilInfo(3./8., 3./8.), }, { Node: generateNode("drain-node-2"), - NodeGroup: testNg, + NodeGroup: drainNodeNodeGroup["drain-node-2"], EvictedPods: generatePods(3, "drain-node-2"), UtilInfo: generateUtilInfo(3./8., 3./8.), }, { Node: generateNode("drain-node-3"), - NodeGroup: testNg, + NodeGroup: drainNodeNodeGroup["drain-node-3"], EvictedPods: generatePods(3, "drain-node-3"), UtilInfo: generateUtilInfo(3./8., 3./8.), }, @@ -590,25 +593,25 @@ func TestStartDeletion(t *testing.T) { ScaledDownNodes: []*status.ScaleDownNode{ { Node: generateNode("empty-node-0"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-0"], EvictedPods: nil, UtilInfo: generateUtilInfo(0, 0), }, { Node: generateNode("empty-node-1"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-1"], EvictedPods: nil, UtilInfo: generateUtilInfo(0, 0), }, { Node: generateNode("drain-node-0"), - NodeGroup: testNg, + NodeGroup: drainNodeNodeGroup["drain-node-0"], EvictedPods: generatePods(2, "drain-node-0"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, { Node: generateNode("drain-node-1"), - NodeGroup: testNg, + NodeGroup: drainNodeNodeGroup["drain-node-1"], EvictedPods: generatePods(2, "drain-node-1"), UtilInfo: generateUtilInfo(2./8., 2./8.), }, @@ -654,13 +657,13 @@ func TestStartDeletion(t *testing.T) { ScaledDownNodes: []*status.ScaleDownNode{ { Node: generateNode("empty-node-0"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-0"], EvictedPods: nil, UtilInfo: generateUtilInfo(2./8., 2./8.), }, { Node: generateNode("empty-node-1"), - NodeGroup: testNg, + NodeGroup: emptyNodeNodeGroup["empty-node-1"], EvictedPods: nil, UtilInfo: generateUtilInfo(2./8., 2./8.), }, @@ -726,12 +729,21 @@ func TestStartDeletion(t *testing.T) { tc := tc // Insert all nodes into a map to support live node updates and GETs. nodesByName := make(map[string]*apiv1.Node) + nodeGroups := make(map[string]*testprovider.TestNodeGroup) nodesLock := sync.Mutex{} for _, node := range tc.emptyNodes { nodesByName[node.Name] = node + ng, ok := emptyNodeNodeGroup[node.Name] + if ok { + nodeGroups[node.Name] = ng + } } for _, node := range tc.drainNodes { nodesByName[node.Name] = node + ng, ok := drainNodeNodeGroup[node.Name] + if ok { + nodeGroups[node.Name] = ng + } } // Set up a fake k8s client to hook and verify certain actions. @@ -806,10 +818,20 @@ func TestStartDeletion(t *testing.T) { deletedNodes <- node return nil }) - testNg.SetCloudProvider(provider) provider.InsertNodeGroup(testNg) + for _, ng := range emptyNodeNodeGroup { + provider.InsertNodeGroup(ng) + } + for _, ng := range drainNodeNodeGroup { + provider.InsertNodeGroup(ng) + } for _, node := range nodesByName { - provider.AddNode("test-ng", node) + nodeGroup, ok := nodeGroups[node.Name] + if !ok { + nodeGroup = testNg + } + nodeGroup.SetCloudProvider(provider) + provider.AddNode(nodeGroup.Id(), node) } // Set up other needed structures and options. @@ -843,9 +865,11 @@ func TestStartDeletion(t *testing.T) { } // Create Actuator, run StartDeletion, and verify the error. + ndt := deletiontracker.NewNodeDeletionTracker(0) actuator := Actuator{ - ctx: &ctx, clusterState: csr, nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0), - evictor: Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}, + ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt, + nodeDeletionBatcher: NewNodeBatcherDeleter(&ctx, csr, ndt, 0*time.Second), + evictor: Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}, } gotStatus, gotErr := actuator.StartDeletion(tc.emptyNodes, tc.drainNodes, time.Now()) if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { @@ -868,7 +892,7 @@ func TestStartDeletion(t *testing.T) { select { case deletedNode := <-deletedNodes: gotDeletedNodes = append(gotDeletedNodes, deletedNode) - case <-time.After(3 * time.Second): + case <-time.After(4 * time.Second): t.Errorf("Timeout while waiting for deleted nodes.") break nodesLoop } @@ -919,7 +943,7 @@ func TestStartDeletion(t *testing.T) { // in cloud provider we sync to above and so this will usually not wait at all. However, it can still happen // that there is a delay between cloud provider deletion and reporting, in which case the results are not there yet // and we need to wait for them before asserting. - err = waitForDeletionResultsCount(actuator.nodeDeletionTracker, len(tc.wantNodeDeleteResults), 3*time.Second, 200*time.Millisecond) + err = waitForDeletionResultsCount(actuator.nodeDeletionTracker, len(tc.wantNodeDeleteResults), 4*time.Second, 200*time.Millisecond) if err != nil { t.Errorf("Timeout while waiting for node deletion results") } @@ -937,6 +961,170 @@ func TestStartDeletion(t *testing.T) { } } +func TestStartDeletionInBatchBasic(t *testing.T) { + testNg1 := testprovider.NewTestNodeGroup("test-ng-1", 0, 100, 3, true, false, "n1-standard-2", nil, nil) + testNg2 := testprovider.NewTestNodeGroup("test-ng-2", 0, 100, 3, true, false, "n1-standard-2", nil, nil) + testNg3 := testprovider.NewTestNodeGroup("test-ng-3", 0, 100, 3, true, false, "n1-standard-2", nil, nil) + deleteInterval := 1 * time.Second + + for _, test := range []struct { + name string + numNodesToDelete map[*testprovider.TestNodeGroup][]int //per node group + failedRequests map[string]bool //per node group + wantSuccessfulDeletion map[string]int //per node group + }{ + { + name: "Succesfull deletion for all node group", + numNodesToDelete: map[*testprovider.TestNodeGroup][]int{ + testNg1: {4}, + testNg2: {5}, + testNg3: {1}, + }, + wantSuccessfulDeletion: map[string]int{ + "test-ng-1": 4, + "test-ng-2": 5, + "test-ng-3": 1, + }, + }, + { + name: "Node deletion failed for one group", + numNodesToDelete: map[*testprovider.TestNodeGroup][]int{ + testNg1: {4}, + testNg2: {5}, + testNg3: {1}, + }, + failedRequests: map[string]bool{ + "test-ng-1": true, + }, + wantSuccessfulDeletion: map[string]int{ + "test-ng-1": 0, + "test-ng-2": 5, + "test-ng-3": 1, + }, + }, + { + name: "Node deletion failed for one group two times", + numNodesToDelete: map[*testprovider.TestNodeGroup][]int{ + testNg1: {4, 3}, + testNg2: {5}, + testNg3: {1}, + }, + failedRequests: map[string]bool{ + "test-ng-1": true, + }, + wantSuccessfulDeletion: map[string]int{ + "test-ng-1": 0, + "test-ng-2": 5, + "test-ng-3": 1, + }, + }, + { + name: "Node deletion failed for all groups", + numNodesToDelete: map[*testprovider.TestNodeGroup][]int{ + testNg1: {4, 3}, + testNg2: {5}, + testNg3: {1}, + }, + failedRequests: map[string]bool{ + "test-ng-1": true, + "test-ng-2": true, + "test-ng-3": true, + }, + wantSuccessfulDeletion: map[string]int{ + "test-ng-1": 0, + "test-ng-2": 0, + "test-ng-3": 0, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + failedRequest := struct { + sync.Mutex + requests map[string]bool + }{requests: test.failedRequests} + gotFailedRequest := func(nodeGroupId string) bool { + failedRequest.Lock() + defer failedRequest.Unlock() + val, _ := failedRequest.requests[nodeGroupId] + return val + } + deletedResult := make(chan string) + fakeClient := &fake.Clientset{} + provider := testprovider.NewTestCloudProvider(nil, func(nodeGroupId string, node string) error { + if gotFailedRequest(nodeGroupId) { + return fmt.Errorf("SIMULATED ERROR: won't remove node") + } + deletedResult <- nodeGroupId + return nil + }) + // 2d array represent the waves of pushing nodes to delete. + deleteNodes := [][]*apiv1.Node{} + for ng, numNodes := range test.numNodesToDelete { + provider.InsertNodeGroup(ng) + ng.SetCloudProvider(provider) + for i, num := range numNodes { + nodes := generateNodes(num, ng.Id()) + if len(deleteNodes) <= i { + deleteNodes = append(deleteNodes, nodes) + } else { + deleteNodes[i] = append(deleteNodes[i], nodes...) + } + for _, node := range nodes { + provider.AddNode(ng.Id(), node) + } + } + } + opts := config.AutoscalingOptions{ + MaxScaleDownParallelism: 10, + MaxDrainParallelism: 5, + MaxPodEvictionTime: 0, + DaemonSetEvictionForEmptyNodes: true, + } + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil) + if err != nil { + t.Fatalf("Couldn't set up autoscaling context: %v", err) + } + csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff()) + ndt := deletiontracker.NewNodeDeletionTracker(0) + actuator := Actuator{ + ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt, + nodeDeletionBatcher: NewNodeBatcherDeleter(&ctx, csr, ndt, deleteInterval), + evictor: Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}, + } + for _, nodes := range deleteNodes { + actuator.StartDeletion(nodes, []*apiv1.Node{}, time.Now()) + time.Sleep(deleteInterval) + } + wantDeletedNodes := 0 + for _, num := range test.wantSuccessfulDeletion { + wantDeletedNodes += num + } + gotDeletedNodes := map[string]int{ + "test-ng-1": 0, + "test-ng-2": 0, + "test-ng-3": 0, + } + for i := 0; i < wantDeletedNodes; i++ { + select { + case ngId := <-deletedResult: + if value, ok := gotDeletedNodes[ngId]; ok { + gotDeletedNodes[ngId] = value + 1 + } else { + gotDeletedNodes[ngId] = 1 + } + case <-time.After(1 * time.Second): + t.Errorf("Timeout while waiting for deleted nodes.") + break + } + } + if diff := cmp.Diff(test.wantSuccessfulDeletion, gotDeletedNodes); diff != "" { + t.Errorf("Successful deleteions per node group diff (-want +got):\n%s", diff) + } + }) + } +} + func generateNodes(count int, prefix string) []*apiv1.Node { var result []*apiv1.Node for i := 0; i < count; i++ { @@ -949,6 +1137,20 @@ func generateNodes(count int, prefix string) []*apiv1.Node { return result } +func generateNodesAndNodeGroupMap(count int, prefix string) map[string]*testprovider.TestNodeGroup { + result := make(map[string]*testprovider.TestNodeGroup) + for i := 0; i < count; i++ { + name := fmt.Sprintf("node-%d", i) + ngName := fmt.Sprintf("test-ng-%v", i) + if prefix != "" { + name = prefix + "-" + name + ngName = prefix + "-" + ngName + } + result[name] = testprovider.NewTestNodeGroup(ngName, 0, 100, 3, true, false, "n1-standard-2", nil, nil) + } + return result +} + func generateNode(name string) *apiv1.Node { return &apiv1.Node{ ObjectMeta: metav1.ObjectMeta{Name: name}, diff --git a/cluster-autoscaler/core/scaledown/actuation/delete.go b/cluster-autoscaler/core/scaledown/actuation/delete.go deleted file mode 100644 index bf419b3aba41..000000000000 --- a/cluster-autoscaler/core/scaledown/actuation/delete.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -Copyright 2022 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package actuation - -import ( - "reflect" - "time" - - "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" - - apiv1 "k8s.io/api/core/v1" - - "k8s.io/autoscaler/cluster-autoscaler/clusterstate" - "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/utils/errors" -) - -const ( - // MaxKubernetesEmptyNodeDeletionTime is the maximum time needed by Kubernetes to delete an empty node. - MaxKubernetesEmptyNodeDeletionTime = 3 * time.Minute - // MaxCloudProviderNodeDeletionTime is the maximum time needed by cloud provider to delete a node. - MaxCloudProviderNodeDeletionTime = 5 * time.Minute -) - -// DeleteNodeFromCloudProvider removes the given node from cloud provider. No extra pre-deletion actions are executed on -// the Kubernetes side. If successful, the deletion is recorded in CSR, and an event is emitted on the node. -func DeleteNodeFromCloudProvider(ctx *context.AutoscalingContext, node *apiv1.Node, registry *clusterstate.ClusterStateRegistry) errors.AutoscalerError { - nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node) - if err != nil { - return errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err) - } - if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { - return errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name) - } - if err = nodeGroup.DeleteNodes([]*apiv1.Node{node}); err != nil { - return errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete %s: %v", node.Name, err) - } - ctx.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "node removed by cluster autoscaler") - registry.RegisterScaleDown(&clusterstate.ScaleDownRequest{ - NodeGroup: nodeGroup, - NodeName: node.Name, - Time: time.Now(), - ExpectedDeleteTime: time.Now().Add(MaxCloudProviderNodeDeletionTime), - }) - return nil -} - -// IsNodeBeingDeleted returns true iff a given node is being deleted. -func IsNodeBeingDeleted(node *apiv1.Node, timestamp time.Time) bool { - deleteTime, _ := deletetaint.GetToBeDeletedTime(node) - return deleteTime != nil && (timestamp.Sub(*deleteTime) < MaxCloudProviderNodeDeletionTime || timestamp.Sub(*deleteTime) < MaxKubernetesEmptyNodeDeletionTime) -} diff --git a/cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go new file mode 100644 index 000000000000..94495adf0675 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go @@ -0,0 +1,187 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package actuation + +import ( + "reflect" + "sync" + "time" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletionbatcher" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" + "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/klog/v2" + + apiv1 "k8s.io/api/core/v1" + + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" +) + +const ( + // MaxKubernetesEmptyNodeDeletionTime is the maximum time needed by Kubernetes to delete an empty node. + MaxKubernetesEmptyNodeDeletionTime = 3 * time.Minute + // MaxCloudProviderNodeDeletionTime is the maximum time needed by cloud provider to delete a node. + MaxCloudProviderNodeDeletionTime = 5 * time.Minute +) + +// NodeBatcherDeleterImpl implements NodeBatcherDeleter interface. +type NodeBatcherDeleterImpl struct { + sync.Mutex + ctx *context.AutoscalingContext + clusterState *clusterstate.ClusterStateRegistry + nodeDeletionTracker *deletiontracker.NodeDeletionTracker + deletionsPerNodeGroup map[string][]*apiv1.Node + deleteInterval time.Duration + drainedNodeDeletions map[string]bool +} + +// NewNodeBatcherDeleter return new NodeBatchDeleter +func NewNodeBatcherDeleter(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, deleteInterval time.Duration) deletionbatcher.NodeBatcherDeleter { + return &NodeBatcherDeleterImpl{ + ctx: ctx, + clusterState: csr, + nodeDeletionTracker: nodeDeletionTracker, + deletionsPerNodeGroup: make(map[string][]*apiv1.Node), + deleteInterval: deleteInterval, + drainedNodeDeletions: make(map[string]bool), + } +} + +// AddNode adds node to delete candidates and schedule deletion. +func (d *NodeBatcherDeleterImpl) AddNode(node *apiv1.Node, drain bool) error { + nodeGroupId, first, err := d.addNodeToBucket(node, drain) + if err != nil { + return nil + } + if first { + go func(nodeGroupId string) { + time.Sleep(d.deleteInterval) + d.remove(nodeGroupId) + }(nodeGroupId) + } + return nil +} + +// AddToBucket adds node to delete candidates and return if it's a first node in the group. +func (d *NodeBatcherDeleterImpl) addNodeToBucket(node *apiv1.Node, drain bool) (string, bool, error) { + d.Lock() + defer d.Unlock() + nodeGroup, err := d.ctx.CloudProvider.NodeGroupForNode(node) + if err != nil { + return "", false, err + } + d.drainedNodeDeletions[node.Name] = drain + val, ok := d.deletionsPerNodeGroup[nodeGroup.Id()] + if !ok || len(val) == 0 { + d.deletionsPerNodeGroup[nodeGroup.Id()] = []*apiv1.Node{node} + return nodeGroup.Id(), true, nil + } + d.deletionsPerNodeGroup[nodeGroup.Id()] = append(d.deletionsPerNodeGroup[nodeGroup.Id()], node) + return nodeGroup.Id(), false, nil +} + +// remove delete nodes of a given nodeGroup, if successful, the deletion is recorded in CSR, and an event is emitted on the node. +func (d *NodeBatcherDeleterImpl) remove(nodeGroupId string) error { + d.Lock() + defer d.Unlock() + nodes, ok := d.deletionsPerNodeGroup[nodeGroupId] + if !ok { + return nil + } + delete(d.deletionsPerNodeGroup, nodeGroupId) + + go func(nodes []*apiv1.Node, nodeGroupId string) { + var result status.NodeDeleteResult + nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, nodes) + if err != nil { + result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err} + } else { + result = status.NodeDeleteResult{ResultType: status.NodeDeleteOk} + } + for _, node := range nodes { + d.Lock() + drain := d.drainedNodeDeletions[node.Name] + if result.Err != nil { + RecordFailedScaleDownEvent(node, drain, d.ctx.Recorder, "", result.Err) + _, _ = deletetaint.CleanToBeDeleted(node, d.ctx.ClientSet, d.ctx.CordonNodeBeforeTerminate) + } else { + d.ctx.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "nodes removed by cluster autoscaler") + d.clusterState.RegisterScaleDown(&clusterstate.ScaleDownRequest{ + NodeGroup: nodeGroup, + NodeName: node.Name, + Time: time.Now(), + ExpectedDeleteTime: time.Now().Add(MaxCloudProviderNodeDeletionTime), + }) + metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(d.ctx.CloudProvider.GPULabel(), d.ctx.CloudProvider.GetAvailableGPUTypes(), node, nodeGroup), nodeScaleDownReason(node, drain)) + if drain { + d.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: node %s removed with drain", node.Name) + } else { + d.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", node.Name) + } + } + delete(d.drainedNodeDeletions, node.Name) + d.Unlock() + d.nodeDeletionTracker.EndDeletion(nodeGroupId, node.Name, result) + } + }(nodes, nodeGroupId) + return nil +} + +// deleteNodeFromCloudProvider removes the given nodes from cloud provider. No extra pre-deletion actions are executed on +// the Kubernetes side. +func deleteNodesFromCloudProvider(ctx *context.AutoscalingContext, nodes []*apiv1.Node) (cloudprovider.NodeGroup, error) { + nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(nodes[0]) + if err != nil { + return nil, errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", nodes[0].Name, err) + } + if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { + return nil, errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", nodes[0].Name) + } + if err = nodeGroup.DeleteNodes(nodes); err != nil { + return nil, errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete %s: %v", nodes[0].Name, err) + } + return nodeGroup, nil +} + +// IsNodeBeingDeleted returns true iff a given node is being deleted. +func IsNodeBeingDeleted(node *apiv1.Node, timestamp time.Time) bool { + deleteTime, _ := deletetaint.GetToBeDeletedTime(node) + return deleteTime != nil && (timestamp.Sub(*deleteTime) < MaxCloudProviderNodeDeletionTime || timestamp.Sub(*deleteTime) < MaxKubernetesEmptyNodeDeletionTime) +} + +func nodeScaleDownReason(node *apiv1.Node, drain bool) metrics.NodeScaleDownReason { + readiness, err := kubernetes.GetNodeReadiness(node) + if err != nil { + klog.Errorf("Couldn't determine node %q readiness while scaling down - assuming unready: %v", node.Name, err) + return metrics.Unready + } + if !readiness.Ready { + return metrics.Unready + } + // Node is ready. + if drain { + return metrics.Underutilized + } + return metrics.Empty +} diff --git a/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go new file mode 100644 index 000000000000..0dda206b5d76 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go @@ -0,0 +1,169 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package actuation + +import ( + "testing" + "time" + + apiv1 "k8s.io/api/core/v1" + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + clusterstate_utils "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + . "k8s.io/autoscaler/cluster-autoscaler/core/test" + "k8s.io/client-go/kubernetes/fake" + kube_record "k8s.io/client-go/tools/record" +) + +func TestAddNodeToBucket(t *testing.T) { + provider := testprovider.NewTestCloudProvider(nil, nil) + ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, nil, nil, provider, nil, nil) + if err != nil { + t.Fatalf("Couldn't set up autoscaling context: %v", err) + } + nodeGroup1 := "ng-1" + nodeGroup2 := "ng-2" + nodes1 := generateNodes(5, "ng-1") + nodes2 := generateNodes(5, "ng-2") + provider.AddNodeGroup(nodeGroup1, 1, 10, 5) + provider.AddNodeGroup(nodeGroup2, 1, 10, 5) + for _, node := range nodes1 { + provider.AddNode(nodeGroup1, node) + } + for _, node := range nodes2 { + provider.AddNode(nodeGroup2, node) + } + testcases := []struct { + name string + nodes []*apiv1.Node + wantBatches int + drained bool + }{ + { + name: "Add 1 node", + nodes: []*apiv1.Node{nodes1[0]}, + wantBatches: 1, + }, + { + name: "Add nodes that belong to one nodeGroup", + nodes: nodes1, + wantBatches: 1, + }, + { + name: "Add 3 nodes that belong to 2 nodeGroups", + nodes: []*apiv1.Node{nodes1[0], nodes2[0], nodes2[1]}, + wantBatches: 2, + }, + { + name: "Add 3 nodes that belong to 2 nodeGroups, all nodes are drained", + nodes: []*apiv1.Node{nodes1[0], nodes2[0], nodes2[1]}, + wantBatches: 2, + drained: true, + }, + } + for _, test := range testcases { + d := NodeBatcherDeleterImpl{ + ctx: &ctx, + clusterState: nil, + nodeDeletionTracker: nil, + deletionsPerNodeGroup: make(map[string][]*apiv1.Node), + drainedNodeDeletions: make(map[string]bool), + } + batchNames := 0 + for _, node := range test.nodes { + _, first, err := d.addNodeToBucket(node, test.drained) + if err != nil { + t.Errorf("addNodeToBucket return error %q when addidng node %v", err, node) + } + if first { + batchNames += 1 + } + } + if batchNames != test.wantBatches { + t.Errorf("Want %d batches, got %d batches", test.wantBatches, batchNames) + } + + } +} + +func TestCleanUp(t *testing.T) { + provider := testprovider.NewTestCloudProvider(nil, func(id, nodeName string) error { + return nil + }) + fakeClient := &fake.Clientset{} + fakeLogRecorder, _ := clusterstate_utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") + ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, nil, nil, provider, nil, nil) + + clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder, NewBackoff()) + if err != nil { + t.Fatalf("Couldn't set up autoscaling context: %v", err) + } + nodeGroup1 := "ng-1" + nodeGroup2 := "ng-2" + nodesDrained := generateNodes(5, "ng-1") + nodesEmpty := generateNodes(5, "ng-2") + provider.AddNodeGroup(nodeGroup1, 1, 10, 5) + provider.AddNodeGroup(nodeGroup2, 1, 10, 5) + for _, node := range nodesDrained { + provider.AddNode(nodeGroup1, node) + } + for _, node := range nodesEmpty { + provider.AddNode(nodeGroup2, node) + } + d := NodeBatcherDeleterImpl{ + ctx: &ctx, + clusterState: clusterStateRegistry, + nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(1 * time.Hour), + deletionsPerNodeGroup: make(map[string][]*apiv1.Node), + drainedNodeDeletions: make(map[string]bool), + } + for _, node := range nodesDrained { + _, _, err := d.addNodeToBucket(node, true) + if err != nil { + t.Errorf("addNodeToBucket return error %q when addidng node %v", err, node) + } + } + for _, node := range nodesEmpty { + _, _, err := d.addNodeToBucket(node, false) + if err != nil { + t.Errorf("addNodeToBucket return error %q when addidng node %v", err, node) + } + } + batches := len(d.deletionsPerNodeGroup) + + // CleanUp NodeGroup that is not present in bucket. + err = d.remove("Unknown") + if err != nil { + t.Errorf("CleanUp return error when removing node group that not present in backet") + } + time.Sleep(1 * time.Second) + if batches != len(d.deletionsPerNodeGroup) { + t.Error("Number of batches has changed after removing node group that not present in backet") + } + + // Regular CleanUp + err = d.remove(nodeGroup1) + if err != nil { + t.Errorf("CleanUp return error when removing node group %s", nodeGroup1) + } + time.Sleep(1 * time.Second) + if batches-len(d.deletionsPerNodeGroup) != 1 { + t.Errorf("Number of bathces hasn't decrease after CleanUp, want: %v, got: %v", batches-1, len(d.deletionsPerNodeGroup)) + } +} diff --git a/cluster-autoscaler/core/scaledown/deletionbatcher/node_deletion_batcher.go b/cluster-autoscaler/core/scaledown/deletionbatcher/node_deletion_batcher.go new file mode 100644 index 000000000000..c39c30bd6dae --- /dev/null +++ b/cluster-autoscaler/core/scaledown/deletionbatcher/node_deletion_batcher.go @@ -0,0 +1,27 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deletionbatcher + +import ( + apiv1 "k8s.io/api/core/v1" +) + +// NodeBatcherDeleter is gathering nodes and deleting them in bathch. +type NodeBatcherDeleter interface { + // AddNode adds node to delete candidates and schedule deletion. + AddNode(node *apiv1.Node, drain bool) error +} diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go index ad12a19005c8..d03b090130ca 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go @@ -1404,6 +1404,7 @@ func newWrapperForTesting(ctx *context.AutoscalingContext, clusterStateRegistry ndt = deletiontracker.NewNodeDeletionTracker(0 * time.Second) } sd := NewScaleDown(ctx, NewTestProcessors(), clusterStateRegistry, ndt) - actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt) + nbd := actuation.NewNodeBatcherDeleter(ctx, clusterStateRegistry, ndt, 0*time.Second) + actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, nbd) return NewScaleDownWrapper(sd, actuator) } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 84b09b65739f..396a2a974407 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -156,7 +156,8 @@ func NewStaticAutoscaler( ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) scaleDown := legacy.NewScaleDown(autoscalingContext, processors, clusterStateRegistry, ndt) - actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt) + nbd := actuation.NewNodeBatcherDeleter(autoscalingContext, clusterStateRegistry, ndt, opts.NodeBatchDeletionInterval) + actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, nbd) scaleDownWrapper := legacy.NewScaleDownWrapper(scaleDown, actuator) processorCallbacks.scaleDownPlanner = scaleDownWrapper diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index ad0011e1063c..0bd400b4e73a 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -680,6 +680,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { MaxCoresTotal: 10, MaxMemoryTotal: 100000, ExpendablePodsPriorityCutoff: 10, + NodeBatchDeletionInterval: 0 * time.Second, } processorCallbacks := newStaticAutoscalerProcessorCallbacks() @@ -1399,7 +1400,8 @@ func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContex ctx.MaxDrainParallelism = 1 ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) sd := legacy.NewScaleDown(ctx, p, cs, ndt) - actuator := actuation.NewActuator(ctx, cs, ndt) + nbd := actuation.NewNodeBatcherDeleter(ctx, cs, ndt, 0*time.Second) + actuator := actuation.NewActuator(ctx, cs, ndt, nbd) wrapper := legacy.NewScaleDownWrapper(sd, actuator) return wrapper, wrapper } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index a0489b55b862..89db703f6d73 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -124,13 +124,14 @@ var ( "for scale down when some candidates from previous iteration are no longer valid."+ "When calculating the pool size for additional candidates we take"+ "max(#nodes * scale-down-candidates-pool-ratio, scale-down-candidates-pool-min-count).") - nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.") - scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") - maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") - coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") - memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") - gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format ::. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.") - cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider, + nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.") + nodeBatchDeletionInterval = flag.Duration("node-deletion-in-batch-interval", 0*time.Second, "How long CA ScaleDown gather nodes to delete them in batch.") + scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") + maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") + coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") + memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") + gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format ::. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.") + cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider, "Cloud provider type. Available values: ["+strings.Join(cloudBuilder.AvailableCloudProviders, ",")+"]") maxBulkSoftTaintCount = flag.Int("max-bulk-soft-taint-count", 10, "Maximum number of nodes that can be tainted/untainted PreferNoSchedule at the same time. Set to 0 to turn off such tainting.") maxBulkSoftTaintTime = flag.Duration("max-bulk-soft-taint-time", 3*time.Second, "Maximum duration of tainting/untainting nodes as PreferNoSchedule at the same time.") @@ -293,6 +294,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { RecordDuplicatedEvents: *recordDuplicatedEvents, MaxNodesPerScaleUp: *maxNodesPerScaleUp, MaxNodeGroupBinpackingDuration: *maxNodeGroupBinpackingDuration, + NodeBatchDeletionInterval: *nodeBatchDeletionInterval, } }