Skip to content

Commit

Permalink
Adding functionality to cordon the node before destroying it. This he…
Browse files Browse the repository at this point in the history
…lps load balancer to remove the node from healthy hosts (ALB does have this support).

This won't fix the issue of 502 completely as there is some time node has to live even after cordoning as to serve In-Flight request but load balancer can be configured to remove Cordon nodes from healthy host list.
This feature is enabled by cordon-node-before-terminating flag with default value as false to retain existing behavior.
  • Loading branch information
atulaggarwal authored and matthias50 committed Aug 12, 2021
1 parent 6244a09 commit 5edabf4
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 28 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,6 @@ type AutoscalingOptions struct {
// ClusterAPICloudConfigAuthoritative tells the Cluster API provider to treat the CloudConfig option as authoritative and
// not use KubeConfigPath as a fallback when it is not provided.
ClusterAPICloudConfigAuthoritative bool
// Enable or disable cordon nodes functionality before terminating the node during downscale process
CordonNodeBeforeTerminate bool
}
8 changes: 4 additions & 4 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
return deletedNodes, errors.NewAutoscalerError(
errors.CloudProviderError, "failed to find node group for %s", node.Name)
}
taintErr := deletetaint.MarkToBeDeleted(node, client)
taintErr := deletetaint.MarkToBeDeleted(node, client, sd.context.CordonNodeBeforeTerminate)
if taintErr != nil {
recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", taintErr)
return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr)
Expand All @@ -1075,7 +1075,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
// If we fail to delete the node we want to remove delete taint
defer func() {
if deleteErr != nil {
deletetaint.CleanToBeDeleted(nodeToDelete, client)
deletetaint.CleanToBeDeleted(nodeToDelete, client, sd.context.CordonNodeBeforeTerminate)
recorder.Eventf(nodeToDelete, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", deleteErr)
} else {
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", nodeToDelete.Name)
Expand Down Expand Up @@ -1111,7 +1111,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo
deleteSuccessful := false
drainSuccessful := false

if err := deletetaint.MarkToBeDeleted(node, sd.context.ClientSet); err != nil {
if err := deletetaint.MarkToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate); err != nil {
sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)}
}
Expand All @@ -1122,7 +1122,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo
// If we fail to evict all the pods from the node we want to remove delete taint
defer func() {
if !deleteSuccessful {
deletetaint.CleanToBeDeleted(node, sd.context.ClientSet)
deletetaint.CleanToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate)
if !drainSuccessful {
sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain the node, aborting ScaleDown")
} else {
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
klog.Errorf("Failed to list ready nodes, not cleaning up taints: %v", err)
} else {
deletetaint.CleanAllToBeDeleted(readyNodes,
a.AutoscalingContext.ClientSet, a.Recorder)
a.AutoscalingContext.ClientSet, a.Recorder, a.CordonNodeBeforeTerminate)
if a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount == 0 {
// Clean old taints if soft taints handling is disabled
deletetaint.CleanAllDeletionCandidates(readyNodes,
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ var (
awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only")
enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled")
clusterAPICloudConfigAuthoritative = flag.Bool("clusterapi-cloud-config-authoritative", false, "Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only")
cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminating", false, "Should CA cordon nodes before terminating during downscale process")
)

func createAutoscalingOptions() config.AutoscalingOptions {
Expand Down Expand Up @@ -243,6 +244,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout,
AWSUseStaticInstanceList: *awsUseStaticInstanceList,
ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative,
CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate,
}
}

Expand Down
39 changes: 24 additions & 15 deletions cluster-autoscaler/utils/deletetaint/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ func getKeyShortName(key string) string {
}

// MarkToBeDeleted sets a taint that makes the node unschedulable.
func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface) error {
return addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule)
func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface, cordonNode bool) error {
return addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, cordonNode)
}

// MarkDeletionCandidate sets a soft taint that makes the node preferably unschedulable.
func MarkDeletionCandidate(node *apiv1.Node, client kube_client.Interface) error {
return addTaint(node, client, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule)
return addTaint(node, client, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule, false)
}

func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, effect apiv1.TaintEffect) error {
func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, effect apiv1.TaintEffect, cordonNode bool) error {
retryDeadline := time.Now().Add(maxRetryDeadline)
freshNode := node.DeepCopy()
var err error
Expand All @@ -81,7 +81,7 @@ func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, e
}
}

if !addTaintToSpec(freshNode, taintKey, effect) {
if !addTaintToSpec(freshNode, taintKey, effect, cordonNode) {
if !refresh {
// Make sure we have the latest version before skipping update.
refresh = true
Expand All @@ -105,7 +105,7 @@ func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, e
}
}

func addTaintToSpec(node *apiv1.Node, taintKey string, effect apiv1.TaintEffect) bool {
func addTaintToSpec(node *apiv1.Node, taintKey string, effect apiv1.TaintEffect, cordonNode bool) bool {
for _, taint := range node.Spec.Taints {
if taint.Key == taintKey {
klog.V(2).Infof("%v already present on node %v, taint: %v", taintKey, node.Name, taint)
Expand All @@ -117,6 +117,10 @@ func addTaintToSpec(node *apiv1.Node, taintKey string, effect apiv1.TaintEffect)
Value: fmt.Sprint(time.Now().Unix()),
Effect: effect,
})
if cordonNode {
klog.V(1).Infof("Marking node %v to be cordoned by Cluster Autoscaler", node.Name)
node.Spec.Unschedulable = true
}
return true
}

Expand Down Expand Up @@ -164,16 +168,16 @@ func getTaintTime(node *apiv1.Node, taintKey string) (*time.Time, error) {
}

// CleanToBeDeleted cleans CA's NoSchedule taint from a node.
func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, error) {
return cleanTaint(node, client, ToBeDeletedTaint)
func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface, cordonNode bool) (bool, error) {
return cleanTaint(node, client, ToBeDeletedTaint, cordonNode)
}

// CleanDeletionCandidate cleans CA's soft NoSchedule taint from a node.
func CleanDeletionCandidate(node *apiv1.Node, client kube_client.Interface) (bool, error) {
return cleanTaint(node, client, DeletionCandidateTaint)
return cleanTaint(node, client, DeletionCandidateTaint, false)
}

func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string) (bool, error) {
func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, cordonNode bool) (bool, error) {
retryDeadline := time.Now().Add(maxRetryDeadline)
freshNode := node.DeepCopy()
var err error
Expand Down Expand Up @@ -205,6 +209,10 @@ func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string)
}

freshNode.Spec.Taints = newTaints
if cordonNode {
klog.V(1).Infof("Marking node %v to be uncordoned by Cluster Autoscaler", freshNode.Name)
freshNode.Spec.Unschedulable = false
}
_, err = client.CoreV1().Nodes().Update(context.TODO(), freshNode, metav1.UpdateOptions{})

if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) {
Expand All @@ -223,21 +231,22 @@ func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string)
}

// CleanAllToBeDeleted cleans ToBeDeleted taints from given nodes.
func CleanAllToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder) {
cleanAllTaints(nodes, client, recorder, ToBeDeletedTaint)
func CleanAllToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder, cordonNode bool) {
cleanAllTaints(nodes, client, recorder, ToBeDeletedTaint, cordonNode)
}

// CleanAllDeletionCandidates cleans DeletionCandidate taints from given nodes.
func CleanAllDeletionCandidates(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder) {
cleanAllTaints(nodes, client, recorder, DeletionCandidateTaint)
cleanAllTaints(nodes, client, recorder, DeletionCandidateTaint, false)
}

func cleanAllTaints(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder, taintKey string) {
func cleanAllTaints(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder,
taintKey string, cordonNode bool) {
for _, node := range nodes {
if !hasTaint(node, taintKey) {
continue
}
cleaned, err := cleanTaint(node, client, taintKey)
cleaned, err := cleanTaint(node, client, taintKey, cordonNode)
if err != nil {
recorder.Eventf(node, apiv1.EventTypeWarning, "ClusterAutoscalerCleanup",
"failed to clean %v on node %v: %v", getKeyShortName(taintKey), node.Name, err)
Expand Down
58 changes: 50 additions & 8 deletions cluster-autoscaler/utils/deletetaint/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMarkNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
fakeClient := buildFakeClientWithConflicts(t, node)
err := MarkToBeDeleted(node, fakeClient)
err := MarkToBeDeleted(node, fakeClient, false)
assert.NoError(t, err)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -65,7 +65,7 @@ func TestSoftMarkNodes(t *testing.T) {
func TestCheckNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -76,7 +76,7 @@ func TestCheckNodes(t *testing.T) {
func TestSoftCheckNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -88,7 +88,7 @@ func TestQueryNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
fakeClient := buildFakeClientWithConflicts(t, node)
err := MarkToBeDeleted(node, fakeClient)
err := MarkToBeDeleted(node, fakeClient, false)
assert.NoError(t, err)

updatedNode := getNode(t, fakeClient, "node")
Expand Down Expand Up @@ -119,25 +119,67 @@ func TestSoftQueryNodes(t *testing.T) {
func TestCleanNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
assert.True(t, HasToBeDeletedTaint(updatedNode))
assert.False(t, updatedNode.Spec.Unschedulable)

cleaned, err := CleanToBeDeleted(node, fakeClient)
cleaned, err := CleanToBeDeleted(node, fakeClient, false)
assert.True(t, cleaned)
assert.NoError(t, err)

updatedNode = getNode(t, fakeClient, "node")
assert.NoError(t, err)
assert.False(t, HasToBeDeletedTaint(updatedNode))
assert.False(t, updatedNode.Spec.Unschedulable)
}

func TestCleanNodesWithCordon(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, true)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
assert.True(t, HasToBeDeletedTaint(updatedNode))
assert.True(t, updatedNode.Spec.Unschedulable)

cleaned, err := CleanToBeDeleted(node, fakeClient, true)
assert.True(t, cleaned)
assert.NoError(t, err)

updatedNode = getNode(t, fakeClient, "node")
assert.NoError(t, err)
assert.False(t, HasToBeDeletedTaint(updatedNode))
assert.False(t, updatedNode.Spec.Unschedulable)
}

func TestCleanNodesWithCordonOnOff(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, true)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
assert.True(t, HasToBeDeletedTaint(updatedNode))
assert.True(t, updatedNode.Spec.Unschedulable)

cleaned, err := CleanToBeDeleted(node, fakeClient, false)
assert.True(t, cleaned)
assert.NoError(t, err)

updatedNode = getNode(t, fakeClient, "node")
assert.NoError(t, err)
assert.False(t, HasToBeDeletedTaint(updatedNode))
assert.True(t, updatedNode.Spec.Unschedulable)
}

func TestSoftCleanNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -162,7 +204,7 @@ func TestCleanAllToBeDeleted(t *testing.T) {

assert.Equal(t, 1, len(getNode(t, fakeClient, "n2").Spec.Taints))

CleanAllToBeDeleted([]*apiv1.Node{n1, n2}, fakeClient, fakeRecorder)
CleanAllToBeDeleted([]*apiv1.Node{n1, n2}, fakeClient, fakeRecorder, false)

assert.Equal(t, 0, len(getNode(t, fakeClient, "n1").Spec.Taints))
assert.Equal(t, 0, len(getNode(t, fakeClient, "n2").Spec.Taints))
Expand Down

0 comments on commit 5edabf4

Please sign in to comment.