Skip to content

Commit

Permalink
CA: plumb the DRA provider to SetClusterState callsites, grab and pas…
Browse files Browse the repository at this point in the history
…s DRA snapshot

The new logic is flag-guarded, it should be a no-op if DRA is disabled.
  • Loading branch information
towca committed Nov 29, 2024
1 parent 6243f97 commit 78d4415
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 10 deletions.
6 changes: 6 additions & 0 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/base"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
Expand Down Expand Up @@ -63,6 +64,7 @@ type AutoscalerOptions struct {
ScaleUpOrchestrator scaleup.Orchestrator
DeleteOptions options.NodeDeleteOptions
DrainabilityRules rules.Rules
DraProvider *draprovider.Provider
}

// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
Expand Down Expand Up @@ -102,6 +104,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor
opts.ScaleUpOrchestrator,
opts.DeleteOptions,
opts.DrainabilityRules,
opts.DraProvider,
), nil
}

Expand Down Expand Up @@ -165,6 +168,9 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
if opts.DrainabilityRules == nil {
opts.DrainabilityRules = rules.Default(opts.DeleteOptions)
}
if opts.DraProvider == nil && opts.DynamicResourceAllocationEnabled {
opts.DraProvider = draprovider.NewProviderFromInformers(informerFactory)
}

return nil
}
16 changes: 14 additions & 2 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/base"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
Expand Down Expand Up @@ -64,6 +65,7 @@ type Actuator struct {
configGetter actuatorNodeGroupConfigGetter
nodeDeleteDelayAfterTaint time.Duration
pastLatencies *expiring.List
draProvider *draprovider.Provider
}

// actuatorNodeGroupConfigGetter is an interface to limit the functions that can be used
Expand All @@ -74,7 +76,7 @@ type actuatorNodeGroupConfigGetter interface {
}

// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter, draProvider *draprovider.Provider) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
var evictor Evictor
Expand All @@ -93,6 +95,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch
configGetter: configGetter,
nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint,
pastLatencies: expiring.NewList(),
draProvider: draProvider,
}
}

Expand Down Expand Up @@ -368,7 +371,16 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)

err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, drasnapshot.Snapshot{})
var draSnapshot drasnapshot.Snapshot
if a.ctx.DynamicResourceAllocationEnabled && a.draProvider != nil {
draSnapshot, err = a.draProvider.Snapshot()
if err != nil {
// TODO(DRA): Maybe proceed?
return nil, err
}
}

err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, draSnapshot)
if err != nil {
return nil, err
}
Expand Down
21 changes: 17 additions & 4 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
Expand Down Expand Up @@ -92,6 +93,7 @@ type StaticAutoscaler struct {
processorCallbacks *staticAutoscalerProcessorCallbacks
initialized bool
taintConfig taints.TaintConfig
draProvider *draprovider.Provider
}

type staticAutoscalerProcessorCallbacks struct {
Expand Down Expand Up @@ -144,7 +146,8 @@ func NewStaticAutoscaler(
remainingPdbTracker pdb.RemainingPdbTracker,
scaleUpOrchestrator scaleup.Orchestrator,
deleteOptions options.NodeDeleteOptions,
drainabilityRules rules.Rules) *StaticAutoscaler {
drainabilityRules rules.Rules,
draProvider *draprovider.Provider) *StaticAutoscaler {

clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: opts.MaxTotalUnreadyPercentage,
Expand Down Expand Up @@ -174,7 +177,7 @@ func NewStaticAutoscaler(
processorCallbacks.scaleDownPlanner = scaleDownPlanner

ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor)
scaleDownActuator := actuation.NewActuator(autoscalingContext, processors.ScaleStateNotifier, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor, draProvider)
autoscalingContext.ScaleDownActuator = scaleDownActuator

if scaleUpOrchestrator == nil {
Expand All @@ -198,6 +201,7 @@ func NewStaticAutoscaler(
processorCallbacks: processorCallbacks,
clusterStateRegistry: clusterStateRegistry,
taintConfig: taintConfig,
draProvider: draProvider,
}
}

Expand Down Expand Up @@ -337,8 +341,17 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
metrics.UpdateMaxNodesCount(maxNodesCount)
}
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
// Initialize cluster state to ClusterSnapshot
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, drasnapshot.Snapshot{}); err != nil {

var draSnapshot drasnapshot.Snapshot
if a.AutoscalingContext.DynamicResourceAllocationEnabled && a.draProvider != nil {
draSnapshot, err = a.draProvider.Snapshot()
if err != nil {
// TODO(DRA): Maybe proceed?
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
}

if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, draSnapshot); err != nil {
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
}
// Initialize Pod Disruption Budget tracking
Expand Down
8 changes: 4 additions & 4 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error {

func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions config.AutoscalingOptions) {
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor, nil)
}

type nodeGroup struct {
Expand Down Expand Up @@ -1450,7 +1450,7 @@ func TestStaticAutoscalerRunOnceWithUnselectedNodeGroups(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingOptions.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker)

// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
sdActuator := actuation.NewActuator(&context, clusterState, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor)
sdActuator := actuation.NewActuator(&context, clusterState, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processors.NodeGroupConfigProcessor, nil)
context.ScaleDownActuator = sdActuator

// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
Expand Down Expand Up @@ -2097,7 +2097,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker)

// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor)
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor, nil)
ctx.ScaleDownActuator = actuator

// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
Expand Down Expand Up @@ -2669,7 +2669,7 @@ func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_proce
nodeDeletionTracker = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
}
planner := planner.New(ctx, p, deleteOptions, nil)
actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, deleteOptions, nil, p.NodeGroupConfigProcessor)
actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, deleteOptions, nil, p.NodeGroupConfigProcessor, nil)
return planner, actuator
}

Expand Down

0 comments on commit 78d4415

Please sign in to comment.