From d5fd28c920c0d395e3e52d8a899b61b6a936774a Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Mon, 25 Jul 2022 14:12:13 +0000 Subject: [PATCH] Introduce NodeDeleterBatcher to ScaleDown actuator --- .../config/autoscaling_options.go | 2 + .../core/scaledown/actuation/actuator.go | 114 +++++---- .../core/scaledown/actuation/actuator_test.go | 186 ++++++++++++++- .../core/scaledown/actuation/delete.go | 66 ------ .../scaledown/actuation/delete_in_batch.go | 182 +++++++++++++++ .../actuation/delete_in_batch_test.go | 216 ++++++++++++++++++ .../core/scaledown/legacy/legacy_test.go | 2 +- cluster-autoscaler/core/static_autoscaler.go | 2 +- .../core/static_autoscaler_test.go | 3 +- cluster-autoscaler/main.go | 16 +- 10 files changed, 660 insertions(+), 129 deletions(-) delete mode 100644 cluster-autoscaler/core/scaledown/actuation/delete.go create mode 100644 cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go create mode 100644 cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 34408a2ddf41..3bc6b4e394c1 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -192,4 +192,6 @@ type AutoscalingOptions struct { // MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold // is exceeded binpacking will be cut short and a partial scale-up will be performed. MaxNodeGroupBinpackingDuration time.Duration + // NodeDeletionBatcherInterval is a time for how long CA ScaleDown gather nodes to delete them in batch. + NodeDeletionBatcherInterval time.Duration } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index 7bc82f23c057..136722618afb 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -24,6 +24,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/klog/v2" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" @@ -34,7 +35,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" - "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" ) // Actuator is responsible for draining and deleting nodes. @@ -42,15 +42,18 @@ type Actuator struct { ctx *context.AutoscalingContext clusterState *clusterstate.ClusterStateRegistry nodeDeletionTracker *deletiontracker.NodeDeletionTracker + nodeDeletionBatcher *NodeBatcherDeleter evictor Evictor } // NewActuator returns a new instance of Actuator. -func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker) *Actuator { +func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker, batchInterval time.Duration) *Actuator { + nbd := NewNodeBatcherDeleter(ctx, csr, ndr, batchInterval) return &Actuator{ ctx: ctx, clusterState: csr, nodeDeletionTracker: ndr, + nodeDeletionBatcher: nbd, evictor: NewDefaultEvictor(), } } @@ -141,7 +144,13 @@ func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNod klog.V(0).Infof("Scale-down: removing empty node %q", emptyNode.Name) a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %q", emptyNode.Name) - err := a.taintNode(emptyNode) + nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(emptyNode) + if err != nil || nodeGroup == nil { + klog.Errorf("Failed to find node group for %s: %v", emptyNode.Name, err) + continue + } + + err = a.taintNode(emptyNode) if err != nil { a.ctx.Recorder.Eventf(emptyNode, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err) return scaledDownNodes, errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", emptyNode.Name) @@ -152,17 +161,8 @@ func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNod } else { klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err) } - - go func(node *apiv1.Node) { - result := a.deleteNode(node, false) - if result.Err == nil { - a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", node.Name) - } else { - klog.Errorf("Scale-down: couldn't delete empty node, err: %v", err) - a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", result.Err) - _, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate) - } - }(emptyNode) + a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), emptyNode.Name) + go a.scheduleDeletion(emptyNode, nodeGroup.Id(), false) } return scaledDownNodes, nil } @@ -197,17 +197,13 @@ func (a *Actuator) deleteAsyncDrain(drain []*apiv1.Node) (scaledDownNodes []*sta } else { klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err) } - - go func(node *apiv1.Node) { - result := a.deleteNode(node, true) - if result.Err == nil { - a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: node %s removed with drain", node.Name) - } else { - klog.Errorf("Scale-down: couldn't delete node %q with drain, err: %v", node.Name, result.Err) - a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain and delete node: %v", result.Err) - _, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate) - } - }(drainNode) + nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(drainNode) + if err != nil { + klog.Errorf("Failed to find node group for %s: %v", drainNode.Name, err) + continue + } + a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), drainNode.Name) + go a.scheduleDeletion(drainNode, nodeGroup.Id(), true) } return scaledDownNodes } @@ -251,41 +247,43 @@ func (a *Actuator) taintNode(node *apiv1.Node) error { return nil } -// deleteNode performs the deletion of the provided node. If drain is true, the node is drained before being deleted. -func (a *Actuator) deleteNode(node *apiv1.Node, drain bool) (result status.NodeDeleteResult) { +func (a *Actuator) prepareNodeForDeletion(node *apiv1.Node, drain bool) status.NodeDeleteResult { nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node) if err != nil { - return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)} + return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.CloudProviderError, "NodeGroupForNode for %s returned error: %v", node.Name, err)} } if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name)} } - - defer func() { a.nodeDeletionTracker.EndDeletion(nodeGroup.Id(), node.Name, result) }() if drain { - a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), node.Name) if evictionResults, err := a.evictor.DrainNode(a.ctx, node); err != nil { return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: err, PodEvictionResults: evictionResults} } } else { - a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), node.Name) if err := a.evictor.EvictDaemonSetPods(a.ctx, node, time.Now()); err != nil { // Evicting DS pods is best-effort, so proceed with the deletion even if there are errors. klog.Warningf("Error while evicting DS pods from an empty node %q: %v", node.Name, err) } } - if err := WaitForDelayDeletion(node, a.ctx.ListerRegistry.AllNodeLister(), a.ctx.AutoscalingOptions.NodeDeletionDelayTimeout); err != nil { return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err} } + return status.NodeDeleteResult{ResultType: status.NodeDeleteOk} +} - if err := DeleteNodeFromCloudProvider(a.ctx, node, a.clusterState); err != nil { - return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err} +// scheduleDeletion schedule the deletion on of the provided node by adding a node to NodeDeletionBatcher. If drain is true, the node is drained before being deleted. +func (a *Actuator) scheduleDeletion(node *apiv1.Node, nodeGroupId string, drain bool) { + nodeDeleteResult := a.prepareNodeForDeletion(node, drain) + if nodeDeleteResult.Err != nil { + CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, nodeGroupId, drain, a.nodeDeletionTracker, "prepareNodeForDeletion failed", nodeDeleteResult) + return + } + err := a.nodeDeletionBatcher.AddNode(node, drain) + if err != nil { + klog.Errorf("Couldn't add node to nodeDeletionBatcher, err: %v", err) + nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "nodeDeletionBatcher.AddNode for %s returned error: %v", node.Name, err)} + CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, nodeGroupId, drain, a.nodeDeletionTracker, "failed add node to the nodeDeletionBatche", nodeDeleteResult) } - - metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(a.ctx.CloudProvider.GPULabel(), a.ctx.CloudProvider.GetAvailableGPUTypes(), node, nodeGroup), nodeScaleDownReason(node, drain)) - - return status.NodeDeleteResult{ResultType: status.NodeDeleteOk} } func min(x, y int) int { @@ -303,18 +301,34 @@ func joinPodNames(pods []*apiv1.Pod) string { return strings.Join(names, ",") } -func nodeScaleDownReason(node *apiv1.Node, drain bool) metrics.NodeScaleDownReason { - readiness, err := kubernetes.GetNodeReadiness(node) - if err != nil { - klog.Errorf("Couldn't determine node %q readiness while scaling down - assuming unready: %v", node.Name, err) - return metrics.Unready - } - if !readiness.Ready { - return metrics.Unready +// CleanUpAndRecordFailedScaleDownEvent record failed scale down event and log an error. +func CleanUpAndRecordFailedScaleDownEvent(ctx *context.AutoscalingContext, node *apiv1.Node, nodeGroupId string, drain bool, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, errMsg string, status status.NodeDeleteResult) { + if drain { + klog.Errorf("Scale-down: couldn't delete node %q with drain, %v, status error: %v", node.Name, errMsg, status.Err) + ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain and delete node: %v", status.Err) + + } else { + klog.Errorf("Scale-down: couldn't delete empty node, %v, status error: %v", errMsg, status.Err) + ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", status.Err) } - // Node is ready. + deletetaint.CleanToBeDeleted(node, ctx.ClientSet, ctx.CordonNodeBeforeTerminate) + nodeDeletionTracker.EndDeletion(nodeGroupId, node.Name, status) +} + +// RegisterAndRecordSuccessfulScaleDownEvent register scale down and record successful scale down event. +func RegisterAndRecordSuccessfulScaleDownEvent(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) { + ctx.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "nodes removed by cluster autoscaler") + csr.RegisterScaleDown(&clusterstate.ScaleDownRequest{ + NodeGroup: nodeGroup, + NodeName: node.Name, + Time: time.Now(), + ExpectedDeleteTime: time.Now().Add(MaxCloudProviderNodeDeletionTime), + }) + metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(ctx.CloudProvider.GPULabel(), ctx.CloudProvider.GetAvailableGPUTypes(), node, nodeGroup), nodeScaleDownReason(node, drain)) if drain { - return metrics.Underutilized + ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: node %s removed with drain", node.Name) + } else { + ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", node.Name) } - return metrics.Empty + nodeDeletionTracker.EndDeletion(nodeGroup.Id(), node.Name, status.NodeDeleteResult{ResultType: status.NodeDeleteOk}) } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index 1c7b7bc0f50f..c61deffed162 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -230,7 +230,7 @@ func TestCropNodesToBudgets(t *testing.T) { for i := 0; i < tc.drainDeletionsInProgress; i++ { ndr.StartDeletionWithDrain("ng2", fmt.Sprintf("drain-node-%d", i)) } - actuator := NewActuator(ctx, nil, ndr) + actuator := NewActuator(ctx, nil, ndr, 0*time.Second) gotEmpty, gotDrain := actuator.cropNodesToBudgets(tc.emptyNodes, tc.drainNodes) if diff := cmp.Diff(tc.wantEmpty, gotEmpty, cmpopts.EquateEmpty()); diff != "" { t.Errorf("cropNodesToBudgets empty nodes diff (-want +got):\n%s", diff) @@ -843,9 +843,11 @@ func TestStartDeletion(t *testing.T) { } // Create Actuator, run StartDeletion, and verify the error. + ndt := deletiontracker.NewNodeDeletionTracker(0) actuator := Actuator{ - ctx: &ctx, clusterState: csr, nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(0), - evictor: Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}, + ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt, + nodeDeletionBatcher: NewNodeBatcherDeleter(&ctx, csr, ndt, 0*time.Second), + evictor: Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}, } gotStatus, gotErr := actuator.StartDeletion(tc.emptyNodes, tc.drainNodes, time.Now()) if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { @@ -937,6 +939,170 @@ func TestStartDeletion(t *testing.T) { } } +func TestStartDeletionInBatchBasic(t *testing.T) { + testNg1 := testprovider.NewTestNodeGroup("test-ng-1", 0, 100, 3, true, false, "n1-standard-2", nil, nil) + testNg2 := testprovider.NewTestNodeGroup("test-ng-2", 0, 100, 3, true, false, "n1-standard-2", nil, nil) + testNg3 := testprovider.NewTestNodeGroup("test-ng-3", 0, 100, 3, true, false, "n1-standard-2", nil, nil) + deleteInterval := 1 * time.Second + + for _, test := range []struct { + name string + deleteCalls int + numNodesToDelete map[*testprovider.TestNodeGroup][]int //per node group and per call + failedRequests map[string]bool //per node group + wantSuccessfulDeletion map[string]int //per node group + }{ + { + name: "Succesfull deletion for all node group", + deleteCalls: 1, + numNodesToDelete: map[*testprovider.TestNodeGroup][]int{ + testNg1: {4}, + testNg2: {5}, + testNg3: {1}, + }, + wantSuccessfulDeletion: map[string]int{ + "test-ng-1": 4, + "test-ng-2": 5, + "test-ng-3": 1, + }, + }, + { + name: "Node deletion failed for one group", + deleteCalls: 1, + numNodesToDelete: map[*testprovider.TestNodeGroup][]int{ + testNg1: {4}, + testNg2: {5}, + testNg3: {1}, + }, + failedRequests: map[string]bool{ + "test-ng-1": true, + }, + wantSuccessfulDeletion: map[string]int{ + "test-ng-1": 0, + "test-ng-2": 5, + "test-ng-3": 1, + }, + }, + { + name: "Node deletion failed for one group two times", + deleteCalls: 2, + numNodesToDelete: map[*testprovider.TestNodeGroup][]int{ + testNg1: {4, 3}, + testNg2: {5}, + testNg3: {1}, + }, + failedRequests: map[string]bool{ + "test-ng-1": true, + }, + wantSuccessfulDeletion: map[string]int{ + "test-ng-1": 0, + "test-ng-2": 5, + "test-ng-3": 1, + }, + }, + { + name: "Node deletion failed for all groups", + deleteCalls: 2, + numNodesToDelete: map[*testprovider.TestNodeGroup][]int{ + testNg1: {4, 3}, + testNg2: {5}, + testNg3: {1}, + }, + failedRequests: map[string]bool{ + "test-ng-1": true, + "test-ng-2": true, + "test-ng-3": true, + }, + wantSuccessfulDeletion: map[string]int{ + "test-ng-1": 0, + "test-ng-2": 0, + "test-ng-3": 0, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + test := test + gotFailedRequest := func(nodeGroupId string) bool { + val, _ := test.failedRequests[nodeGroupId] + return val + } + deletedResult := make(chan string) + fakeClient := &fake.Clientset{} + provider := testprovider.NewTestCloudProvider(nil, func(nodeGroupId string, node string) error { + if gotFailedRequest(nodeGroupId) { + return fmt.Errorf("SIMULATED ERROR: won't remove node") + } + deletedResult <- nodeGroupId + return nil + }) + // 2d array represent the waves of pushing nodes to delete. + deleteNodes := [][]*apiv1.Node{} + + for i := 0; i < test.deleteCalls; i++ { + deleteNodes = append(deleteNodes, []*apiv1.Node{}) + } + for ng, numNodes := range test.numNodesToDelete { + provider.InsertNodeGroup(ng) + ng.SetCloudProvider(provider) + for i, num := range numNodes { + nodes := generateNodes(num, ng.Id()) + deleteNodes[i] = append(deleteNodes[i], nodes...) + for _, node := range nodes { + provider.AddNode(ng.Id(), node) + } + } + } + opts := config.AutoscalingOptions{ + MaxScaleDownParallelism: 10, + MaxDrainParallelism: 5, + MaxPodEvictionTime: 0, + DaemonSetEvictionForEmptyNodes: true, + } + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil) + if err != nil { + t.Fatalf("Couldn't set up autoscaling context: %v", err) + } + csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff()) + ndt := deletiontracker.NewNodeDeletionTracker(0) + actuator := Actuator{ + ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt, + nodeDeletionBatcher: NewNodeBatcherDeleter(&ctx, csr, ndt, deleteInterval), + evictor: Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}, + } + for _, nodes := range deleteNodes { + actuator.StartDeletion(nodes, []*apiv1.Node{}, time.Now()) + time.Sleep(deleteInterval) + } + wantDeletedNodes := 0 + for _, num := range test.wantSuccessfulDeletion { + wantDeletedNodes += num + } + gotDeletedNodes := map[string]int{ + "test-ng-1": 0, + "test-ng-2": 0, + "test-ng-3": 0, + } + for i := 0; i < wantDeletedNodes; i++ { + select { + case ngId := <-deletedResult: + if value, ok := gotDeletedNodes[ngId]; ok { + gotDeletedNodes[ngId] = value + 1 + } else { + gotDeletedNodes[ngId] = 1 + } + case <-time.After(1 * time.Second): + t.Errorf("Timeout while waiting for deleted nodes.") + break + } + } + if diff := cmp.Diff(test.wantSuccessfulDeletion, gotDeletedNodes); diff != "" { + t.Errorf("Successful deleteions per node group diff (-want +got):\n%s", diff) + } + }) + } +} + func generateNodes(count int, prefix string) []*apiv1.Node { var result []*apiv1.Node for i := 0; i < count; i++ { @@ -949,6 +1115,20 @@ func generateNodes(count int, prefix string) []*apiv1.Node { return result } +func generateNodesAndNodeGroupMap(count int, prefix string) map[string]*testprovider.TestNodeGroup { + result := make(map[string]*testprovider.TestNodeGroup) + for i := 0; i < count; i++ { + name := fmt.Sprintf("node-%d", i) + ngName := fmt.Sprintf("test-ng-%v", i) + if prefix != "" { + name = prefix + "-" + name + ngName = prefix + "-" + ngName + } + result[name] = testprovider.NewTestNodeGroup(ngName, 0, 100, 3, true, false, "n1-standard-2", nil, nil) + } + return result +} + func generateNode(name string) *apiv1.Node { return &apiv1.Node{ ObjectMeta: metav1.ObjectMeta{Name: name}, diff --git a/cluster-autoscaler/core/scaledown/actuation/delete.go b/cluster-autoscaler/core/scaledown/actuation/delete.go deleted file mode 100644 index bf419b3aba41..000000000000 --- a/cluster-autoscaler/core/scaledown/actuation/delete.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -Copyright 2022 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package actuation - -import ( - "reflect" - "time" - - "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" - - apiv1 "k8s.io/api/core/v1" - - "k8s.io/autoscaler/cluster-autoscaler/clusterstate" - "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/utils/errors" -) - -const ( - // MaxKubernetesEmptyNodeDeletionTime is the maximum time needed by Kubernetes to delete an empty node. - MaxKubernetesEmptyNodeDeletionTime = 3 * time.Minute - // MaxCloudProviderNodeDeletionTime is the maximum time needed by cloud provider to delete a node. - MaxCloudProviderNodeDeletionTime = 5 * time.Minute -) - -// DeleteNodeFromCloudProvider removes the given node from cloud provider. No extra pre-deletion actions are executed on -// the Kubernetes side. If successful, the deletion is recorded in CSR, and an event is emitted on the node. -func DeleteNodeFromCloudProvider(ctx *context.AutoscalingContext, node *apiv1.Node, registry *clusterstate.ClusterStateRegistry) errors.AutoscalerError { - nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node) - if err != nil { - return errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err) - } - if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { - return errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name) - } - if err = nodeGroup.DeleteNodes([]*apiv1.Node{node}); err != nil { - return errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete %s: %v", node.Name, err) - } - ctx.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "node removed by cluster autoscaler") - registry.RegisterScaleDown(&clusterstate.ScaleDownRequest{ - NodeGroup: nodeGroup, - NodeName: node.Name, - Time: time.Now(), - ExpectedDeleteTime: time.Now().Add(MaxCloudProviderNodeDeletionTime), - }) - return nil -} - -// IsNodeBeingDeleted returns true iff a given node is being deleted. -func IsNodeBeingDeleted(node *apiv1.Node, timestamp time.Time) bool { - deleteTime, _ := deletetaint.GetToBeDeletedTime(node) - return deleteTime != nil && (timestamp.Sub(*deleteTime) < MaxCloudProviderNodeDeletionTime || timestamp.Sub(*deleteTime) < MaxKubernetesEmptyNodeDeletionTime) -} diff --git a/cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go new file mode 100644 index 000000000000..e784866f7d71 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch.go @@ -0,0 +1,182 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package actuation + +import ( + "fmt" + "reflect" + "sync" + "time" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/klog/v2" + + apiv1 "k8s.io/api/core/v1" + + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" +) + +const ( + // MaxKubernetesEmptyNodeDeletionTime is the maximum time needed by Kubernetes to delete an empty node. + MaxKubernetesEmptyNodeDeletionTime = 3 * time.Minute + // MaxCloudProviderNodeDeletionTime is the maximum time needed by cloud provider to delete a node. + MaxCloudProviderNodeDeletionTime = 5 * time.Minute +) + +// NodeBatcherDeleter implements NodeBatcherDeleter interface. +type NodeBatcherDeleter struct { + sync.Mutex + ctx *context.AutoscalingContext + clusterState *clusterstate.ClusterStateRegistry + nodeDeletionTracker *deletiontracker.NodeDeletionTracker + deletionsPerNodeGroup map[string][]*apiv1.Node + deleteInterval time.Duration + drainedNodeDeletions map[string]bool +} + +// NewNodeBatcherDeleter return new NodeBatchDeleter +func NewNodeBatcherDeleter(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, deleteInterval time.Duration) *NodeBatcherDeleter { + return &NodeBatcherDeleter{ + ctx: ctx, + clusterState: csr, + nodeDeletionTracker: nodeDeletionTracker, + deletionsPerNodeGroup: make(map[string][]*apiv1.Node), + deleteInterval: deleteInterval, + drainedNodeDeletions: make(map[string]bool), + } +} + +// AddNode adds node to delete candidates and schedule deletion. +func (d *NodeBatcherDeleter) AddNode(node *apiv1.Node, drain bool) error { + // If delete interval is 0, than instantly start node deletion. + if d.deleteInterval == 0 { + nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, []*apiv1.Node{node}) + if err != nil { + result := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err} + CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroup.Id(), drain, d.nodeDeletionTracker, "", result) + } else { + RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker) + } + return nil + } + nodeGroupId, first, err := d.addNodeToBucket(node, drain) + if err != nil { + return err + } + if first { + go func(nodeGroupId string) { + time.Sleep(d.deleteInterval) + d.remove(nodeGroupId) + }(nodeGroupId) + } + return nil +} + +// AddToBucket adds node to delete candidates and return if it's a first node in the group. +func (d *NodeBatcherDeleter) addNodeToBucket(node *apiv1.Node, drain bool) (string, bool, error) { + d.Lock() + defer d.Unlock() + nodeGroup, err := d.ctx.CloudProvider.NodeGroupForNode(node) + if err != nil { + return "", false, err + } + d.drainedNodeDeletions[node.Name] = drain + val, ok := d.deletionsPerNodeGroup[nodeGroup.Id()] + if !ok || len(val) == 0 { + d.deletionsPerNodeGroup[nodeGroup.Id()] = []*apiv1.Node{node} + return nodeGroup.Id(), true, nil + } + d.deletionsPerNodeGroup[nodeGroup.Id()] = append(d.deletionsPerNodeGroup[nodeGroup.Id()], node) + return nodeGroup.Id(), false, nil +} + +// remove delete nodes of a given nodeGroup, if successful, the deletion is recorded in CSR, and an event is emitted on the node. +func (d *NodeBatcherDeleter) remove(nodeGroupId string) error { + d.Lock() + defer d.Unlock() + nodes, ok := d.deletionsPerNodeGroup[nodeGroupId] + if !ok { + return fmt.Errorf("Node Group %s is not present in the batch deleter", nodeGroupId) + } + delete(d.deletionsPerNodeGroup, nodeGroupId) + drainedNodeDeletions := make(map[string]bool) + for _, node := range nodes { + drainedNodeDeletions[node.Name] = d.drainedNodeDeletions[node.Name] + delete(d.drainedNodeDeletions, node.Name) + } + + go func(nodes []*apiv1.Node, drainedNodeDeletions map[string]bool) { + var result status.NodeDeleteResult + nodeGroup, err := deleteNodesFromCloudProvider(d.ctx, nodes) + for _, node := range nodes { + drain := drainedNodeDeletions[node.Name] + if err != nil { + result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err} + CleanUpAndRecordFailedScaleDownEvent(d.ctx, node, nodeGroup.Id(), drain, d.nodeDeletionTracker, "", result) + } else { + RegisterAndRecordSuccessfulScaleDownEvent(d.ctx, d.clusterState, node, nodeGroup, drain, d.nodeDeletionTracker) + } + + } + }(nodes, drainedNodeDeletions) + return nil +} + +// deleteNodeFromCloudProvider removes the given nodes from cloud provider. No extra pre-deletion actions are executed on +// the Kubernetes side. +func deleteNodesFromCloudProvider(ctx *context.AutoscalingContext, nodes []*apiv1.Node) (cloudprovider.NodeGroup, error) { + nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(nodes[0]) + if err != nil { + return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", nodes[0].Name, err) + } + if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { + return nodeGroup, errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", nodes[0].Name) + } + if err = nodeGroup.DeleteNodes(nodes); err != nil { + return nodeGroup, errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete %s: %v", nodes[0].Name, err) + } + return nodeGroup, nil +} + +func nodeScaleDownReason(node *apiv1.Node, drain bool) metrics.NodeScaleDownReason { + readiness, err := kubernetes.GetNodeReadiness(node) + if err != nil { + klog.Errorf("Couldn't determine node %q readiness while scaling down - assuming unready: %v", node.Name, err) + return metrics.Unready + } + if !readiness.Ready { + return metrics.Unready + } + // Node is ready. + if drain { + return metrics.Underutilized + } + return metrics.Empty +} + +// IsNodeBeingDeleted returns true iff a given node is being deleted. +func IsNodeBeingDeleted(node *apiv1.Node, timestamp time.Time) bool { + deleteTime, _ := deletetaint.GetToBeDeletedTime(node) + return deleteTime != nil && (timestamp.Sub(*deleteTime) < MaxCloudProviderNodeDeletionTime || timestamp.Sub(*deleteTime) < MaxKubernetesEmptyNodeDeletionTime) +} diff --git a/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go new file mode 100644 index 000000000000..03e886df2680 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/actuation/delete_in_batch_test.go @@ -0,0 +1,216 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package actuation + +import ( + "fmt" + "testing" + "time" + + apiv1 "k8s.io/api/core/v1" + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + clusterstate_utils "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + . "k8s.io/autoscaler/cluster-autoscaler/core/test" + "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" + "k8s.io/client-go/kubernetes/fake" + kube_record "k8s.io/client-go/tools/record" +) + +func TestAddNodeToBucket(t *testing.T) { + provider := testprovider.NewTestCloudProvider(nil, nil) + ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, nil, nil, provider, nil, nil) + if err != nil { + t.Fatalf("Couldn't set up autoscaling context: %v", err) + } + nodeGroup1 := "ng-1" + nodeGroup2 := "ng-2" + nodes1 := generateNodes(5, "ng-1") + nodes2 := generateNodes(5, "ng-2") + provider.AddNodeGroup(nodeGroup1, 1, 10, 5) + provider.AddNodeGroup(nodeGroup2, 1, 10, 5) + for _, node := range nodes1 { + provider.AddNode(nodeGroup1, node) + } + for _, node := range nodes2 { + provider.AddNode(nodeGroup2, node) + } + testcases := []struct { + name string + nodes []*apiv1.Node + wantBatches int + drained bool + }{ + { + name: "Add 1 node", + nodes: []*apiv1.Node{nodes1[0]}, + wantBatches: 1, + }, + { + name: "Add nodes that belong to one nodeGroup", + nodes: nodes1, + wantBatches: 1, + }, + { + name: "Add 3 nodes that belong to 2 nodeGroups", + nodes: []*apiv1.Node{nodes1[0], nodes2[0], nodes2[1]}, + wantBatches: 2, + }, + { + name: "Add 3 nodes that belong to 2 nodeGroups, all nodes are drained", + nodes: []*apiv1.Node{nodes1[0], nodes2[0], nodes2[1]}, + wantBatches: 2, + drained: true, + }, + } + for _, test := range testcases { + d := NodeBatcherDeleter{ + ctx: &ctx, + clusterState: nil, + nodeDeletionTracker: nil, + deletionsPerNodeGroup: make(map[string][]*apiv1.Node), + drainedNodeDeletions: make(map[string]bool), + } + batchCount := 0 + for _, node := range test.nodes { + _, first, err := d.addNodeToBucket(node, test.drained) + if err != nil { + t.Errorf("addNodeToBucket return error %q when addidng node %v", err, node) + } + if first { + batchCount += 1 + } + } + if batchCount != test.wantBatches { + t.Errorf("Want %d batches, got %d batches", test.wantBatches, batchCount) + } + + } +} + +func TestRemove(t *testing.T) { + testCases := []struct { + name string + err bool + numNodes int + failedDeletion int + addNgToBucket bool + }{ + { + name: "Remove NodeGroup that is not present in bucket", + err: true, + addNgToBucket: false, + }, + { + name: "Regular successful remove", + err: false, + numNodes: 5, + failedDeletion: 0, + addNgToBucket: true, + }, + { + name: "Unsuccessful remove", + err: true, + numNodes: 5, + failedDeletion: 1, + addNgToBucket: true, + }, + } + for _, test := range testCases { + + fakeClient := &fake.Clientset{} + fakeLogRecorder, _ := clusterstate_utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") + + failedNodeDeletion := make(map[string]bool) + deletedNodes := make(chan string, 10) + nodDeletedNodes := make(chan string, 10) + // Hook node deletion at the level of cloud provider, to gather which nodes were deleted, and to fail the deletion for + // certain nodes to simulate errors. + provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error { + if failedNodeDeletion[node] { + nodDeletedNodes <- node + return fmt.Errorf("SIMULATED ERROR: won't remove node") + } + deletedNodes <- node + return nil + }) + + ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, nil, nil, provider, nil, nil) + clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder, NewBackoff()) + if err != nil { + t.Fatalf("Couldn't set up autoscaling context: %v", err) + } + + ng := "ng" + provider.AddNodeGroup(ng, 1, 10, test.numNodes) + + d := NodeBatcherDeleter{ + ctx: &ctx, + clusterState: clusterStateRegistry, + nodeDeletionTracker: deletiontracker.NewNodeDeletionTracker(1 * time.Minute), + deletionsPerNodeGroup: make(map[string][]*apiv1.Node), + drainedNodeDeletions: make(map[string]bool), + } + nodes := generateNodes(test.numNodes, ng) + failedDeletion := test.failedDeletion + for _, node := range nodes { + if failedDeletion > 0 { + failedNodeDeletion[node.Name] = true + failedDeletion -= 1 + } + provider.AddNode(ng, node) + } + if test.addNgToBucket { + for _, node := range nodes { + node.Spec.Taints = append(node.Spec.Taints, apiv1.Taint{ + Key: deletetaint.ToBeDeletedTaint, + Effect: apiv1.TaintEffectNoSchedule, + }) + _, _, err := d.addNodeToBucket(node, true) + if err != nil { + t.Errorf("%s: addNodeToBucket return error %q when addidng node %v", test.name, err, node) + } + } + } + + err = d.remove(ng) + if test.err { + if err == nil { + t.Errorf("%s: remove() should return error, but return nil", test.name) + } + return + } + if err != nil { + t.Errorf("%s: remove() return error, but shouldn't", test.name) + } + gotDeletedNodes := []string{} + for i := 0; i < test.numNodes; i++ { + select { + case deletedNode := <-deletedNodes: + gotDeletedNodes = append(gotDeletedNodes, deletedNode) + case <-time.After(4 * time.Second): + t.Errorf("%s: Timeout while waiting for deleted nodes.", test.name) + return + } + } + if len(d.deletionsPerNodeGroup) > 0 { + t.Errorf("%s: Number of bathces hasn't reach 0 after remove(), got: %v", test.name, len(d.deletionsPerNodeGroup)) + } + } +} diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go index ad12a19005c8..3356f4cc111f 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go @@ -1404,6 +1404,6 @@ func newWrapperForTesting(ctx *context.AutoscalingContext, clusterStateRegistry ndt = deletiontracker.NewNodeDeletionTracker(0 * time.Second) } sd := NewScaleDown(ctx, NewTestProcessors(), clusterStateRegistry, ndt) - actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt) + actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, 0*time.Second) return NewScaleDownWrapper(sd, actuator) } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 84b09b65739f..9c787d0d5e5c 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -156,7 +156,7 @@ func NewStaticAutoscaler( ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) scaleDown := legacy.NewScaleDown(autoscalingContext, processors, clusterStateRegistry, ndt) - actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt) + actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, opts.NodeDeletionBatcherInterval) scaleDownWrapper := legacy.NewScaleDownWrapper(scaleDown, actuator) processorCallbacks.scaleDownPlanner = scaleDownWrapper diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index ad0011e1063c..1774691759e6 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -680,6 +680,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { MaxCoresTotal: 10, MaxMemoryTotal: 100000, ExpendablePodsPriorityCutoff: 10, + NodeDeletionBatcherInterval: 0 * time.Second, } processorCallbacks := newStaticAutoscalerProcessorCallbacks() @@ -1399,7 +1400,7 @@ func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContex ctx.MaxDrainParallelism = 1 ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) sd := legacy.NewScaleDown(ctx, p, cs, ndt) - actuator := actuation.NewActuator(ctx, cs, ndt) + actuator := actuation.NewActuator(ctx, cs, ndt, 0*time.Second) wrapper := legacy.NewScaleDownWrapper(sd, actuator) return wrapper, wrapper } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index a0489b55b862..9b508bbe0646 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -124,13 +124,14 @@ var ( "for scale down when some candidates from previous iteration are no longer valid."+ "When calculating the pool size for additional candidates we take"+ "max(#nodes * scale-down-candidates-pool-ratio, scale-down-candidates-pool-min-count).") - nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.") - scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") - maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") - coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") - memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") - gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format ::. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.") - cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider, + nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.") + nodeDeletionBatcherInterval = flag.Duration("node-deletion-batcher-interval", 0*time.Second, "How long CA ScaleDown gather nodes to delete them in batch.") + scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") + maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") + coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") + memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format :. Cluster autoscaler will not scale the cluster beyond these numbers.") + gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format ::. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.") + cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider, "Cloud provider type. Available values: ["+strings.Join(cloudBuilder.AvailableCloudProviders, ",")+"]") maxBulkSoftTaintCount = flag.Int("max-bulk-soft-taint-count", 10, "Maximum number of nodes that can be tainted/untainted PreferNoSchedule at the same time. Set to 0 to turn off such tainting.") maxBulkSoftTaintTime = flag.Duration("max-bulk-soft-taint-time", 3*time.Second, "Maximum duration of tainting/untainting nodes as PreferNoSchedule at the same time.") @@ -293,6 +294,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { RecordDuplicatedEvents: *recordDuplicatedEvents, MaxNodesPerScaleUp: *maxNodesPerScaleUp, MaxNodeGroupBinpackingDuration: *maxNodeGroupBinpackingDuration, + NodeDeletionBatcherInterval: *nodeDeletionBatcherInterval, } }