diff --git a/cluster-autoscaler/core/scale_down_test.go b/cluster-autoscaler/core/scale_down_test.go index da770d29856..4496fe28391 100644 --- a/cluster-autoscaler/core/scale_down_test.go +++ b/cluster-autoscaler/core/scale_down_test.go @@ -36,6 +36,7 @@ import ( scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/autoscaler/cluster-autoscaler/utils/units" + kube_client "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" @@ -497,8 +498,9 @@ func TestDeleteNode(t *testing.T) { // set up fake client fakeClient := &fake.Clientset{} + fakeNode := n1.DeepCopy() fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { - return true, n1, nil + return true, fakeNode.DeepCopy(), nil }) fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { @@ -509,6 +511,7 @@ func TestDeleteNode(t *testing.T) { taints = append(taints, taint.Key) } updatedNodes <- fmt.Sprintf("%s-%s", obj.Name, taints) + fakeNode = obj.DeepCopy() return true, obj, nil }) fakeClient.Fake.AddReactor("create", "pods", @@ -541,7 +544,7 @@ func TestDeleteNode(t *testing.T) { // verify if scenario.expectedDeletion { assert.NoError(t, err) - assert.Equal(t, n1.Name, getStringFromChan(deletedNodes)) + assert.Equal(t, n1.Name, getStringFromChanImmediately(deletedNodes)) } else { assert.NotNil(t, err) } @@ -551,7 +554,7 @@ func TestDeleteNode(t *testing.T) { assert.Equal(t, taintedUpdate, getStringFromChan(updatedNodes)) if !scenario.expectedDeletion { untaintedUpdate := fmt.Sprintf("%s-%s", n1.Name, []string{}) - assert.Equal(t, untaintedUpdate, getStringFromChan(updatedNodes)) + assert.Equal(t, untaintedUpdate, getStringFromChanImmediately(updatedNodes)) } assert.Equal(t, nothingReturned, getStringFromChanImmediately(updatedNodes)) }) @@ -1317,12 +1320,78 @@ func TestCheckScaleDownDeltaWithinLimits(t *testing.T) { } } -func TestSoftTaint(t *testing.T) { - updatedNodes := make(chan string, 10) - deletedNodes := make(chan string, 10) - taintedNodes := make(chan string, 10) +// newFakeInMemoryNodeClient creates fake client that keeps state of cluster nodes in memory +func newFakeInMemoryNodeClient(nodes []*apiv1.Node) *fake.Clientset { fakeClient := &fake.Clientset{} + clusterState := struct { + nodes map[string]*apiv1.Node + }{ + make(map[string]*apiv1.Node), + } + for _, node := range nodes { + clusterState.nodes[node.Name] = node.DeepCopy() + } + + fakeClient.Fake.AddReactor("list", "nodes", func(action core.Action) (bool, runtime.Object, error) { + nodes := make([]apiv1.Node, 0, len(clusterState.nodes)) + for _, node := range clusterState.nodes { + nodes = append(nodes, *node.DeepCopy()) + } + return true, &apiv1.NodeList{Items: nodes}, nil + }) + fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.GetAction) + node, ok := clusterState.nodes[getAction.GetName()] + if !ok { + return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName()) + } + return true, node.DeepCopy(), nil + }) + fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { + update := action.(core.UpdateAction) + node := update.GetObject().(*apiv1.Node) + clusterState.nodes[node.Name] = node.DeepCopy() + return true, node.DeepCopy(), nil + }) + fakeClient.Fake.AddReactor("delete", "nodes", func(action core.Action) (bool, runtime.Object, error) { + deleteAction := action.(core.DeleteAction) + delete(clusterState.nodes, deleteAction.GetName()) + return true, nil, nil + }) + + return fakeClient +} + +// Helper functions +func hasDeletionCandidateTaint(t *testing.T, client kube_client.Interface, name string) bool { + node, err := client.CoreV1().Nodes().Get(name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to retrieve node %v: %v", name, err) + } + return deletetaint.HasDeletionCandidateTaint(node) +} +func getAllNodes(t *testing.T, client kube_client.Interface) []*apiv1.Node { + nodeList, err := client.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to retrieve list of nodes: %v", err) + } + result := make([]*apiv1.Node, 0, nodeList.Size()) + for _, node := range nodeList.Items { + result = append(result, node.DeepCopy()) + } + return result +} +func countDeletionCandidateTaints(t *testing.T, client kube_client.Interface) (total int) { + for _, node := range getAllNodes(t, client) { + if deletetaint.HasDeletionCandidateTaint(node) { + total++ + } + } + return total +} + +func TestSoftTaint(t *testing.T) { job := batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "job", @@ -1342,28 +1411,10 @@ func TestSoftTaint(t *testing.T) { p700.Spec.NodeName = "n1000" p1200.Spec.NodeName = "n2000" - fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { - getAction := action.(core.GetAction) - switch getAction.GetName() { - case n1000.Name: - return true, n1000, nil - case n2000.Name: - return true, n2000, nil - } - return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName()) - }) - fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { - update := action.(core.UpdateAction) - obj := update.GetObject().(*apiv1.Node) - if deletetaint.HasDeletionCandidateTaint(obj) { - taintedNodes <- obj.Name - } - updatedNodes <- obj.Name - return true, obj, nil - }) + fakeClient := newFakeInMemoryNodeClient([]*apiv1.Node{n1000, n2000}) provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error { - deletedNodes <- node + t.Fatalf("Unexpected deletion of %s", node) return nil }) provider.AddNodeGroup("ng1", 1, 10, 2) @@ -1390,76 +1441,49 @@ func TestSoftTaint(t *testing.T) { // Test no superfluous nodes scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000}, []*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil) - errors := scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) - assert.Empty(t, errors) - assert.Equal(t, 0, getCountOfChan(deletedNodes)) - assert.Equal(t, 0, getCountOfChan(updatedNodes)) - assert.Equal(t, 0, getCountOfChan(taintedNodes)) - assert.False(t, deletetaint.HasDeletionCandidateTaint(n1000)) - assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000)) + errs := scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name)) + assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name)) // Test one unneeded node scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000}, []*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p1200}, time.Now().Add(-5*time.Minute), nil) - errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) - assert.Empty(t, errors) - assert.Equal(t, 0, getCountOfChan(deletedNodes)) - assert.Equal(t, n1000.Name, getStringFromChanImmediately(updatedNodes)) - assert.Equal(t, n1000.Name, getStringFromChanImmediately(taintedNodes)) - assert.True(t, deletetaint.HasDeletionCandidateTaint(n1000)) - assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000)) + errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.True(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name)) + assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name)) // Test remove soft taint scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000}, []*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil) - errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) - assert.Empty(t, errors) - assert.Equal(t, n1000.Name, getStringFromChanImmediately(updatedNodes)) - assert.Equal(t, 0, getCountOfChan(taintedNodes)) - assert.False(t, deletetaint.HasDeletionCandidateTaint(n1000)) - assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000)) + errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n1000.Name)) + assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2000.Name)) // Test bulk update taint limit scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000}, []*apiv1.Node{n1000, n2000}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil) - errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) - assert.Empty(t, errors) - assert.Equal(t, 1, getCountOfChan(updatedNodes)) - assert.Equal(t, 1, getCountOfChan(taintedNodes)) - errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) - assert.Empty(t, errors) - assert.Equal(t, 1, getCountOfChan(updatedNodes)) - assert.Equal(t, 1, getCountOfChan(taintedNodes)) - assert.True(t, deletetaint.HasDeletionCandidateTaint(n1000)) - assert.True(t, deletetaint.HasDeletionCandidateTaint(n2000)) - errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) - assert.Empty(t, errors) - assert.Equal(t, 0, getCountOfChan(updatedNodes)) - assert.Equal(t, 0, getCountOfChan(taintedNodes)) + errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient)) + errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient)) // Test bulk update untaint limit scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000}, []*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil) - errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) - assert.Empty(t, errors) - assert.Equal(t, 1, getCountOfChan(updatedNodes)) - assert.Equal(t, 0, getCountOfChan(taintedNodes)) - errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) - assert.Empty(t, errors) - assert.Equal(t, 1, getCountOfChan(updatedNodes)) - assert.Equal(t, 0, getCountOfChan(taintedNodes)) - assert.False(t, deletetaint.HasDeletionCandidateTaint(n1000)) - assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000)) - errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000}) - assert.Empty(t, errors) - assert.Equal(t, 0, getCountOfChan(updatedNodes)) - assert.Equal(t, 0, getCountOfChan(taintedNodes)) + errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient)) + errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient)) } func TestSoftTaintTimeLimit(t *testing.T) { - updatedNodes := make(chan string, 10) - fakeClient := &fake.Clientset{} - job := batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "job", @@ -1484,27 +1508,14 @@ func TestSoftTaintTimeLimit(t *testing.T) { return currentTime } - fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) { - getAction := action.(core.GetAction) - switch getAction.GetName() { - case n1.Name: - return true, n1, nil - case n2.Name: - return true, n2, nil - } - return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName()) - }) - fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { + fakeClient := newFakeInMemoryNodeClient([]*apiv1.Node{n1, n2}) + // Move time forward when updating + fakeClient.Fake.PrependReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) { currentTime = currentTime.Add(updateTime) - update := action.(core.UpdateAction) - obj := update.GetObject().(*apiv1.Node) - updatedNodes <- obj.Name - return true, obj, nil + return false, nil, nil }) - provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error { - return nil - }) + provider := testprovider.NewTestCloudProvider(nil, nil) provider.AddNodeGroup("ng1", 1, 10, 2) provider.AddNode("ng1", n1) provider.AddNode("ng1", n2) @@ -1529,28 +1540,42 @@ func TestSoftTaintTimeLimit(t *testing.T) { // Test bulk taint scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil) - errors := scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1, n2}) - assert.Empty(t, errors) - assert.Equal(t, 2, getCountOfChan(updatedNodes)) - assert.True(t, deletetaint.HasDeletionCandidateTaint(n1)) - assert.True(t, deletetaint.HasDeletionCandidateTaint(n2)) + errs := scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient)) + assert.True(t, hasDeletionCandidateTaint(t, fakeClient, n1.Name)) + assert.True(t, hasDeletionCandidateTaint(t, fakeClient, n2.Name)) // Test bulk untaint scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute), nil) - errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1, n2}) - assert.Empty(t, errors) - assert.Equal(t, 2, getCountOfChan(updatedNodes)) - assert.False(t, deletetaint.HasDeletionCandidateTaint(n1)) - assert.False(t, deletetaint.HasDeletionCandidateTaint(n2)) + errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient)) + assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n1.Name)) + assert.False(t, hasDeletionCandidateTaint(t, fakeClient, n2.Name)) // Test duration limit of bulk taint updateTime = maxSoftTaintDuration scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil) - errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1, n2}) - assert.Empty(t, errors) - assert.Equal(t, 1, getCountOfChan(updatedNodes)) + errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient)) + errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.Equal(t, 2, countDeletionCandidateTaints(t, fakeClient)) + + // Test duration limit of bulk untaint + updateTime = maxSoftTaintDuration + scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, + []*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute), nil) + errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.Equal(t, 1, countDeletionCandidateTaints(t, fakeClient)) + errs = scaleDown.SoftTaintUnneededNodes(getAllNodes(t, fakeClient)) + assert.Empty(t, errs) + assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient)) // Clean up now = time.Now diff --git a/cluster-autoscaler/utils/deletetaint/delete.go b/cluster-autoscaler/utils/deletetaint/delete.go index 4ef2b4dc632..46cf9be6509 100644 --- a/cluster-autoscaler/utils/deletetaint/delete.go +++ b/cluster-autoscaler/utils/deletetaint/delete.go @@ -64,19 +64,30 @@ func MarkDeletionCandidate(node *apiv1.Node, client kube_client.Interface) error func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, effect apiv1.TaintEffect) error { retryDeadline := time.Now().Add(maxRetryDeadline) + freshNode := node.DeepCopy() + var err error + refresh := false for { - // Get the newest version of the node. - freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) - if err != nil || freshNode == nil { - klog.Warningf("Error while adding %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err) - return fmt.Errorf("failed to get node %v: %v", node.Name, err) + if refresh { + // Get the newest version of the node. + freshNode, err = client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + if err != nil || freshNode == nil { + klog.Warningf("Error while adding %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err) + return fmt.Errorf("failed to get node %v: %v", node.Name, err) + } } if !addTaintToSpec(freshNode, taintKey, effect) { + if !refresh { + // Make sure we have the latest version before skipping update. + refresh = true + continue + } return nil } _, err = client.CoreV1().Nodes().Update(freshNode) if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) { + refresh = true time.Sleep(conflictRetryInterval) continue } @@ -160,11 +171,17 @@ func CleanDeletionCandidate(node *apiv1.Node, client kube_client.Interface) (boo func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string) (bool, error) { retryDeadline := time.Now().Add(maxRetryDeadline) + freshNode := node.DeepCopy() + var err error + refresh := false for { - freshNode, err := client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) - if err != nil || freshNode == nil { - klog.Warningf("Error while releasing %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err) - return false, fmt.Errorf("failed to get node %v: %v", node.Name, err) + if refresh { + // Get the newest version of the node. + freshNode, err = client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + if err != nil || freshNode == nil { + klog.Warningf("Error while adding %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err) + return false, fmt.Errorf("failed to get node %v: %v", node.Name, err) + } } newTaints := make([]apiv1.Taint, 0) for _, taint := range freshNode.Spec.Taints { @@ -174,24 +191,30 @@ func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string) newTaints = append(newTaints, taint) } } - - if len(newTaints) != len(freshNode.Spec.Taints) { - freshNode.Spec.Taints = newTaints - _, err := client.CoreV1().Nodes().Update(freshNode) - - if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) { - time.Sleep(conflictRetryInterval) + if len(newTaints) == len(freshNode.Spec.Taints) { + if !refresh { + // Make sure we have the latest version before skipping update. + refresh = true continue } + return false, nil + } - if err != nil { - klog.Warningf("Error while releasing %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err) - return false, err - } - klog.V(1).Infof("Successfully released %v on node %v", getKeyShortName(taintKey), node.Name) - return true, nil + freshNode.Spec.Taints = newTaints + _, err = client.CoreV1().Nodes().Update(freshNode) + + if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) { + refresh = true + time.Sleep(conflictRetryInterval) + continue + } + + if err != nil { + klog.Warningf("Error while releasing %v taint on node %v: %v", getKeyShortName(taintKey), node.Name, err) + return false, err } - return false, nil + klog.V(1).Infof("Successfully released %v on node %v", getKeyShortName(taintKey), node.Name) + return true, nil } } diff --git a/cluster-autoscaler/utils/deletetaint/delete_test.go b/cluster-autoscaler/utils/deletetaint/delete_test.go index 440e2088e7d..5a42d0cc93a 100644 --- a/cluster-autoscaler/utils/deletetaint/delete_test.go +++ b/cluster-autoscaler/utils/deletetaint/delete_test.go @@ -94,6 +94,7 @@ func TestQueryNodes(t *testing.T) { val, err := GetToBeDeletedTime(updatedNode) assert.NoError(t, err) + assert.NotNil(t, val) assert.True(t, time.Now().Sub(*val) < 10*time.Second) } @@ -109,6 +110,7 @@ func TestSoftQueryNodes(t *testing.T) { val, err := GetDeletionCandidateTime(updatedNode) assert.NoError(t, err) + assert.NotNil(t, val) assert.True(t, time.Now().Sub(*val) < 10*time.Second) }