Skip to content

Commit

Permalink
CA: refactor SchedulerBasedPredicateChecker into SchedulerPluginRunner
Browse files Browse the repository at this point in the history
For DRA, this component will have to call the Reserve phase in addition
to just checking predicates/filters.

The new version also makes more sense in the context of
PredicateSnapshot, which is the only context now.
  • Loading branch information
towca committed Nov 14, 2024
1 parent 01bb42c commit c97a422
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package predicatechecker
package predicate

import (
"context"
Expand All @@ -24,49 +24,32 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"

apiv1 "k8s.io/api/core/v1"
v1listers "k8s.io/client-go/listers/core/v1"
klog "k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// SchedulerBasedPredicateChecker checks whether all required predicates pass for given Pod and Node.
// The verification is done by calling out to scheduler code.
type SchedulerBasedPredicateChecker struct {
fwHandle *framework.Handle
nodeLister v1listers.NodeLister
podLister v1listers.PodLister
lastIndex int
// SchedulerPluginRunner can be used to run various phases of scheduler plugins through the scheduler framework.
type SchedulerPluginRunner struct {
fwHandle *framework.Handle
snapshotBase clustersnapshot.SnapshotBase
lastIndex int
}

// NewSchedulerBasedPredicateChecker builds scheduler based PredicateChecker.
func NewSchedulerBasedPredicateChecker(fwHandle *framework.Handle) *SchedulerBasedPredicateChecker {
return &SchedulerBasedPredicateChecker{fwHandle: fwHandle}
// NewSchedulerPluginRunner builds a SchedulerPluginRunner.
func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshotBase clustersnapshot.SnapshotBase) *SchedulerPluginRunner {
return &SchedulerPluginRunner{fwHandle: fwHandle, snapshotBase: snapshotBase}
}

// FitsAnyNode checks if the given pod can be placed on any of the given nodes.
func (p *SchedulerBasedPredicateChecker) FitsAnyNode(clusterSnapshot clustersnapshot.SnapshotBase, pod *apiv1.Pod) (string, error) {
return p.FitsAnyNodeMatching(clusterSnapshot, pod, func(*framework.NodeInfo) bool {
return true
})
}

// FitsAnyNodeMatching checks if the given pod can be placed on any of the given nodes matching the provided function.
func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clustersnapshot.SnapshotBase, pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, error) {
if clusterSnapshot == nil {
return "", fmt.Errorf("ClusterSnapshot not provided")
}

nodeInfosList, err := clusterSnapshot.ListNodeInfos()
// RunFiltersUntilPassingNode runs the scheduler framework PreFilter phase once, and then keeps running the Filter phase for all nodes in the cluster that match the provided
// function - until a Node where the filters pass is found. Filters are only run for matching Nodes. If no matching node with passing filters is found, an error is returned.
//
// The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner.
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (nodeName string, err error) {
nodeInfosList, err := p.snapshotBase.ListNodeInfos()
if err != nil {
// This should never happen.
//
// Scheduler requires interface returning error, but no implementation
// of ClusterSnapshot ever does it.
klog.Errorf("Error obtaining nodeInfos from schedulerLister")
return "", fmt.Errorf("error obtaining nodeInfos from schedulerLister")
}

p.fwHandle.DelegatingLister.UpdateDelegate(clusterSnapshot)
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotBase)
defer p.fwHandle.DelegatingLister.ResetDelegate()

state := schedulerframework.NewCycleState()
Expand Down Expand Up @@ -99,18 +82,15 @@ func (p *SchedulerBasedPredicateChecker) FitsAnyNodeMatching(clusterSnapshot clu
return "", fmt.Errorf("cannot put pod %s on any node", pod.Name)
}

// CheckPredicates checks if the given pod can be placed on the given node.
func (p *SchedulerBasedPredicateChecker) CheckPredicates(clusterSnapshot clustersnapshot.SnapshotBase, pod *apiv1.Pod, nodeName string) *clustersnapshot.PredicateError {
if clusterSnapshot == nil {
return clustersnapshot.NewPredicateError(clustersnapshot.InternalPredicateError, "", "ClusterSnapshot not provided", nil, emptyString)
}
nodeInfo, err := clusterSnapshot.GetNodeInfo(nodeName)
// RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node.
func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) *clustersnapshot.PredicateError {
nodeInfo, err := p.snapshotBase.GetNodeInfo(nodeName)
if err != nil {
errorMessage := fmt.Sprintf("Error obtaining NodeInfo for name %s; %v", nodeName, err)
return clustersnapshot.NewPredicateError(clustersnapshot.InternalPredicateError, "", errorMessage, nil, emptyString)
}

p.fwHandle.DelegatingLister.UpdateDelegate(clusterSnapshot)
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshotBase)
defer p.fwHandle.DelegatingLister.ResetDelegate()

state := schedulerframework.NewCycleState()
Expand Down Expand Up @@ -149,7 +129,7 @@ func (p *SchedulerBasedPredicateChecker) CheckPredicates(clusterSnapshot cluster
return nil
}

func (p *SchedulerBasedPredicateChecker) buildDebugInfo(filterName string, nodeInfo *framework.NodeInfo) func() string {
func (p *SchedulerPluginRunner) buildDebugInfo(filterName string, nodeInfo *framework.NodeInfo) func() string {
switch filterName {
case "TaintToleration":
taints := nodeInfo.Node().Spec.Taints
Expand Down
Loading

0 comments on commit c97a422

Please sign in to comment.