Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
DONOTSUBMIT
  • Loading branch information
towca committed Nov 25, 2024
1 parent 30e57c9 commit 0ec9524
Show file tree
Hide file tree
Showing 74 changed files with 2,643 additions and 1,951 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ type AutoscalingOptions struct {
CheckCapacityProvisioningRequestBatchTimebox time.Duration
// ForceDeleteLongUnregisteredNodes is used to enable/disable ignoring min size constraints during removal of long unregistered nodes
ForceDeleteLongUnregisteredNodes bool
// DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled.
DynamicResourceAllocationEnabled bool
}

// KubeClientOptions specify options for kube client
Expand Down
11 changes: 5 additions & 6 deletions cluster-autoscaler/context/autoscaling_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
Expand All @@ -44,9 +44,8 @@ type AutoscalingContext struct {
AutoscalingKubeClients
// CloudProvider used in CA.
CloudProvider cloudprovider.CloudProvider
// TODO(kgolab) - move away too as it's not config
// PredicateChecker to check if a pod can fit into a node.
PredicateChecker predicatechecker.PredicateChecker
// FrameworkHandle can be used to interact with the scheduler framework.
FrameworkHandle *framework.Handle
// ClusterSnapshot denotes cluster snapshot used for predicate checking.
ClusterSnapshot clustersnapshot.ClusterSnapshot
// ExpanderStrategy is the strategy used to choose which node group to expand when scaling up
Expand Down Expand Up @@ -100,7 +99,7 @@ func NewResourceLimiterFromAutoscalingOptions(options config.AutoscalingOptions)
// NewAutoscalingContext returns an autoscaling context from all the necessary parameters passed via arguments
func NewAutoscalingContext(
options config.AutoscalingOptions,
predicateChecker predicatechecker.PredicateChecker,
fwHandle *framework.Handle,
clusterSnapshot clustersnapshot.ClusterSnapshot,
autoscalingKubeClients *AutoscalingKubeClients,
cloudProvider cloudprovider.CloudProvider,
Expand All @@ -114,7 +113,7 @@ func NewAutoscalingContext(
AutoscalingOptions: options,
CloudProvider: cloudProvider,
AutoscalingKubeClients: *autoscalingKubeClients,
PredicateChecker: predicateChecker,
FrameworkHandle: fwHandle,
ClusterSnapshot: clusterSnapshot,
ExpanderStrategy: expanderStrategy,
ProcessorCallbacks: processorCallbacks,
Expand Down
17 changes: 13 additions & 4 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/base"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/client-go/informers"
Expand All @@ -49,7 +51,7 @@ type AutoscalerOptions struct {
InformerFactory informers.SharedInformerFactory
AutoscalingKubeClients *context.AutoscalingKubeClients
CloudProvider cloudprovider.CloudProvider
PredicateChecker predicatechecker.PredicateChecker
FrameworkHandle *framework.Handle
ClusterSnapshot clustersnapshot.ClusterSnapshot
ExpanderStrategy expander.Strategy
EstimatorBuilder estimator.EstimatorBuilder
Expand Down Expand Up @@ -86,7 +88,7 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor
}
return NewStaticAutoscaler(
opts.AutoscalingOptions,
opts.PredicateChecker,
opts.FrameworkHandle,
opts.ClusterSnapshot,
opts.AutoscalingKubeClients,
opts.Processors,
Expand Down Expand Up @@ -114,8 +116,15 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
if opts.AutoscalingKubeClients == nil {
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
}
if opts.FrameworkHandle == nil {
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled)
if err != nil {
return err
}
opts.FrameworkHandle = fwHandle
}
if opts.ClusterSnapshot == nil {
opts.ClusterSnapshot = clustersnapshot.NewBasicClusterSnapshot()
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(base.NewBasicSnapshotBase(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled)
}
if opts.RemainingPdbTracker == nil {
opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -194,7 +195,7 @@ func TestCurrentlyDrainedNodesPodListProcessor(t *testing.T) {
name: "single node, non-recreatable pods filtered out",
drainedNodes: []string{"n"},
nodes: []*apiv1.Node{
BuildTestNode("n", 1000, 10),
BuildTestNode("n", 2000, 10),
},
pods: []*apiv1.Pod{
BuildScheduledTestPod("p1", 100, 1, "n"),
Expand Down Expand Up @@ -229,11 +230,11 @@ func TestCurrentlyDrainedNodesPodListProcessor(t *testing.T) {
name: "everything works together",
drainedNodes: []string{"n1", "n3", "n5"},
nodes: []*apiv1.Node{
BuildTestNode("n1", 1000, 10),
BuildTestNode("n2", 1000, 10),
BuildTestNode("n3", 1000, 10),
BuildTestNode("n4", 1000, 10),
BuildTestNode("n5", 1000, 10),
BuildTestNode("n1", 3000, 10),
BuildTestNode("n2", 3000, 10),
BuildTestNode("n3", 3000, 10),
BuildTestNode("n4", 3000, 10),
BuildTestNode("n5", 3000, 10),
},
pods: []*apiv1.Pod{
BuildScheduledTestPod("p1", 100, 1, "n1"),
Expand Down Expand Up @@ -267,7 +268,7 @@ func TestCurrentlyDrainedNodesPodListProcessor(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := context.AutoscalingContext{
ScaleDownActuator: &mockActuator{&mockActuationStatus{tc.drainedNodes}},
ClusterSnapshot: clustersnapshot.NewBasicClusterSnapshot(),
ClusterSnapshot: testsnapshot.NewTestSnapshotOrDie(t),
}
clustersnapshot.InitializeClusterSnapshotOrDie(t, ctx.ClusterSnapshot, tc.nodes, tc.pods)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
// TODO(DRA): Figure out if/how to use the predicate-checking SchedulePod() here instead - otherwise this doesn't work with DRA pods.
if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -109,8 +110,8 @@ func TestFilterOutExpendable(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpendablePodListProcessor()
snapshot := clustersnapshot.NewBasicClusterSnapshot()
err := snapshot.SetClusterState(tc.nodes, nil)
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
err := snapshot.SetClusterState(tc.nodes, nil, drasnapshot.Snapshot{})
assert.NoError(t, err)

pods, err := processor.Process(&context.AutoscalingContext{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
klog "k8s.io/klog/v2"
Expand All @@ -38,9 +37,9 @@ type filterOutSchedulablePodListProcessor struct {
}

// NewFilterOutSchedulablePodListProcessor creates a PodListProcessor filtering out schedulable pods
func NewFilterOutSchedulablePodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodeFilter func(*framework.NodeInfo) bool) *filterOutSchedulablePodListProcessor {
func NewFilterOutSchedulablePodListProcessor(nodeFilter func(*framework.NodeInfo) bool) *filterOutSchedulablePodListProcessor {
return &filterOutSchedulablePodListProcessor{
schedulingSimulator: scheduling.NewHintingSimulator(predicateChecker),
schedulingSimulator: scheduling.NewHintingSimulator(),
nodeFilter: nodeFilter,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/base"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics"
)

func TestFilterOutSchedulable(t *testing.T) {
schedulermetrics.Register()

node := buildReadyTestNode("node", 2000, 100)
matchesAllNodes := func(*framework.NodeInfo) bool { return true }
matchesNoNodes := func(*framework.NodeInfo) bool { return false }
Expand Down Expand Up @@ -176,9 +175,7 @@ func TestFilterOutSchedulable(t *testing.T) {

for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(t, err)
clusterSnapshot := testsnapshot.NewTestSnapshotOrDie(t)

var allExpectedScheduledPods []*apiv1.Pod
allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...)
Expand All @@ -194,7 +191,7 @@ func TestFilterOutSchedulable(t *testing.T) {

clusterSnapshot.Fork()

processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, tc.nodeFilter)
processor := NewFilterOutSchedulablePodListProcessor(tc.nodeFilter)
unschedulablePods, err := processor.filterOutSchedulableByPacking(tc.unschedulableCandidates, clusterSnapshot)

assert.NoError(t, err)
Expand Down Expand Up @@ -253,8 +250,12 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
},
}
snapshots := map[string]func() clustersnapshot.ClusterSnapshot{
"basic": func() clustersnapshot.ClusterSnapshot { return clustersnapshot.NewBasicClusterSnapshot() },
"delta": func() clustersnapshot.ClusterSnapshot { return clustersnapshot.NewDeltaClusterSnapshot() },
"basic": func() clustersnapshot.ClusterSnapshot {
return testsnapshot.NewCustomTestSnapshotOrDie(b, base.NewBasicSnapshotBase())
},
"delta": func() clustersnapshot.ClusterSnapshot {
return testsnapshot.NewCustomTestSnapshotOrDie(b, base.NewDeltaSnapshotBase())
},
}
for snapshotName, snapshotFactory := range snapshots {
for _, tc := range tests {
Expand All @@ -279,18 +280,15 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
}
}

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(b, err)

clusterSnapshot := snapshotFactory()
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil {
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods, drasnapshot.Snapshot{}); err != nil {
assert.NoError(b, err)
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, scheduling.ScheduleAnywhere)
processor := NewFilterOutSchedulablePodListProcessor(scheduling.ScheduleAnywhere)
if stillPending, err := processor.filterOutSchedulableByPacking(pendingPods, clusterSnapshot); err != nil {
assert.NoError(b, err)
} else if len(stillPending) < tc.pendingPods {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package podlistprocessor
import (
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
)

// NewDefaultPodListProcessor returns a default implementation of the pod list
// processor, which wraps and sequentially runs other sub-processors.
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodeFilter func(*framework.NodeInfo) bool) *pods.CombinedPodListProcessor {
func NewDefaultPodListProcessor(nodeFilter func(*framework.NodeInfo) bool) *pods.CombinedPodListProcessor {
return pods.NewCombinedPodListProcessor([]pods.PodListProcessor{
NewClearTPURequestsPodListProcessor(),
NewFilterOutExpendablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
NewFilterOutSchedulablePodListProcessor(predicateChecker, nodeFilter),
NewFilterOutSchedulablePodListProcessor(nodeFilter),
NewFilterOutDaemonSetPodListProcessor(),
})
}
7 changes: 5 additions & 2 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/base"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
Expand Down Expand Up @@ -356,7 +359,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
}

func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot := predicate.NewPredicateSnapshot(base.NewBasicSnapshotBase(), a.ctx.FrameworkHandle, a.ctx.DynamicResourceAllocationEnabled)
pods, err := a.ctx.AllPodLister().List()
if err != nil {
return nil, err
Expand All @@ -365,7 +368,7 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)

err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods)
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, drasnapshot.Snapshot{})
if err != nil {
return nil, err
}
Expand Down
4 changes: 1 addition & 3 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
Expand All @@ -50,7 +51,6 @@ import (
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics"
)

type nodeGroupViewInfo struct {
Expand Down Expand Up @@ -1000,8 +1000,6 @@ func getStartDeletionTestCases(ignoreDaemonSetsUtilization bool, suffix string)
}

func TestStartDeletion(t *testing.T) {
schedulermetrics.Register()

testSets := []map[string]startDeletionTestCase{
// IgnoreDaemonSetsUtilization is false
getStartDeletionTestCases(false, "testNg1"),
Expand Down
4 changes: 3 additions & 1 deletion cluster-autoscaler/core/scaledown/actuation/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,6 +38,7 @@ import (
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -611,7 +613,7 @@ func TestPodsToEvict(t *testing.T) {
},
} {
t.Run(tn, func(t *testing.T) {
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
node := BuildTestNode("test-node", 1000, 1000)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods...))
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

apiv1 "k8s.io/api/core/v1"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

const (
Expand Down
Loading

0 comments on commit 0ec9524

Please sign in to comment.