Skip to content

Commit

Permalink
Tainting unneeded nodes as PreferNoSchedule
Browse files Browse the repository at this point in the history
  • Loading branch information
jkaniuk committed Jan 21, 2019
1 parent 4a2ddef commit 0c64e09
Show file tree
Hide file tree
Showing 10 changed files with 565 additions and 69 deletions.
5 changes: 5 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,9 @@ type AutoscalingOptions struct {
Regional bool
// Pods newer than this will not be considered as unschedulable for scale-up.
NewPodScaleUpDelay time.Duration
// MaxBulkSoftTaint sets the maximum number of nodes that can be (un)tainted PreferNoSchedule during single scaling down run.
// Value of 0 turns turn off such tainting.
MaxBulkSoftTaintCount int
// MaxBulkSoftTaintTime sets the maximum duration of single run of PreferNoSchedule tainting.
MaxBulkSoftTaintTime time.Duration
}
62 changes: 47 additions & 15 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type NodeDeleteStatus struct {
nodeDeleteResults map[string]error
}

// Get current time. Proxy for unit tests.
var now func() time.Time = time.Now

// IsDeleteInProgress returns true if a node is being deleted.
func (n *NodeDeleteStatus) IsDeleteInProgress() bool {
n.Lock()
Expand Down Expand Up @@ -572,6 +575,50 @@ func (sd *ScaleDown) mapNodesToStatusScaleDownNodes(nodes []*apiv1.Node, nodeGro
return result
}

// SoftTaintUnneededNodes manage soft taints of unneeded nodes.
func (sd *ScaleDown) SoftTaintUnneededNodes(allNodes []*apiv1.Node) (errors []error) {
defer metrics.UpdateDurationFromStart(metrics.ScaleDownSoftTaintUnneeded, time.Now())
apiCallBudget := sd.context.AutoscalingOptions.MaxBulkSoftTaintCount
timeBudget := sd.context.AutoscalingOptions.MaxBulkSoftTaintTime
skippedNodes := 0
startTime := now()
for _, node := range allNodes {
if deletetaint.HasToBeDeletedTaint(node) {
// Do not consider nodes that are scheduled to be deleted
continue
}
alreadyTainted := deletetaint.HasDeletionCandidateTaint(node)
_, unneeded := sd.unneededNodes[node.Name]

// Check if expected taints match existing taints
if unneeded != alreadyTainted {
if apiCallBudget <= 0 || now().Sub(startTime) >= timeBudget {
skippedNodes++
continue
}
apiCallBudget--
if unneeded && !alreadyTainted {
err := deletetaint.MarkDeletionCandidate(node, sd.context.ClientSet)
if err != nil {
errors = append(errors, err)
klog.Warningf("Soft taint on %s adding error %v", node.Name, err)
}
}
if !unneeded && alreadyTainted {
_, err := deletetaint.CleanDeletionCandidate(node, sd.context.ClientSet)
if err != nil {
errors = append(errors, err)
klog.Warningf("Soft taint on %s removal error %v", node.Name, err)
}
}
}
}
if skippedNodes > 0 {
klog.V(4).Infof("Skipped adding/removing soft taints on %v nodes - API call limit exceeded", skippedNodes)
}
return
}

// TryToScaleDown tries to scale down the cluster. It returns a result inside a ScaleDownStatus indicating if any node was
// removed and error if such occurred.
func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError) {
Expand Down Expand Up @@ -1011,21 +1058,6 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
errors.TransientError, "Failed to drain node %s/%s: pods remaining after timeout", node.Namespace, node.Name)
}

// cleanToBeDeleted cleans ToBeDeleted taints.
func cleanToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder) {
for _, node := range nodes {
cleaned, err := deletetaint.CleanToBeDeleted(node, client)
if err != nil {
klog.Warningf("Error while releasing taints on node %v: %v", node.Name, err)
recorder.Eventf(node, apiv1.EventTypeWarning, "ClusterAutoscalerCleanup",
"failed to clean toBeDeletedTaint: %v", err)
} else if cleaned {
klog.V(1).Infof("Successfully released toBeDeletedTaint on node %v", node.Name)
recorder.Eventf(node, apiv1.EventTypeNormal, "ClusterAutoscalerCleanup", "marking the node as schedulable")
}
}
}

// Removes the given node from cloud provider. No extra pre-deletion actions are executed on
// the Kubernetes side.
func deleteNodeFromCloudProvider(node *apiv1.Node, cloudProvider cloudprovider.CloudProvider,
Expand Down
280 changes: 248 additions & 32 deletions cluster-autoscaler/core/scale_down_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,39 +1182,16 @@ func getStringFromChanImmediately(c chan string) string {
}
}

func TestCleanToBeDeleted(t *testing.T) {
n1 := BuildTestNode("n1", 1000, 10)
n2 := BuildTestNode("n2", 1000, 10)
n2.Spec.Taints = []apiv1.Taint{{Key: deletetaint.ToBeDeletedTaint, Value: strconv.FormatInt(time.Now().Unix()-301, 10)}}

fakeClient := &fake.Clientset{}
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.GetAction)
switch getAction.GetName() {
case n1.Name:
return true, n1, nil
case n2.Name:
return true, n2, nil
}
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
})
fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
obj := update.GetObject().(*apiv1.Node)
switch obj.Name {
case n1.Name:
n1 = obj
case n2.Name:
n2 = obj
func getCountOfChan(c chan string) int {
count := 0
for {
select {
case <-c:
count++
default:
return count
}
return true, obj, nil
})
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)

cleanToBeDeleted([]*apiv1.Node{n1, n2}, fakeClient, fakeRecorder)

assert.Equal(t, 0, len(n1.Spec.Taints))
assert.Equal(t, 0, len(n2.Spec.Taints))
}
}

func TestCalculateCoresAndMemoryTotal(t *testing.T) {
Expand Down Expand Up @@ -1339,3 +1316,242 @@ func TestCheckScaleDownDeltaWithinLimits(t *testing.T) {
}
}
}

func TestSoftTaint(t *testing.T) {
updatedNodes := make(chan string, 10)
deletedNodes := make(chan string, 10)
taintedNodes := make(chan string, 10)
fakeClient := &fake.Clientset{}

job := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
SelfLink: "/apivs/batch/v1/namespaces/default/jobs/job",
},
}
n1000 := BuildTestNode("n1000", 1000, 1000)
SetNodeReadyState(n1000, true, time.Time{})
n2000 := BuildTestNode("n2000", 2000, 1000)
SetNodeReadyState(n2000, true, time.Time{})

p500 := BuildTestPod("p500", 500, 0)
p700 := BuildTestPod("p700", 700, 0)
p1200 := BuildTestPod("p1200", 1200, 0)
p500.Spec.NodeName = "n2000"
p700.Spec.NodeName = "n1000"
p1200.Spec.NodeName = "n2000"

fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.GetAction)
switch getAction.GetName() {
case n1000.Name:
return true, n1000, nil
case n2000.Name:
return true, n2000, nil
}
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
})
fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
obj := update.GetObject().(*apiv1.Node)
if deletetaint.HasDeletionCandidateTaint(obj) {
taintedNodes <- obj.Name
}
updatedNodes <- obj.Name
return true, obj, nil
})

provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
deletedNodes <- node
return nil
})
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNode("ng1", n1000)
provider.AddNode("ng1", n2000)
assert.NotNil(t, provider)

options := config.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: 10 * time.Minute,
MaxGracefulTerminationSec: 60,
MaxBulkSoftTaintCount: 1,
MaxBulkSoftTaintTime: 3 * time.Second,
}
jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job})
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)

context := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider)

clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
scaleDown := NewScaleDown(&context, clusterStateRegistry)

// Test no superfluous nodes
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000},
[]*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil)
errors := scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000})
assert.Empty(t, errors)
assert.Equal(t, 0, getCountOfChan(deletedNodes))
assert.Equal(t, 0, getCountOfChan(updatedNodes))
assert.Equal(t, 0, getCountOfChan(taintedNodes))
assert.False(t, deletetaint.HasDeletionCandidateTaint(n1000))
assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000))

// Test one unneeded node
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000},
[]*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p1200}, time.Now().Add(-5*time.Minute), nil)
errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000})
assert.Empty(t, errors)
assert.Equal(t, 0, getCountOfChan(deletedNodes))
assert.Equal(t, n1000.Name, getStringFromChanImmediately(updatedNodes))
assert.Equal(t, n1000.Name, getStringFromChanImmediately(taintedNodes))
assert.True(t, deletetaint.HasDeletionCandidateTaint(n1000))
assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000))

// Test remove soft taint
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000},
[]*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil)
errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000})
assert.Empty(t, errors)
assert.Equal(t, n1000.Name, getStringFromChanImmediately(updatedNodes))
assert.Equal(t, 0, getCountOfChan(taintedNodes))
assert.False(t, deletetaint.HasDeletionCandidateTaint(n1000))
assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000))

// Test bulk update taint limit
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000},
[]*apiv1.Node{n1000, n2000}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000})
assert.Empty(t, errors)
assert.Equal(t, 1, getCountOfChan(updatedNodes))
assert.Equal(t, 1, getCountOfChan(taintedNodes))
errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000})
assert.Empty(t, errors)
assert.Equal(t, 1, getCountOfChan(updatedNodes))
assert.Equal(t, 1, getCountOfChan(taintedNodes))
assert.True(t, deletetaint.HasDeletionCandidateTaint(n1000))
assert.True(t, deletetaint.HasDeletionCandidateTaint(n2000))
errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000})
assert.Empty(t, errors)
assert.Equal(t, 0, getCountOfChan(updatedNodes))
assert.Equal(t, 0, getCountOfChan(taintedNodes))

// Test bulk update untaint limit
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1000, n2000},
[]*apiv1.Node{n1000, n2000}, []*apiv1.Pod{p500, p700, p1200}, time.Now().Add(-5*time.Minute), nil)
errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000})
assert.Empty(t, errors)
assert.Equal(t, 1, getCountOfChan(updatedNodes))
assert.Equal(t, 0, getCountOfChan(taintedNodes))
errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000})
assert.Empty(t, errors)
assert.Equal(t, 1, getCountOfChan(updatedNodes))
assert.Equal(t, 0, getCountOfChan(taintedNodes))
assert.False(t, deletetaint.HasDeletionCandidateTaint(n1000))
assert.False(t, deletetaint.HasDeletionCandidateTaint(n2000))
errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1000, n2000})
assert.Empty(t, errors)
assert.Equal(t, 0, getCountOfChan(updatedNodes))
assert.Equal(t, 0, getCountOfChan(taintedNodes))
}

func TestSoftTaintTimeLimit(t *testing.T) {
updatedNodes := make(chan string, 10)
fakeClient := &fake.Clientset{}

job := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job",
Namespace: "default",
SelfLink: "/apivs/batch/v1/namespaces/default/jobs/job",
},
}
n1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(n1, true, time.Time{})
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Time{})

p1 := BuildTestPod("p1", 1000, 0)
p2 := BuildTestPod("p2", 1000, 0)
p1.Spec.NodeName = "n1"
p2.Spec.NodeName = "n2"

currentTime := time.Now()
updateTime := time.Millisecond
maxSoftTaintDuration := 1 * time.Second
now = func() time.Time {
return currentTime
}

fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
getAction := action.(core.GetAction)
switch getAction.GetName() {
case n1.Name:
return true, n1, nil
case n2.Name:
return true, n2, nil
}
return true, nil, fmt.Errorf("Wrong node: %v", getAction.GetName())
})
fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
currentTime = currentTime.Add(updateTime)
update := action.(core.UpdateAction)
obj := update.GetObject().(*apiv1.Node)
updatedNodes <- obj.Name
return true, obj, nil
})

provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
return nil
})
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNode("ng1", n1)
provider.AddNode("ng1", n2)
assert.NotNil(t, provider)

options := config.AutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
ScaleDownUnneededTime: 10 * time.Minute,
MaxGracefulTerminationSec: 60,
MaxBulkSoftTaintCount: 10,
MaxBulkSoftTaintTime: maxSoftTaintDuration,
}
jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job})
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil)

context := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider)

clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
scaleDown := NewScaleDown(&context, clusterStateRegistry)

// Test bulk taint
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
errors := scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1, n2})
assert.Empty(t, errors)
assert.Equal(t, 2, getCountOfChan(updatedNodes))
assert.True(t, deletetaint.HasDeletionCandidateTaint(n1))
assert.True(t, deletetaint.HasDeletionCandidateTaint(n2))

// Test bulk untaint
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute), nil)
errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1, n2})
assert.Empty(t, errors)
assert.Equal(t, 2, getCountOfChan(updatedNodes))
assert.False(t, deletetaint.HasDeletionCandidateTaint(n1))
assert.False(t, deletetaint.HasDeletionCandidateTaint(n2))

// Test duration limit of bulk taint
updateTime = maxSoftTaintDuration
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
errors = scaleDown.SoftTaintUnneededNodes([]*apiv1.Node{n1, n2})
assert.Empty(t, errors)
assert.Equal(t, 1, getCountOfChan(updatedNodes))

// Clean up
now = time.Now
}
Loading

0 comments on commit 0c64e09

Please sign in to comment.