Skip to content

Commit

Permalink
filter out those nodes are not helpful for preemption
Browse files Browse the repository at this point in the history
Signed-off-by: lowang-bh <[email protected]>
  • Loading branch information
lowang-bh committed May 18, 2024
1 parent ed9c93a commit 241d974
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 11 deletions.
6 changes: 3 additions & 3 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ func preempt(
predicateHelper util.PredicateHelper,
) (bool, error) {
assigned := false
allNodes := ssn.NodeList
// TODO: we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action

if err := ssn.PrePredicateFn(preemptor); err != nil {
return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err)
}
Expand All @@ -220,7 +219,8 @@ func preempt(
}
return nil, nil
}

// we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
allNodes := ssn.NodesWherePreemptionMightHelp(preemptor)
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ func (ra *Action) Execute(ssn *framework.Session) {
}

assigned := false
// TODO: we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
for _, n := range ssn.Nodes {
// we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
totalNodes := ssn.NodesWherePreemptionMightHelp(task)
for _, n := range totalNodes {
var statusSets api.StatusSets
statusSets, _ = ssn.PredicateFn(task, n)

Expand Down
11 changes: 11 additions & 0 deletions pkg/scheduler/api/unschedule_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ func (f *FitErrors) SetNodeError(nodeName string, err error) {
f.nodes[nodeName] = fe
}

// NotHelpfulForPreemptNodes returns the set of nodes that has no help from preempting pods from it
func (f *FitErrors) NotHelpfulForPreemptNodes() map[string]struct{} {
ret := make(map[string]struct{})
for _, node := range f.nodes {
if node.Status.ContainsUnschedulableAndUnresolvable() {
ret[node.NodeName] = struct{}{}
}
}
return ret
}

// Error returns the final error message
func (f *FitErrors) Error() string {
if f.err == "" {
Expand Down
23 changes: 17 additions & 6 deletions pkg/scheduler/api/unschedule_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@ func TestFitError(t *testing.T) {
tests := []struct {
task *TaskInfo
node *NodeInfo
reason []string
status []*Status
want *FitError
// the wanted reason from fitError
reason []string
// the wanted fitError
wantErr *FitError
// string of fitError
errStr string
}{
{
task: &TaskInfo{Name: "pod1", Namespace: "ns1"},
node: &NodeInfo{Name: "node1"},
reason: []string{affinityRulesNotMatch, nodeAffinity},
want: &FitError{
wantErr: &FitError{
NodeName: "node1", taskNamespace: "ns1", taskName: "pod1",
Status: []*Status{{Reason: affinityRulesNotMatch, Code: Error}, {Reason: nodeAffinity, Code: Error}},
},
Expand All @@ -52,7 +55,7 @@ func TestFitError(t *testing.T) {
node: &NodeInfo{Name: "node2"},
status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}, {Reason: existingAntiAffinityNotMatch, Code: Error}},
reason: []string{nodeAffinity, existingAntiAffinityNotMatch},
want: &FitError{
wantErr: &FitError{
NodeName: "node2", taskNamespace: "ns2", taskName: "pod2",
Status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}, {Reason: existingAntiAffinityNotMatch, Code: Error}},
},
Expand All @@ -68,7 +71,7 @@ func TestFitError(t *testing.T) {
got = NewFitError(test.task, test.node, test.reason...)
}

assert.Equal(t, test.want, got)
assert.Equal(t, test.wantErr, got)
assert.Equal(t, test.reason, got.Reasons())
assert.Equal(t, test.errStr, got.Error())
}
Expand All @@ -81,15 +84,20 @@ func TestFitErrors(t *testing.T) {
err error
fiterr *FitError
want string // expected error string
// nodes that are not helpful for preempting, which has a code of UnschedulableAndUnresolvable
filterNodes map[string]struct{}
}{
{
want: "0/0 nodes are unavailable", // base fit err string is empty, set as the default
want: "0/0 nodes are unavailable", // base fit err string is empty, set as the default
filterNodes: map[string]struct{}{},
},
{
node: "node1",
fitStr: "fit failed",
err: fmt.Errorf(NodePodNumberExceeded),
want: "fit failed: 1 node(s) pod number exceeded.",
// no node has UnschedulableAndUnresolvable
filterNodes: map[string]struct{}{},
},
{
node: "node1",
Expand All @@ -100,6 +108,8 @@ func TestFitErrors(t *testing.T) {
Status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}},
},
want: "NodeResourceFitFailed: 1 node(s) didn't match Pod's node affinity/selector, 1 node(s) pod number exceeded.",
// only node2 has UnschedulableAndUnresolvable
filterNodes: map[string]struct{}{"node2": {}},
},
}
for _, test := range tests {
Expand All @@ -113,5 +123,6 @@ func TestFitErrors(t *testing.T) {
}
got := fitErrs.Error()
assert.Equal(t, test.want, got)
assert.Equal(t, test.filterNodes, fitErrs.NotHelpfulForPreemptNodes())
}
}
26 changes: 26 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,32 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool {
return helpers.CompareTask(lv, rv)
}

// NodesWherePreemptionMightHelp filter out those node that has unscheduleandunre
func (ssn *Session) NodesWherePreemptionMightHelp(task *api.TaskInfo) []*api.NodeInfo {
fitErrors, ok1 := ssn.Jobs[task.Job]
if !ok1 {
return ssn.NodeList
}
fitErr, ok2 := fitErrors.NodesFitErrors[task.UID]
if !ok2 {
return ssn.NodeList
}

skipNodes := fitErr.NotHelpfulForPreemptNodes()
if len(skipNodes) == 0 {
return ssn.NodeList
}

ret := make([]*api.NodeInfo, 0, len(ssn.Nodes))
for _, node := range ssn.Nodes {
if _, ok := skipNodes[node.Name]; !ok {
ret = append(ret, node)
}
}

return ret
}

// PredicateFn invoke predicate function of the plugins
func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
predicateStatus := make([]*api.Status, 0)
Expand Down
141 changes: 141 additions & 0 deletions pkg/scheduler/framework/session_plugins_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2024 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package framework

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"

schedulingv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/util"
)

func newFitErr(taskName, nodeName string, sts ...*api.Status) *api.FitError {
return api.NewFitStatus(&api.TaskInfo{Name: taskName}, &api.NodeInfo{Name: nodeName}, sts...)
}

func TestFilterOutPreemptMayNotHelpNodes(t *testing.T) {
tests := []struct {
Name string
PodGroups []*schedulingv1.PodGroup
Pods []*v1.Pod
Nodes []*v1.Node
Queues []*schedulingv1.Queue
status map[api.TaskID]*api.FitError
want map[api.TaskID][]string // task's nodes name list which is helpful for preemption
}{
{
Name: "all are helpful for preemption",
PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 1, nil, schedulingv1.PodGroupInqueue)},
Pods: []*v1.Pod{
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil),
util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}),
util.BuildNode("n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}),
},
Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)},
status: map[api.TaskID]*api.FitError{},
want: map[api.TaskID][]string{"c1-p2": {"n1", "n2"}, "c1-p1": {"n1", "n2"}},
},
{
Name: "master predicate failed: node selector does not match",
PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 1, nil, schedulingv1.PodGroupInqueue)},
Pods: []*v1.Pod{
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
},
Nodes: []*v1.Node{util.BuildNode("n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"})},
Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)},
status: map[api.TaskID]*api.FitError{"c1-p1": newFitErr("c1-p1", "n1", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable})},
want: map[api.TaskID][]string{"c1-p2": {"n1"}, "c1-p1": {}},
},
{
Name: "p1,p3 has node fit error",
PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 2, map[string]int32{"master": 1, "worker": 1}, schedulingv1.PodGroupInqueue)},
Pods: []*v1.Pod{
util.BuildPod("c1", "p0", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "master"}),
util.BuildNode("n2", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}),
},
Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)},
status: map[api.TaskID]*api.FitError{
"c1-p1": newFitErr("c1-p1", "n2", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable}),
"c1-p3": newFitErr("c1-p3", "n1", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable}),
},
// notes that are useful for preempting
want: map[api.TaskID][]string{
"c1-p0": {"n1", "n2"},
"c1-p1": {"n1"},
"c1-p2": {"n1", "n2"},
"c1-p3": {"n2"},
},
},
}

for i, test := range tests {
t.Run(test.Name, func(t *testing.T) {
scherCache := cache.NewDefaultMockSchedulerCache("test-scheduler")
for _, node := range test.Nodes {
scherCache.AddOrUpdateNode(node)
}
for _, pod := range test.Pods {
scherCache.AddPod(pod)
}
for _, pg := range test.PodGroups {
scherCache.AddPodGroupV1beta1(pg)
}
for _, queue := range test.Queues {
scherCache.AddQueueV1beta1(queue)
}
ssn := OpenSession(scherCache, nil, nil)
defer CloseSession(ssn)
for _, job := range ssn.Jobs {
for _, task := range job.TaskStatusIndex[api.Pending] {
if fitErr, exist := test.status[task.UID]; exist {
fe := api.NewFitErrors()
fe.SetNodeError(fitErr.NodeName, fitErr)
job.NodesFitErrors[task.UID] = fe
}

// check potential nodes
potentialNodes := ssn.NodesWherePreemptionMightHelp(task)
want := test.want[task.UID]
if len(potentialNodes) != len(want) {
t.Fatalf("case %d(%s): task %s want potential nodes length: %d, but got %d", i, test.Name, task.UID, len(want), len(potentialNodes))
}
got := make([]string, 0, len(potentialNodes))
for _, node := range potentialNodes {
got = append(got, node.Name)
}
assert.Equal(t, want, got, fmt.Sprintf("case %d: task %s", i, task.UID))
}
}
})
}
}

0 comments on commit 241d974

Please sign in to comment.