Skip to content

Commit

Permalink
Introduce NodeDeleterBatcher to ScaleDown actuator
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Aug 29, 2022
1 parent db5e2f2 commit ebca4f9
Show file tree
Hide file tree
Showing 12 changed files with 687 additions and 157 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,6 @@ type AutoscalingOptions struct {
// MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold
// is exceeded binpacking will be cut short and a partial scale-up will be performed.
MaxNodeGroupBinpackingDuration time.Duration
// NodeBatchDeletionInterval is a time for how long CA ScaleDown gather nodes to delete them in batch.
NodeBatchDeletionInterval time.Duration
}
98 changes: 47 additions & 51 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,32 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletionbatcher"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_record "k8s.io/client-go/tools/record"
)

// Actuator is responsible for draining and deleting nodes.
type Actuator struct {
ctx *context.AutoscalingContext
clusterState *clusterstate.ClusterStateRegistry
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeDeletionBatcher deletionbatcher.NodeBatcherDeleter
evictor Evictor
}

// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker) *Actuator {
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker, nbd deletionbatcher.NodeBatcherDeleter) *Actuator {
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndr,
nodeDeletionBatcher: nbd,
evictor: NewDefaultEvictor(),
}
}
Expand Down Expand Up @@ -152,17 +154,13 @@ func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNod
} else {
klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err)
}

go func(node *apiv1.Node) {
result := a.deleteNode(node, false)
if result.Err == nil {
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", node.Name)
} else {
klog.Errorf("Scale-down: couldn't delete empty node, err: %v", err)
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", result.Err)
_, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
}(emptyNode)
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(emptyNode)
if err != nil || nodeGroup == nil {
klog.Errorf("Failed to find node group for %s: %v", emptyNode.Name, err)
continue
}
a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), emptyNode.Name)
go a.scheduleDeletion(emptyNode, false)
}
return scaledDownNodes, nil
}
Expand Down Expand Up @@ -197,17 +195,13 @@ func (a *Actuator) deleteAsyncDrain(drain []*apiv1.Node) (scaledDownNodes []*sta
} else {
klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err)
}

go func(node *apiv1.Node) {
result := a.deleteNode(node, true)
if result.Err == nil {
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: node %s removed with drain", node.Name)
} else {
klog.Errorf("Scale-down: couldn't delete node %q with drain, err: %v", node.Name, result.Err)
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain and delete node: %v", result.Err)
_, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
}(drainNode)
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(drainNode)
if err != nil {
klog.Errorf("Failed to find node group for %s: %v", drainNode.Name, err)
continue
}
a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), drainNode.Name)
go a.scheduleDeletion(drainNode, true)
}
return scaledDownNodes
}
Expand Down Expand Up @@ -251,41 +245,47 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
return nil
}

// deleteNode performs the deletion of the provided node. If drain is true, the node is drained before being deleted.
func (a *Actuator) deleteNode(node *apiv1.Node, drain bool) (result status.NodeDeleteResult) {
func (a *Actuator) prepareNodeForDeletion(node *apiv1.Node, drain bool) status.NodeDeleteResult {
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)}
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "NodeGroupForNode returned error: %v %s", node.Name, err)}
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name)}
}

defer func() { a.nodeDeletionTracker.EndDeletion(nodeGroup.Id(), node.Name, result) }()
if drain {
a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), node.Name)
if evictionResults, err := a.evictor.DrainNode(a.ctx, node); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: err, PodEvictionResults: evictionResults}
}
} else {
a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), node.Name)
if err := a.evictor.EvictDaemonSetPods(a.ctx, node, time.Now()); err != nil {
// Evicting DS pods is best-effort, so proceed with the deletion even if there are errors.
klog.Warningf("Error while evicting DS pods from an empty node %q: %v", node.Name, err)
}
}

if err := WaitForDelayDeletion(node, a.ctx.ListerRegistry.AllNodeLister(), a.ctx.AutoscalingOptions.NodeDeletionDelayTimeout); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
}
return status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}

if err := DeleteNodeFromCloudProvider(a.ctx, node, a.clusterState); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
// scheduleDeletion schedule the deletion on of the provided node by adding a node to NodeDeletionBatcher. If drain is true, the node is drained before being deleted.
func (a *Actuator) scheduleDeletion(node *apiv1.Node, drain bool) {
status := a.prepareNodeForDeletion(node, drain)
if status.Err != nil {
RecordFailedScaleDownEvent(node, drain, a.ctx.Recorder, "prepareNodeForDelition failed", status.Err)
_, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return
}
a.nodeDeletionTracker.EndDeletion(nodeGroup.Id(), node.Name, status)
return
}
err := a.nodeDeletionBatcher.AddNode(node)
if err != nil {
klog.Errorf("Couldn't add node to nodeDeletionBatcher, err: %v", err)
}

metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(a.ctx.CloudProvider.GPULabel(), a.ctx.CloudProvider.GetAvailableGPUTypes(), node, nodeGroup), nodeScaleDownReason(node, drain))

return status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}

func min(x, y int) int {
Expand All @@ -303,18 +303,14 @@ func joinPodNames(pods []*apiv1.Pod) string {
return strings.Join(names, ",")
}

func nodeScaleDownReason(node *apiv1.Node, drain bool) metrics.NodeScaleDownReason {
readiness, err := kubernetes.GetNodeReadiness(node)
if err != nil {
klog.Errorf("Couldn't determine node %q readiness while scaling down - assuming unready: %v", node.Name, err)
return metrics.Unready
}
if !readiness.Ready {
return metrics.Unready
}
// Node is ready.
// RecordFailedScaleDownEvent record failed scale down event and log an error.
func RecordFailedScaleDownEvent(node *apiv1.Node, drain bool, recorder kube_record.EventRecorder, errMsg string, statusErr error) {
if drain {
return metrics.Underutilized
klog.Errorf("Scale-down: couldn't delete node %q with drain, %v, status error: %v", node.Name, errMsg, statusErr)
recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain and delete node: %v", statusErr)

} else {
klog.Errorf("Scale-down: couldn't delete empty node, %v, status error: %v", errMsg, statusErr)
recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", statusErr)
}
return metrics.Empty
}
Loading

0 comments on commit ebca4f9

Please sign in to comment.