Skip to content

Commit

Permalink
Merge pull request kubernetes#4259 from matthias50/cherry-pick-3649-1.20
Browse files Browse the repository at this point in the history
cherry pick kubernetes#3649 - Adding functionality to cordon the node before destroying it.
  • Loading branch information
k8s-ci-robot authored Aug 16, 2021
2 parents 6244a09 + 5edabf4 commit c7978e2
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 28 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,6 @@ type AutoscalingOptions struct {
// ClusterAPICloudConfigAuthoritative tells the Cluster API provider to treat the CloudConfig option as authoritative and
// not use KubeConfigPath as a fallback when it is not provided.
ClusterAPICloudConfigAuthoritative bool
// Enable or disable cordon nodes functionality before terminating the node during downscale process
CordonNodeBeforeTerminate bool
}
8 changes: 4 additions & 4 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
return deletedNodes, errors.NewAutoscalerError(
errors.CloudProviderError, "failed to find node group for %s", node.Name)
}
taintErr := deletetaint.MarkToBeDeleted(node, client)
taintErr := deletetaint.MarkToBeDeleted(node, client, sd.context.CordonNodeBeforeTerminate)
if taintErr != nil {
recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", taintErr)
return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr)
Expand All @@ -1075,7 +1075,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
// If we fail to delete the node we want to remove delete taint
defer func() {
if deleteErr != nil {
deletetaint.CleanToBeDeleted(nodeToDelete, client)
deletetaint.CleanToBeDeleted(nodeToDelete, client, sd.context.CordonNodeBeforeTerminate)
recorder.Eventf(nodeToDelete, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", deleteErr)
} else {
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", nodeToDelete.Name)
Expand Down Expand Up @@ -1111,7 +1111,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo
deleteSuccessful := false
drainSuccessful := false

if err := deletetaint.MarkToBeDeleted(node, sd.context.ClientSet); err != nil {
if err := deletetaint.MarkToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate); err != nil {
sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)}
}
Expand All @@ -1122,7 +1122,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo
// If we fail to evict all the pods from the node we want to remove delete taint
defer func() {
if !deleteSuccessful {
deletetaint.CleanToBeDeleted(node, sd.context.ClientSet)
deletetaint.CleanToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate)
if !drainSuccessful {
sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain the node, aborting ScaleDown")
} else {
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
klog.Errorf("Failed to list ready nodes, not cleaning up taints: %v", err)
} else {
deletetaint.CleanAllToBeDeleted(readyNodes,
a.AutoscalingContext.ClientSet, a.Recorder)
a.AutoscalingContext.ClientSet, a.Recorder, a.CordonNodeBeforeTerminate)
if a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount == 0 {
// Clean old taints if soft taints handling is disabled
deletetaint.CleanAllDeletionCandidates(readyNodes,
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ var (
awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only")
enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled")
clusterAPICloudConfigAuthoritative = flag.Bool("clusterapi-cloud-config-authoritative", false, "Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only")
cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminating", false, "Should CA cordon nodes before terminating during downscale process")
)

func createAutoscalingOptions() config.AutoscalingOptions {
Expand Down Expand Up @@ -243,6 +244,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout,
AWSUseStaticInstanceList: *awsUseStaticInstanceList,
ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative,
CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate,
}
}

Expand Down
39 changes: 24 additions & 15 deletions cluster-autoscaler/utils/deletetaint/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ func getKeyShortName(key string) string {
}

// MarkToBeDeleted sets a taint that makes the node unschedulable.
func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface) error {
return addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule)
func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface, cordonNode bool) error {
return addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, cordonNode)
}

// MarkDeletionCandidate sets a soft taint that makes the node preferably unschedulable.
func MarkDeletionCandidate(node *apiv1.Node, client kube_client.Interface) error {
return addTaint(node, client, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule)
return addTaint(node, client, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule, false)
}

func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, effect apiv1.TaintEffect) error {
func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, effect apiv1.TaintEffect, cordonNode bool) error {
retryDeadline := time.Now().Add(maxRetryDeadline)
freshNode := node.DeepCopy()
var err error
Expand All @@ -81,7 +81,7 @@ func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, e
}
}

if !addTaintToSpec(freshNode, taintKey, effect) {
if !addTaintToSpec(freshNode, taintKey, effect, cordonNode) {
if !refresh {
// Make sure we have the latest version before skipping update.
refresh = true
Expand All @@ -105,7 +105,7 @@ func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, e
}
}

func addTaintToSpec(node *apiv1.Node, taintKey string, effect apiv1.TaintEffect) bool {
func addTaintToSpec(node *apiv1.Node, taintKey string, effect apiv1.TaintEffect, cordonNode bool) bool {
for _, taint := range node.Spec.Taints {
if taint.Key == taintKey {
klog.V(2).Infof("%v already present on node %v, taint: %v", taintKey, node.Name, taint)
Expand All @@ -117,6 +117,10 @@ func addTaintToSpec(node *apiv1.Node, taintKey string, effect apiv1.TaintEffect)
Value: fmt.Sprint(time.Now().Unix()),
Effect: effect,
})
if cordonNode {
klog.V(1).Infof("Marking node %v to be cordoned by Cluster Autoscaler", node.Name)
node.Spec.Unschedulable = true
}
return true
}

Expand Down Expand Up @@ -164,16 +168,16 @@ func getTaintTime(node *apiv1.Node, taintKey string) (*time.Time, error) {
}

// CleanToBeDeleted cleans CA's NoSchedule taint from a node.
func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, error) {
return cleanTaint(node, client, ToBeDeletedTaint)
func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface, cordonNode bool) (bool, error) {
return cleanTaint(node, client, ToBeDeletedTaint, cordonNode)
}

// CleanDeletionCandidate cleans CA's soft NoSchedule taint from a node.
func CleanDeletionCandidate(node *apiv1.Node, client kube_client.Interface) (bool, error) {
return cleanTaint(node, client, DeletionCandidateTaint)
return cleanTaint(node, client, DeletionCandidateTaint, false)
}

func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string) (bool, error) {
func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, cordonNode bool) (bool, error) {
retryDeadline := time.Now().Add(maxRetryDeadline)
freshNode := node.DeepCopy()
var err error
Expand Down Expand Up @@ -205,6 +209,10 @@ func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string)
}

freshNode.Spec.Taints = newTaints
if cordonNode {
klog.V(1).Infof("Marking node %v to be uncordoned by Cluster Autoscaler", freshNode.Name)
freshNode.Spec.Unschedulable = false
}
_, err = client.CoreV1().Nodes().Update(context.TODO(), freshNode, metav1.UpdateOptions{})

if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) {
Expand All @@ -223,21 +231,22 @@ func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string)
}

// CleanAllToBeDeleted cleans ToBeDeleted taints from given nodes.
func CleanAllToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder) {
cleanAllTaints(nodes, client, recorder, ToBeDeletedTaint)
func CleanAllToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder, cordonNode bool) {
cleanAllTaints(nodes, client, recorder, ToBeDeletedTaint, cordonNode)
}

// CleanAllDeletionCandidates cleans DeletionCandidate taints from given nodes.
func CleanAllDeletionCandidates(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder) {
cleanAllTaints(nodes, client, recorder, DeletionCandidateTaint)
cleanAllTaints(nodes, client, recorder, DeletionCandidateTaint, false)
}

func cleanAllTaints(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder, taintKey string) {
func cleanAllTaints(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder,
taintKey string, cordonNode bool) {
for _, node := range nodes {
if !hasTaint(node, taintKey) {
continue
}
cleaned, err := cleanTaint(node, client, taintKey)
cleaned, err := cleanTaint(node, client, taintKey, cordonNode)
if err != nil {
recorder.Eventf(node, apiv1.EventTypeWarning, "ClusterAutoscalerCleanup",
"failed to clean %v on node %v: %v", getKeyShortName(taintKey), node.Name, err)
Expand Down
58 changes: 50 additions & 8 deletions cluster-autoscaler/utils/deletetaint/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMarkNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
fakeClient := buildFakeClientWithConflicts(t, node)
err := MarkToBeDeleted(node, fakeClient)
err := MarkToBeDeleted(node, fakeClient, false)
assert.NoError(t, err)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -65,7 +65,7 @@ func TestSoftMarkNodes(t *testing.T) {
func TestCheckNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -76,7 +76,7 @@ func TestCheckNodes(t *testing.T) {
func TestSoftCheckNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -88,7 +88,7 @@ func TestQueryNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
fakeClient := buildFakeClientWithConflicts(t, node)
err := MarkToBeDeleted(node, fakeClient)
err := MarkToBeDeleted(node, fakeClient, false)
assert.NoError(t, err)

updatedNode := getNode(t, fakeClient, "node")
Expand Down Expand Up @@ -119,25 +119,67 @@ func TestSoftQueryNodes(t *testing.T) {
func TestCleanNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
assert.True(t, HasToBeDeletedTaint(updatedNode))
assert.False(t, updatedNode.Spec.Unschedulable)

cleaned, err := CleanToBeDeleted(node, fakeClient)
cleaned, err := CleanToBeDeleted(node, fakeClient, false)
assert.True(t, cleaned)
assert.NoError(t, err)

updatedNode = getNode(t, fakeClient, "node")
assert.NoError(t, err)
assert.False(t, HasToBeDeletedTaint(updatedNode))
assert.False(t, updatedNode.Spec.Unschedulable)
}

func TestCleanNodesWithCordon(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, true)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
assert.True(t, HasToBeDeletedTaint(updatedNode))
assert.True(t, updatedNode.Spec.Unschedulable)

cleaned, err := CleanToBeDeleted(node, fakeClient, true)
assert.True(t, cleaned)
assert.NoError(t, err)

updatedNode = getNode(t, fakeClient, "node")
assert.NoError(t, err)
assert.False(t, HasToBeDeletedTaint(updatedNode))
assert.False(t, updatedNode.Spec.Unschedulable)
}

func TestCleanNodesWithCordonOnOff(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, true)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
assert.True(t, HasToBeDeletedTaint(updatedNode))
assert.True(t, updatedNode.Spec.Unschedulable)

cleaned, err := CleanToBeDeleted(node, fakeClient, false)
assert.True(t, cleaned)
assert.NoError(t, err)

updatedNode = getNode(t, fakeClient, "node")
assert.NoError(t, err)
assert.False(t, HasToBeDeletedTaint(updatedNode))
assert.True(t, updatedNode.Spec.Unschedulable)
}

func TestSoftCleanNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -162,7 +204,7 @@ func TestCleanAllToBeDeleted(t *testing.T) {

assert.Equal(t, 1, len(getNode(t, fakeClient, "n2").Spec.Taints))

CleanAllToBeDeleted([]*apiv1.Node{n1, n2}, fakeClient, fakeRecorder)
CleanAllToBeDeleted([]*apiv1.Node{n1, n2}, fakeClient, fakeRecorder, false)

assert.Equal(t, 0, len(getNode(t, fakeClient, "n1").Spec.Taints))
assert.Equal(t, 0, len(getNode(t, fakeClient, "n2").Spec.Taints))
Expand Down

0 comments on commit c7978e2

Please sign in to comment.