From 45eb7a8c33111aad34b96799dbb0367f0d36dd6c Mon Sep 17 00:00:00 2001 From: lowang-bh Date: Sat, 18 May 2024 15:37:12 +0800 Subject: [PATCH] filter out those nodes are not helpful for preemption Signed-off-by: lowang-bh --- pkg/scheduler/actions/allocate/allocate.go | 2 +- pkg/scheduler/actions/preempt/preempt.go | 8 +- pkg/scheduler/actions/reclaim/reclaim.go | 5 +- pkg/scheduler/api/unschedule_info.go | 17 ++- pkg/scheduler/api/unschedule_info_test.go | 25 +++- pkg/scheduler/framework/session_plugins.go | 26 ++++ .../framework/session_plugins_test.go | 138 ++++++++++++++++++ 7 files changed, 204 insertions(+), 17 deletions(-) create mode 100644 pkg/scheduler/framework/session_plugins_test.go diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 4675c7cfe5f..e9328f50948 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -300,7 +300,7 @@ func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) ([]*api.S if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - return nil, api.NewFitStatus(task, node, statusSets...) + return nil, api.NewFitErrWithStatus(task, node, statusSets...) } return nil, nil } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index bbcf0a339ef..71da0efd211 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -208,8 +208,7 @@ func preempt( predicateHelper util.PredicateHelper, ) (bool, error) { assigned := false - allNodes := ssn.NodeList - // TODO: we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action + if err := ssn.PrePredicateFn(preemptor); err != nil { return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err) } @@ -221,11 +220,12 @@ func preempt( // When filtering candidate nodes, need to consider the node statusSets instead of the err information. // refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422 if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - return nil, api.NewFitStatus(task, node, statusSets...) + return nil, api.NewFitErrWithStatus(task, node, statusSets...) } return nil, nil } - + // we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action + allNodes := ssn.NodesWherePreemptionMightHelp(preemptor) predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true) nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 1224f69f750..9b4198e4f8b 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -125,8 +125,9 @@ func (ra *Action) Execute(ssn *framework.Session) { } assigned := false - // TODO: we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action - for _, n := range ssn.Nodes { + // we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action + totalNodes := ssn.NodesWherePreemptionMightHelp(task) + for _, n := range totalNodes { var statusSets api.StatusSets statusSets, _ = ssn.PredicateFn(task, n) diff --git a/pkg/scheduler/api/unschedule_info.go b/pkg/scheduler/api/unschedule_info.go index f7149222a67..674ab4b7caf 100644 --- a/pkg/scheduler/api/unschedule_info.go +++ b/pkg/scheduler/api/unschedule_info.go @@ -63,6 +63,17 @@ func (f *FitErrors) SetNodeError(nodeName string, err error) { f.nodes[nodeName] = fe } +// NotHelpfulForPreemptNodes returns the set of nodes that has no help from preempting pods from it +func (f *FitErrors) NotHelpfulForPreemptNodes() map[string]struct{} { + ret := make(map[string]struct{}) + for _, node := range f.nodes { + if node.Status.ContainsUnschedulableAndUnresolvable() { + ret[node.NodeName] = struct{}{} + } + } + return ret +} + // Error returns the final error message func (f *FitErrors) Error() string { if f.err == "" { @@ -99,7 +110,7 @@ type FitError struct { Status StatusSets } -// NewFitError return FitError by message +// NewFitError return FitError by message, setting default code to Error func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError { fe := &FitError{ taskName: task.Name, @@ -114,8 +125,8 @@ func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError { return fe } -// NewFitStatus returns a fit error with code and reason in it -func NewFitStatus(task *TaskInfo, node *NodeInfo, sts ...*Status) *FitError { +// NewFitErrWithStatus returns a fit error with code and reason in it +func NewFitErrWithStatus(task *TaskInfo, node *NodeInfo, sts ...*Status) *FitError { fe := &FitError{ taskName: task.Name, taskNamespace: task.Namespace, diff --git a/pkg/scheduler/api/unschedule_info_test.go b/pkg/scheduler/api/unschedule_info_test.go index 37ab23a4e32..c3dcb1cb65f 100644 --- a/pkg/scheduler/api/unschedule_info_test.go +++ b/pkg/scheduler/api/unschedule_info_test.go @@ -32,16 +32,19 @@ func TestFitError(t *testing.T) { tests := []struct { task *TaskInfo node *NodeInfo - reason []string status []*Status - want *FitError + // the wanted reason from fitError + reason []string + // the wanted fitError + wantErr *FitError + // string of fitError errStr string }{ { task: &TaskInfo{Name: "pod1", Namespace: "ns1"}, node: &NodeInfo{Name: "node1"}, reason: []string{affinityRulesNotMatch, nodeAffinity}, - want: &FitError{ + wantErr: &FitError{ NodeName: "node1", taskNamespace: "ns1", taskName: "pod1", Status: []*Status{{Reason: affinityRulesNotMatch, Code: Error}, {Reason: nodeAffinity, Code: Error}}, }, @@ -52,7 +55,7 @@ func TestFitError(t *testing.T) { node: &NodeInfo{Name: "node2"}, status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}, {Reason: existingAntiAffinityNotMatch, Code: Error}}, reason: []string{nodeAffinity, existingAntiAffinityNotMatch}, - want: &FitError{ + wantErr: &FitError{ NodeName: "node2", taskNamespace: "ns2", taskName: "pod2", Status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}, {Reason: existingAntiAffinityNotMatch, Code: Error}}, }, @@ -63,12 +66,12 @@ func TestFitError(t *testing.T) { var got *FitError for _, test := range tests { if len(test.status) != 0 { - got = NewFitStatus(test.task, test.node, test.status...) + got = NewFitErrWithStatus(test.task, test.node, test.status...) } else if len(test.reason) != 0 { got = NewFitError(test.task, test.node, test.reason...) } - assert.Equal(t, test.want, got) + assert.Equal(t, test.wantErr, got) assert.Equal(t, test.reason, got.Reasons()) assert.Equal(t, test.errStr, got.Error()) } @@ -81,15 +84,20 @@ func TestFitErrors(t *testing.T) { err error fiterr *FitError want string // expected error string + // nodes that are not helpful for preempting, which has a code of UnschedulableAndUnresolvable + filterNodes map[string]struct{} }{ { - want: "0/0 nodes are unavailable", // base fit err string is empty, set as the default + want: "0/0 nodes are unavailable", // base fit err string is empty, set as the default + filterNodes: map[string]struct{}{}, }, { node: "node1", fitStr: "fit failed", err: fmt.Errorf(NodePodNumberExceeded), want: "fit failed: 1 node(s) pod number exceeded.", + // no node has UnschedulableAndUnresolvable + filterNodes: map[string]struct{}{}, }, { node: "node1", @@ -100,6 +108,8 @@ func TestFitErrors(t *testing.T) { Status: []*Status{{Reason: nodeAffinity, Code: UnschedulableAndUnresolvable}}, }, want: "NodeResourceFitFailed: 1 node(s) didn't match Pod's node affinity/selector, 1 node(s) pod number exceeded.", + // only node2 has UnschedulableAndUnresolvable + filterNodes: map[string]struct{}{"node2": {}}, }, } for _, test := range tests { @@ -113,5 +123,6 @@ func TestFitErrors(t *testing.T) { } got := fitErrs.Error() assert.Equal(t, test.want, got) + assert.Equal(t, test.filterNodes, fitErrs.NotHelpfulForPreemptNodes()) } } diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index a95fc80b280..83a5a735fbf 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -626,6 +626,32 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool { return helpers.CompareTask(lv, rv) } +// NodesWherePreemptionMightHelp filter out those node that has unscheduleandunre +func (ssn *Session) NodesWherePreemptionMightHelp(task *api.TaskInfo) []*api.NodeInfo { + fitErrors, ok1 := ssn.Jobs[task.Job] + if !ok1 { + return ssn.NodeList + } + fitErr, ok2 := fitErrors.NodesFitErrors[task.UID] + if !ok2 { + return ssn.NodeList + } + + skipNodes := fitErr.NotHelpfulForPreemptNodes() + if len(skipNodes) == 0 { + return ssn.NodeList + } + + ret := make([]*api.NodeInfo, 0, len(ssn.Nodes)) + for _, node := range ssn.Nodes { + if _, ok := skipNodes[node.Name]; !ok { + ret = append(ret, node) + } + } + + return ret +} + // PredicateFn invoke predicate function of the plugins func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { predicateStatus := make([]*api.Status, 0) diff --git a/pkg/scheduler/framework/session_plugins_test.go b/pkg/scheduler/framework/session_plugins_test.go new file mode 100644 index 00000000000..a84b8f238aa --- /dev/null +++ b/pkg/scheduler/framework/session_plugins_test.go @@ -0,0 +1,138 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + + schedulingv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/cache" + "volcano.sh/volcano/pkg/scheduler/util" +) + +func newFitErr(taskName, nodeName string, sts ...*api.Status) *api.FitError { + return api.NewFitErrWithStatus(&api.TaskInfo{Name: taskName}, &api.NodeInfo{Name: nodeName}, sts...) +} + +func TestFilterOutPreemptMayNotHelpNodes(t *testing.T) { + tests := []struct { + Name string + PodGroups []*schedulingv1.PodGroup + Pods []*v1.Pod + Nodes []*v1.Node + Queues []*schedulingv1.Queue + status map[api.TaskID]*api.FitError + want map[api.TaskID][]string // task's nodes name list which is helpful for preemption + }{ + { + Name: "all are helpful for preemption", + PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 1, nil, schedulingv1.PodGroupInqueue)}, + Pods: []*v1.Pod{ + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, nil), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, nil), + }, + Nodes: []*v1.Node{ + util.BuildNode("n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}), + util.BuildNode("n2", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}), + }, + Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)}, + status: map[api.TaskID]*api.FitError{}, + want: map[api.TaskID][]string{"c1-p2": {"n1", "n2"}, "c1-p1": {"n1", "n2"}}, + }, + { + Name: "master predicate failed: node selector does not match", + PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 1, nil, schedulingv1.PodGroupInqueue)}, + Pods: []*v1.Pod{ + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("2", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}), + }, + Nodes: []*v1.Node{util.BuildNode("n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"})}, + Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)}, + status: map[api.TaskID]*api.FitError{"c1-p1": newFitErr("c1-p1", "n1", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable})}, + want: map[api.TaskID][]string{"c1-p2": {"n1"}, "c1-p1": {}}, + }, + { + Name: "p1,p3 has node fit error", + PodGroups: []*schedulingv1.PodGroup{util.BuildPodGroup("pg1", "c1", "c1", 2, map[string]int32{"master": 1, "worker": 1}, schedulingv1.PodGroupInqueue)}, + Pods: []*v1.Pod{ + util.BuildPod("c1", "p0", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}), + util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}), + util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}), + util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}), + }, + Nodes: []*v1.Node{ + util.BuildNode("n1", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "master"}), + util.BuildNode("n2", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}), + }, + Queues: []*schedulingv1.Queue{util.BuildQueue("c1", 1, nil)}, + status: map[api.TaskID]*api.FitError{ + "c1-p1": newFitErr("c1-p1", "n2", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable}), + "c1-p3": newFitErr("c1-p3", "n1", &api.Status{Reason: "node(s) didn't match Pod's node selector", Code: api.UnschedulableAndUnresolvable}), + }, + // notes that are useful for preempting + want: map[api.TaskID][]string{ + "c1-p0": {"n1", "n2"}, + "c1-p1": {"n1"}, + "c1-p2": {"n1", "n2"}, + "c1-p3": {"n2"}, + }, + }, + } + + for i, test := range tests { + t.Run(test.Name, func(t *testing.T) { + scherCache := cache.NewDefaultMockSchedulerCache("test-scheduler") + for _, node := range test.Nodes { + scherCache.AddOrUpdateNode(node) + } + for _, pod := range test.Pods { + scherCache.AddPod(pod) + } + for _, pg := range test.PodGroups { + scherCache.AddPodGroupV1beta1(pg) + } + for _, queue := range test.Queues { + scherCache.AddQueueV1beta1(queue) + } + ssn := OpenSession(scherCache, nil, nil) + defer CloseSession(ssn) + for _, job := range ssn.Jobs { + for _, task := range job.TaskStatusIndex[api.Pending] { + if fitErr, exist := test.status[task.UID]; exist { + fe := api.NewFitErrors() + fe.SetNodeError(fitErr.NodeName, fitErr) + job.NodesFitErrors[task.UID] = fe + } + + // check potential nodes + potentialNodes := ssn.NodesWherePreemptionMightHelp(task) + want := test.want[task.UID] + got := make([]string, 0, len(potentialNodes)) + for _, node := range potentialNodes { + got = append(got, node.Name) + } + assert.Equal(t, want, got, fmt.Sprintf("case %d: task %s", i, task.UID)) + } + } + }) + } +}