From c97a42245313e1f8616d9b500a3663a8a561cf0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Thu, 14 Nov 2024 15:45:49 +0100 Subject: [PATCH] CA: refactor SchedulerBasedPredicateChecker into SchedulerPluginRunner For DRA, this component will have to call the Reserve phase in addition to just checking predicates/filters. The new version also makes more sense in the context of PredicateSnapshot, which is the only context now. --- .../predicate/plugin_runner.go} | 62 ++--- .../predicate/plugin_runner_test.go} | 237 ++++++++---------- .../predicate/predicate_snapshot.go | 13 +- 3 files changed, 137 insertions(+), 175 deletions(-) rename cluster-autoscaler/simulator/{predicatechecker/schedulerbased.go => clustersnapshot/predicate/plugin_runner.go} (59%) rename cluster-autoscaler/simulator/{predicatechecker/schedulerbased_test.go => clustersnapshot/predicate/plugin_runner_test.go} (50%) diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go similarity index 59% rename from cluster-autoscaler/simulator/predicatechecker/schedulerbased.go rename to cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go index d214ed5c3c4b..eddc453bcd27 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package predicatechecker +package predicate import ( "context" @@ -24,49 +24,32 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" apiv1 "k8s.io/api/core/v1" - v1listers "k8s.io/client-go/listers/core/v1" - klog "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) -// SchedulerBasedPredicateChecker checks whether all required predicates pass for given Pod and Node. -// The verification is done by calling out to scheduler code. -type SchedulerBasedPredicateChecker struct { - fwHandle *framework.Handle - nodeLister v1listers.NodeLister - podLister v1listers.PodLister - lastIndex int +// SchedulerPluginRunner can be used to run various phases of scheduler plugins through the scheduler framework. +type SchedulerPluginRunner struct { + fwHandle *framework.Handle + snapshotBase clustersnapshot.SnapshotBase + lastIndex int } -// NewSchedulerBasedPredicateChecker builds scheduler based PredicateChecker. -func NewSchedulerBasedPredicateChecker(fwHandle *framework.Handle) *SchedulerBasedPredicateChecker { - return &SchedulerBasedPredicateChecker{fwHandle: fwHandle} +// NewSchedulerPluginRunner builds a SchedulerPluginRunner. +func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshotBase clustersnapshot.SnapshotBase) *SchedulerPluginRunner { + return &SchedulerPluginRunner{fwHandle: fwHandle, snapshotBase: snapshotBase} } -// FitsAnyNode checks if the given pod can be placed on any of the given nodes. -func (p *SchedulerBasedPredicateChecker) FitsAnyNode(clusterSnapshot clustersnapshot.SnapshotBase, pod *apiv1.Pod) (string, error) { - return p.FitsAnyNodeMatching(clusterSnapshot, pod, func(*framework.NodeInfo) bool { - return true - }) -} - -// FitsAnyNodeMatching checks if the given pod can be placed on any of the given nodes matching the provided function. -func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clustersnapshot.SnapshotBase, pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, error) { - if clusterSnapshot == nil { - return "", fmt.Errorf("ClusterSnapshot not provided") - } - - nodeInfosList, err := clusterSnapshot.ListNodeInfos() +// RunFiltersUntilPassingNode runs the scheduler framework PreFilter phase once, and then keeps running the Filter phase for all nodes in the cluster that match the provided +// function - until a Node where the filters pass is found. Filters are only run for matching Nodes. If no matching node with passing filters is found, an error is returned. +// +// The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner. +func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (nodeName string, err error) { + nodeInfosList, err := p.snapshotBase.ListNodeInfos() if err != nil { - // This should never happen. - // - // Scheduler requires interface returning error, but no implementation - // of ClusterSnapshot ever does it. - klog.Errorf("Error obtaining nodeInfos from schedulerLister") return "", fmt.Errorf("error obtaining nodeInfos from schedulerLister") } - p.fwHandle.DelegatingLister.UpdateDelegate(clusterSnapshot) + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotBase) defer p.fwHandle.DelegatingLister.ResetDelegate() state := schedulerframework.NewCycleState() @@ -99,18 +82,15 @@ func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clu return "", fmt.Errorf("cannot put pod %s on any node", pod.Name) } -// CheckPredicates checks if the given pod can be placed on the given node. -func (p *SchedulerBasedPredicateChecker) CheckPredicates(clusterSnapshot clustersnapshot.SnapshotBase, pod *apiv1.Pod, nodeName string) *clustersnapshot.PredicateError { - if clusterSnapshot == nil { - return clustersnapshot.NewPredicateError(clustersnapshot.InternalPredicateError, "", "ClusterSnapshot not provided", nil, emptyString) - } - nodeInfo, err := clusterSnapshot.GetNodeInfo(nodeName) +// RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node. +func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) *clustersnapshot.PredicateError { + nodeInfo, err := p.snapshotBase.GetNodeInfo(nodeName) if err != nil { errorMessage := fmt.Sprintf("Error obtaining NodeInfo for name %s; %v", nodeName, err) return clustersnapshot.NewPredicateError(clustersnapshot.InternalPredicateError, "", errorMessage, nil, emptyString) } - p.fwHandle.DelegatingLister.UpdateDelegate(clusterSnapshot) + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotBase) defer p.fwHandle.DelegatingLister.ResetDelegate() state := schedulerframework.NewCycleState() @@ -149,7 +129,7 @@ func (p *SchedulerBasedPredicateChecker) CheckPredicates(clusterSnapshot cluster return nil } -func (p *SchedulerBasedPredicateChecker) buildDebugInfo(filterName string, nodeInfo *framework.NodeInfo) func() string { +func (p *SchedulerPluginRunner) buildDebugInfo(filterName string, nodeInfo *framework.NodeInfo) func() string { switch filterName { case "TaintToleration": taints := nodeInfo.Node().Spec.Taints diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go similarity index 50% rename from cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go rename to cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go index cd8cd1d556d6..4afcb0518afa 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package predicatechecker +package predicate import ( "os" @@ -38,7 +38,7 @@ import ( apiv1 "k8s.io/api/core/v1" ) -func TestCheckPredicate(t *testing.T) { +func TestRunFiltersOnNode(t *testing.T) { p450 := BuildTestPod("p450", 450, 500000) p600 := BuildTestPod("p600", 600, 500000) p8000 := BuildTestPod("p8000", 8000, 0) @@ -49,9 +49,6 @@ func TestCheckPredicate(t *testing.T) { n1000Unschedulable := BuildTestNode("n1000", 1000, 2000000) SetNodeReadyState(n1000Unschedulable, true, time.Time{}) - defaultPredicateChecker, err := newTestPredicateChecker() - assert.NoError(t, err) - // temp dir tmpDir, err := os.MkdirTemp("", "scheduler-configs") if err != nil { @@ -65,95 +62,90 @@ func TestCheckPredicate(t *testing.T) { os.FileMode(0600)); err != nil { t.Fatal(err) } - customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - customPredicateChecker, err := newTestPredicateCheckerWithCustomConfig(customConfig) - assert.NoError(t, err) tests := []struct { - name string - node *apiv1.Node - scheduledPods []*apiv1.Pod - testPod *apiv1.Pod - predicateChecker *SchedulerBasedPredicateChecker - expectError bool + name string + customConfig *config.KubeSchedulerConfiguration + node *apiv1.Node + scheduledPods []*apiv1.Pod + testPod *apiv1.Pod + expectError bool }{ // default predicate checker test cases { - name: "default - other pod - insuficient cpu", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p600, - expectError: true, - predicateChecker: defaultPredicateChecker, + name: "default - other pod - insuficient cpu", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p600, + expectError: true, }, { - name: "default - other pod - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p500, - expectError: false, - predicateChecker: defaultPredicateChecker, + name: "default - other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p500, + expectError: false, }, { - name: "default - empty - insuficient cpu", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p8000, - expectError: true, - predicateChecker: defaultPredicateChecker, + name: "default - empty - insuficient cpu", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p8000, + expectError: true, }, { - name: "default - empty - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p600, - expectError: false, - predicateChecker: defaultPredicateChecker, + name: "default - empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p600, + expectError: false, }, // custom predicate checker test cases { - name: "custom - other pod - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p600, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom - other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p600, + expectError: false, + customConfig: customConfig, }, { - name: "custom -other pod - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p500, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom -other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p500, + expectError: false, + customConfig: customConfig, }, { - name: "custom -empty - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p8000, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom -empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p8000, + expectError: false, + customConfig: customConfig, }, { - name: "custom -empty - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p600, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom -empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p600, + expectError: false, + customConfig: customConfig, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var err error - clusterSnapshot := base.NewBasicSnapshotBase() - err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) + snapshotBase := base.NewBasicSnapshotBase() + err := snapshotBase.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) assert.NoError(t, err) - predicateError := tt.predicateChecker.CheckPredicates(clusterSnapshot, tt.testPod, tt.node.Name) + pluginRunner, err := newTestPluginRunner(snapshotBase, tt.customConfig) + assert.NoError(t, err) + + predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name) if tt.expectError { assert.NotNil(t, predicateError) assert.Equal(t, clustersnapshot.NotSchedulablePredicateError, predicateError.ErrorType()) @@ -166,7 +158,7 @@ func TestCheckPredicate(t *testing.T) { } } -func TestFitsAnyNode(t *testing.T) { +func TestRunFilterUntilPassingNode(t *testing.T) { p900 := BuildTestPod("p900", 900, 1000) p1900 := BuildTestPod("p1900", 1900, 1000) p2100 := BuildTestPod("p2100", 2100, 1000) @@ -174,9 +166,6 @@ func TestFitsAnyNode(t *testing.T) { n1000 := BuildTestNode("n1000", 1000, 2000000) n2000 := BuildTestNode("n2000", 2000, 2000000) - defaultPredicateChecker, err := newTestPredicateChecker() - assert.NoError(t, err) - // temp dir tmpDir, err := os.MkdirTemp("", "scheduler-configs") if err != nil { @@ -190,74 +179,71 @@ func TestFitsAnyNode(t *testing.T) { os.FileMode(0600)); err != nil { t.Fatal(err) } - customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - customPredicateChecker, err := newTestPredicateCheckerWithCustomConfig(customConfig) - assert.NoError(t, err) testCases := []struct { - name string - predicateChecker *SchedulerBasedPredicateChecker - pod *apiv1.Pod - expectedNodes []string - expectError bool + name string + customConfig *config.KubeSchedulerConfiguration + pod *apiv1.Pod + expectedNodes []string + expectError bool }{ // default predicate checker test cases { - name: "default - small pod - no error", - predicateChecker: defaultPredicateChecker, - pod: p900, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "default - small pod - no error", + pod: p900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, { - name: "default - medium pod - no error", - predicateChecker: defaultPredicateChecker, - pod: p1900, - expectedNodes: []string{"n2000"}, - expectError: false, + name: "default - medium pod - no error", + pod: p1900, + expectedNodes: []string{"n2000"}, + expectError: false, }, { - name: "default - large pod - insufficient cpu", - predicateChecker: defaultPredicateChecker, - pod: p2100, - expectError: true, + name: "default - large pod - insufficient cpu", + pod: p2100, + expectError: true, }, // custom predicate checker test cases { - name: "custom - small pod - no error", - predicateChecker: customPredicateChecker, - pod: p900, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "custom - small pod - no error", + customConfig: customConfig, + pod: p900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, { - name: "custom - medium pod - no error", - predicateChecker: customPredicateChecker, - pod: p1900, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "custom - medium pod - no error", + customConfig: customConfig, + pod: p1900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, { - name: "custom - large pod - insufficient cpu", - predicateChecker: customPredicateChecker, - pod: p2100, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "custom - large pod - insufficient cpu", + customConfig: customConfig, + pod: p2100, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, } - clusterSnapshot := base.NewBasicSnapshotBase() - err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n1000)) + snapshotBase := base.NewBasicSnapshotBase() + err = snapshotBase.AddNodeInfo(framework.NewTestNodeInfo(n1000)) assert.NoError(t, err) - err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) + err = snapshotBase.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - nodeName, err := tc.predicateChecker.FitsAnyNode(clusterSnapshot, tc.pod) + pluginRunner, err := newTestPluginRunner(snapshotBase, tc.customConfig) + assert.NoError(t, err) + + nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true }) if tc.expectError { assert.Error(t, err) } else { @@ -266,7 +252,6 @@ func TestFitsAnyNode(t *testing.T) { } }) } - } func TestDebugInfo(t *testing.T) { @@ -291,9 +276,9 @@ func TestDebugInfo(t *testing.T) { assert.NoError(t, err) // with default predicate checker - defaultPredicateChecker, err := newTestPredicateChecker() + defaultPluginnRunner, err := newTestPluginRunner(clusterSnapshot, nil) assert.NoError(t, err) - predicateErr := defaultPredicateChecker.CheckPredicates(clusterSnapshot, p1, "n1") + predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1") assert.NotNil(t, predicateErr) assert.Equal(t, "node(s) had untolerated taint {SomeTaint: WhyNot?}", predicateErr.Message()) assert.Contains(t, predicateErr.VerboseMessage(), "RandomTaint") @@ -316,27 +301,25 @@ func TestDebugInfo(t *testing.T) { customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - customPredicateChecker, err := newTestPredicateCheckerWithCustomConfig(customConfig) + customPluginnRunner, err := newTestPluginRunner(clusterSnapshot, customConfig) assert.NoError(t, err) - predicateErr = customPredicateChecker.CheckPredicates(clusterSnapshot, p1, "n1") + predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1") assert.Nil(t, predicateErr) } -// newTestPredicateChecker builds test version of PredicateChecker. -func newTestPredicateChecker() (*SchedulerBasedPredicateChecker, error) { - defaultConfig, err := scheduler_config_latest.Default() - if err != nil { - return nil, err +// newTestPluginRunner builds test version of SchedulerPluginRunner. +func newTestPluginRunner(snapshotBase clustersnapshot.SnapshotBase, schedConfig *config.KubeSchedulerConfiguration) (*SchedulerPluginRunner, error) { + if schedConfig == nil { + defaultConfig, err := scheduler_config_latest.Default() + if err != nil { + return nil, err + } + schedConfig = defaultConfig } - return newTestPredicateCheckerWithCustomConfig(defaultConfig) -} -// newTestPredicateCheckerWithCustomConfig builds test version of PredicateChecker with custom scheduler config. -func newTestPredicateCheckerWithCustomConfig(schedConfig *config.KubeSchedulerConfiguration) (*SchedulerBasedPredicateChecker, error) { - // just call out to NewSchedulerBasedPredicateChecker but use fake kubeClient fwHandle, err := framework.NewHandle(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig) if err != nil { return nil, err } - return NewSchedulerBasedPredicateChecker(fwHandle), nil + return NewSchedulerPluginRunner(fwHandle, snapshotBase), nil } diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go index 14242a090d89..e57c5f3e47c4 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go @@ -22,25 +22,24 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" - "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" ) type PredicateSnapshot struct { clustersnapshot.SnapshotBase - predicateChecker *predicatechecker.SchedulerBasedPredicateChecker + pluginRunner *SchedulerPluginRunner } // NewPredicateSnapshot builds a PredicateSnapshot. func NewPredicateSnapshot(snapshotBase clustersnapshot.SnapshotBase, fwHandle *framework.Handle) *PredicateSnapshot { return &PredicateSnapshot{ - SnapshotBase: snapshotBase, - predicateChecker: predicatechecker.NewSchedulerBasedPredicateChecker(fwHandle), + SnapshotBase: snapshotBase, + pluginRunner: NewSchedulerPluginRunner(fwHandle, snapshotBase), } } // SchedulePod adds pod to the snapshot and schedules it to given node. func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) error { - if predErr := s.predicateChecker.CheckPredicates(s, pod, nodeName); predErr != nil { + if predErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); predErr != nil { return fmt.Errorf("can't schedule pod %s/%s on node %s, predicate error: %v", pod.Namespace, pod.Name, nodeName, predErr) } return s.ForceSchedulePod(pod, nodeName) @@ -48,7 +47,7 @@ func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) error { // SchedulePodOnAnyNodeMatching adds pod to the snapshot and schedules it to any node matching the provided function. func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNodeMatching func(*framework.NodeInfo) bool) (matchingNode string, err error) { - nodeName, err := s.predicateChecker.FitsAnyNodeMatching(s, pod, anyNodeMatching) + nodeName, err := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching) if err != nil { return "", fmt.Errorf("can't schedule pod %s/%s on any node: %v", pod.Namespace, pod.Name, err) } @@ -57,5 +56,5 @@ func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNode // CheckPredicates checks whether scheduler predicates pass for the given pod on the given node. func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) *clustersnapshot.PredicateError { - return s.predicateChecker.CheckPredicates(s, pod, nodeName) + return s.pluginRunner.RunFiltersOnNode(pod, nodeName) }