Skip to content

Commit

Permalink
CA: remove PredicateChecker, use the new ClusterSnapshot methods instead
Browse files Browse the repository at this point in the history
  • Loading branch information
towca committed Nov 19, 2024
1 parent 79622b5 commit 5a35d42
Show file tree
Hide file tree
Showing 25 changed files with 118 additions and 239 deletions.
6 changes: 0 additions & 6 deletions cluster-autoscaler/context/autoscaling_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
Expand All @@ -47,9 +46,6 @@ type AutoscalingContext struct {
CloudProvider cloudprovider.CloudProvider
// FrameworkHandle can be used to interact with the scheduler framework.
FrameworkHandle *framework.Handle
// TODO(kgolab) - move away too as it's not config
// PredicateChecker to check if a pod can fit into a node.
PredicateChecker predicatechecker.PredicateChecker
// ClusterSnapshot denotes cluster snapshot used for predicate checking.
ClusterSnapshot clustersnapshot.ClusterSnapshot
// ExpanderStrategy is the strategy used to choose which node group to expand when scaling up
Expand Down Expand Up @@ -104,7 +100,6 @@ func NewResourceLimiterFromAutoscalingOptions(options config.AutoscalingOptions)
func NewAutoscalingContext(
options config.AutoscalingOptions,
fwHandle *framework.Handle,
predicateChecker predicatechecker.PredicateChecker,
clusterSnapshot clustersnapshot.ClusterSnapshot,
autoscalingKubeClients *AutoscalingKubeClients,
cloudProvider cloudprovider.CloudProvider,
Expand All @@ -119,7 +114,6 @@ func NewAutoscalingContext(
CloudProvider: cloudProvider,
AutoscalingKubeClients: *autoscalingKubeClients,
FrameworkHandle: fwHandle,
PredicateChecker: predicateChecker,
ClusterSnapshot: clusterSnapshot,
ExpanderStrategy: expanderStrategy,
ProcessorCallbacks: processorCallbacks,
Expand Down
3 changes: 0 additions & 3 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/client-go/informers"
Expand All @@ -53,7 +52,6 @@ type AutoscalerOptions struct {
AutoscalingKubeClients *context.AutoscalingKubeClients
CloudProvider cloudprovider.CloudProvider
FrameworkHandle *framework.Handle
PredicateChecker predicatechecker.PredicateChecker
ClusterSnapshot clustersnapshot.ClusterSnapshot
ExpanderStrategy expander.Strategy
EstimatorBuilder estimator.EstimatorBuilder
Expand Down Expand Up @@ -91,7 +89,6 @@ func NewAutoscaler(opts AutoscalerOptions, informerFactory informers.SharedInfor
return NewStaticAutoscaler(
opts.AutoscalingOptions,
opts.FrameworkHandle,
opts.PredicateChecker,
opts.ClusterSnapshot,
opts.AutoscalingKubeClients,
opts.Processors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
// TODO(DRA): Figure out if/how to use the predicate-checking SchedulePod() here instead - otherwise this doesn't work with DRA pods.
if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
klog "k8s.io/klog/v2"
Expand All @@ -38,9 +37,9 @@ type filterOutSchedulablePodListProcessor struct {
}

// NewFilterOutSchedulablePodListProcessor creates a PodListProcessor filtering out schedulable pods
func NewFilterOutSchedulablePodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodeFilter func(*framework.NodeInfo) bool) *filterOutSchedulablePodListProcessor {
func NewFilterOutSchedulablePodListProcessor(nodeFilter func(*framework.NodeInfo) bool) *filterOutSchedulablePodListProcessor {
return &filterOutSchedulablePodListProcessor{
schedulingSimulator: scheduling.NewHintingSimulator(predicateChecker),
schedulingSimulator: scheduling.NewHintingSimulator(),
nodeFilter: nodeFilter,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/base"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -176,8 +175,6 @@ func TestFilterOutSchedulable(t *testing.T) {
for tn, tc := range testCases {
t.Run(tn, func(t *testing.T) {
clusterSnapshot := testsnapshot.NewTestSnapshotOrDie(t)
predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(t, err)

var allExpectedScheduledPods []*apiv1.Pod
allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...)
Expand All @@ -193,7 +190,7 @@ func TestFilterOutSchedulable(t *testing.T) {

clusterSnapshot.Fork()

processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, tc.nodeFilter)
processor := NewFilterOutSchedulablePodListProcessor(tc.nodeFilter)
unschedulablePods, err := processor.filterOutSchedulableByPacking(tc.unschedulableCandidates, clusterSnapshot)

assert.NoError(t, err)
Expand Down Expand Up @@ -282,9 +279,6 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
}
}

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(b, err)

clusterSnapshot := snapshotFactory()
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil {
assert.NoError(b, err)
Expand All @@ -293,7 +287,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
processor := NewFilterOutSchedulablePodListProcessor(predicateChecker, scheduling.ScheduleAnywhere)
processor := NewFilterOutSchedulablePodListProcessor(scheduling.ScheduleAnywhere)
if stillPending, err := processor.filterOutSchedulableByPacking(pendingPods, clusterSnapshot); err != nil {
assert.NoError(b, err)
} else if len(stillPending) < tc.pendingPods {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package podlistprocessor
import (
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
)

// NewDefaultPodListProcessor returns a default implementation of the pod list
// processor, which wraps and sequentially runs other sub-processors.
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker, nodeFilter func(*framework.NodeInfo) bool) *pods.CombinedPodListProcessor {
func NewDefaultPodListProcessor(nodeFilter func(*framework.NodeInfo) bool) *pods.CombinedPodListProcessor {
return pods.NewCombinedPodListProcessor([]pods.PodListProcessor{
NewClearTPURequestsPodListProcessor(),
NewFilterOutExpendablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
NewFilterOutSchedulablePodListProcessor(predicateChecker, nodeFilter),
NewFilterOutSchedulablePodListProcessor(nodeFilter),
NewFilterOutDaemonSetPodListProcessor(),
})
}
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
context: context,
unremovableNodes: unremovable.NewNodes(),
unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder),
rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, deleteOptions, drainabilityRules, true),
actuationInjector: scheduling.NewHintingSimulator(context.PredicateChecker),
rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, deleteOptions, drainabilityRules, true),
actuationInjector: scheduling.NewHintingSimulator(),
eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor),
nodeUtilizationMap: make(map[string]utilization.Info),
resourceLimitsFinder: resourceLimitsFinder,
Expand Down
3 changes: 1 addition & 2 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(

estimateStart := time.Now()
expansionEstimator := o.estimatorBuilder(
o.autoscalingContext.PredicateChecker,
o.autoscalingContext.ClusterSnapshot,
estimator.NewEstimationContext(o.autoscalingContext.MaxNodesTotal, option.SimilarNodeGroups, currentNodeCount),
)
Expand Down Expand Up @@ -577,7 +576,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups(
var schedulablePodGroups []estimator.PodEquivalenceGroup
for _, eg := range podEquivalenceGroups {
samplePod := eg.Pods[0]
if err := o.autoscalingContext.PredicateChecker.CheckPredicates(o.autoscalingContext.ClusterSnapshot, samplePod, nodeInfo.Node().Name); err == nil {
if err := o.autoscalingContext.ClusterSnapshot.CheckPredicates(samplePod, nodeInfo.Node().Name); err == nil {
// Add pods to option.
schedulablePodGroups = append(schedulablePodGroups, estimator.PodEquivalenceGroup{
Pods: eg.Pods,
Expand Down
3 changes: 0 additions & 3 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -132,7 +131,6 @@ func (callbacks *staticAutoscalerProcessorCallbacks) reset() {
func NewStaticAutoscaler(
opts config.AutoscalingOptions,
fwHandle *framework.Handle,
predicateChecker predicatechecker.PredicateChecker,
clusterSnapshot clustersnapshot.ClusterSnapshot,
autoscalingKubeClients *context.AutoscalingKubeClients,
processors *ca_processors.AutoscalingProcessors,
Expand All @@ -156,7 +154,6 @@ func NewStaticAutoscaler(
autoscalingContext := context.NewAutoscalingContext(
opts,
fwHandle,
predicateChecker,
clusterSnapshot,
autoscalingKubeClients,
cloudProvider,
Expand Down
6 changes: 0 additions & 6 deletions cluster-autoscaler/core/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -175,10 +174,6 @@ func NewScaleTestAutoscalingContext(
if err != nil {
return context.AutoscalingContext{}, err
}
predicateChecker, err := predicatechecker.NewTestPredicateChecker()
if err != nil {
return context.AutoscalingContext{}, err
}
remainingPdbTracker := pdb.NewBasicRemainingPdbTracker()
if debuggingSnapshotter == nil {
debuggingSnapshotter = debuggingsnapshot.NewDebuggingSnapshotter(false)
Expand All @@ -196,7 +191,6 @@ func NewScaleTestAutoscalingContext(
ListerRegistry: listers,
},
CloudProvider: provider,
PredicateChecker: predicateChecker,
ClusterSnapshot: clusterSnapshot,
ExpanderStrategy: random.NewStrategy(),
ProcessorCallbacks: processorCallbacks,
Expand Down
48 changes: 19 additions & 29 deletions cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ import (
core_utils "k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/klog/v2"
)

// BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods.
type BinpackingNodeEstimator struct {
predicateChecker predicatechecker.PredicateChecker
clusterSnapshot clustersnapshot.ClusterSnapshot
limiter EstimationLimiter
podOrderer EstimationPodOrderer
Expand All @@ -48,17 +46,20 @@ type estimationState struct {
newNodesWithPods map[string]bool
}

func (s *estimationState) trackScheduledPod(pod *apiv1.Pod, nodeName string) {
s.newNodesWithPods[nodeName] = true
s.scheduledPods = append(s.scheduledPods, pod)
}

// NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator.
func NewBinpackingNodeEstimator(
predicateChecker predicatechecker.PredicateChecker,
clusterSnapshot clustersnapshot.ClusterSnapshot,
limiter EstimationLimiter,
podOrderer EstimationPodOrderer,
context EstimationContext,
estimationAnalyserFunc EstimationAnalyserFunc,
) *BinpackingNodeEstimator {
return &BinpackingNodeEstimator{
predicateChecker: predicateChecker,
clusterSnapshot: clusterSnapshot,
limiter: limiter,
podOrderer: podOrderer,
Expand Down Expand Up @@ -136,16 +137,16 @@ func (e *BinpackingNodeEstimator) tryToScheduleOnExistingNodes(
pod := pods[index]

// Check schedulability on all nodes created during simulation
nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, pod, func(nodeInfo *framework.NodeInfo) bool {
nodeName, err := e.clusterSnapshot.SchedulePodOnAnyNodeMatching(pod, func(nodeInfo *framework.NodeInfo) bool {
return estimationState.newNodeNames[nodeInfo.Node().Name]
})
if err != nil {
if err != nil && err.Type() == clustersnapshot.NoNodesPassingPredicatesFoundError {
break
}

if err := e.tryToAddNode(estimationState, pod, nodeName); err != nil {
} else if err != nil {
// Unexpected error.
return nil, err
}
estimationState.trackScheduledPod(pod, nodeName)
}
return pods[index:], nil
}
Expand All @@ -160,11 +161,12 @@ func (e *BinpackingNodeEstimator) tryToScheduleOnNewNodes(

if estimationState.lastNodeName != "" {
// Check schedulability on only newly created node
if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, pod, estimationState.lastNodeName); err == nil {
if err := e.clusterSnapshot.SchedulePod(pod, estimationState.lastNodeName); err == nil {
found = true
if err := e.tryToAddNode(estimationState, pod, estimationState.lastNodeName); err != nil {
return err
}
estimationState.trackScheduledPod(pod, estimationState.lastNodeName)
} else if err.Type() != clustersnapshot.FailingPredicateError {
// Unexpected error.
return err
}
}

Expand Down Expand Up @@ -195,12 +197,13 @@ func (e *BinpackingNodeEstimator) tryToScheduleOnNewNodes(
// Note that this may still fail (ex. if topology spreading with zonal topologyKey is used);
// in this case we can't help the pending pod. We keep the node in clusterSnapshot to avoid
// adding and removing node to snapshot for each such pod.
if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, pod, estimationState.lastNodeName); err != nil {
if err := e.clusterSnapshot.SchedulePod(pod, estimationState.lastNodeName); err != nil && err.Type() == clustersnapshot.FailingPredicateError {
break
}
if err := e.tryToAddNode(estimationState, pod, estimationState.lastNodeName); err != nil {
} else if err != nil {
// Unexpected error.
return err
}
estimationState.trackScheduledPod(pod, estimationState.lastNodeName)
}
}
return nil
Expand All @@ -219,16 +222,3 @@ func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(
estimationState.newNodeNames[estimationState.lastNodeName] = true
return nil
}

func (e *BinpackingNodeEstimator) tryToAddNode(
estimationState *estimationState,
pod *apiv1.Pod,
nodeName string,
) error {
if err := e.clusterSnapshot.ForceAddPod(pod, nodeName); err != nil {
return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err)
}
estimationState.newNodesWithPods[nodeName] = true
estimationState.scheduledPods = append(estimationState.scheduledPods, pod)
return nil
}
9 changes: 2 additions & 7 deletions cluster-autoscaler/estimator/binpacking_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
)
Expand Down Expand Up @@ -214,11 +213,9 @@ func TestBinpackingEstimate(t *testing.T) {
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
assert.NoError(t, err)

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(t, err)
limiter := NewThresholdBasedEstimationLimiter([]Threshold{NewStaticThreshold(tc.maxNodes, time.Duration(0))})
processor := NewDecreasingPodOrderer()
estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, processor, nil /* EstimationContext */, nil /* EstimationAnalyserFunc */)
estimator := NewBinpackingNodeEstimator(clusterSnapshot, limiter, processor, nil /* EstimationContext */, nil /* EstimationAnalyserFunc */)
node := makeNode(tc.millicores, tc.memory, 10, "template", "zone-mars")
nodeInfo := framework.NewTestNodeInfo(node)

Expand Down Expand Up @@ -269,11 +266,9 @@ func BenchmarkBinpackingEstimate(b *testing.B) {
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
assert.NoError(b, err)

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(b, err)
limiter := NewThresholdBasedEstimationLimiter([]Threshold{NewStaticThreshold(maxNodes, time.Duration(0))})
processor := NewDecreasingPodOrderer()
estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, processor, nil /* EstimationContext */, nil /* EstimationAnalyserFunc */)
estimator := NewBinpackingNodeEstimator(clusterSnapshot, limiter, processor, nil /* EstimationContext */, nil /* EstimationAnalyserFunc */)
node := makeNode(millicores, memory, podsPerNode, "template", "zone-mars")
nodeInfo := framework.NewTestNodeInfo(node)

Expand Down
Loading

0 comments on commit 5a35d42

Please sign in to comment.