Skip to content

Commit

Permalink
filter out those nodes are not helpful for preemption
Browse files Browse the repository at this point in the history
Signed-off-by: lowang-bh <[email protected]>
  • Loading branch information
lowang-bh committed May 23, 2024
1 parent ed9c93a commit 4c267ee
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) ([]*api.S

if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
return nil, api.NewFitStatus(task, node, statusSets...)
return nil, api.NewFitErrWithStatus(task, node, statusSets...)
}
return nil, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ func preempt(
predicateHelper util.PredicateHelper,
) (bool, error) {
assigned := false
allNodes := ssn.NodeList
// TODO: we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action

if err := ssn.PrePredicateFn(preemptor); err != nil {
return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err)
}
Expand All @@ -216,11 +215,12 @@ func preempt(
// When filtering candidate nodes, need to consider the node statusSets instead of the err information.
// refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422
if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
return nil, api.NewFitStatus(task, node, statusSets...)
return nil, api.NewFitErrWithStatus(task, node, statusSets...)
}
return nil, nil
}

// we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
allNodes := ssn.NodesWherePreemptionMightHelp(preemptor)
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ func (ra *Action) Execute(ssn *framework.Session) {
}

assigned := false
// TODO: we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
for _, n := range ssn.Nodes {
// we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
totalNodes := ssn.NodesWherePreemptionMightHelp(task)
for _, n := range totalNodes {
var statusSets api.StatusSets
statusSets, _ = ssn.PredicateFn(task, n)

Expand Down
17 changes: 14 additions & 3 deletions pkg/scheduler/api/unschedule_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ func (f *FitErrors) SetNodeError(nodeName string, err error) {
f.nodes[nodeName] = fe
}

// NotHelpfulForPreemptNodes returns the set of nodes that has no help from preempting pods from it
func (f *FitErrors) NotHelpfulForPreemptNodes() map[string]struct{} {
ret := make(map[string]struct{})
for _, node := range f.nodes {
if node.Status.ContainsUnschedulableAndUnresolvable() {
ret[node.NodeName] = struct{}{}
}
}
return ret
}

// Error returns the final error message
func (f *FitErrors) Error() string {
if f.err == "" {
Expand Down Expand Up @@ -99,7 +110,7 @@ type FitError struct {
Status StatusSets
}

// NewFitError return FitError by message
// NewFitError return FitError by message, setting default code to Error
func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError {
fe := &FitError{
taskName: task.Name,
Expand All @@ -114,8 +125,8 @@ func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError {
return fe
}

// NewFitStatus returns a fit error with code and reason in it
func NewFitStatus(task *TaskInfo, node *NodeInfo, sts ...*Status) *FitError {
// NewFitErrWithStatus returns a fit error with code and reason in it
func NewFitErrWithStatus(task *TaskInfo, node *NodeInfo, sts ...*Status) *FitError {
fe := &FitError{
taskName: task.Name,
taskNamespace: task.Namespace,
Expand Down
25 changes: 18 additions & 7 deletions pkg/scheduler/api/unschedule_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@ func TestFitError(t *testing.T) {
tests := []struct {
task *TaskInfo
node *NodeInfo
reason []string
status []*Status
want *FitError
// the wanted reason from fitError
reason []string
// the wanted fitError
wantErr *FitError
// string of fitError
errStr string
}{
{
task: &TaskInfo{Name: "pod1", Namespace: "ns1"},
node: &NodeInfo{Name: "node1"},
reason: []string{affinityRulesNotMatch, nodeAffinity},
want: &FitError{
wantErr: &FitError{
NodeName: "node1", taskNamespace: "ns1", taskName: "pod1",
Status: []*Status{{Reason: affinityRulesNotMatch, Code: Error}, {Reason: nodeAffinity, Code: Error}},
},
Expand All @@ -52,7 +55,7 @@ func TestFitError(t *testing.T) {
node: &NodeInfo{Name: "node2"},
status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}, {Reason: existingAntiAffinityNotMatch, Code: Error}},
reason: []string{nodeAffinity, existingAntiAffinityNotMatch},
want: &FitError{
wantErr: &FitError{
NodeName: "node2", taskNamespace: "ns2", taskName: "pod2",
Status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}, {Reason: existingAntiAffinityNotMatch, Code: Error}},
},
Expand All @@ -63,12 +66,12 @@ func TestFitError(t *testing.T) {
var got *FitError
for _, test := range tests {
if len(test.status) != 0 {
got = NewFitStatus(test.task, test.node, test.status...)
got = NewFitErrWithStatus(test.task, test.node, test.status...)
} else if len(test.reason) != 0 {
got = NewFitError(test.task, test.node, test.reason...)
}

assert.Equal(t, test.want, got)
assert.Equal(t, test.wantErr, got)
assert.Equal(t, test.reason, got.Reasons())
assert.Equal(t, test.errStr, got.Error())
}
Expand All @@ -81,15 +84,20 @@ func TestFitErrors(t *testing.T) {
err error
fiterr *FitError
want string // expected error string
// nodes that are not helpful for preempting, which has a code of UnschedulableAndUnresolvable
filterNodes map[string]struct{}
}{
{
want: "0/0 nodes are unavailable", // base fit err string is empty, set as the default
want: "0/0 nodes are unavailable", // base fit err string is empty, set as the default
filterNodes: map[string]struct{}{},
},
{
node: "node1",
fitStr: "fit failed",
err: fmt.Errorf(NodePodNumberExceeded),
want: "fit failed: 1 node(s) pod number exceeded.",
// no node has UnschedulableAndUnresolvable
filterNodes: map[string]struct{}{},
},
{
node: "node1",
Expand All @@ -100,6 +108,8 @@ func TestFitErrors(t *testing.T) {
Status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}},
},
want: "NodeResourceFitFailed: 1 node(s) didn't match Pod's node affinity/selector, 1 node(s) pod number exceeded.",
// only node2 has UnschedulableAndUnresolvable
filterNodes: map[string]struct{}{"node2": {}},
},
}
for _, test := range tests {
Expand All @@ -113,5 +123,6 @@ func TestFitErrors(t *testing.T) {
}
got := fitErrs.Error()
assert.Equal(t, test.want, got)
assert.Equal(t, test.filterNodes, fitErrs.NotHelpfulForPreemptNodes())
}
}
26 changes: 26 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,32 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool {
return helpers.CompareTask(lv, rv)
}

// NodesWherePreemptionMightHelp filter out those node that has unscheduleandunre
func (ssn *Session) NodesWherePreemptionMightHelp(task *api.TaskInfo) []*api.NodeInfo {
fitErrors, ok1 := ssn.Jobs[task.Job]
if !ok1 {
return ssn.NodeList
}
fitErr, ok2 := fitErrors.NodesFitErrors[task.UID]
if !ok2 {
return ssn.NodeList
}

skipNodes := fitErr.NotHelpfulForPreemptNodes()
if len(skipNodes) == 0 {
return ssn.NodeList
}

ret := make([]*api.NodeInfo, 0, len(ssn.Nodes))
for _, node := range ssn.Nodes {
if _, ok := skipNodes[node.Name]; !ok {
ret = append(ret, node)
}
}

return ret
}

// PredicateFn invoke predicate function of the plugins
func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
predicateStatus := make([]*api.Status, 0)
Expand Down
138 changes: 138 additions & 0 deletions pkg/scheduler/framework/session_plugins_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
Copyright 2024 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package framework

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"

schedulingv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/util"
)

func newFitErr(taskName, nodeName string, sts ...*api.Status) *api.FitError {
return api.NewFitErrWithStatus(&api.TaskInfo{Name: taskName}, &api.NodeInfo{Name: nodeName}, sts...)
}

func TestFilterOutPreemptMayNotHelpNodes(t *testing.T) {
tests := []struct {
Name string
PodGroups []*schedulingv1.PodGroup
Pods []*v1.Pod
Nodes []*v1.Node
Queues []*schedulingv1.Queue
status map[api.TaskID]*api.FitError
want map[api.TaskID][]string // task's nodes name list which is helpful for preemption
}{
{
Name: "all are helpful for preemption",
PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 1, nil, schedulingv1.PodGroupInqueue)},
Pods: []*v1.Pod{
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil),
util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}),
util.BuildNode("n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}),
},
Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)},
status: map[api.TaskID]*api.FitError{},
want: map[api.TaskID][]string{"c1-p2": {"n1", "n2"}, "c1-p1": {"n1", "n2"}},
},
{
Name: "master predicate failed: node selector does not match",
PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 1, nil, schedulingv1.PodGroupInqueue)},
Pods: []*v1.Pod{
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
},
Nodes: []*v1.Node{util.BuildNode("n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"})},
Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)},
status: map[api.TaskID]*api.FitError{"c1-p1": newFitErr("c1-p1", "n1", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable})},
want: map[api.TaskID][]string{"c1-p2": {"n1"}, "c1-p1": {}},
},
{
Name: "p1,p3 has node fit error",
PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 2, map[string]int32{"master": 1, "worker": 1}, schedulingv1.PodGroupInqueue)},
Pods: []*v1.Pod{
util.BuildPod("c1", "p0", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "master"}),
util.BuildNode("n2", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}),
},
Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)},
status: map[api.TaskID]*api.FitError{
"c1-p1": newFitErr("c1-p1", "n2", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable}),
"c1-p3": newFitErr("c1-p3", "n1", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable}),
},
// notes that are useful for preempting
want: map[api.TaskID][]string{
"c1-p0": {"n1", "n2"},
"c1-p1": {"n1"},
"c1-p2": {"n1", "n2"},
"c1-p3": {"n2"},
},
},
}

for i, test := range tests {
t.Run(test.Name, func(t *testing.T) {
scherCache := cache.NewDefaultMockSchedulerCache("test-scheduler")
for _, node := range test.Nodes {
scherCache.AddOrUpdateNode(node)
}
for _, pod := range test.Pods {
scherCache.AddPod(pod)
}
for _, pg := range test.PodGroups {
scherCache.AddPodGroupV1beta1(pg)
}
for _, queue := range test.Queues {
scherCache.AddQueueV1beta1(queue)
}
ssn := OpenSession(scherCache, nil, nil)
defer CloseSession(ssn)
for _, job := range ssn.Jobs {
for _, task := range job.TaskStatusIndex[api.Pending] {
if fitErr, exist := test.status[task.UID]; exist {
fe := api.NewFitErrors()
fe.SetNodeError(fitErr.NodeName, fitErr)
job.NodesFitErrors[task.UID] = fe
}

// check potential nodes
potentialNodes := ssn.NodesWherePreemptionMightHelp(task)
want := test.want[task.UID]
got := make([]string, 0, len(potentialNodes))
for _, node := range potentialNodes {
got = append(got, node.Name)
}
assert.Equal(t, want, got, fmt.Sprintf("case %d: task %s", i, task.UID))
}
}
})
}
}

0 comments on commit 4c267ee

Please sign in to comment.