Skip to content

Commit

Permalink
Introduce NodeDeleterBatcher to ScaleDown actuator
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Sep 21, 2022
1 parent db5e2f2 commit d5fd28c
Show file tree
Hide file tree
Showing 10 changed files with 660 additions and 129 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,6 @@ type AutoscalingOptions struct {
// MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold
// is exceeded binpacking will be cut short and a partial scale-up will be performed.
MaxNodeGroupBinpackingDuration time.Duration
// NodeDeletionBatcherInterval is a time for how long CA ScaleDown gather nodes to delete them in batch.
NodeDeletionBatcherInterval time.Duration
}
114 changes: 64 additions & 50 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
Expand All @@ -34,23 +35,25 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
)

// Actuator is responsible for draining and deleting nodes.
type Actuator struct {
ctx *context.AutoscalingContext
clusterState *clusterstate.ClusterStateRegistry
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeDeletionBatcher *NodeBatcherDeleter
evictor Evictor
}

// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker) *Actuator {
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndr *deletiontracker.NodeDeletionTracker, batchInterval time.Duration) *Actuator {
nbd := NewNodeBatcherDeleter(ctx, csr, ndr, batchInterval)
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndr,
nodeDeletionBatcher: nbd,
evictor: NewDefaultEvictor(),
}
}
Expand Down Expand Up @@ -141,7 +144,13 @@ func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNod
klog.V(0).Infof("Scale-down: removing empty node %q", emptyNode.Name)
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %q", emptyNode.Name)

err := a.taintNode(emptyNode)
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(emptyNode)
if err != nil || nodeGroup == nil {
klog.Errorf("Failed to find node group for %s: %v", emptyNode.Name, err)
continue
}

err = a.taintNode(emptyNode)
if err != nil {
a.ctx.Recorder.Eventf(emptyNode, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
return scaledDownNodes, errors.NewAutoscalerError(errors.ApiCallError, "couldn't taint node %q with ToBeDeleted", emptyNode.Name)
Expand All @@ -152,17 +161,8 @@ func (a *Actuator) taintSyncDeleteAsyncEmpty(empty []*apiv1.Node) (scaledDownNod
} else {
klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err)
}

go func(node *apiv1.Node) {
result := a.deleteNode(node, false)
if result.Err == nil {
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", node.Name)
} else {
klog.Errorf("Scale-down: couldn't delete empty node, err: %v", err)
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", result.Err)
_, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
}(emptyNode)
a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), emptyNode.Name)
go a.scheduleDeletion(emptyNode, nodeGroup.Id(), false)
}
return scaledDownNodes, nil
}
Expand Down Expand Up @@ -197,17 +197,13 @@ func (a *Actuator) deleteAsyncDrain(drain []*apiv1.Node) (scaledDownNodes []*sta
} else {
klog.Errorf("Scale-down: couldn't report scaled down node, err: %v", err)
}

go func(node *apiv1.Node) {
result := a.deleteNode(node, true)
if result.Err == nil {
a.ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: node %s removed with drain", node.Name)
} else {
klog.Errorf("Scale-down: couldn't delete node %q with drain, err: %v", node.Name, result.Err)
a.ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain and delete node: %v", result.Err)
_, _ = deletetaint.CleanToBeDeleted(node, a.ctx.ClientSet, a.ctx.CordonNodeBeforeTerminate)
}
}(drainNode)
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(drainNode)
if err != nil {
klog.Errorf("Failed to find node group for %s: %v", drainNode.Name, err)
continue
}
a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), drainNode.Name)
go a.scheduleDeletion(drainNode, nodeGroup.Id(), true)
}
return scaledDownNodes
}
Expand Down Expand Up @@ -251,41 +247,43 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
return nil
}

// deleteNode performs the deletion of the provided node. If drain is true, the node is drained before being deleted.
func (a *Actuator) deleteNode(node *apiv1.Node, drain bool) (result status.NodeDeleteResult) {
func (a *Actuator) prepareNodeForDeletion(node *apiv1.Node, drain bool) status.NodeDeleteResult {
nodeGroup, err := a.ctx.CloudProvider.NodeGroupForNode(node)
if err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)}
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.CloudProviderError, "NodeGroupForNode for %s returned error: %v", node.Name, err)}
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name)}
}

defer func() { a.nodeDeletionTracker.EndDeletion(nodeGroup.Id(), node.Name, result) }()
if drain {
a.nodeDeletionTracker.StartDeletionWithDrain(nodeGroup.Id(), node.Name)
if evictionResults, err := a.evictor.DrainNode(a.ctx, node); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToEvictPods, Err: err, PodEvictionResults: evictionResults}
}
} else {
a.nodeDeletionTracker.StartDeletion(nodeGroup.Id(), node.Name)
if err := a.evictor.EvictDaemonSetPods(a.ctx, node, time.Now()); err != nil {
// Evicting DS pods is best-effort, so proceed with the deletion even if there are errors.
klog.Warningf("Error while evicting DS pods from an empty node %q: %v", node.Name, err)
}
}

if err := WaitForDelayDeletion(node, a.ctx.ListerRegistry.AllNodeLister(), a.ctx.AutoscalingOptions.NodeDeletionDelayTimeout); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
}
return status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}

if err := DeleteNodeFromCloudProvider(a.ctx, node, a.clusterState); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err}
// scheduleDeletion schedule the deletion on of the provided node by adding a node to NodeDeletionBatcher. If drain is true, the node is drained before being deleted.
func (a *Actuator) scheduleDeletion(node *apiv1.Node, nodeGroupId string, drain bool) {
nodeDeleteResult := a.prepareNodeForDeletion(node, drain)
if nodeDeleteResult.Err != nil {
CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, nodeGroupId, drain, a.nodeDeletionTracker, "prepareNodeForDeletion failed", nodeDeleteResult)
return
}
err := a.nodeDeletionBatcher.AddNode(node, drain)
if err != nil {
klog.Errorf("Couldn't add node to nodeDeletionBatcher, err: %v", err)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "nodeDeletionBatcher.AddNode for %s returned error: %v", node.Name, err)}
CleanUpAndRecordFailedScaleDownEvent(a.ctx, node, nodeGroupId, drain, a.nodeDeletionTracker, "failed add node to the nodeDeletionBatche", nodeDeleteResult)
}

metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(a.ctx.CloudProvider.GPULabel(), a.ctx.CloudProvider.GetAvailableGPUTypes(), node, nodeGroup), nodeScaleDownReason(node, drain))

return status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}

func min(x, y int) int {
Expand All @@ -303,18 +301,34 @@ func joinPodNames(pods []*apiv1.Pod) string {
return strings.Join(names, ",")
}

func nodeScaleDownReason(node *apiv1.Node, drain bool) metrics.NodeScaleDownReason {
readiness, err := kubernetes.GetNodeReadiness(node)
if err != nil {
klog.Errorf("Couldn't determine node %q readiness while scaling down - assuming unready: %v", node.Name, err)
return metrics.Unready
}
if !readiness.Ready {
return metrics.Unready
// CleanUpAndRecordFailedScaleDownEvent record failed scale down event and log an error.
func CleanUpAndRecordFailedScaleDownEvent(ctx *context.AutoscalingContext, node *apiv1.Node, nodeGroupId string, drain bool, nodeDeletionTracker *deletiontracker.NodeDeletionTracker, errMsg string, status status.NodeDeleteResult) {
if drain {
klog.Errorf("Scale-down: couldn't delete node %q with drain, %v, status error: %v", node.Name, errMsg, status.Err)
ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain and delete node: %v", status.Err)

} else {
klog.Errorf("Scale-down: couldn't delete empty node, %v, status error: %v", errMsg, status.Err)
ctx.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", status.Err)
}
// Node is ready.
deletetaint.CleanToBeDeleted(node, ctx.ClientSet, ctx.CordonNodeBeforeTerminate)
nodeDeletionTracker.EndDeletion(nodeGroupId, node.Name, status)
}

// RegisterAndRecordSuccessfulScaleDownEvent register scale down and record successful scale down event.
func RegisterAndRecordSuccessfulScaleDownEvent(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) {
ctx.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "nodes removed by cluster autoscaler")
csr.RegisterScaleDown(&clusterstate.ScaleDownRequest{
NodeGroup: nodeGroup,
NodeName: node.Name,
Time: time.Now(),
ExpectedDeleteTime: time.Now().Add(MaxCloudProviderNodeDeletionTime),
})
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(ctx.CloudProvider.GPULabel(), ctx.CloudProvider.GetAvailableGPUTypes(), node, nodeGroup), nodeScaleDownReason(node, drain))
if drain {
return metrics.Underutilized
ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDown", "Scale-down: node %s removed with drain", node.Name)
} else {
ctx.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", node.Name)
}
return metrics.Empty
nodeDeletionTracker.EndDeletion(nodeGroup.Id(), node.Name, status.NodeDeleteResult{ResultType: status.NodeDeleteOk})
}
Loading

0 comments on commit d5fd28c

Please sign in to comment.