Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move prefilter out of predicates to improve performance #2580

Merged
merged 1 commit into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,16 @@ func (alloc *Action) Execute(ssn *framework.Session) {

klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name)

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
fitErrors := api.NewFitErrors()
for _, ni := range allNodes {
fitErrors.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fitErrors
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, predicateFn)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
Expand Down
9 changes: 9 additions & 0 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ func (backfill *Action) Execute(ssn *framework.Session) {
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

// As task did not request resources, so it only need to meet predicates.
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
for _, node := range ssn.Nodes {
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ func preempt(

allNodes := ssn.NodeList

if err := ssn.PrePredicateFn(preemptor); err != nil {
return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err)
}
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (ra *Action) Execute(ssn *framework.Session) {
continue
}

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
continue
}

assigned := false
for _, n := range ssn.Nodes {
// If predicates failed, next node.
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ type JobEnqueuedFn func(interface{})
// PredicateFn is the func declaration used to predicate node for task.
type PredicateFn func(*TaskInfo, *NodeInfo) error

// PrePredicateFn is the func declaration used to pre-predicate node for task.
type PrePredicateFn func(*TaskInfo) error

// BestNodeFn is the func declaration used to return the nodeScores to plugins.
type BestNodeFn func(*TaskInfo, map[float64][]*NodeInfo) *NodeInfo

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Session struct {
namespaceOrderFns map[string]api.CompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
prePredicateFns map[string]api.PrePredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
Expand Down Expand Up @@ -118,6 +119,7 @@ func openSession(cache cache.Cache) *Session {
namespaceOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
Expand Down
26 changes: 26 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (ssn *Session) AddPredicateFn(name string, pf api.PredicateFn) {
ssn.predicateFns[name] = pf
}

// AddPredicateFn add Predicate function
func (ssn *Session) AddPrePredicateFn(name string, pf api.PrePredicateFn) {
ssn.prePredicateFns[name] = pf
}

// AddBestNodeFn add BestNode function
func (ssn *Session) AddBestNodeFn(name string, pf api.BestNodeFn) {
ssn.bestNodeFns[name] = pf
Expand Down Expand Up @@ -639,6 +644,27 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
return nil
}

// PrePredicateFn invoke predicate function of the plugins
func (ssn *Session) PrePredicateFn(task *api.TaskInfo) error {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
// we use same option as predicates for they are
if !isEnabled(plugin.EnabledPredicate) {
continue
}
pfn, found := ssn.prePredicateFns[plugin.Name]
if !found {
continue
}
err := pfn(task)
if err != nil {
return err
}
}
}
return nil
}

// BestNodeFn invoke bestNode function of the plugins
func (ssn *Session) BestNodeFn(task *api.TaskInfo, nodeScores map[float64][]*api.NodeInfo) *api.NodeInfo {
for _, tier := range ssn.Tiers {
Expand Down
72 changes: 39 additions & 33 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,44 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
plugin, _ = podtopologyspread.New(ptsArgs, handle, features)
podTopologySpreadFilter := plugin.(*podtopologyspread.PodTopologySpread)

state := k8sframework.NewCycleState()

ssn.AddPrePredicateFn(pp.Name(), func(task *api.TaskInfo) error {
// Check NodePorts
nodePortFilter.PreFilter(context.TODO(), state, task.Pod)

// InterPodAffinity Predicate
// TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter.
// In K8S V1.25, the return value result is added to the Prefile interface,
// indicating the list of nodes that meet filtering conditions.
// If the value of result is nil, all nodes meet the conditions.
// If the specified node information exists, only the node information in result meets the conditions.
// The value of Prefile in the current InterPodAffinity package always returns nil.
// The outer layer does not need to be processed temporarily.
// If the filtering logic is added to the Prefile node in the Volumebinding package in the future,
// the processing logic needs to be added to the return value result.
_, status := podAffinityFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message())
}

// Check PodTopologySpread
// TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter.
// In K8S V1.25, the return value result is added to the Prefile interface,
// indicating the list of nodes that meet filtering conditions.
// If the value of result is nil, all nodes meet the conditions.
// If the specified node information exists, only the node information in result meets the conditions.
// The value of Prefile in the current PodTopologySpread package always returns nil.
// The outer layer does not need to be processed temporarily.
// If the filtering logic is added to the Prefile node in the Volumebinding package in the future,
// the processing logic needs to be added to the return value result.
_, status = podTopologySpreadFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
}
return nil
})

ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo, found := nodeMap[node.Name]
if !found {
Expand All @@ -358,7 +396,6 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
return api.NewFitError(task, node, api.NodePodNumberExceeded)
}

state := k8sframework.NewCycleState()
predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) (bool, error) {
// CheckNodeUnschedulable
status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
Expand Down Expand Up @@ -402,26 +439,9 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
return err
}

// Check NodePorts
nodePortFilter.PreFilter(context.TODO(), state, task.Pod)
status := nodePortFilter.Filter(context.TODO(), state, nil, nodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message())
}

// InterPodAffinity Predicate
// TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter.
// In K8S V1.25, the return value result is added to the Prefile interface,
// indicating the list of nodes that meet filtering conditions.
// If the value of result is nil, all nodes meet the conditions.
// If the specified node information exists, only the node information in result meets the conditions.
// The value of Prefile in the current InterPodAffinity package always returns nil.
// The outer layer does not need to be processed temporarily.
// If the filtering logic is added to the Prefile node in the Volumebinding package in the future,
// the processing logic needs to be added to the return value result.
_, status = podAffinityFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message())
return fmt.Errorf("plugin %s predicates failed %s", nodeports.Name, status.Message())
}

status = podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
Expand All @@ -441,20 +461,6 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
return fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message())
}

// Check PodTopologySpread
// TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter.
// In K8S V1.25, the return value result is added to the Prefile interface,
// indicating the list of nodes that meet filtering conditions.
// If the value of result is nil, all nodes meet the conditions.
// If the specified node information exists, only the node information in result meets the conditions.
// The value of Prefile in the current PodTopologySpread package always returns nil.
// The outer layer does not need to be processed temporarily.
// If the filtering logic is added to the Prefile node in the Volumebinding package in the future,
// the processing logic needs to be added to the return value result.
_, status = podTopologySpreadFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
}
status = podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
Expand Down