From 054d5d2e7c7cf28ca83d2eb872977624f7b4fcd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Thu, 14 Nov 2024 15:45:49 +0100 Subject: [PATCH] CA: refactor SchedulerBasedPredicateChecker into SchedulerPluginRunner For DRA, this component will have to call the Reserve phase in addition to just checking predicates/filters. The new version also makes more sense in the context of PredicateSnapshot, which is the only context now. While refactoring, I noticed that CheckPredicates for some reason doesn't check the provided Node against the eligible Nodes returned from PreFilter (while FitsAnyNodeMatching does do that). This seems like a bug, so the check is added. The checks in FitsAnyNodeMatching are also reordered so that the cheapest ones are checked earliest. --- .../predicate/plugin_runner.go | 147 +++++++++++ .../predicate/plugin_runner_test.go} | 240 ++++++++---------- .../predicate/predicate_snapshot.go | 11 +- .../predicatechecker/schedulerbased.go | 146 ----------- 4 files changed, 264 insertions(+), 280 deletions(-) create mode 100644 cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go rename cluster-autoscaler/simulator/{predicatechecker/schedulerbased_test.go => clustersnapshot/predicate/plugin_runner_test.go} (51%) delete mode 100644 cluster-autoscaler/simulator/predicatechecker/schedulerbased.go diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go new file mode 100644 index 000000000000..150b245fdd71 --- /dev/null +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go @@ -0,0 +1,147 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package predicate + +import ( + "context" + "fmt" + "strings" + + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + + apiv1 "k8s.io/api/core/v1" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +// SchedulerPluginRunner can be used to run various phases of scheduler plugins through the scheduler framework. +type SchedulerPluginRunner struct { + fwHandle *framework.Handle + snapshotStore clustersnapshot.ClusterSnapshotStore + lastIndex int +} + +// NewSchedulerPluginRunner builds a SchedulerPluginRunner. +func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshotStore clustersnapshot.ClusterSnapshotStore) *SchedulerPluginRunner { + return &SchedulerPluginRunner{fwHandle: fwHandle, snapshotStore: snapshotStore} +} + +// RunFiltersUntilPassingNode runs the scheduler framework PreFilter phase once, and then keeps running the Filter phase for all nodes in the cluster that match the provided +// function - until a Node where the Filters pass is found. Filters are only run for matching Nodes. If no matching Node with passing Filters is found, an error is returned. +// +// The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner. +func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { + nodeInfosList, err := p.snapshotStore.ListNodeInfos() + if err != nil { + return "", clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err)) + } + + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotStore) + defer p.fwHandle.DelegatingLister.ResetDelegate() + + state := schedulerframework.NewCycleState() + // Run the PreFilter phase of the framework for the Pod. This allows plugins to precompute some things (for all Nodes in the cluster at once) and + // save them in the CycleState. During the Filter phase, plugins can retrieve the precomputes from the CycleState and use them for answering the Filter + // for a given Node. + preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod) + if !preFilterStatus.IsSuccess() { + // If any of the plugin PreFilter methods isn't successful, the corresponding Filter method can't be run, so the whole scheduling cycle is aborted. + // Match that behavior here. + return "", clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") + } + + for i := range nodeInfosList { + // Determine which NodeInfo to check next. + nodeInfo := nodeInfosList[(p.lastIndex+i)%len(nodeInfosList)] + + // Plugins can filter some Nodes out during the PreFilter phase, if they're sure the Nodes won't work for the Pod at that stage. + // Filters are only run for Nodes that haven't been filtered out during the PreFilter phase. Match that behavior here - skip such Nodes. + if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) { + continue + } + + // Nodes with the Unschedulable bit set will be rejected by one of the plugins during the Filter phase below. We can check that quickly here + // and short-circuit to avoid running the expensive Filter phase at all in this case. + if nodeInfo.Node().Spec.Unschedulable { + continue + } + + // Check if the NodeInfo matches the provided filtering condition. This should be less expensive than running the Filter phase below, so + // check this first. + if !nodeMatches(nodeInfo) { + continue + } + + // Run the Filter phase of the framework. Plugins retrieve the state they saved during PreFilter from CycleState, and answer whether the + // given Pod can be scheduled on the given Node. + filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler()) + if filterStatus.IsSuccess() { + // Filter passed for all plugins, so this pod can be scheduled on this Node. + p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList) + return nodeInfo.Node().Name, nil + } + // Filter didn't pass for some plugin, so this Node won't work - move on to the next one. + } + return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod) +} + +// RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node. +func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { + nodeInfo, err := p.snapshotStore.GetNodeInfo(nodeName) + if err != nil { + return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) + } + + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotStore) + defer p.fwHandle.DelegatingLister.ResetDelegate() + + state := schedulerframework.NewCycleState() + // Run the PreFilter phase of the framework for the Pod and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info. + preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod) + if !preFilterStatus.IsSuccess() { + return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") + } + if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) { + return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter filtered the Node out", "") + } + + // Run the Filter phase of the framework for the Pod and the Node and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info. + filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler()) + if !filterStatus.IsSuccess() { + filterName := filterStatus.Plugin() + filterReasons := filterStatus.Reasons() + unexpectedErrMsg := "" + if !filterStatus.IsRejected() { + unexpectedErrMsg = fmt.Sprintf("unexpected filter status %q", filterStatus.Code().String()) + } + return clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo)) + } + + // PreFilter and Filter phases checked, this Pod can be scheduled on this Node. + return nil +} + +func (p *SchedulerPluginRunner) failingFilterDebugInfo(filterName string, nodeInfo *framework.NodeInfo) string { + infoParts := []string{fmt.Sprintf("nodeName: %q", nodeInfo.Node().Name)} + + switch filterName { + case "TaintToleration": + infoParts = append(infoParts, fmt.Sprintf("nodeTaints: %#v", nodeInfo.Node().Spec.Taints)) + } + + return strings.Join(infoParts, ", ") +} diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go similarity index 51% rename from cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go rename to cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go index d023b5846a26..3e5323d0a4d5 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go @@ -1,5 +1,5 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2024 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package predicatechecker +package predicate import ( "os" @@ -23,6 +23,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "k8s.io/client-go/informers" clientsetfake "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/scheduler/apis/config" @@ -38,7 +39,7 @@ import ( apiv1 "k8s.io/api/core/v1" ) -func TestCheckPredicate(t *testing.T) { +func TestRunFiltersOnNode(t *testing.T) { p450 := BuildTestPod("p450", 450, 500000) p600 := BuildTestPod("p600", 600, 500000) p8000 := BuildTestPod("p8000", 8000, 0) @@ -49,9 +50,6 @@ func TestCheckPredicate(t *testing.T) { n1000Unschedulable := BuildTestNode("n1000", 1000, 2000000) SetNodeReadyState(n1000Unschedulable, true, time.Time{}) - defaultPredicateChecker, err := newTestPredicateChecker() - assert.NoError(t, err) - // temp dir tmpDir, err := os.MkdirTemp("", "scheduler-configs") if err != nil { @@ -65,95 +63,90 @@ func TestCheckPredicate(t *testing.T) { os.FileMode(0600)); err != nil { t.Fatal(err) } - customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - customPredicateChecker, err := newTestPredicateCheckerWithCustomConfig(customConfig) - assert.NoError(t, err) tests := []struct { - name string - node *apiv1.Node - scheduledPods []*apiv1.Pod - testPod *apiv1.Pod - predicateChecker *SchedulerBasedPredicateChecker - expectError bool + name string + customConfig *config.KubeSchedulerConfiguration + node *apiv1.Node + scheduledPods []*apiv1.Pod + testPod *apiv1.Pod + expectError bool }{ // default predicate checker test cases { - name: "default - other pod - insuficient cpu", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p600, - expectError: true, - predicateChecker: defaultPredicateChecker, + name: "default - other pod - insuficient cpu", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p600, + expectError: true, }, { - name: "default - other pod - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p500, - expectError: false, - predicateChecker: defaultPredicateChecker, + name: "default - other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p500, + expectError: false, }, { - name: "default - empty - insuficient cpu", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p8000, - expectError: true, - predicateChecker: defaultPredicateChecker, + name: "default - empty - insuficient cpu", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p8000, + expectError: true, }, { - name: "default - empty - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p600, - expectError: false, - predicateChecker: defaultPredicateChecker, + name: "default - empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p600, + expectError: false, }, // custom predicate checker test cases { - name: "custom - other pod - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p600, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom - other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p600, + expectError: false, + customConfig: customConfig, }, { - name: "custom -other pod - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{p450}, - testPod: p500, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom -other pod - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{p450}, + testPod: p500, + expectError: false, + customConfig: customConfig, }, { - name: "custom -empty - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p8000, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom -empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p8000, + expectError: false, + customConfig: customConfig, }, { - name: "custom -empty - ok", - node: n1000, - scheduledPods: []*apiv1.Pod{}, - testPod: p600, - expectError: false, - predicateChecker: customPredicateChecker, + name: "custom -empty - ok", + node: n1000, + scheduledPods: []*apiv1.Pod{}, + testPod: p600, + expectError: false, + customConfig: customConfig, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var err error - clusterSnapshot := store.NewBasicSnapshotStore() - err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) + snapshotStore := store.NewBasicSnapshotStore() + err := snapshotStore.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) assert.NoError(t, err) - predicateError := tt.predicateChecker.CheckPredicates(clusterSnapshot, tt.testPod, tt.node.Name) + pluginRunner, err := newTestPluginRunner(snapshotStore, tt.customConfig) + assert.NoError(t, err) + + predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name) if tt.expectError { assert.NotNil(t, predicateError) assert.Equal(t, clustersnapshot.FailingPredicateError, predicateError.Type()) @@ -168,7 +161,7 @@ func TestCheckPredicate(t *testing.T) { } } -func TestFitsAnyNode(t *testing.T) { +func TestRunFilterUntilPassingNode(t *testing.T) { p900 := BuildTestPod("p900", 900, 1000) p1900 := BuildTestPod("p1900", 1900, 1000) p2100 := BuildTestPod("p2100", 2100, 1000) @@ -176,9 +169,6 @@ func TestFitsAnyNode(t *testing.T) { n1000 := BuildTestNode("n1000", 1000, 2000000) n2000 := BuildTestNode("n2000", 2000, 2000000) - defaultPredicateChecker, err := newTestPredicateChecker() - assert.NoError(t, err) - // temp dir tmpDir, err := os.MkdirTemp("", "scheduler-configs") if err != nil { @@ -192,74 +182,71 @@ func TestFitsAnyNode(t *testing.T) { os.FileMode(0600)); err != nil { t.Fatal(err) } - customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - customPredicateChecker, err := newTestPredicateCheckerWithCustomConfig(customConfig) - assert.NoError(t, err) testCases := []struct { - name string - predicateChecker *SchedulerBasedPredicateChecker - pod *apiv1.Pod - expectedNodes []string - expectError bool + name string + customConfig *config.KubeSchedulerConfiguration + pod *apiv1.Pod + expectedNodes []string + expectError bool }{ // default predicate checker test cases { - name: "default - small pod - no error", - predicateChecker: defaultPredicateChecker, - pod: p900, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "default - small pod - no error", + pod: p900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, { - name: "default - medium pod - no error", - predicateChecker: defaultPredicateChecker, - pod: p1900, - expectedNodes: []string{"n2000"}, - expectError: false, + name: "default - medium pod - no error", + pod: p1900, + expectedNodes: []string{"n2000"}, + expectError: false, }, { - name: "default - large pod - insufficient cpu", - predicateChecker: defaultPredicateChecker, - pod: p2100, - expectError: true, + name: "default - large pod - insufficient cpu", + pod: p2100, + expectError: true, }, // custom predicate checker test cases { - name: "custom - small pod - no error", - predicateChecker: customPredicateChecker, - pod: p900, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "custom - small pod - no error", + customConfig: customConfig, + pod: p900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, { - name: "custom - medium pod - no error", - predicateChecker: customPredicateChecker, - pod: p1900, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "custom - medium pod - no error", + customConfig: customConfig, + pod: p1900, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, { - name: "custom - large pod - insufficient cpu", - predicateChecker: customPredicateChecker, - pod: p2100, - expectedNodes: []string{"n1000", "n2000"}, - expectError: false, + name: "custom - large pod - insufficient cpu", + customConfig: customConfig, + pod: p2100, + expectedNodes: []string{"n1000", "n2000"}, + expectError: false, }, } - clusterSnapshot := store.NewBasicSnapshotStore() - err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n1000)) + snapshotStore := store.NewBasicSnapshotStore() + err = snapshotStore.AddNodeInfo(framework.NewTestNodeInfo(n1000)) assert.NoError(t, err) - err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) + err = snapshotStore.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - nodeName, err := tc.predicateChecker.FitsAnyNode(clusterSnapshot, tc.pod) + pluginRunner, err := newTestPluginRunner(snapshotStore, tc.customConfig) + assert.NoError(t, err) + + nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true }) if tc.expectError { assert.Error(t, err) } else { @@ -268,7 +255,6 @@ func TestFitsAnyNode(t *testing.T) { } }) } - } func TestDebugInfo(t *testing.T) { @@ -293,9 +279,9 @@ func TestDebugInfo(t *testing.T) { assert.NoError(t, err) // with default predicate checker - defaultPredicateChecker, err := newTestPredicateChecker() + defaultPluginRunner, err := newTestPluginRunner(clusterSnapshot, nil) assert.NoError(t, err) - predicateErr := defaultPredicateChecker.CheckPredicates(clusterSnapshot, p1, "n1") + predicateErr := defaultPluginRunner.RunFiltersOnNode(p1, "n1") assert.NotNil(t, predicateErr) assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") assert.Contains(t, predicateErr.Error(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") @@ -319,27 +305,25 @@ func TestDebugInfo(t *testing.T) { customConfig, err := scheduler.ConfigFromPath(customConfigFile) assert.NoError(t, err) - customPredicateChecker, err := newTestPredicateCheckerWithCustomConfig(customConfig) + customPluginRunner, err := newTestPluginRunner(clusterSnapshot, customConfig) assert.NoError(t, err) - predicateErr = customPredicateChecker.CheckPredicates(clusterSnapshot, p1, "n1") + predicateErr = customPluginRunner.RunFiltersOnNode(p1, "n1") assert.Nil(t, predicateErr) } -// newTestPredicateChecker builds test version of PredicateChecker. -func newTestPredicateChecker() (*SchedulerBasedPredicateChecker, error) { - defaultConfig, err := scheduler_config_latest.Default() - if err != nil { - return nil, err +// newTestPluginRunner builds test version of SchedulerPluginRunner. +func newTestPluginRunner(snapshotStore clustersnapshot.ClusterSnapshotStore, schedConfig *config.KubeSchedulerConfiguration) (*SchedulerPluginRunner, error) { + if schedConfig == nil { + defaultConfig, err := scheduler_config_latest.Default() + if err != nil { + return nil, err + } + schedConfig = defaultConfig } - return newTestPredicateCheckerWithCustomConfig(defaultConfig) -} -// newTestPredicateCheckerWithCustomConfig builds test version of PredicateChecker with custom scheduler config. -func newTestPredicateCheckerWithCustomConfig(schedConfig *config.KubeSchedulerConfiguration) (*SchedulerBasedPredicateChecker, error) { - // just call out to NewSchedulerBasedPredicateChecker but use fake kubeClient fwHandle, err := framework.NewHandle(informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0), schedConfig) if err != nil { return nil, err } - return NewSchedulerBasedPredicateChecker(fwHandle), nil + return NewSchedulerPluginRunner(fwHandle, snapshotStore), nil } diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go index 9542b7a06535..7d6e62f7dea2 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go @@ -20,27 +20,26 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" - "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" ) // PredicateSnapshot implements ClusterSnapshot on top of a ClusterSnapshotStore by using // SchedulerBasedPredicateChecker to check scheduler predicates. type PredicateSnapshot struct { clustersnapshot.ClusterSnapshotStore - predicateChecker *predicatechecker.SchedulerBasedPredicateChecker + pluginRunner *SchedulerPluginRunner } // NewPredicateSnapshot builds a PredicateSnapshot. func NewPredicateSnapshot(snapshotStore clustersnapshot.ClusterSnapshotStore, fwHandle *framework.Handle) *PredicateSnapshot { return &PredicateSnapshot{ ClusterSnapshotStore: snapshotStore, - predicateChecker: predicatechecker.NewSchedulerBasedPredicateChecker(fwHandle), + pluginRunner: NewSchedulerPluginRunner(fwHandle, snapshotStore), } } // SchedulePod adds pod to the snapshot and schedules it to given node. func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - if schedErr := s.predicateChecker.CheckPredicates(s, pod, nodeName); schedErr != nil { + if schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil { return schedErr } if err := s.ClusterSnapshotStore.ForceAddPod(pod, nodeName); err != nil { @@ -51,7 +50,7 @@ func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) cluster // SchedulePodOnAnyNodeMatching adds pod to the snapshot and schedules it to any node matching the provided function. func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNodeMatching func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { - nodeName, schedErr := s.predicateChecker.FitsAnyNodeMatching(s, pod, anyNodeMatching) + nodeName, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching) if schedErr != nil { return "", schedErr } @@ -68,5 +67,5 @@ func (s *PredicateSnapshot) UnschedulePod(namespace string, podName string, node // CheckPredicates checks whether scheduler predicates pass for the given pod on the given node. func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - return s.predicateChecker.CheckPredicates(s, pod, nodeName) + return s.pluginRunner.RunFiltersOnNode(pod, nodeName) } diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go b/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go deleted file mode 100644 index 6672d0c9f6e4..000000000000 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased.go +++ /dev/null @@ -1,146 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package predicatechecker - -import ( - "context" - "fmt" - "strings" - - "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" - "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" - - apiv1 "k8s.io/api/core/v1" - v1listers "k8s.io/client-go/listers/core/v1" - "k8s.io/klog/v2" - schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" -) - -// SchedulerBasedPredicateChecker checks whether all required predicates pass for given Pod and Node. -// The verification is done by calling out to scheduler code. -type SchedulerBasedPredicateChecker struct { - fwHandle *framework.Handle - nodeLister v1listers.NodeLister - podLister v1listers.PodLister - lastIndex int -} - -// NewSchedulerBasedPredicateChecker builds scheduler based PredicateChecker. -func NewSchedulerBasedPredicateChecker(fwHandle *framework.Handle) *SchedulerBasedPredicateChecker { - return &SchedulerBasedPredicateChecker{fwHandle: fwHandle} -} - -// FitsAnyNode checks if the given pod can be placed on any of the given nodes. -func (p *SchedulerBasedPredicateChecker) FitsAnyNode(clusterSnapshot clustersnapshot.ClusterSnapshotStore, pod *apiv1.Pod) (string, clustersnapshot.SchedulingError) { - return p.FitsAnyNodeMatching(clusterSnapshot, pod, func(*framework.NodeInfo) bool { - return true - }) -} - -// FitsAnyNodeMatching checks if the given pod can be placed on any of the given nodes matching the provided function. -func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clustersnapshot.ClusterSnapshotStore, pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { - if clusterSnapshot == nil { - return "", clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided") - } - - nodeInfosList, err := clusterSnapshot.ListNodeInfos() - if err != nil { - // This should never happen. - // - // Scheduler requires interface returning error, but no implementation - // of ClusterSnapshot ever does it. - klog.Errorf("Error obtaining nodeInfos from schedulerLister") - return "", clustersnapshot.NewSchedulingInternalError(pod, "error obtaining nodeInfos from schedulerLister") - } - - p.fwHandle.DelegatingLister.UpdateDelegate(clusterSnapshot) - defer p.fwHandle.DelegatingLister.ResetDelegate() - - state := schedulerframework.NewCycleState() - preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod) - if !preFilterStatus.IsSuccess() { - return "", clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") - } - - for i := range nodeInfosList { - nodeInfo := nodeInfosList[(p.lastIndex+i)%len(nodeInfosList)] - if !nodeMatches(nodeInfo) { - continue - } - - if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) { - continue - } - - // Be sure that the node is schedulable. - if nodeInfo.Node().Spec.Unschedulable { - continue - } - - filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler()) - if filterStatus.IsSuccess() { - p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList) - return nodeInfo.Node().Name, nil - } - } - return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod) -} - -// CheckPredicates checks if the given pod can be placed on the given node. -func (p *SchedulerBasedPredicateChecker) CheckPredicates(clusterSnapshot clustersnapshot.ClusterSnapshotStore, pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - if clusterSnapshot == nil { - return clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided") - } - nodeInfo, err := clusterSnapshot.GetNodeInfo(nodeName) - if err != nil { - return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) - } - - p.fwHandle.DelegatingLister.UpdateDelegate(clusterSnapshot) - defer p.fwHandle.DelegatingLister.ResetDelegate() - - state := schedulerframework.NewCycleState() - _, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod) - if !preFilterStatus.IsSuccess() { - return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") - } - - filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler()) - - if !filterStatus.IsSuccess() { - filterName := filterStatus.Plugin() - filterReasons := filterStatus.Reasons() - unexpectedErrMsg := "" - if !filterStatus.IsRejected() { - unexpectedErrMsg = fmt.Sprintf("unexpected filter status %q", filterStatus.Code().String()) - } - return clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo)) - } - - return nil -} - -func (p *SchedulerBasedPredicateChecker) failingFilterDebugInfo(filterName string, nodeInfo *framework.NodeInfo) string { - infoParts := []string{fmt.Sprintf("nodeName: %q", nodeInfo.Node().Name)} - - switch filterName { - case "TaintToleration": - infoParts = append(infoParts, fmt.Sprintf("nodeTaints: %#v", nodeInfo.Node().Spec.Taints)) - } - - return strings.Join(infoParts, ", ") -}