diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go similarity index 58% rename from cluster-autoscaler/simulator/predicatechecker/schedulerbased.go rename to cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go index 5aa1e071cffe..82b49b54670d 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package predicatechecker +package predicate import ( "context" @@ -25,49 +25,32 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" apiv1 "k8s.io/api/core/v1" - v1listers "k8s.io/client-go/listers/core/v1" - "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) -// SchedulerBasedPredicateChecker checks whether all required predicates pass for given Pod and Node. -// The verification is done by calling out to scheduler code. -type SchedulerBasedPredicateChecker struct { - fwHandle *framework.Handle - nodeLister v1listers.NodeLister - podLister v1listers.PodLister - lastIndex int +// SchedulerPluginRunner can be used to run various phases of scheduler plugins through the scheduler framework. +type SchedulerPluginRunner struct { + fwHandle *framework.Handle + snapshotBase clustersnapshot.SnapshotBase + lastIndex int } -// NewSchedulerBasedPredicateChecker builds scheduler based PredicateChecker. -func NewSchedulerBasedPredicateChecker(fwHandle *framework.Handle) *SchedulerBasedPredicateChecker { - return &SchedulerBasedPredicateChecker{fwHandle: fwHandle} +// NewSchedulerPluginRunner builds a SchedulerPluginRunner. +func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshotBase clustersnapshot.SnapshotBase) *SchedulerPluginRunner { + return &SchedulerPluginRunner{fwHandle: fwHandle, snapshotBase: snapshotBase} } -// FitsAnyNode checks if the given pod can be placed on any of the given nodes. -func (p *SchedulerBasedPredicateChecker) FitsAnyNode(clusterSnapshot clustersnapshot.SnapshotBase, pod *apiv1.Pod) (string, clustersnapshot.SchedulingError) { - return p.FitsAnyNodeMatching(clusterSnapshot, pod, func(*framework.NodeInfo) bool { - return true - }) -} - -// FitsAnyNodeMatching checks if the given pod can be placed on any of the given nodes matching the provided function. -func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clustersnapshot.SnapshotBase, pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { - if clusterSnapshot == nil { - return "", clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided") - } - - nodeInfosList, err := clusterSnapshot.ListNodeInfos() +// RunFiltersUntilPassingNode runs the scheduler framework PreFilter phase once, and then keeps running the Filter phase for all nodes in the cluster that match the provided +// function - until a Node where the filters pass is found. Filters are only run for matching Nodes. If no matching node with passing filters is found, an error is returned. +// +// The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner. +func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { + nodeInfosList, err := p.snapshotBase.ListNodeInfos() if err != nil { - // This should never happen. - // - // Scheduler requires interface returning error, but no implementation - // of ClusterSnapshot ever does it. - klog.Errorf("Error obtaining nodeInfos from schedulerLister") - return "", clustersnapshot.NewSchedulingInternalError(pod, "error obtaining nodeInfos from schedulerLister") + return "", clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided") } - p.fwHandle.DelegatingLister.UpdateDelegate(clusterSnapshot) + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotBase) defer p.fwHandle.DelegatingLister.ResetDelegate() state := schedulerframework.NewCycleState() @@ -100,17 +83,14 @@ func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clu return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod) } -// CheckPredicates checks if the given pod can be placed on the given node. -func (p *SchedulerBasedPredicateChecker) CheckPredicates(clusterSnapshot clustersnapshot.SnapshotBase, pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - if clusterSnapshot == nil { - return clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided") - } - nodeInfo, err := clusterSnapshot.GetNodeInfo(nodeName) +// RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node. +func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { + nodeInfo, err := p.snapshotBase.GetNodeInfo(nodeName) if err != nil { return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) } - p.fwHandle.DelegatingLister.UpdateDelegate(clusterSnapshot) + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotBase) defer p.fwHandle.DelegatingLister.ResetDelegate() state := schedulerframework.NewCycleState() @@ -134,7 +114,7 @@ func (p *SchedulerBasedPredicateChecker) CheckPredicates(clusterSnapshot cluster return nil } -func (p *SchedulerBasedPredicateChecker) failingFilterDebugInfo(filterName string, nodeInfo *framework.NodeInfo) string { +func (p *SchedulerPluginRunner) failingFilterDebugInfo(filterName string, nodeInfo *framework.NodeInfo) string { infoParts := []string{fmt.Sprintf("nodeName: %q", nodeInfo.Node().Name)} switch filterName { diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go similarity index 51% rename from cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go rename to cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go index 0e84f34d60c9..8f2a770e9ce4 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package predicatechecker +package predicate import ( "os" @@ -38,7 +38,7 @@ import ( apiv1 "k8s.io/api/core/v1" ) -func TestCheckPredicate(t *testing.T) { +func TestRunFiltersOnNode(t *testing.T) { p450 := BuildTestPod("p450", 450, 500000) p600 := BuildTestPod("p600", 600, 500000) p8000 := BuildTestPod("p8000", 8000, 0) @@ -49,9 +49,6 @@ func TestCheckPredicate(t *testing.T) { n1000Unschedulable := BuildTestNode("n1000", 1000, 2000000) SetNodeReadyState(n1000Unschedulable, true, time.Time{}) - defaultPredicateChecker, err := newTestPredicateChecker() - assert.NoError(t, err) - // temp dir tmpDir, err := os.MkdirTemp("", "scheduler-configs") if err != nil { @@ -65,95 +62,90 @@ func TestCheckPredicate(t *testing.T) { os.FileMode(0600)); err != nil { t.Fatal(err) } - customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - customPredicateChecker, err := newTestPredicateCheckerWithCustomConfig(customConfig) - assert.NoError(t, err) tests := []struct { - name string - node *apiv1.Node - scheduledPods []*apiv1.Pod - testPod *apiv1.Pod - predicateChecker *SchedulerBasedPredicateChecker - expectError bool + name string + customConfig *config.KubeSchedulerConfiguration + node *apiv1.Node + scheduledPods []*apiv1.Pod + testPod *apiv1.Pod + expectError bool }{ // default predicate checker test cases { - name: "default - other pod - insuficient cpu", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p600, - expectError: true, - predicateChecker: defaultPredicateChecker, + name: "default - other pod - insuficient cpu", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p600, + expectError: true, }, { - name: "default - other pod - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p500, - expectError: false, - predicateChecker: defaultPredicateChecker, + name: "default - other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p500, + expectError: false, }, { - name: "default - empty - insuficient cpu", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p8000, - expectError: true, - predicateChecker: defaultPredicateChecker, + name: "default - empty - insuficient cpu", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p8000, + expectError: true, }, { - name: "default - empty - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p600, - expectError: false, - predicateChecker: defaultPredicateChecker, + name: "default - empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p600, + expectError: false, }, // custom predicate checker test cases { - name: "custom - other pod - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p600, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom - other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p600, + expectError: false, + customConfig: customConfig, }, { - name: "custom -other pod - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p500, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom -other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p500, + expectError: false, + customConfig: customConfig, }, { - name: "custom -empty - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p8000, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom -empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p8000, + expectError: false, + customConfig: customConfig, }, { - name: "custom -empty - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p600, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom -empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p600, + expectError: false, + customConfig: customConfig, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var err error - clusterSnapshot := base.NewBasicSnapshotBase() - err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) + snapshotBase := base.NewBasicSnapshotBase() + err := snapshotBase.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) assert.NoError(t, err) - predicateError := tt.predicateChecker.CheckPredicates(clusterSnapshot, tt.testPod, tt.node.Name) + pluginRunner, err := newTestPluginRunner(snapshotBase, tt.customConfig) + assert.NoError(t, err) + + predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name) if tt.expectError { assert.NotNil(t, predicateError) assert.Equal(t, clustersnapshot.FailingPredicateError, predicateError.Type()) @@ -168,7 +160,7 @@ func TestCheckPredicate(t *testing.T) { } } -func TestFitsAnyNode(t *testing.T) { +func TestRunFilterUntilPassingNode(t *testing.T) { p900 := BuildTestPod("p900", 900, 1000) p1900 := BuildTestPod("p1900", 1900, 1000) p2100 := BuildTestPod("p2100", 2100, 1000) @@ -176,9 +168,6 @@ func TestFitsAnyNode(t *testing.T) { n1000 := BuildTestNode("n1000", 1000, 2000000) n2000 := BuildTestNode("n2000", 2000, 2000000) - defaultPredicateChecker, err := newTestPredicateChecker() - assert.NoError(t, err) - // temp dir tmpDir, err := os.MkdirTemp("", "scheduler-configs") if err != nil { @@ -192,74 +181,71 @@ func TestFitsAnyNode(t *testing.T) { os.FileMode(0600)); err != nil { t.Fatal(err) } - customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - customPredicateChecker, err := newTestPredicateCheckerWithCustomConfig(customConfig) - assert.NoError(t, err) testCases := []struct { - name string - predicateChecker *SchedulerBasedPredicateChecker - pod *apiv1.Pod - expectedNodes []string - expectError bool + name string + customConfig *config.KubeSchedulerConfiguration + pod *apiv1.Pod + expectedNodes []string + expectError bool }{ // default predicate checker test cases { - name: "default - small pod - no error", - predicateChecker: defaultPredicateChecker, - pod: p900, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "default - small pod - no error", + pod: p900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, { - name: "default - medium pod - no error", - predicateChecker: defaultPredicateChecker, - pod: p1900, - expectedNodes: []string{"n2000"}, - expectError: false, + name: "default - medium pod - no error", + pod: p1900, + expectedNodes: []string{"n2000"}, + expectError: false, }, { - name: "default - large pod - insufficient cpu", - predicateChecker: defaultPredicateChecker, - pod: p2100, - expectError: true, + name: "default - large pod - insufficient cpu", + pod: p2100, + expectError: true, }, // custom predicate checker test cases { - name: "custom - small pod - no error", - predicateChecker: customPredicateChecker, - pod: p900, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "custom - small pod - no error", + customConfig: customConfig, + pod: p900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, { - name: "custom - medium pod - no error", - predicateChecker: customPredicateChecker, - pod: p1900, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "custom - medium pod - no error", + customConfig: customConfig, + pod: p1900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, { - name: "custom - large pod - insufficient cpu", - predicateChecker: customPredicateChecker, - pod: p2100, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "custom - large pod - insufficient cpu", + customConfig: customConfig, + pod: p2100, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, } - clusterSnapshot := base.NewBasicSnapshotBase() - err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n1000)) + snapshotBase := base.NewBasicSnapshotBase() + err = snapshotBase.AddNodeInfo(framework.NewTestNodeInfo(n1000)) assert.NoError(t, err) - err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) + err = snapshotBase.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - nodeName, err := tc.predicateChecker.FitsAnyNode(clusterSnapshot, tc.pod) + pluginRunner, err := newTestPluginRunner(snapshotBase, tc.customConfig) + assert.NoError(t, err) + + nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true }) if tc.expectError { assert.Error(t, err) } else { @@ -268,7 +254,6 @@ func TestFitsAnyNode(t *testing.T) { } }) } - } func TestDebugInfo(t *testing.T) { @@ -293,9 +278,9 @@ func TestDebugInfo(t *testing.T) { assert.NoError(t, err) // with default predicate checker - defaultPredicateChecker, err := newTestPredicateChecker() + defaultPluginnRunner, err := newTestPluginRunner(clusterSnapshot, nil) assert.NoError(t, err) - predicateErr := defaultPredicateChecker.CheckPredicates(clusterSnapshot, p1, "n1") + predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1") assert.NotNil(t, predicateErr) assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") assert.Contains(t, predicateErr.Error(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") @@ -319,27 +304,25 @@ func TestDebugInfo(t *testing.T) { customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - customPredicateChecker, err := newTestPredicateCheckerWithCustomConfig(customConfig) + customPluginnRunner, err := newTestPluginRunner(clusterSnapshot, customConfig) assert.NoError(t, err) - predicateErr = customPredicateChecker.CheckPredicates(clusterSnapshot, p1, "n1") + predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1") assert.Nil(t, predicateErr) } -// newTestPredicateChecker builds test version of PredicateChecker. -func newTestPredicateChecker() (*SchedulerBasedPredicateChecker, error) { - defaultConfig, err := scheduler_config_latest.Default() - if err != nil { - return nil, err +// newTestPluginRunner builds test version of SchedulerPluginRunner. +func newTestPluginRunner(snapshotBase clustersnapshot.SnapshotBase, schedConfig *config.KubeSchedulerConfiguration) (*SchedulerPluginRunner, error) { + if schedConfig == nil { + defaultConfig, err := scheduler_config_latest.Default() + if err != nil { + return nil, err + } + schedConfig = defaultConfig } - return newTestPredicateCheckerWithCustomConfig(defaultConfig) -} -// newTestPredicateCheckerWithCustomConfig builds test version of PredicateChecker with custom scheduler config. -func newTestPredicateCheckerWithCustomConfig(schedConfig *config.KubeSchedulerConfiguration) (*SchedulerBasedPredicateChecker, error) { - // just call out to NewSchedulerBasedPredicateChecker but use fake kubeClient fwHandle, err := framework.NewHandle(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig) if err != nil { return nil, err } - return NewSchedulerBasedPredicateChecker(fwHandle), nil + return NewSchedulerPluginRunner(fwHandle, snapshotBase), nil } diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go index 8283c39d03c0..3e1156b58dae 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go @@ -20,27 +20,26 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" - "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" ) // PredicateSnapshot implements ClusterSnapshot on top of a SnapshotBase by using // SchedulerBasedPredicateChecker to check scheduler predicates. type PredicateSnapshot struct { clustersnapshot.SnapshotBase - predicateChecker *predicatechecker.SchedulerBasedPredicateChecker + pluginRunner *SchedulerPluginRunner } // NewPredicateSnapshot builds a PredicateSnapshot. func NewPredicateSnapshot(snapshotBase clustersnapshot.SnapshotBase, fwHandle *framework.Handle) *PredicateSnapshot { return &PredicateSnapshot{ - SnapshotBase: snapshotBase, - predicateChecker: predicatechecker.NewSchedulerBasedPredicateChecker(fwHandle), + SnapshotBase: snapshotBase, + pluginRunner: NewSchedulerPluginRunner(fwHandle, snapshotBase), } } // SchedulePod adds pod to the snapshot and schedules it to given node. func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - if schedErr := s.predicateChecker.CheckPredicates(s, pod, nodeName); schedErr != nil { + if schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil { return schedErr } if err := s.ForceAddPod(pod, nodeName); err != nil { @@ -51,7 +50,7 @@ func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) cluster // SchedulePodOnAnyNodeMatching adds pod to the snapshot and schedules it to any node matching the provided function. func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNodeMatching func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { - nodeName, schedErr := s.predicateChecker.FitsAnyNodeMatching(s, pod, anyNodeMatching) + nodeName, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching) if schedErr != nil { return "", schedErr } @@ -68,5 +67,5 @@ func (s *PredicateSnapshot) UnschedulePod(namespace string, podName string, node // CheckPredicates checks whether scheduler predicates pass for the given pod on the given node. func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - return s.predicateChecker.CheckPredicates(s, pod, nodeName) + return s.pluginRunner.RunFiltersOnNode(pod, nodeName) }