diff --git a/example/extender/extender.go b/example/extender/extender.go index 77f5c11495..b313a5d9b8 100644 --- a/example/extender/extender.go +++ b/example/extender/extender.go @@ -72,10 +72,8 @@ func predicate(w http.ResponseWriter, r *http.Request) { resp := &extender.PredicateResponse{} if req.Task.BestEffort && len(req.Node.Tasks) > 10 { - sts := api.Status{} - sts.Code = api.Unschedulable - sts.Reason = "Too many tasks on the node" - resp.Status = append(resp.Status, &sts) + resp.ErrorMessage = "Too many tasks on the node" + resp.Code = api.Unschedulable } response, err := json.Marshal(resp) if err != nil { diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index e606b17349..ec21a1b570 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -303,23 +303,14 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a } } -func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { +func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) error { // Check for Resource Predicate var statusSets api.StatusSets if ok, resources := task.InitResreq.LessEqualWithResourcesName(node.FutureIdle(), api.Zero); !ok { statusSets = append(statusSets, &api.Status{Code: api.Unschedulable, Reason: api.WrapInsufficientResourceReason(resources)}) - return nil, api.NewFitErrWithStatus(task, node, statusSets...) + return api.NewFitErrWithStatus(task, node, statusSets...) } - statusSets, err := alloc.session.PredicateFn(task, node) - if err != nil { - return nil, api.NewFitError(task, node, err.Error()) - } - - if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || - statusSets.ContainsErrorSkipOrWait() { - return nil, api.NewFitErrWithStatus(task, node, statusSets...) - } - return nil, nil + return alloc.session.PredicateForAllocateAction(task, node) } func (alloc *Action) UnInitialize() {} diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 8daa720031..576cc674ad 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -17,7 +17,6 @@ limitations under the License. package backfill import ( - "fmt" "time" "k8s.io/klog/v2" @@ -44,20 +43,7 @@ func (backfill *Action) Execute(ssn *framework.Session) { klog.V(5).Infof("Enter Backfill ...") defer klog.V(5).Infof("Leaving Backfill ...") - predicateFunc := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { - var statusSets api.StatusSets - statusSets, err := ssn.PredicateFn(task, node) - if err != nil { - return nil, err - } - - // predicateHelper.PredicateNodes will print the log if predicate failed, so don't print log anymore here - if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - err := fmt.Errorf(statusSets.Message()) // should not include variables in api node errors - return nil, err - } - return nil, nil - } + predicateFunc := ssn.PredicateForAllocateAction // TODO (k82cn): When backfill, it's also need to balance between Queues. pendingTasks := backfill.pickUpPendingTasks(ssn) diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 0b51adfa8b..308efef6d4 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -213,17 +213,7 @@ func preempt( return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err) } - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { - var statusSets api.StatusSets - statusSets, _ = ssn.PredicateFn(task, node) - - // When filtering candidate nodes, need to consider the node statusSets instead of the err information. - // refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422 - if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - return nil, api.NewFitErrWithStatus(task, node, statusSets...) - } - return nil, nil - } + predicateFn := ssn.PredicateForPreemptAction // we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action allNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(preemptor) predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index a028f5726f..a856e907f1 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -128,16 +128,13 @@ func (ra *Action) Execute(ssn *framework.Session) { // we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action totalNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(task) for _, n := range totalNodes { - var statusSets api.StatusSets - statusSets, _ = ssn.PredicateFn(task, n) - // When filtering candidate nodes, need to consider the node statusSets instead of the err information. // refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422 - if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - klog.V(5).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, reason is %s.", - task.Namespace, task.Name, n.Name, statusSets.Message()) + if err := ssn.PredicateForPreemptAction(task, n); err != nil { + klog.V(4).Infof("Reclaim predicate for task %s/%s on node %s return error %v ", task.Namespace, task.Name, n.Name, err) continue } + klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", task.Namespace, task.Name, n.Name) var reclaimees []*api.TaskInfo diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 7c52481ce5..a4c6eef719 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -147,11 +147,15 @@ const ( Wait // Skip is used when a Bind plugin chooses to skip binding. Skip + // There is a Pending status in k8s. + // Pending means that the scheduling process is finished successfully, + // but the plugin wants to stop the scheduling cycle/binding cycle here. ) type Status struct { Code int Reason string + Plugin string } // String represents status string @@ -227,6 +231,33 @@ func (s StatusSets) Reasons() []string { return all } +// ConvertPredicateStatus return predicate status from k8sframework status +func ConvertPredicateStatus(status *k8sframework.Status) *Status { + internalStatus := &Status{} + if status != nil { + internalStatus.Plugin = status.Plugin() // function didn't check whether Status is nil + } + switch status.Code() { + case k8sframework.Error: + internalStatus.Code = Error + case k8sframework.Unschedulable: + internalStatus.Code = Unschedulable + case k8sframework.UnschedulableAndUnresolvable: + internalStatus.Code = UnschedulableAndUnresolvable + case k8sframework.Wait: + internalStatus.Code = Wait + case k8sframework.Skip: + internalStatus.Code = Skip + default: + internalStatus.Code = Success + } + // in case that pod's scheduling message is not identifiable with message: 'all nodes are unavailable' + if internalStatus.Code != Success { + internalStatus.Reason = status.Message() + } + return internalStatus +} + // ValidateExFn is the func declaration used to validate the result. type ValidateExFn func(interface{}) *ValidateResult @@ -237,7 +268,7 @@ type VoteFn func(interface{}) int type JobEnqueuedFn func(interface{}) // PredicateFn is the func declaration used to predicate node for task. -type PredicateFn func(*TaskInfo, *NodeInfo) ([]*Status, error) +type PredicateFn func(*TaskInfo, *NodeInfo) error // PrePredicateFn is the func declaration used to pre-predicate node for task. type PrePredicateFn func(*TaskInfo) error diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 34ada7c541..0d87588ab1 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -313,6 +313,52 @@ func (ssn *Session) GetUnschedulableAndUnresolvableNodesForTask(task *api.TaskIn return ret } +// PredicateForAllocateAction checks if the predicate error contains +// - Unschedulable +// - UnschedulableAndUnresolvable +// - ErrorSkipOrWait +func (ssn *Session) PredicateForAllocateAction(task *api.TaskInfo, node *api.NodeInfo) error { + err := ssn.PredicateFn(task, node) + if err == nil { + return nil + } + + fitError, ok := err.(*api.FitError) + if !ok { + return api.NewFitError(task, node, err.Error()) + } + + statusSets := fitError.Status + if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || + statusSets.ContainsErrorSkipOrWait() { + return fitError + } + return nil +} + +// PredicateForPreemptAction checks if the predicate error contains: +// - UnschedulableAndUnresolvable +// - ErrorSkipOrWait +func (ssn *Session) PredicateForPreemptAction(task *api.TaskInfo, node *api.NodeInfo) error { + err := ssn.PredicateFn(task, node) + if err == nil { + return nil + } + + fitError, ok := err.(*api.FitError) + if !ok { + return api.NewFitError(task, node, err.Error()) + } + + // When filtering candidate nodes, need to consider the node statusSets instead of the err information. + // refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422 + statusSets := fitError.Status + if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { + return fitError + } + return nil +} + // Statement returns new statement object func (ssn *Session) Statement() *Statement { return &Statement{ diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index a95fc80b28..e2d9a91911 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -627,8 +627,7 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool { } // PredicateFn invoke predicate function of the plugins -func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { - predicateStatus := make([]*api.Status, 0) +func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error { for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { if !isEnabled(plugin.EnabledPredicate) { @@ -638,14 +637,13 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api. if !found { continue } - status, err := pfn(task, node) - predicateStatus = append(predicateStatus, status...) + err := pfn(task, node) if err != nil { - return predicateStatus, err + return err } } } - return predicateStatus, nil + return nil } // PrePredicateFn invoke predicate function of the plugins diff --git a/pkg/scheduler/framework/util.go b/pkg/scheduler/framework/util.go index 9f802b107d..7c93d61cc7 100644 --- a/pkg/scheduler/framework/util.go +++ b/pkg/scheduler/framework/util.go @@ -262,24 +262,3 @@ func (nl *NodeLister) List() ([]*v1.Node, error) { } return nodes, nil } - -// ConvertPredicateStatus return predicate status from k8sframework status -func ConvertPredicateStatus(status *k8sframework.Status) *api.Status { - internalStatus := &api.Status{} - if status.Code() == k8sframework.Success { - internalStatus.Code = api.Success - return internalStatus - } else if status.Code() == k8sframework.Unschedulable { - internalStatus.Code = api.Unschedulable - internalStatus.Reason = status.Message() - return internalStatus - } else if status.Code() == k8sframework.UnschedulableAndUnresolvable { - internalStatus.Code = api.UnschedulableAndUnresolvable - internalStatus.Reason = status.Message() - return internalStatus - } else { - internalStatus.Code = api.Error - internalStatus.Reason = status.Message() - return internalStatus - } -} diff --git a/pkg/scheduler/plugins/deviceshare/deviceshare.go b/pkg/scheduler/plugins/deviceshare/deviceshare.go index 62b083cba7..90a0b8b0cb 100644 --- a/pkg/scheduler/plugins/deviceshare/deviceshare.go +++ b/pkg/scheduler/plugins/deviceshare/deviceshare.go @@ -18,7 +18,6 @@ package deviceshare import ( "context" - "fmt" "math" "reflect" @@ -109,7 +108,7 @@ func getDeviceScore(ctx context.Context, pod *v1.Pod, node *api.NodeInfo, schedu func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { // Register event handlers to update task info in PodLister & nodeMap - ssn.AddPredicateFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + ssn.AddPredicateFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { predicateStatus := make([]*api.Status, 0) // Check PredicateWithCache for _, val := range api.RegisteredDevices { @@ -120,8 +119,9 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { predicateStatus = append(predicateStatus, &api.Status{ Code: devices.Unschedulable, Reason: "node not initialized with device" + val, + Plugin: PluginName, }) - return predicateStatus, fmt.Errorf("node not initialized with device %s", val) + return api.NewFitErrWithStatus(task, node, predicateStatus...) } klog.V(4).Infof("pod %s/%s did not request device %s on %s, skipping it", task.Pod.Namespace, task.Pod.Name, val, node.Name) continue @@ -129,12 +129,12 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { code, msg, err := dev.FilterNode(task.Pod, dp.schedulePolicy) if err != nil { predicateStatus = append(predicateStatus, createStatus(code, msg)) - return predicateStatus, err + return api.NewFitErrWithStatus(task, node, predicateStatus...) } filterNodeStatus := createStatus(code, msg) if filterNodeStatus.Code != api.Success { predicateStatus = append(predicateStatus, filterNodeStatus) - return predicateStatus, fmt.Errorf("plugin device filternode predicates failed %s", msg) + return api.NewFitErrWithStatus(task, node, predicateStatus...) } } else { klog.Warningf("Devices %s assertion conversion failed, skip", val) @@ -144,7 +144,7 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("checkDevices predicates Task <%s/%s> on Node <%s>: fit ", task.Namespace, task.Name, node.Name) - return predicateStatus, nil + return nil }) ssn.AddNodeOrderFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { diff --git a/pkg/scheduler/plugins/extender/argument.go b/pkg/scheduler/plugins/extender/argument.go index cdd130322e..994ced2cc3 100644 --- a/pkg/scheduler/plugins/extender/argument.go +++ b/pkg/scheduler/plugins/extender/argument.go @@ -22,7 +22,8 @@ type PredicateRequest struct { } type PredicateResponse struct { - Status []*api.Status `json:"status"` + ErrorMessage string `json:"status"` + Code int `json:"code"` } type PrioritizeRequest struct { diff --git a/pkg/scheduler/plugins/extender/extender.go b/pkg/scheduler/plugins/extender/extender.go index fea755f71c..6cb993cc1d 100644 --- a/pkg/scheduler/plugins/extender/extender.go +++ b/pkg/scheduler/plugins/extender/extender.go @@ -161,20 +161,27 @@ func (ep *extenderPlugin) OnSessionOpen(ssn *framework.Session) { } if ep.config.predicateVerb != "" { - ssn.AddPredicateFn(ep.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + ssn.AddPredicateFn(ep.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { resp := &PredicateResponse{} err := ep.send(ep.config.predicateVerb, &PredicateRequest{Task: task, Node: node}, resp) if err != nil { klog.Warningf("Predicate failed with error %v", err) if ep.config.ignorable { - return nil, nil + return nil } - return nil, err + return api.NewFitError(task, node, err.Error()) } - predicateStatus := resp.Status - return predicateStatus, nil + if len(resp.ErrorMessage) == 0 { + return nil + } + // keep compatibility with old behavior: error messages length is not zero, + // but didn't return a code, and code will be 0 for default. Change code to Error for corresponding + if resp.Code == api.Success { + resp.Code = api.Error + } + return api.NewFitErrWithStatus(task, node, &api.Status{Code: resp.Code, Reason: resp.ErrorMessage, Plugin: PluginName}) }) } diff --git a/pkg/scheduler/plugins/nodegroup/nodegroup.go b/pkg/scheduler/plugins/nodegroup/nodegroup.go index 117459c7ba..c0bff94a01 100644 --- a/pkg/scheduler/plugins/nodegroup/nodegroup.go +++ b/pkg/scheduler/plugins/nodegroup/nodegroup.go @@ -18,7 +18,6 @@ package nodegroup import ( "errors" - "fmt" "k8s.io/apimachinery/pkg/util/sets" @@ -226,7 +225,7 @@ func (np *nodeGroupPlugin) OnSessionOpen(ssn *framework.Session) { } ssn.AddNodeOrderFn(np.Name(), nodeOrderFn) - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { predicateStatus := make([]*api.Status, 0) group := node.Node.Labels[NodeGroupNameKey] @@ -237,15 +236,16 @@ func (np *nodeGroupPlugin) OnSessionOpen(ssn *framework.Session) { Reason: "node not satisfy", } predicateStatus = append(predicateStatus, nodeStatus) - return predicateStatus, fmt.Errorf("<%s> predicates Task <%s/%s> on Node <%s> of nodegroup <%v> failed <%v>", np.Name(), task.Namespace, task.Name, node.Name, group, err) + return api.NewFitErrWithStatus(task, node, predicateStatus...) } klog.V(4).Infof("task <%s>/<%s> queue %s on node %s of nodegroup %v", task.Namespace, task.Name, queue, node.Name, group) nodeStatus := &api.Status{ Code: api.Success, Reason: "node satisfy task", + Plugin: PluginName, } predicateStatus = append(predicateStatus, nodeStatus) - return predicateStatus, nil + return api.NewFitErrWithStatus(task, node, predicateStatus...) } ssn.AddPredicateFn(np.Name(), predicateFn) diff --git a/pkg/scheduler/plugins/nodegroup/nodegroup_test.go b/pkg/scheduler/plugins/nodegroup/nodegroup_test.go index 793c2b3185..9027a0db35 100644 --- a/pkg/scheduler/plugins/nodegroup/nodegroup_test.go +++ b/pkg/scheduler/plugins/nodegroup/nodegroup_test.go @@ -1,33 +1,39 @@ +/* +Copyright 2024 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package nodegroup import ( "fmt" - "reflect" "testing" - "github.com/agiledragon/gomonkey/v2" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" batch "volcano.sh/apis/pkg/apis/batch/v1alpha1" schedulingv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/pkg/scheduler/api" - "volcano.sh/volcano/pkg/scheduler/cache" "volcano.sh/volcano/pkg/scheduler/conf" "volcano.sh/volcano/pkg/scheduler/framework" + "volcano.sh/volcano/pkg/scheduler/uthelper" "volcano.sh/volcano/pkg/scheduler/util" ) func TestNodeGroup(t *testing.T) { - var tmp *cache.SchedulerCache - patchUpdateQueueStatus := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "UpdateQueueStatus", func(scCache *cache.SchedulerCache, queue *api.QueueInfo) error { - return nil - }) - defer patchUpdateQueueStatus.Reset() - - framework.RegisterPluginBuilder(PluginName, New) - defer framework.CleanupPluginBuilders() + plugins := map[string]framework.PluginBuilder{PluginName: New} p1 := util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("2", "4Gi"), "pg1", map[string]string{ batch.QueueNameKey: "q1", @@ -51,25 +57,8 @@ func TestNodeGroup(t *testing.T) { }) n5 := util.BuildNode("n5", api.BuildResourceList("4", "16Gi"), make(map[string]string)) - pg1 := &schedulingv1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pg1", - Namespace: "c1", - }, - Spec: schedulingv1.PodGroupSpec{ - Queue: "q1", - }, - } - - pg2 := &schedulingv1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pg2", - Namespace: "c1", - }, - Spec: schedulingv1.PodGroupSpec{ - Queue: "q2", - }, - } + pg1 := util.BuildPodGroup("pg1", "c1", "q1", 0, nil, "") + pg2 := util.BuildPodGroup("pg2", "c1", "q2", 0, nil, "") queue1 := &schedulingv1.Queue{ ObjectMeta: metav1.ObjectMeta{ @@ -110,28 +99,19 @@ func TestNodeGroup(t *testing.T) { } tests := []struct { - name string - podGroups []*schedulingv1.PodGroup - pods []*v1.Pod - nodes []*v1.Node - queues []*schedulingv1.Queue + uthelper.TestCommonStruct arguments framework.Arguments expected map[string]map[string]float64 expectedStatus map[string]map[string]int }{ { - name: "case: soft constraints is subset of hard constraints", - podGroups: []*schedulingv1.PodGroup{ - pg1, - }, - queues: []*schedulingv1.Queue{ - queue1, - }, - pods: []*v1.Pod{ - p1, - }, - nodes: []*v1.Node{ - n1, n2, n3, n4, n5, + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "case: soft constraints is subset of hard constraints", + PodGroups: []*schedulingv1.PodGroup{pg1}, + Queues: []*schedulingv1.Queue{queue1}, + Pods: []*v1.Pod{p1}, + Nodes: []*v1.Node{n1, n2, n3, n4, n5}, + Plugins: plugins, }, arguments: framework.Arguments{}, expected: map[string]map[string]float64{ @@ -155,18 +135,13 @@ func TestNodeGroup(t *testing.T) { }, { // test unnormal case - name: "case: soft constraints is not subset of hard constraints", - podGroups: []*schedulingv1.PodGroup{ - pg2, - }, - queues: []*schedulingv1.Queue{ - queue2, - }, - pods: []*v1.Pod{ - p2, - }, - nodes: []*v1.Node{ - n1, n2, n3, n4, n5, + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "case: soft constraints is not subset of hard constraints", + PodGroups: []*schedulingv1.PodGroup{pg2}, + Queues: []*schedulingv1.Queue{queue2}, + Pods: []*v1.Pod{p2}, + Nodes: []*v1.Node{n1, n2, n3, n4, n5}, + Plugins: plugins, }, arguments: framework.Arguments{}, expected: map[string]map[string]float64{ @@ -191,34 +166,9 @@ func TestNodeGroup(t *testing.T) { } for i, test := range tests { - t.Run(fmt.Sprintf("case %v %v", i, test.name), func(t *testing.T) { - binder := util.NewFakeBinder(0) - schedulerCache := &cache.SchedulerCache{ - Nodes: make(map[string]*api.NodeInfo), - Jobs: make(map[api.JobID]*api.JobInfo), - Queues: make(map[api.QueueID]*api.QueueInfo), - Binder: binder, - StatusUpdater: &util.FakeStatusUpdater{}, - VolumeBinder: &util.FakeVolumeBinder{}, - - Recorder: record.NewFakeRecorder(100), - } - - for _, node := range test.nodes { - schedulerCache.AddOrUpdateNode(node) - } - for _, pod := range test.pods { - schedulerCache.AddPod(pod) - } - for _, ss := range test.podGroups { - schedulerCache.AddPodGroupV1beta1(ss) - } - for _, q := range test.queues { - schedulerCache.AddQueueV1beta1(q) - } - + t.Run(fmt.Sprintf("case %v %v", i, test.Name), func(t *testing.T) { trueValue := true - ssn := framework.OpenSession(schedulerCache, []conf.Tier{ + tiers := []conf.Tier{ { Plugins: []conf.PluginOption{ { @@ -229,8 +179,9 @@ func TestNodeGroup(t *testing.T) { }, }, }, - }, nil) - defer framework.CloseSession(ssn) + } + ssn := test.RegisterSession(tiers, nil) + defer test.Close() for _, job := range ssn.Jobs { for _, task := range job.Tasks { @@ -246,9 +197,15 @@ func TestNodeGroup(t *testing.T) { t.Errorf("case%d: task %s on node %s expect have score %v, but get %v", i, taskID, node.Name, expectScore, score) } - status, _ := ssn.PredicateFn(task, node) - if expectStatus := test.expectedStatus[taskID][node.Name]; expectStatus != status[0].Code { - t.Errorf("case%d: task %s on node %s expect have status code %v, but get %v", i, taskID, node.Name, expectStatus, status[0].Code) + var code int + err = ssn.PredicateFn(task, node) + if err == nil { + code = api.Success + } else { + code = err.(*api.FitError).Status[0].Code + } + if expectStatus := test.expectedStatus[taskID][node.Name]; expectStatus != code { + t.Errorf("case%d: task %s on node %s expect have status code %v, but get %v", i, taskID, node.Name, expectStatus, code) } } diff --git a/pkg/scheduler/plugins/numaaware/numaaware.go b/pkg/scheduler/plugins/numaaware/numaaware.go index feaf305316..f1ec94cd1b 100644 --- a/pkg/scheduler/plugins/numaaware/numaaware.go +++ b/pkg/scheduler/plugins/numaaware/numaaware.go @@ -113,16 +113,19 @@ func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) { }, }) - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { predicateStatus := make([]*api.Status, 0) numaStatus := &api.Status{} if v1qos.GetPodQOS(task.Pod) != v1.PodQOSGuaranteed { klog.V(3).Infof("task %s isn't Guaranteed pod", task.Name) - return predicateStatus, nil + return nil } if fit, err := filterNodeByPolicy(task, node, pp.nodeResSets); !fit { - return predicateStatus, err + if err != nil { + return api.NewFitError(task, node, err.Error()) + } + return nil } resNumaSets := pp.nodeResSets[node.Name].Clone() @@ -134,11 +137,10 @@ func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) { hit, admit := taskPolicy.Predicate(providersHints) if !admit { numaStatus.Code = api.UnschedulableAndUnresolvable - numaStatus.Reason = fmt.Sprintf("plugin %s predicates failed for task %s container %s on node %s", - pp.Name(), task.Name, container.Name, node.Name) + numaStatus.Reason = fmt.Sprintf("container %s cannot be assigned by numa", container.Name) + numaStatus.Plugin = PluginName predicateStatus = append(predicateStatus, numaStatus) - return predicateStatus, fmt.Errorf("plugin %s predicates failed for task %s container %s on node %s", - pp.Name(), task.Name, container.Name, node.Name) + return api.NewFitErrWithStatus(task, node, predicateStatus...) } klog.V(4).Infof("[numaaware] hits for task %s container '%v': %v on node %s, besthit: %v", @@ -161,9 +163,7 @@ func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof(" task %s's on node<%s> resAssignMap: %v", task.Name, node.Name, pp.assignRes[task.UID][node.Name]) - numaStatus.Code = api.Success - predicateStatus = append(predicateStatus, numaStatus) - return predicateStatus, nil + return nil } ssn.AddPredicateFn(pp.Name(), predicateFn) diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index b83bc540dc..cc2173208c 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -375,11 +375,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { return nil }) - ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { predicateStatus := make([]*api.Status, 0) nodeInfo, found := nodeMap[node.Name] if !found { - return predicateStatus, fmt.Errorf("failed to predicates, node info for %s not found", node.Name) + return api.NewFitError(task, node, "node info not found") } if node.Allocatable.MaxTaskNum <= len(nodeInfo.Pods) { @@ -396,7 +396,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { // CheckNodeUnschedulable predicateStatus := make([]*api.Status, 0) status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - nodeUnscheduleStatus := framework.ConvertPredicateStatus(status) + nodeUnscheduleStatus := api.ConvertPredicateStatus(status) if nodeUnscheduleStatus.Code != api.Success { predicateStatus = append(predicateStatus, nodeUnscheduleStatus) return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeUnscheduleFilter.Name(), status.Message()) @@ -405,7 +405,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { // Check NodeAffinity if predicate.nodeAffinityEnable { status := nodeAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - nodeAffinityStatus := framework.ConvertPredicateStatus(status) + nodeAffinityStatus := api.ConvertPredicateStatus(status) if nodeAffinityStatus.Code != api.Success { predicateStatus = append(predicateStatus, nodeAffinityStatus) return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeAffinityFilter.Name(), status.Message()) @@ -415,7 +415,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { // PodToleratesNodeTaints: TaintToleration if predicate.taintTolerationEnable { status := tolerationFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - tolerationStatus := framework.ConvertPredicateStatus(status) + tolerationStatus := api.ConvertPredicateStatus(status) if tolerationStatus.Code != api.Success { predicateStatus = append(predicateStatus, tolerationStatus) return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", tolerationFilter.Name(), status.Message()) @@ -432,20 +432,23 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { if predicate.cacheEnable { fit, err = pCache.PredicateWithCache(node.Name, task.Pod) if err != nil { - predicateCacheStatus, fit, err = predicateByStablefilter(task.Pod, nodeInfo) + predicateCacheStatus, fit, _ = predicateByStablefilter(task.Pod, nodeInfo) pCache.UpdateCache(node.Name, task.Pod, fit) } else { if !fit { err = fmt.Errorf("plugin equivalence cache predicates failed") + predicateCacheStatus = append(predicateCacheStatus, &api.Status{ + Code: api.Error, Reason: err.Error(), Plugin: CachePredicate, + }) } } } else { - predicateCacheStatus, fit, err = predicateByStablefilter(task.Pod, nodeInfo) + predicateCacheStatus, fit, _ = predicateByStablefilter(task.Pod, nodeInfo) } predicateStatus = append(predicateStatus, predicateCacheStatus...) if !fit { - return predicateStatus, err + return api.NewFitErrWithStatus(task, node, predicateStatus...) } // Check NodePort @@ -453,10 +456,10 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { isSkipNodePorts := handleSkipPredicatePlugin(task, skipPlugins, nodePortFilter.Name(), node) if !isSkipNodePorts { status := nodePortFilter.Filter(context.TODO(), state, nil, nodeInfo) - nodePortStatus := framework.ConvertPredicateStatus(status) + nodePortStatus := api.ConvertPredicateStatus(status) if nodePortStatus.Code != api.Success { predicateStatus = append(predicateStatus, nodePortStatus) - return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodePortFilter.Name(), status.Message()) + return api.NewFitErrWithStatus(task, node, predicateStatus...) } } } @@ -466,10 +469,10 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { isSkipInterPodAffinity := handleSkipPredicatePlugin(task, skipPlugins, podAffinityFilter.Name(), node) if !isSkipInterPodAffinity { status := podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - podAffinityStatus := framework.ConvertPredicateStatus(status) + podAffinityStatus := api.ConvertPredicateStatus(status) if podAffinityStatus.Code != api.Success { predicateStatus = append(predicateStatus, podAffinityStatus) - return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", podAffinityFilter.Name(), status.Message()) + return api.NewFitErrWithStatus(task, node, predicateStatus...) } } } @@ -477,17 +480,18 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { // Check NodeVolumeLimits if predicate.nodeVolumeLimitsEnable { status := nodeVolumeLimitsCSIFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - nodeVolumeStatus := framework.ConvertPredicateStatus(status) + nodeVolumeStatus := api.ConvertPredicateStatus(status) predicateStatus = append(predicateStatus, nodeVolumeStatus) + return api.NewFitErrWithStatus(task, node, predicateStatus...) } // Check VolumeZone if predicate.volumeZoneEnable { status := volumeZoneFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - volumeZoneStatus := framework.ConvertPredicateStatus(status) + volumeZoneStatus := api.ConvertPredicateStatus(status) if volumeZoneStatus.Code != api.Success { predicateStatus = append(predicateStatus, volumeZoneStatus) - return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message()) + return api.NewFitErrWithStatus(task, node, predicateStatus...) } } @@ -496,25 +500,25 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { isSkipPodTopologySpreadFilter := handleSkipPredicatePlugin(task, skipPlugins, podTopologySpreadFilter.Name(), node) if !isSkipPodTopologySpreadFilter { status := podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - podTopologyStatus := framework.ConvertPredicateStatus(status) + podTopologyStatus := api.ConvertPredicateStatus(status) if podTopologyStatus.Code != api.Success { predicateStatus = append(predicateStatus, podTopologyStatus) - return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) + return api.NewFitErrWithStatus(task, node, predicateStatus...) } } } if predicate.proportionalEnable { // Check ProportionalPredicate - proportionalStatus, err := checkNodeResourceIsProportional(task, node, predicate.proportional) + proportionalStatus, _ := checkNodeResourceIsProportional(task, node, predicate.proportional) if proportionalStatus.Code != api.Success { predicateStatus = append(predicateStatus, proportionalStatus) - return predicateStatus, err + return api.NewFitErrWithStatus(task, node, predicateStatus...) } klog.V(4).Infof("checkNodeResourceIsProportional predicates Task <%s/%s> on Node <%s>: fit %v", task.Namespace, task.Name, node.Name, fit) } - return predicateStatus, nil + return nil }) } diff --git a/pkg/scheduler/plugins/predicates/proportional.go b/pkg/scheduler/plugins/predicates/proportional.go index 65322274e5..1bf17e5f09 100644 --- a/pkg/scheduler/plugins/predicates/proportional.go +++ b/pkg/scheduler/plugins/predicates/proportional.go @@ -27,7 +27,8 @@ import ( // checkNodeResourceIsProportional checks if a gpu:cpu:memory is Proportional func checkNodeResourceIsProportional(task *api.TaskInfo, node *api.NodeInfo, proportional map[v1.ResourceName]baseResource) (*api.Status, error) { status := &api.Status{ - Code: api.Success, + Code: api.Success, + Plugin: ProportionalPredicate, } for resourceName := range proportional { if value, found := task.Resreq.ScalarResources[resourceName]; found && value > 0 { diff --git a/pkg/scheduler/plugins/tdm/tdm.go b/pkg/scheduler/plugins/tdm/tdm.go index e0d19af2e6..cf91b8219e 100644 --- a/pkg/scheduler/plugins/tdm/tdm.go +++ b/pkg/scheduler/plugins/tdm/tdm.go @@ -143,30 +143,31 @@ func (tp *tdmPlugin) OnSessionOpen(ssn *framework.Session) { }() // tdm plugin just handle revocable node - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { - predicateStatus := make([]*api.Status, 0) - tdmStatus := &api.Status{} + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { + tdmStatus := &api.Status{ + Plugin: PluginName, + } + predicateStatus := []*api.Status{tdmStatus} if node.RevocableZone == "" { - return predicateStatus, nil + return nil } if err := tp.availableRevocableZone(node.RevocableZone); err != nil { tdmStatus.Code = api.UnschedulableAndUnresolvable - tdmStatus.Reason = fmt.Sprintf("plugin %s predicates %v", tp.Name(), err) - return predicateStatus, fmt.Errorf("plugin %s predicates %v", tp.Name(), err) + tdmStatus.Reason = err.Error() + return api.NewFitErrWithStatus(task, node, predicateStatus...) } klog.V(4).Infof("TDM node %v revocable zone %v:%v is active", node.Name, node.RevocableZone, tp.revocableZone[node.RevocableZone]) if len(task.RevocableZone) == 0 { - msg := fmt.Sprintf("task %s/%s is not allow to dispatch to revocable node %s", task.Namespace, task.Name, node.Name) - return predicateStatus, fmt.Errorf("plugin %s predicates %s", tp.Name(), msg) + tdmStatus.Code = api.UnschedulableAndUnresolvable + tdmStatus.Reason = "not allow to dispatch to revocable node" + return api.NewFitErrWithStatus(task, node, predicateStatus...) } - tdmStatus.Code = api.Success - predicateStatus = append(predicateStatus, tdmStatus) klog.V(4).Infof("TDM filter for Task %s/%s on node %s pass.", task.Namespace, task.Name, node.Name) - return predicateStatus, nil + return nil } // tdm plugin just handle revocable node diff --git a/pkg/scheduler/plugins/tdm/tdm_test.go b/pkg/scheduler/plugins/tdm/tdm_test.go index 12281cc357..fbc6e0ff10 100644 --- a/pkg/scheduler/plugins/tdm/tdm_test.go +++ b/pkg/scheduler/plugins/tdm/tdm_test.go @@ -237,20 +237,10 @@ func Test_TDM(t *testing.T) { predicatedNode := make([]*api.NodeInfo, 0) for _, node := range ssn.Nodes { - predicateStatus, err := ssn.PredicateFn(task, node) + err := ssn.PredicateFn(task, node) if err != nil { continue } - predicateIsSuccess := true - for _, status := range predicateStatus { - if status != nil && status.Code != api.Success { - predicateIsSuccess = false - break - } - } - if predicateIsSuccess == false { - continue - } predicatedNode = append(predicatedNode, node) } diff --git a/pkg/scheduler/plugins/usage/usage.go b/pkg/scheduler/plugins/usage/usage.go index 3c89ad51a8..7140eebc45 100644 --- a/pkg/scheduler/plugins/usage/usage.go +++ b/pkg/scheduler/plugins/usage/usage.go @@ -17,7 +17,6 @@ limitations under the License. package usage import ( - "fmt" "time" "volcano.sh/volcano/pkg/scheduler/metrics/source" @@ -124,18 +123,16 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) { } } - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { predicateStatus := make([]*api.Status, 0) - usageStatus := &api.Status{} + usageStatus := &api.Status{Plugin: PluginName} now := time.Now() if up.period == "" || now.Sub(node.ResourceUsage.MetricsTime) > MetricsActiveTime { klog.V(4).Infof("The period(%s) is empty or the usage metrics data is not updated for more than %v minutes, "+ "Usage plugin filter for task %s/%s on node %s pass, metrics time is %v. ", up.period, MetricsActiveTime, task.Namespace, task.Name, node.Name, node.ResourceUsage.MetricsTime) - usageStatus.Code = api.Success - predicateStatus = append(predicateStatus, usageStatus) - return predicateStatus, nil + return nil } klog.V(4).Infof("predicateFn cpuUsageAvg:%v,predicateFn memUsageAvg:%v", up.cpuThresholds, up.memThresholds) @@ -144,18 +141,18 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) { usageStatus.Code = api.UnschedulableAndUnresolvable usageStatus.Reason = NodeUsageCPUExtend predicateStatus = append(predicateStatus, usageStatus) - return predicateStatus, fmt.Errorf("plugin %s predicates failed, because of %s", up.Name(), NodeUsageCPUExtend) + return api.NewFitErrWithStatus(task, node, predicateStatus...) } if node.ResourceUsage.MEMUsageAvg[up.period] > up.memThresholds { klog.V(3).Infof("Node %s mem usage %f exceeds the threshold %f", node.Name, node.ResourceUsage.MEMUsageAvg[up.period], up.memThresholds) usageStatus.Code = api.UnschedulableAndUnresolvable usageStatus.Reason = NodeUsageMemoryExtend predicateStatus = append(predicateStatus, usageStatus) - return predicateStatus, fmt.Errorf("plugin %s predicates failed, because of %s", up.Name(), NodeUsageMemoryExtend) + return api.NewFitErrWithStatus(task, node, predicateStatus...) } klog.V(4).Infof("Usage plugin filter for task %s/%s on node %s pass.", task.Namespace, task.Name, node.Name) - return predicateStatus, nil + return nil } nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { diff --git a/pkg/scheduler/plugins/usage/usage_test.go b/pkg/scheduler/plugins/usage/usage_test.go index a679d4b22d..56bc3d4fc2 100644 --- a/pkg/scheduler/plugins/usage/usage_test.go +++ b/pkg/scheduler/plugins/usage/usage_test.go @@ -19,21 +19,18 @@ package usage import ( "fmt" "math" - "reflect" + "strings" "testing" "time" - "github.com/agiledragon/gomonkey/v2" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" schedulingv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/pkg/scheduler/api" - "volcano.sh/volcano/pkg/scheduler/cache" "volcano.sh/volcano/pkg/scheduler/conf" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/metrics/source" + "volcano.sh/volcano/pkg/scheduler/uthelper" "volcano.sh/volcano/pkg/scheduler/util" ) @@ -63,14 +60,7 @@ func updateNodeUsage(nodesInfo map[string]*api.NodeInfo, nodesUsage map[string]* } func TestUsage_predicateFn(t *testing.T) { - var tmp *cache.SchedulerCache - patchUpdateQueueStatus := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "UpdateQueueStatus", func(scCache *cache.SchedulerCache, queue *api.QueueInfo) error { - return nil - }) - defer patchUpdateQueueStatus.Reset() - - framework.RegisterPluginBuilder(PluginName, New) - defer framework.CleanupPluginBuilders() + plugins := map[string]framework.PluginBuilder{PluginName: New} p1 := util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("1", "1Gi"), "pg1", make(map[string]string), make(map[string]string)) p2 := util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("1", "1Gi"), "pg1", make(map[string]string), make(map[string]string)) @@ -101,47 +91,23 @@ func TestUsage_predicateFn(t *testing.T) { // The node can schedule pods. nodesUsage[n5.Name] = buildNodeUsage(map[string]float64{source.NODE_METRICS_PERIOD: 90}, map[string]float64{source.NODE_METRICS_PERIOD: 81}, time.Time{}) - pg1 := &schedulingv1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pg1", - Namespace: "c1", - }, - Spec: schedulingv1.PodGroupSpec{ - Queue: "q1", - }, - } - queue1 := &schedulingv1.Queue{ - ObjectMeta: metav1.ObjectMeta{ - Name: "q1", - }, - Spec: schedulingv1.QueueSpec{ - Weight: 1, - }, - } + pg1 := util.BuildPodGroup("pg1", "c1", "q1", 0, nil, "") + + queue1 := util.BuildQueue("q1", 1, nil) tests := []struct { - name string - podGroups []*schedulingv1.PodGroup - queues []*schedulingv1.Queue - pods []*v1.Pod - nodes []*v1.Node + uthelper.TestCommonStruct nodesUsageMap map[string]*api.NodeUsage arguments framework.Arguments expected predicateResult }{ { - name: "The node cannot be scheduled, because of the CPU load of the node exceeds the upper limit.", - podGroups: []*schedulingv1.PodGroup{ - pg1, - }, - queues: []*schedulingv1.Queue{ - queue1, - }, - pods: []*v1.Pod{ - p1, p2, - }, - nodes: []*v1.Node{ - n1, + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "The node cannot be scheduled, because of the CPU load of the node exceeds the upper limit.", + PodGroups: []*schedulingv1.PodGroup{pg1}, + Queues: []*schedulingv1.Queue{queue1}, + Pods: []*v1.Pod{p1, p2}, + Nodes: []*v1.Node{n1}, }, nodesUsageMap: nodesUsage, arguments: framework.Arguments{ @@ -164,18 +130,12 @@ func TestUsage_predicateFn(t *testing.T) { }, }, { - name: "The node cannot be scheduled, because of the memory load of the node exceeds the upper limit.", - podGroups: []*schedulingv1.PodGroup{ - pg1, - }, - queues: []*schedulingv1.Queue{ - queue1, - }, - pods: []*v1.Pod{ - p1, p2, - }, - nodes: []*v1.Node{ - n2, + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "The node cannot be scheduled, because of the memory load of the node exceeds the upper limit.", + PodGroups: []*schedulingv1.PodGroup{pg1}, + Queues: []*schedulingv1.Queue{queue1}, + Pods: []*v1.Pod{p1, p2}, + Nodes: []*v1.Node{n2}, }, nodesUsageMap: nodesUsage, arguments: framework.Arguments{ @@ -198,18 +158,12 @@ func TestUsage_predicateFn(t *testing.T) { }, }, { - name: "The node can be scheduled, because of the CPU usage and memory usage do not exceed the upper limit.", - podGroups: []*schedulingv1.PodGroup{ - pg1, - }, - queues: []*schedulingv1.Queue{ - queue1, - }, - pods: []*v1.Pod{ - p1, p2, - }, - nodes: []*v1.Node{ - n3, + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "The node can be scheduled, because of the CPU usage and memory usage do not exceed the upper limit.", + PodGroups: []*schedulingv1.PodGroup{pg1}, + Queues: []*schedulingv1.Queue{queue1}, + Pods: []*v1.Pod{p1, p2}, + Nodes: []*v1.Node{n3}, }, nodesUsageMap: nodesUsage, arguments: framework.Arguments{ @@ -232,18 +186,12 @@ func TestUsage_predicateFn(t *testing.T) { }, }, { - name: "The node can be scheduled, because of the metrics are not updated in the latest 5 minutes, and the usage function is invalid.", - podGroups: []*schedulingv1.PodGroup{ - pg1, - }, - queues: []*schedulingv1.Queue{ - queue1, - }, - pods: []*v1.Pod{ - p1, p2, - }, - nodes: []*v1.Node{ - n4, + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "The node can be scheduled, because of the metrics are not updated in the latest 5 minutes, and the usage function is invalid.", + PodGroups: []*schedulingv1.PodGroup{pg1}, + Queues: []*schedulingv1.Queue{queue1}, + Pods: []*v1.Pod{p1, p2}, + Nodes: []*v1.Node{n4}, }, nodesUsageMap: nodesUsage, arguments: framework.Arguments{ @@ -266,18 +214,12 @@ func TestUsage_predicateFn(t *testing.T) { }, }, { - name: "The node can be scheduled, because of the metric time is in the initial state, and the usage function is invalid.", - podGroups: []*schedulingv1.PodGroup{ - pg1, - }, - queues: []*schedulingv1.Queue{ - queue1, - }, - pods: []*v1.Pod{ - p1, p2, - }, - nodes: []*v1.Node{ - n5, + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "The node can be scheduled, because of the metric time is in the initial state, and the usage function is invalid.", + PodGroups: []*schedulingv1.PodGroup{pg1}, + Queues: []*schedulingv1.Queue{queue1}, + Pods: []*v1.Pod{p1, p2}, + Nodes: []*v1.Node{n5}, }, nodesUsageMap: nodesUsage, arguments: framework.Arguments{ @@ -302,33 +244,9 @@ func TestUsage_predicateFn(t *testing.T) { } for i, test := range tests { - t.Run(test.name, func(t *testing.T) { - schedulerCache := &cache.SchedulerCache{ - Nodes: make(map[string]*api.NodeInfo), - Jobs: make(map[api.JobID]*api.JobInfo), - Queues: make(map[api.QueueID]*api.QueueInfo), - StatusUpdater: &util.FakeStatusUpdater{}, - VolumeBinder: &util.FakeVolumeBinder{}, - - Recorder: record.NewFakeRecorder(100), - } - - for _, node := range test.nodes { - schedulerCache.AddOrUpdateNode(node) - } - for _, pod := range test.pods { - schedulerCache.AddPod(pod) - } - for _, ss := range test.podGroups { - schedulerCache.AddPodGroupV1beta1(ss) - } - for _, q := range test.queues { - schedulerCache.AddQueueV1beta1(q) - } - updateNodeUsage(schedulerCache.Nodes, nodesUsage) - + t.Run(test.Name, func(t *testing.T) { trueValue := true - ssn := framework.OpenSession(schedulerCache, []conf.Tier{ + tiers := []conf.Tier{ { Plugins: []conf.PluginOption{ { @@ -338,29 +256,48 @@ func TestUsage_predicateFn(t *testing.T) { }, }, }, - }, nil) - defer framework.CloseSession(ssn) + } + test.Plugins = plugins + ssn := test.RegisterSession(tiers, nil) + defer test.Close() + + updateNodeUsage(ssn.Nodes, nodesUsage) for _, job := range ssn.Jobs { for _, task := range job.Tasks { taskID := fmt.Sprintf("%s/%s", task.Namespace, task.Name) for _, node := range ssn.Nodes { - predicateStatus, err := ssn.PredicateFn(task, node) + err := ssn.PredicateFn(task, node) if (test.expected.err == nil || err == nil) && test.expected.err != err { t.Errorf("case%d: task %s on node %s has error, expect: %v, actual: %v", i, taskID, node.Name, test.expected.err, err) continue } - if test.expected.err != nil && test.expected.err.Error() != err.Error() { - t.Errorf("case%d: task %s on node %s has error, expect: %v, actual: %v", - i, taskID, node.Name, test.expected.err, err) + if err == nil { + continue + } + + fitErr := err.(*api.FitError) + predicateStatus := fitErr.Status + + errString := func(fe *api.FitError) string { + if len(fe.Status) > 0 { + return fmt.Sprintf("plugin %s predicates failed, because of %s", fe.Status[0].Plugin, strings.Join(fe.Reasons(), ", ")) + } + return strings.Join(fe.Reasons(), ", ") + } + + errStr := errString(fitErr) + if test.expected.err != nil && test.expected.err.Error() != errStr { + t.Errorf("case%d: task %s on node %s has error:\nexpect: %v\nactual: %v", + i, taskID, node.Name, test.expected.err, errStr) continue } for index := range predicateStatus { if predicateStatus[index].Code != test.expected.predicateStatus[index].Code || predicateStatus[index].Reason != test.expected.predicateStatus[index].Reason { - t.Errorf("case%d: task %s on node %s has error, expect: %v, actual: %v", + t.Errorf("case%d: task %s on node %s has error:\nexpect: %v\nactual: %v", i, taskID, node.Name, test.expected.predicateStatus[index], predicateStatus[index]) continue } @@ -373,14 +310,7 @@ func TestUsage_predicateFn(t *testing.T) { } func TestUsage_nodeOrderFn(t *testing.T) { - var tmp *cache.SchedulerCache - patchUpdateQueueStatus := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "UpdateQueueStatus", func(scCache *cache.SchedulerCache, queue *api.QueueInfo) error { - return nil - }) - defer patchUpdateQueueStatus.Reset() - - framework.RegisterPluginBuilder(PluginName, New) - defer framework.CleanupPluginBuilders() + plugins := map[string]framework.PluginBuilder{PluginName: New} p1 := util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("1", "1Gi"), "pg1", make(map[string]string), make(map[string]string)) @@ -402,47 +332,31 @@ func TestUsage_nodeOrderFn(t *testing.T) { // The node score is 0. nodesUsage[n5.Name] = buildNodeUsage(map[string]float64{source.NODE_METRICS_PERIOD: 0}, map[string]float64{source.NODE_METRICS_PERIOD: 0}, time.Time{}) - pg1 := &schedulingv1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pg1", - Namespace: "c1", - }, - Spec: schedulingv1.PodGroupSpec{ - Queue: "q1", - }, - } - queue1 := &schedulingv1.Queue{ - ObjectMeta: metav1.ObjectMeta{ - Name: "q1", - }, - Spec: schedulingv1.QueueSpec{ - Weight: 1, - }, - } + pg1 := util.BuildPodGroup("pg1", "c1", "q1", 0, nil, "") + + queue1 := util.BuildQueue("q1", 1, nil) tests := []struct { - name string - podGroups []*schedulingv1.PodGroup - queues []*schedulingv1.Queue - pods []*v1.Pod - nodes []*v1.Node + uthelper.TestCommonStruct nodesUsageMap map[string]*api.NodeUsage arguments framework.Arguments expected map[string]map[string]float64 }{ { - name: "Node scoring in the default weight configuration scenario.", - podGroups: []*schedulingv1.PodGroup{ - pg1, - }, - queues: []*schedulingv1.Queue{ - queue1, - }, - pods: []*v1.Pod{ - p1, - }, - nodes: []*v1.Node{ - n1, n2, n3, n4, n5, + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "Node scoring in the default weight configuration scenario.", + PodGroups: []*schedulingv1.PodGroup{ + pg1, + }, + Queues: []*schedulingv1.Queue{ + queue1, + }, + Pods: []*v1.Pod{ + p1, + }, + Nodes: []*v1.Node{ + n1, n2, n3, n4, n5, + }, }, nodesUsageMap: nodesUsage, arguments: framework.Arguments{ @@ -465,18 +379,20 @@ func TestUsage_nodeOrderFn(t *testing.T) { }, }, { - name: "Node scoring gives priority to memory resources", - podGroups: []*schedulingv1.PodGroup{ - pg1, - }, - queues: []*schedulingv1.Queue{ - queue1, - }, - pods: []*v1.Pod{ - p1, - }, - nodes: []*v1.Node{ - n1, n2, n3, n4, n5, + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "Node scoring gives priority to memory resources", + PodGroups: []*schedulingv1.PodGroup{ + pg1, + }, + Queues: []*schedulingv1.Queue{ + queue1, + }, + Pods: []*v1.Pod{ + p1, + }, + Nodes: []*v1.Node{ + n1, n2, n3, n4, n5, + }, }, nodesUsageMap: nodesUsage, arguments: framework.Arguments{ @@ -501,33 +417,9 @@ func TestUsage_nodeOrderFn(t *testing.T) { } for i, test := range tests { - t.Run(test.name, func(t *testing.T) { - schedulerCache := &cache.SchedulerCache{ - Nodes: make(map[string]*api.NodeInfo), - Jobs: make(map[api.JobID]*api.JobInfo), - Queues: make(map[api.QueueID]*api.QueueInfo), - StatusUpdater: &util.FakeStatusUpdater{}, - VolumeBinder: &util.FakeVolumeBinder{}, - - Recorder: record.NewFakeRecorder(100), - } - - for _, node := range test.nodes { - schedulerCache.AddOrUpdateNode(node) - } - for _, pod := range test.pods { - schedulerCache.AddPod(pod) - } - for _, ss := range test.podGroups { - schedulerCache.AddPodGroupV1beta1(ss) - } - for _, q := range test.queues { - schedulerCache.AddQueueV1beta1(q) - } - updateNodeUsage(schedulerCache.Nodes, nodesUsage) - + t.Run(test.Name, func(t *testing.T) { trueValue := true - ssn := framework.OpenSession(schedulerCache, []conf.Tier{ + tiers := []conf.Tier{ { Plugins: []conf.PluginOption{ { @@ -537,8 +429,12 @@ func TestUsage_nodeOrderFn(t *testing.T) { }, }, }, - }, nil) - defer framework.CloseSession(ssn) + } + test.Plugins = plugins + ssn := test.RegisterSession(tiers, nil) + defer test.Close() + + updateNodeUsage(ssn.Nodes, nodesUsage) for _, job := range ssn.Jobs { for _, task := range job.Tasks { diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index b45f27097f..acd6065bd4 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -70,7 +70,7 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI } // TODO (k82cn): Enable eCache for performance improvement. - if _, err := fn(task, node); err != nil { + if err := fn(task, node); err != nil { klog.V(3).Infof("Predicates failed: %v", err) errorLock.Lock() nodeErrorCache[node.Name] = err