From 0c64e0932ac50ab6c820fb268624b32e2662d96a Mon Sep 17 00:00:00 2001 From: Jacek Kaniuk Date: Fri, 11 Jan 2019 13:58:14 +0100 Subject: [PATCH] Tainting unneeded nodes as PreferNoSchedule --- .../config/autoscaling_options.go | 5 + cluster-autoscaler/core/scale_down.go | 62 +++- cluster-autoscaler/core/scale_down_test.go | 280 ++++++++++++++++-- cluster-autoscaler/core/static_autoscaler.go | 11 +- .../core/static_autoscaler_test.go | 3 +- cluster-autoscaler/core/utils.go | 2 + cluster-autoscaler/main.go | 4 + cluster-autoscaler/metrics/metrics.go | 1 + .../utils/deletetaint/delete.go | 114 +++++-- .../utils/deletetaint/delete_test.go | 152 +++++++++- 10 files changed, 565 insertions(+), 69 deletions(-) diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 61efa16c003..1a31f595b88 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -121,4 +121,9 @@ type AutoscalingOptions struct { Regional bool // Pods newer than this will not be considered as unschedulable for scale-up. NewPodScaleUpDelay time.Duration + // MaxBulkSoftTaint sets the maximum number of nodes that can be (un)tainted PreferNoSchedule during single scaling down run. + // Value of 0 turns turn off such tainting. + MaxBulkSoftTaintCount int + // MaxBulkSoftTaintTime sets the maximum duration of single run of PreferNoSchedule tainting. + MaxBulkSoftTaintTime time.Duration } diff --git a/cluster-autoscaler/core/scale_down.go b/cluster-autoscaler/core/scale_down.go index 467cfa31f69..196d2e7fd68 100644 --- a/cluster-autoscaler/core/scale_down.go +++ b/cluster-autoscaler/core/scale_down.go @@ -76,6 +76,9 @@ type NodeDeleteStatus struct { nodeDeleteResults map[string]error } +// Get current time. Proxy for unit tests. +var now func() time.Time = time.Now + // IsDeleteInProgress returns true if a node is being deleted. func (n *NodeDeleteStatus) IsDeleteInProgress() bool { n.Lock() @@ -572,6 +575,50 @@ func (sd *ScaleDown) mapNodesToStatusScaleDownNodes(nodes []*apiv1.Node, nodeGro return result } +// SoftTaintUnneededNodes manage soft taints of unneeded nodes. +func (sd *ScaleDown) SoftTaintUnneededNodes(allNodes []*apiv1.Node) (errors []error) { + defer metrics.UpdateDurationFromStart(metrics.ScaleDownSoftTaintUnneeded, time.Now()) + apiCallBudget := sd.context.AutoscalingOptions.MaxBulkSoftTaintCount + timeBudget := sd.context.AutoscalingOptions.MaxBulkSoftTaintTime + skippedNodes := 0 + startTime := now() + for _, node := range allNodes { + if deletetaint.HasToBeDeletedTaint(node) { + // Do not consider nodes that are scheduled to be deleted + continue + } + alreadyTainted := deletetaint.HasDeletionCandidateTaint(node) + _, unneeded := sd.unneededNodes[node.Name] + + // Check if expected taints match existing taints + if unneeded != alreadyTainted { + if apiCallBudget <= 0 || now().Sub(startTime) >= timeBudget { + skippedNodes++ + continue + } + apiCallBudget-- + if unneeded && !alreadyTainted { + err := deletetaint.MarkDeletionCandidate(node, sd.context.ClientSet) + if err != nil { + errors = append(errors, err) + klog.Warningf("Soft taint on %s adding error %v", node.Name, err) + } + } + if !unneeded && alreadyTainted { + _, err := deletetaint.CleanDeletionCandidate(node, sd.context.ClientSet) + if err != nil { + errors = append(errors, err) + klog.Warningf("Soft taint on %s removal error %v", node.Name, err) + } + } + } + } + if skippedNodes > 0 { + klog.V(4).Infof("Skipped adding/removing soft taints on %v nodes - API call limit exceeded", skippedNodes) + } + return +} + // TryToScaleDown tries to scale down the cluster. It returns a result inside a ScaleDownStatus indicating if any node was // removed and error if such occurred. func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError) { @@ -1011,21 +1058,6 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface errors.TransientError, "Failed to drain node %s/%s: pods remaining after timeout", node.Namespace, node.Name) } -// cleanToBeDeleted cleans ToBeDeleted taints. -func cleanToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder) { - for _, node := range nodes { - cleaned, err := deletetaint.CleanToBeDeleted(node, client) - if err != nil { - klog.Warningf("Error while releasing taints on node %v: %v", node.Name, err) - recorder.Eventf(node, apiv1.EventTypeWarning, "ClusterAutoscalerCleanup", - "failed to clean toBeDeletedTaint: %v", err) - } else if cleaned { - klog.V(1).Infof("Successfully released toBeDeletedTaint on node %v", node.Name) - recorder.Eventf(node, apiv1.EventTypeNormal, "ClusterAutoscalerCleanup", "marking the node as schedulable") - } - } -} - // Removes the given node from cloud provider. No extra pre-deletion actions are executed on // the Kubernetes side. func deleteNodeFromCloudProvider(node *apiv1.Node, cloudProvider cloudprovider.CloudProvider, diff --git a/cluster-autoscaler/core/scale_down_test.go b/cluster-autoscaler/core/scale_down_test.go index 8aceb99f4d9..da770d29856 100644 --- a/cluster-autoscaler/core/scale_down_test.go +++ b/cluster-autoscaler/core/scale_down_test.go @@ -1182,39 +1182,16 @@ func getStringFromChanImmediately(c chan string) string { } } -func TestCleanToBeDeleted(t *testing.T) { - n1 := BuildTestNode("n1", 1000, 10) - n2 := BuildTestNode("n2", 1000, 10) - n2.Spec.Taints = []apiv1.Taint{{Key: deletetaint.ToBeDeletedTaint, Value: strconv.FormatInt(time.Now().Unix()-301, 10)}} - - fakeClient := &fake.Clientset{} - fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { - getAction := action.(core.GetAction) - switch getAction.GetName() { - case n1.Name: - return true, n1, nil - case n2.Name: - return true, n2, nil - } - return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName()) - }) - fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { - update := action.(core.UpdateAction) - obj := update.GetObject().(*apiv1.Node) - switch obj.Name { - case n1.Name: - n1 = obj - case n2.Name: - n2 = obj +func getCountOfChan(c chan string) int { + count := 0 + for { + select { + case <-c: + count++ + default: + return count } - return true, obj, nil - }) - fakeRecorder := kube_util.CreateEventRecorder(fakeClient) - - cleanToBeDeleted([]*apiv1.Node{n1, n2}, fakeClient, fakeRecorder) - - assert.Equal(t, 0, len(n1.Spec.Taints)) - assert.Equal(t, 0, len(n2.Spec.Taints)) + } } func TestCalculateCoresAndMemoryTotal(t *testing.T) { @@ -1339,3 +1316,242 @@ func TestCheckScaleDownDeltaWithinLimits(t *testing.T) { } } } + +func TestSoftTaint(t *testing.T) { + updatedNodes := make(chan string, 10) + deletedNodes := make(chan string, 10) + taintedNodes := make(chan string, 10) + fakeClient := &fake.Clientset{} + + job := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + SelfLink: "/apivs/batch/v1/namespaces/default/jobs/job", + }, + } + n1000 := BuildTestNode("n1000", 1000, 1000) + SetNodeReadyState(n1000, true, time.Time{}) + n2000 := BuildTestNode("n2000", 2000, 1000) + SetNodeReadyState(n2000, true, time.Time{}) + + p500 := BuildTestPod("p500", 500, 0) + p700 := BuildTestPod("p700", 700, 0) + p1200 := BuildTestPod("p1200", 1200, 0) + p500.Spec.NodeName = "n2000" + p700.Spec.NodeName = "n1000" + p1200.Spec.NodeName = "n2000" + + fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.GetAction) + switch getAction.GetName() { + case n1000.Name: + return true, n1000, nil + case n2000.Name: + return true, n2000, nil + } + return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName()) + }) + fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { + update := action.(core.UpdateAction) + obj := update.GetObject().(*apiv1.Node) + if deletetaint.HasDeletionCandidateTaint(obj) { + taintedNodes <- obj.Name + } + updatedNodes <- obj.Name + return true, obj, nil + }) + + provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error { + deletedNodes <- node + return nil + }) + provider.AddNodeGroup("ng1", 1, 10, 2) + provider.AddNode("ng1", n1000) + provider.AddNode("ng1", n2000) + assert.NotNil(t, provider) + + options := config.AutoscalingOptions{ + ScaleDownUtilizationThreshold: 0.5, + ScaleDownUnneededTime: 10 * time.Minute, + MaxGracefulTerminationSec: 60, + MaxBulkSoftTaintCount: 1, + MaxBulkSoftTaintTime: 3 * time.Second, + } + jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job}) + assert.NoError(t, err) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil) + + context := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider) + + clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) + scaleDown := NewScaleDown(&context, clusterStateRegistry) + + // Test no superfluous nodes + scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000}, + []*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil) + errors := scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) + assert.Empty(t, errors) + assert.Equal(t, 0, getCountOfChan(deletedNodes)) + assert.Equal(t, 0, getCountOfChan(updatedNodes)) + assert.Equal(t, 0, getCountOfChan(taintedNodes)) + assert.False(t, deletetaint.HasDeletionCandidateTaint(n1000)) + assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000)) + + // Test one unneeded node + scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000}, + []*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p1200}, time.Now().Add(-5*time.Minute), nil) + errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) + assert.Empty(t, errors) + assert.Equal(t, 0, getCountOfChan(deletedNodes)) + assert.Equal(t, n1000.Name, getStringFromChanImmediately(updatedNodes)) + assert.Equal(t, n1000.Name, getStringFromChanImmediately(taintedNodes)) + assert.True(t, deletetaint.HasDeletionCandidateTaint(n1000)) + assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000)) + + // Test remove soft taint + scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000}, + []*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil) + errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) + assert.Empty(t, errors) + assert.Equal(t, n1000.Name, getStringFromChanImmediately(updatedNodes)) + assert.Equal(t, 0, getCountOfChan(taintedNodes)) + assert.False(t, deletetaint.HasDeletionCandidateTaint(n1000)) + assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000)) + + // Test bulk update taint limit + scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000}, + []*apiv1.Node{n1000, n2000}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil) + errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) + assert.Empty(t, errors) + assert.Equal(t, 1, getCountOfChan(updatedNodes)) + assert.Equal(t, 1, getCountOfChan(taintedNodes)) + errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) + assert.Empty(t, errors) + assert.Equal(t, 1, getCountOfChan(updatedNodes)) + assert.Equal(t, 1, getCountOfChan(taintedNodes)) + assert.True(t, deletetaint.HasDeletionCandidateTaint(n1000)) + assert.True(t, deletetaint.HasDeletionCandidateTaint(n2000)) + errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) + assert.Empty(t, errors) + assert.Equal(t, 0, getCountOfChan(updatedNodes)) + assert.Equal(t, 0, getCountOfChan(taintedNodes)) + + // Test bulk update untaint limit + scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000}, + []*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil) + errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) + assert.Empty(t, errors) + assert.Equal(t, 1, getCountOfChan(updatedNodes)) + assert.Equal(t, 0, getCountOfChan(taintedNodes)) + errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) + assert.Empty(t, errors) + assert.Equal(t, 1, getCountOfChan(updatedNodes)) + assert.Equal(t, 0, getCountOfChan(taintedNodes)) + assert.False(t, deletetaint.HasDeletionCandidateTaint(n1000)) + assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000)) + errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) + assert.Empty(t, errors) + assert.Equal(t, 0, getCountOfChan(updatedNodes)) + assert.Equal(t, 0, getCountOfChan(taintedNodes)) +} + +func TestSoftTaintTimeLimit(t *testing.T) { + updatedNodes := make(chan string, 10) + fakeClient := &fake.Clientset{} + + job := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + SelfLink: "/apivs/batch/v1/namespaces/default/jobs/job", + }, + } + n1 := BuildTestNode("n1", 1000, 1000) + SetNodeReadyState(n1, true, time.Time{}) + n2 := BuildTestNode("n2", 1000, 1000) + SetNodeReadyState(n2, true, time.Time{}) + + p1 := BuildTestPod("p1", 1000, 0) + p2 := BuildTestPod("p2", 1000, 0) + p1.Spec.NodeName = "n1" + p2.Spec.NodeName = "n2" + + currentTime := time.Now() + updateTime := time.Millisecond + maxSoftTaintDuration := 1 * time.Second + now = func() time.Time { + return currentTime + } + + fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.GetAction) + switch getAction.GetName() { + case n1.Name: + return true, n1, nil + case n2.Name: + return true, n2, nil + } + return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName()) + }) + fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { + currentTime = currentTime.Add(updateTime) + update := action.(core.UpdateAction) + obj := update.GetObject().(*apiv1.Node) + updatedNodes <- obj.Name + return true, obj, nil + }) + + provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error { + return nil + }) + provider.AddNodeGroup("ng1", 1, 10, 2) + provider.AddNode("ng1", n1) + provider.AddNode("ng1", n2) + assert.NotNil(t, provider) + + options := config.AutoscalingOptions{ + ScaleDownUtilizationThreshold: 0.5, + ScaleDownUnneededTime: 10 * time.Minute, + MaxGracefulTerminationSec: 60, + MaxBulkSoftTaintCount: 10, + MaxBulkSoftTaintTime: maxSoftTaintDuration, + } + jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job}) + assert.NoError(t, err) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil) + + context := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider) + + clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) + scaleDown := NewScaleDown(&context, clusterStateRegistry) + + // Test bulk taint + scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, + []*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil) + errors := scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1, n2}) + assert.Empty(t, errors) + assert.Equal(t, 2, getCountOfChan(updatedNodes)) + assert.True(t, deletetaint.HasDeletionCandidateTaint(n1)) + assert.True(t, deletetaint.HasDeletionCandidateTaint(n2)) + + // Test bulk untaint + scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, + []*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute), nil) + errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1, n2}) + assert.Empty(t, errors) + assert.Equal(t, 2, getCountOfChan(updatedNodes)) + assert.False(t, deletetaint.HasDeletionCandidateTaint(n1)) + assert.False(t, deletetaint.HasDeletionCandidateTaint(n2)) + + // Test duration limit of bulk taint + updateTime = maxSoftTaintDuration + scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, + []*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil) + errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1, n2}) + assert.Empty(t, errors) + assert.Equal(t, 1, getCountOfChan(updatedNodes)) + + // Clean up + now = time.Now +} diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 73a32390ca5..c1e85f45c75 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -34,6 +34,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" + "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" @@ -112,7 +113,11 @@ func (a *StaticAutoscaler) cleanUpIfRequired() { if readyNodes, err := a.ReadyNodeLister().List(); err != nil { klog.Errorf("Failed to list ready nodes, not cleaning up taints: %v", err) } else { - cleanToBeDeleted(readyNodes, a.AutoscalingContext.ClientSet, a.Recorder) + deletetaint.CleanAllToBeDeleted(readyNodes, a.AutoscalingContext.ClientSet, a.Recorder) + if a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount == 0 { + // Clean old taints if soft taints handling is disabled + deletetaint.CleanAllDeletionCandidates(readyNodes, a.AutoscalingContext.ClientSet, a.Recorder) + } } a.initialized = true } @@ -379,6 +384,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError } else { klog.V(4).Infof("Starting scale down") + if a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount != 0 { + scaleDown.SoftTaintUnneededNodes(allNodes) + } + // We want to delete unneeded Node Groups only if there was no recent scale up, // and there is no current delete in progress and there was no recent errors. a.processors.NodeGroupManager.RemoveUnneededNodeGroups(autoscalingContext) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 7fdee66a8ae..dbcaaca3fc1 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -17,12 +17,13 @@ limitations under the License. package core import ( - "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "reflect" "strings" "testing" "time" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + mockprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/mocks" testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" diff --git a/cluster-autoscaler/core/utils.go b/cluster-autoscaler/core/utils.go index 2bdf19140d5..599ac23e4fa 100644 --- a/cluster-autoscaler/core/utils.go +++ b/cluster-autoscaler/core/utils.go @@ -380,6 +380,8 @@ func sanitizeTemplateNode(node *apiv1.Node, nodeGroup string) (*apiv1.Node, erro klog.V(4).Infof("Removing rescheduler taint when creating template from node %s", node.Name) case deletetaint.ToBeDeletedTaint: klog.V(4).Infof("Removing autoscaler taint when creating template from node %s", node.Name) + case deletetaint.DeletionCandidateTaint: + klog.V(4).Infof("Removing autoscaler soft taint when creating template from node %s", node.Name) default: newTaints = append(newTaints, taint) } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 7332761fd3a..7f396e0dbfc 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -118,6 +118,8 @@ var ( gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format ::. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.") cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider, "Cloud provider type. Available values: ["+strings.Join(cloudBuilder.AvailableCloudProviders, ",")+"]") + maxBulkSoftTaintCount = flag.Int("max-bulk-soft-taint-count", 10, "Maximum number of nodes that can be tainted/untainted PreferNoSchedule at the same time. Set to 0 to turn off such tainting.") + maxBulkSoftTaintTime = flag.Duration("max-bulk-soft-taint-time", 3*time.Second, "Maximum duration of tainting/untainting nodes as PreferNoSchedule at the same time.") maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.") maxGracefulTerminationFlag = flag.Int("max-graceful-termination-sec", 10*60, "Maximum number of seconds CA waits for pod termination when trying to scale down a node.") maxTotalUnreadyPercentage = flag.Float64("max-total-unready-percentage", 45, "Maximum percentage of unready nodes in the cluster. After this is exceeded, CA halts operations") @@ -186,6 +188,8 @@ func createAutoscalingOptions() config.AutoscalingOptions { ExpanderName: *expanderFlag, IgnoreDaemonSetsUtilization: *ignoreDaemonSetsUtilization, IgnoreMirrorPodsUtilization: *ignoreMirrorPodsUtilization, + MaxBulkSoftTaintCount: *maxBulkSoftTaintCount, + MaxBulkSoftTaintTime: *maxBulkSoftTaintTime, MaxEmptyBulkDelete: *maxEmptyBulkDeleteFlag, MaxGracefulTerminationSec: *maxGracefulTerminationFlag, MaxNodeProvisionTime: *maxNodeProvisionTime, diff --git a/cluster-autoscaler/metrics/metrics.go b/cluster-autoscaler/metrics/metrics.go index 77853f9f527..d9860592883 100644 --- a/cluster-autoscaler/metrics/metrics.go +++ b/cluster-autoscaler/metrics/metrics.go @@ -79,6 +79,7 @@ const ( ScaleDownNodeDeletion FunctionLabel = "scaleDown:nodeDeletion" ScaleDownFindNodesToRemove FunctionLabel = "scaleDown:findNodesToRemove" ScaleDownMiscOperations FunctionLabel = "scaleDown:miscOperations" + ScaleDownSoftTaintUnneeded FunctionLabel = "scaleDown:softTaintUnneeded" ScaleUp FunctionLabel = "scaleUp" FindUnneeded FunctionLabel = "findUnneeded" UpdateState FunctionLabel = "updateClusterState" diff --git a/cluster-autoscaler/utils/deletetaint/delete.go b/cluster-autoscaler/utils/deletetaint/delete.go index 3fc87f11f03..4ef2b4dc632 100644 --- a/cluster-autoscaler/utils/deletetaint/delete.go +++ b/cluster-autoscaler/utils/deletetaint/delete.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kube_client "k8s.io/client-go/kubernetes" + kube_record "k8s.io/client-go/tools/record" "k8s.io/klog" ) @@ -32,24 +33,47 @@ import ( const ( // ToBeDeletedTaint is a taint used to make the node unschedulable. ToBeDeletedTaint = "ToBeDeletedByClusterAutoscaler" + // DeletionCandidateTaint is a taint used to mark unneeded node as preferably unschedulable. + DeletionCandidateTaint = "DeletionCandidateOfClusterAutoscaler" maxRetryDeadline = 5 * time.Second conflictRetryInterval = 750 * time.Millisecond ) +// getKeyShortName converts taint key to short name for logging +func getKeyShortName(key string) string { + switch key { + case ToBeDeletedTaint: + return "ToBeDeletedTaint" + case DeletionCandidateTaint: + return "DeletionCandidateTaint" + default: + return key + } +} + // MarkToBeDeleted sets a taint that makes the node unschedulable. func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface) error { + return addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule) +} + +// MarkDeletionCandidate sets a soft taint that makes the node preferably unschedulable. +func MarkDeletionCandidate(node *apiv1.Node, client kube_client.Interface) error { + return addTaint(node, client, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule) +} + +func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, effect apiv1.TaintEffect) error { retryDeadline := time.Now().Add(maxRetryDeadline) for { // Get the newest version of the node. freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) if err != nil || freshNode == nil { + klog.Warningf("Error while adding %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err) return fmt.Errorf("failed to get node %v: %v", node.Name, err) } - added, err := addToBeDeletedTaint(freshNode) - if added == false { - return err + if !addTaintToSpec(freshNode, taintKey, effect) { + return nil } _, err = client.CoreV1().Nodes().Update(freshNode) if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) { @@ -58,33 +82,42 @@ func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface) error { } if err != nil { - klog.Warningf("Error while adding taints on node %v: %v", node.Name, err) + klog.Warningf("Error while adding %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err) return err } - klog.V(1).Infof("Successfully added toBeDeletedTaint on node %v", node.Name) + klog.V(1).Infof("Successfully added %v on node %v", getKeyShortName(taintKey), node.Name) return nil } } -func addToBeDeletedTaint(node *apiv1.Node) (bool, error) { +func addTaintToSpec(node *apiv1.Node, taintKey string, effect apiv1.TaintEffect) bool { for _, taint := range node.Spec.Taints { - if taint.Key == ToBeDeletedTaint { - klog.V(2).Infof("ToBeDeletedTaint already present on node %v, taint: %v", node.Name, taint) - return false, nil + if taint.Key == taintKey { + klog.V(2).Infof("%v already present on node %v, taint: %v", taintKey, node.Name, taint) + return false } } node.Spec.Taints = append(node.Spec.Taints, apiv1.Taint{ - Key: ToBeDeletedTaint, + Key: taintKey, Value: fmt.Sprint(time.Now().Unix()), - Effect: apiv1.TaintEffectNoSchedule, + Effect: effect, }) - return true, nil + return true } // HasToBeDeletedTaint returns true if ToBeDeleted taint is applied on the node. func HasToBeDeletedTaint(node *apiv1.Node) bool { + return hasTaint(node, ToBeDeletedTaint) +} + +// HasDeletionCandidateTaint returns true if DeletionCandidate taint is applied on the node. +func HasDeletionCandidateTaint(node *apiv1.Node) bool { + return hasTaint(node, DeletionCandidateTaint) +} + +func hasTaint(node *apiv1.Node, taintKey string) bool { for _, taint := range node.Spec.Taints { - if taint.Key == ToBeDeletedTaint { + if taint.Key == taintKey { return true } } @@ -93,8 +126,17 @@ func HasToBeDeletedTaint(node *apiv1.Node) bool { // GetToBeDeletedTime returns the date when the node was marked by CA as for delete. func GetToBeDeletedTime(node *apiv1.Node) (*time.Time, error) { + return getTaintTime(node, ToBeDeletedTaint) +} + +// GetDeletionCandidateTime returns the date when the node was marked by CA as for delete. +func GetDeletionCandidateTime(node *apiv1.Node) (*time.Time, error) { + return getTaintTime(node, DeletionCandidateTaint) +} + +func getTaintTime(node *apiv1.Node, taintKey string) (*time.Time, error) { for _, taint := range node.Spec.Taints { - if taint.Key == ToBeDeletedTaint { + if taint.Key == taintKey { resultTimestamp, err := strconv.ParseInt(taint.Value, 10, 64) if err != nil { return nil, err @@ -106,17 +148,27 @@ func GetToBeDeletedTime(node *apiv1.Node) (*time.Time, error) { return nil, nil } -// CleanToBeDeleted cleans ToBeDeleted taint. +// CleanToBeDeleted cleans CA's NoSchedule taint from a node. func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, error) { + return cleanTaint(node, client, ToBeDeletedTaint) +} + +// CleanDeletionCandidate cleans CA's soft NoSchedule taint from a node. +func CleanDeletionCandidate(node *apiv1.Node, client kube_client.Interface) (bool, error) { + return cleanTaint(node, client, DeletionCandidateTaint) +} + +func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string) (bool, error) { retryDeadline := time.Now().Add(maxRetryDeadline) for { freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) if err != nil || freshNode == nil { + klog.Warningf("Error while releasing %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err) return false, fmt.Errorf("failed to get node %v: %v", node.Name, err) } newTaints := make([]apiv1.Taint, 0) for _, taint := range freshNode.Spec.Taints { - if taint.Key == ToBeDeletedTaint { + if taint.Key == taintKey { klog.V(1).Infof("Releasing taint %+v on node %v", taint, node.Name) } else { newTaints = append(newTaints, taint) @@ -133,12 +185,38 @@ func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, err } if err != nil { - klog.Warningf("Error while releasing taints on node %v: %v", node.Name, err) + klog.Warningf("Error while releasing %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err) return false, err } - klog.V(1).Infof("Successfully released toBeDeletedTaint on node %v", node.Name) + klog.V(1).Infof("Successfully released %v on node %v", getKeyShortName(taintKey), node.Name) return true, nil } return false, nil } } + +// CleanAllToBeDeleted cleans ToBeDeleted taints from given nodes. +func CleanAllToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder) { + cleanAllTaints(nodes, client, recorder, ToBeDeletedTaint) +} + +// CleanAllDeletionCandidates cleans DeletionCandidate taints from given nodes. +func CleanAllDeletionCandidates(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder) { + cleanAllTaints(nodes, client, recorder, DeletionCandidateTaint) +} + +func cleanAllTaints(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder, taintKey string) { + for _, node := range nodes { + if !hasTaint(node, taintKey) { + continue + } + cleaned, err := cleanTaint(node, client, taintKey) + if err != nil { + recorder.Eventf(node, apiv1.EventTypeWarning, "ClusterAutoscalerCleanup", + "failed to clean %v on node %v: %v", getKeyShortName(taintKey), node.Name, err) + } else if cleaned { + recorder.Eventf(node, apiv1.EventTypeNormal, "ClusterAutoscalerCleanup", + "removed %v taint from node %v", getKeyShortName(taintKey), node.Name) + } + } +} diff --git a/cluster-autoscaler/utils/deletetaint/delete_test.go b/cluster-autoscaler/utils/deletetaint/delete_test.go index 8763d71de9b..440e2088e7d 100644 --- a/cluster-autoscaler/utils/deletetaint/delete_test.go +++ b/cluster-autoscaler/utils/deletetaint/delete_test.go @@ -18,6 +18,7 @@ package deletetaint import ( "fmt" + "strconv" "sync/atomic" "testing" "time" @@ -28,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" @@ -43,9 +45,44 @@ func TestMarkNodes(t *testing.T) { updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) assert.NoError(t, err) assert.True(t, HasToBeDeletedTaint(updatedNode)) + assert.False(t, HasDeletionCandidateTaint(updatedNode)) +} + +func TestSoftMarkNodes(t *testing.T) { + node := BuildTestNode("node", 1000, 1000) + fakeClient := buildFakeClient(t, node) + err := MarkDeletionCandidate(node, fakeClient) + assert.NoError(t, err) + + updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) + assert.NoError(t, err) + assert.False(t, HasToBeDeletedTaint(updatedNode)) + assert.True(t, HasDeletionCandidateTaint(updatedNode)) } func TestCheckNodes(t *testing.T) { + node := BuildTestNode("node", 1000, 1000) + addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule) + fakeClient := buildFakeClient(t, node) + + updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) + assert.NoError(t, err) + assert.True(t, HasToBeDeletedTaint(updatedNode)) + assert.False(t, HasDeletionCandidateTaint(updatedNode)) +} + +func TestSoftCheckNodes(t *testing.T) { + node := BuildTestNode("node", 1000, 1000) + addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule) + fakeClient := buildFakeClient(t, node) + + updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) + assert.NoError(t, err) + assert.False(t, HasToBeDeletedTaint(updatedNode)) + assert.True(t, HasDeletionCandidateTaint(updatedNode)) +} + +func TestQueryNodes(t *testing.T) { node := BuildTestNode("node", 1000, 1000) fakeClient := buildFakeClient(t, node) err := MarkToBeDeleted(node, fakeClient) @@ -60,20 +97,131 @@ func TestCheckNodes(t *testing.T) { assert.True(t, time.Now().Sub(*val) < 10*time.Second) } +func TestSoftQueryNodes(t *testing.T) { + node := BuildTestNode("node", 1000, 1000) + fakeClient := buildFakeClient(t, node) + err := MarkDeletionCandidate(node, fakeClient) + assert.NoError(t, err) + + updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) + assert.NoError(t, err) + assert.True(t, HasDeletionCandidateTaint(updatedNode)) + + val, err := GetDeletionCandidateTime(updatedNode) + assert.NoError(t, err) + assert.True(t, time.Now().Sub(*val) < 10*time.Second) +} + func TestCleanNodes(t *testing.T) { node := BuildTestNode("node", 1000, 1000) - addToBeDeletedTaint(node) + addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule) fakeClient := buildFakeClient(t, node) + updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) + assert.NoError(t, err) + assert.True(t, HasToBeDeletedTaint(updatedNode)) + cleaned, err := CleanToBeDeleted(node, fakeClient) assert.True(t, cleaned) assert.NoError(t, err) - updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) + updatedNode, err = fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) assert.NoError(t, err) assert.False(t, HasToBeDeletedTaint(updatedNode)) } +func TestSoftCleanNodes(t *testing.T) { + node := BuildTestNode("node", 1000, 1000) + addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule) + fakeClient := buildFakeClient(t, node) + + updatedNode, err := fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) + assert.NoError(t, err) + assert.True(t, HasDeletionCandidateTaint(updatedNode)) + + cleaned, err := CleanDeletionCandidate(node, fakeClient) + assert.True(t, cleaned) + assert.NoError(t, err) + + updatedNode, err = fakeClient.Core().Nodes().Get("node", metav1.GetOptions{}) + assert.NoError(t, err) + assert.False(t, HasDeletionCandidateTaint(updatedNode)) +} + +func TestCleanAllToBeDeleted(t *testing.T) { + n1 := BuildTestNode("n1", 1000, 10) + n2 := BuildTestNode("n2", 1000, 10) + n2.Spec.Taints = []apiv1.Taint{{Key: ToBeDeletedTaint, Value: strconv.FormatInt(time.Now().Unix()-301, 10)}} + + fakeClient := &fake.Clientset{} + fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.GetAction) + switch getAction.GetName() { + case n1.Name: + return true, n1, nil + case n2.Name: + return true, n2, nil + } + return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName()) + }) + fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { + update := action.(core.UpdateAction) + obj := update.GetObject().(*apiv1.Node) + switch obj.Name { + case n1.Name: + n1 = obj + case n2.Name: + n2 = obj + } + return true, obj, nil + }) + fakeRecorder := kube_util.CreateEventRecorder(fakeClient) + + assert.Equal(t, 1, len(n2.Spec.Taints)) + + CleanAllToBeDeleted([]*apiv1.Node{n1, n2}, fakeClient, fakeRecorder) + + assert.Equal(t, 0, len(n1.Spec.Taints)) + assert.Equal(t, 0, len(n2.Spec.Taints)) +} + +func TestCleanAllDeletionCandidates(t *testing.T) { + n1 := BuildTestNode("n1", 1000, 10) + n2 := BuildTestNode("n2", 1000, 10) + n2.Spec.Taints = []apiv1.Taint{{Key: DeletionCandidateTaint, Value: strconv.FormatInt(time.Now().Unix()-301, 10)}} + + fakeClient := &fake.Clientset{} + fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.GetAction) + switch getAction.GetName() { + case n1.Name: + return true, n1, nil + case n2.Name: + return true, n2, nil + } + return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName()) + }) + fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { + update := action.(core.UpdateAction) + obj := update.GetObject().(*apiv1.Node) + switch obj.Name { + case n1.Name: + n1 = obj + case n2.Name: + n2 = obj + } + return true, obj, nil + }) + fakeRecorder := kube_util.CreateEventRecorder(fakeClient) + + assert.Equal(t, 1, len(n2.Spec.Taints)) + + CleanAllDeletionCandidates([]*apiv1.Node{n1, n2}, fakeClient, fakeRecorder) + + assert.Equal(t, 0, len(n1.Spec.Taints)) + assert.Equal(t, 0, len(n2.Spec.Taints)) +} + func buildFakeClient(t *testing.T, node *apiv1.Node) *fake.Clientset { fakeClient := fake.NewSimpleClientset()