From 78d4415e5d832388be8570ecb9bc3ec506d26358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Thu, 21 Nov 2024 18:40:28 +0100 Subject: [PATCH] CA: plumb the DRA provider to SetClusterState callsites, grab and pass DRA snapshot The new logic is flag-guarded, it should be a no-op if DRA is disabled. --- cluster-autoscaler/core/autoscaler.go | 6 ++++++ .../core/scaledown/actuation/actuator.go | 16 ++++++++++++-- cluster-autoscaler/core/static_autoscaler.go | 21 +++++++++++++++---- .../core/static_autoscaler_test.go | 8 +++---- 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 42e98897733b..64fa88401d1a 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -36,6 +36,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/base" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" @@ -63,6 +64,7 @@ type AutoscalerOptions struct { ScaleUpOrchestrator scaleup.Orchestrator DeleteOptions options.NodeDeleteOptions DrainabilityRules rules.Rules + DraProvider *draprovider.Provider } // Autoscaler is the main component of CA which scales up/down node groups according to its configuration @@ -102,6 +104,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor opts.ScaleUpOrchestrator, opts.DeleteOptions, opts.DrainabilityRules, + opts.DraProvider, ), nil } @@ -165,6 +168,9 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers if opts.DrainabilityRules == nil { opts.DrainabilityRules = rules.Default(opts.DeleteOptions) } + if opts.DraProvider == nil && opts.DynamicResourceAllocationEnabled { + opts.DraProvider = draprovider.NewProviderFromInformers(informerFactory) + } return nil } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index 3c0fed951fc6..8ddb30163c02 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -36,6 +36,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/base" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" @@ -64,6 +65,7 @@ type Actuator struct { configGetter actuatorNodeGroupConfigGetter nodeDeleteDelayAfterTaint time.Duration pastLatencies *expiring.List + draProvider *draprovider.Provider } // actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used @@ -74,7 +76,7 @@ type actuatorNodeGroupConfigGetter interface { } // NewActuator returns a new instance of Actuator. -func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator { +func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter, draProvider *draprovider.Provider) *Actuator { ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval) legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec) var evictor Evictor @@ -93,6 +95,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch configGetter: configGetter, nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint, pastLatencies: expiring.NewList(), + draProvider: draProvider, } } @@ -368,7 +371,16 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS scheduledPods := kube_util.ScheduledPods(pods) nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff) - err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, drasnapshot.Snapshot{}) + var draSnapshot drasnapshot.Snapshot + if a.ctx.DynamicResourceAllocationEnabled && a.draProvider != nil { + draSnapshot, err = a.draProvider.Snapshot() + if err != nil { + // TODO(DRA): Maybe proceed? + return nil, err + } + } + + err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, draSnapshot) if err != nil { return nil, err } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 589403d1abb5..8936d954b44f 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -46,6 +46,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider" drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/simulator/options" @@ -92,6 +93,7 @@ type StaticAutoscaler struct { processorCallbacks *staticAutoscalerProcessorCallbacks initialized bool taintConfig taints.TaintConfig + draProvider *draprovider.Provider } type staticAutoscalerProcessorCallbacks struct { @@ -144,7 +146,8 @@ func NewStaticAutoscaler( remainingPdbTracker pdb.RemainingPdbTracker, scaleUpOrchestrator scaleup.Orchestrator, deleteOptions options.NodeDeleteOptions, - drainabilityRules rules.Rules) *StaticAutoscaler { + drainabilityRules rules.Rules, + draProvider *draprovider.Provider) *StaticAutoscaler { clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: opts.MaxTotalUnreadyPercentage, @@ -174,7 +177,7 @@ func NewStaticAutoscaler( processorCallbacks.scaleDownPlanner = scaleDownPlanner ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) - scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) + scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor, draProvider) autoscalingContext.ScaleDownActuator = scaleDownActuator if scaleUpOrchestrator == nil { @@ -198,6 +201,7 @@ func NewStaticAutoscaler( processorCallbacks: processorCallbacks, clusterStateRegistry: clusterStateRegistry, taintConfig: taintConfig, + draProvider: draProvider, } } @@ -337,8 +341,17 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr metrics.UpdateMaxNodesCount(maxNodesCount) } nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff) - // Initialize cluster state to ClusterSnapshot - if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, drasnapshot.Snapshot{}); err != nil { + + var draSnapshot drasnapshot.Snapshot + if a.AutoscalingContext.DynamicResourceAllocationEnabled && a.draProvider != nil { + draSnapshot, err = a.draProvider.Snapshot() + if err != nil { + // TODO(DRA): Maybe proceed? + return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) + } + } + + if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, draSnapshot); err != nil { return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ") } // Initialize Pod Disruption Budget tracking diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index ec0b084a752b..acb0df84e679 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -163,7 +163,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error { func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions config.AutoscalingOptions) { deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) - ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor) + ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor, nil) } type nodeGroup struct { @@ -1450,7 +1450,7 @@ func TestStaticAutoscalerRunOnceWithUnselectedNodeGroups(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingOptions.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) // Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test. - sdActuator := actuation.NewActuator(&context, clusterState, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor) + sdActuator := actuation.NewActuator(&context, clusterState, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor, nil) context.ScaleDownActuator = sdActuator // Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState. @@ -2097,7 +2097,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) { csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker) // Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test. - actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor) + actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor, nil) ctx.ScaleDownActuator = actuator // Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState. @@ -2669,7 +2669,7 @@ func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_proce nodeDeletionTracker = deletiontracker.NewNodeDeletionTracker(0 * time.Second) } planner := planner.New(ctx, p, deleteOptions, nil) - actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, deleteOptions, nil, p.NodeGroupConfigProcessor) + actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, deleteOptions, nil, p.NodeGroupConfigProcessor, nil) return planner, actuator }