Skip to content

Commit

Permalink
Merge pull request kubernetes#7529 from towca/jtuznik/dra-prep
Browse files Browse the repository at this point in the history
CA: prepare for DRA integration
  • Loading branch information
k8s-ci-robot authored Dec 9, 2024
2 parents f6c990e + 0691512 commit 37b3da4
Show file tree
Hide file tree
Showing 29 changed files with 452 additions and 191 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ type AutoscalingOptions struct {
CheckCapacityProvisioningRequestBatchTimebox time.Duration
// ForceDeleteLongUnregisteredNodes is used to enable/disable ignoring min size constraints during removal of long unregistered nodes
ForceDeleteLongUnregisteredNodes bool
// DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled.
DynamicResourceAllocationEnabled bool
}

// KubeClientOptions specify options for kube client
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
}
if opts.FrameworkHandle == nil {
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig)
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled)
if err != nil {
return err
}
opts.FrameworkHandle = fwHandle
}
if opts.ClusterSnapshot == nil {
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle)
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled)
}
if opts.RemainingPdbTracker == nil {
opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -110,7 +111,7 @@ func TestFilterOutExpendable(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpendablePodListProcessor()
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(tc.nodes, nil)
err := snapshot.SetClusterState(tc.nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)

pods, err := processor.Process(&context.AutoscalingContext{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -280,7 +281,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
}

clusterSnapshot := snapshotFactory()
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil {
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods, drasnapshot.Snapshot{}); err != nil {
assert.NoError(b, err)
}

Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
Expand Down Expand Up @@ -358,7 +359,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
}

func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.ctx.FrameworkHandle)
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.ctx.FrameworkHandle, a.ctx.DynamicResourceAllocationEnabled)
pods, err := a.ctx.AllPodLister().List()
if err != nil {
return nil, err
Expand All @@ -367,7 +368,7 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)

err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods)
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, drasnapshot.Snapshot{})
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

apiv1 "k8s.io/api/core/v1"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

const (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -1044,7 +1045,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
// build orchestrator
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods))
err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods), drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
Expand Down Expand Up @@ -1154,7 +1155,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, pods)
err = context.ClusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
Expand Down Expand Up @@ -1197,7 +1198,7 @@ func TestBinpackingLimiter(t *testing.T) {

context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, nil)
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
Expand Down Expand Up @@ -1257,7 +1258,7 @@ func TestScaleUpNoHelp(t *testing.T) {
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, pods)
err = context.ClusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
Expand Down Expand Up @@ -1412,7 +1413,7 @@ func TestComputeSimilarNodeGroups(t *testing.T) {
listers := kube_util.NewListerRegistry(nil, nil, kube_util.NewTestPodLister(nil), nil, nil, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{BalanceSimilarNodeGroups: tc.balancingEnabled}, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = ctx.ClusterSnapshot.SetClusterState(nodes, nil)
err = ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
Expand Down Expand Up @@ -1477,7 +1478,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
}
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
err = context.ClusterSnapshot.SetClusterState(nodes, podList)
err = context.ClusterSnapshot.SetClusterState(nodes, podList, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
Expand Down Expand Up @@ -1654,7 +1655,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
assert.NoError(t, err)

nodes := []*apiv1.Node{n1, n2}
err = context.ClusterSnapshot.SetClusterState(nodes, nil)
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
processors := processorstest.NewTestProcessors(&context)
Expand Down
9 changes: 5 additions & 4 deletions cluster-autoscaler/core/scaleup/resource/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
utils_test "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestDeltaForNode(t *testing.T) {

ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())

Expand Down Expand Up @@ -114,7 +115,7 @@ func TestResourcesLeft(t *testing.T) {

ng := testCase.nodeGroupConfig
_, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())

Expand Down Expand Up @@ -167,7 +168,7 @@ func TestApplyLimits(t *testing.T) {

ng := testCase.nodeGroupConfig
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil)
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())

Expand Down Expand Up @@ -234,7 +235,7 @@ func TestResourceManagerWithGpuResource(t *testing.T) {
assert.NoError(t, err)

nodes := []*corev1.Node{n1}
err = context.ClusterSnapshot.SetClusterState(nodes, nil)
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())

Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
Expand Down Expand Up @@ -337,7 +338,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
}
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
// Initialize cluster state to ClusterSnapshot
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil {
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, drasnapshot.Snapshot{}); err != nil {
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
}
// Initialize Pod Disruption Budget tracking
Expand Down
6 changes: 4 additions & 2 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ var (
checkCapacityProvisioningRequestMaxBatchSize = flag.Int("check-capacity-provisioning-request-max-batch-size", 10, "Maximum number of provisioning requests to process in a single batch.")
checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.")
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -459,6 +460,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
CheckCapacityProvisioningRequestMaxBatchSize: *checkCapacityProvisioningRequestMaxBatchSize,
CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox,
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
}
}

Expand Down Expand Up @@ -494,7 +496,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
}
informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, 0, informers.WithTransform(trim))

fwHandle, err := framework.NewHandle(informerFactory, autoscalingOptions.SchedulerConfig)
fwHandle, err := framework.NewHandle(informerFactory, autoscalingOptions.SchedulerConfig, autoscalingOptions.DynamicResourceAllocationEnabled)
if err != nil {
return nil, nil, err
}
Expand All @@ -504,7 +506,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
FrameworkHandle: fwHandle,
ClusterSnapshot: predicate.NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle),
ClusterSnapshot: predicate.NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled),
KubeClient: kubeClient,
InformerFactory: informerFactory,
DebuggingSnapshotter: debuggingSnapshotter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {

nodes := []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(nodes, nil)
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)

ctx := context.AutoscalingContext{
Expand Down Expand Up @@ -163,7 +164,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {

nodes := []*apiv1.Node{unready4, unready3, ready2, ready1}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(nodes, nil)
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)

// Fill cache
Expand Down Expand Up @@ -254,7 +255,7 @@ func TestGetNodeInfosCacheExpired(t *testing.T) {

nodes := []*apiv1.Node{ready1}
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(nodes, nil)
err := snapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)

ctx := context.AutoscalingContext{
Expand Down
Loading

0 comments on commit 37b3da4

Please sign in to comment.