diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 4c51f8ba330..4ed6754f100 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -22,6 +22,7 @@ import ( "strings" "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/component-base/config" componentbaseconfigvalidation "k8s.io/component-base/config/validation" @@ -120,14 +121,23 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet, knownControllers []string) { "'-' to disable controllers, e.g. \"-job-controller,-queue-controller\" to disable job and queue controllers.", knownControllers)) } -// CheckOptionOrDie checks the option and returns error if it's invalid +// CheckOptionOrDie checks all options and returns all errors if they are invalid. +// If there are any invalid options, it aggregates all the errors and returns them. func (s *ServerOption) CheckOptionOrDie() error { - // check controllers option + var allErrors []error + + // Check controllers option if err := s.checkControllers(); err != nil { - return err + allErrors = append(allErrors, err) + } + + // Check leader election flag when LeaderElection is enabled. + leaderElectionErr := componentbaseconfigvalidation.ValidateLeaderElectionConfiguration( + &s.LeaderElection, field.NewPath("leaderElection")).ToAggregate() + if leaderElectionErr != nil { + allErrors = append(allErrors, leaderElectionErr) } - // check leader election flag when LeaderElection is enabled. - return componentbaseconfigvalidation.ValidateLeaderElectionConfiguration(&s.LeaderElection, field.NewPath("leaderElection")).ToAggregate() + return errors.NewAggregate(allErrors) } // checkControllers checks the controllers option and returns error if it's invalid diff --git a/pkg/scheduler/api/resource_info.go b/pkg/scheduler/api/resource_info.go index 22611ac39c5..3f6c9f46818 100644 --- a/pkg/scheduler/api/resource_info.go +++ b/pkg/scheduler/api/resource_info.go @@ -429,6 +429,42 @@ func (r *Resource) LessEqual(rr *Resource, defaultValue DimensionDefaultValue) b return true } +// LessEqualWithDimension only compare the resource items in req param +// @param req define the resource item to be compared +// if req is nil, equals r.LessEqual(rr, zero) +func (r *Resource) LessEqualWithDimension(rr *Resource, req *Resource) bool { + if r == nil { + return true + } + if rr == nil { + return false + } + if req == nil { + return r.LessEqual(rr, Zero) + } + + if req.MilliCPU > 0 && r.MilliCPU > rr.MilliCPU { + return false + } + if req.Memory > 0 && r.Memory > rr.Memory { + return false + } + + // if r.scalar is nil, whatever rr.scalar is, r is less or equal to rr + if r.ScalarResources == nil { + return true + } + + for name, quant := range req.ScalarResources { + rQuant := r.ScalarResources[name] + rrQuant := rr.ScalarResources[name] + if quant > 0 && rQuant > rrQuant { + return false + } + } + return true +} + // LessEqualWithResourcesName returns true, []string{} only on condition that all dimensions of resources in r are less than or equal with that of rr, // Otherwise returns false and err string ,which show what resources are insufficient. // @param defaultValue "default value for resource dimension not defined in ScalarResources. Its value can only be one of 'Zero' and 'Infinity'" diff --git a/pkg/scheduler/api/resource_info_test.go b/pkg/scheduler/api/resource_info_test.go index 49edc13e094..8ca0a3b5c99 100644 --- a/pkg/scheduler/api/resource_info_test.go +++ b/pkg/scheduler/api/resource_info_test.go @@ -753,6 +753,96 @@ func TestLessEqual(t *testing.T) { } } +func TestLessEqualWithDimension(t *testing.T) { + tests := []struct { + resource1 *Resource + resource2 *Resource + req *Resource + expected bool + }{ + { + resource1: &Resource{}, + resource2: &Resource{}, + req: nil, + expected: true, + }, + { + resource1: &Resource{MilliCPU: 5000}, + resource2: &Resource{ + MilliCPU: 4000, + Memory: 2000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000}, + }, + req: &Resource{MilliCPU: 1000}, + expected: false, + }, + { + resource1: &Resource{MilliCPU: 3000, Memory: 3000}, + resource2: &Resource{MilliCPU: 4000, Memory: 2000}, + req: &Resource{Memory: 1000}, + expected: false, + }, + { + resource1: &Resource{ + MilliCPU: 4, + Memory: 4000, + }, + resource2: &Resource{}, + req: &Resource{}, + expected: true, + }, + { + resource1: &Resource{ + MilliCPU: 4, Memory: 4000, + ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 1}, + }, + resource2: &Resource{MilliCPU: 8, Memory: 8000}, + req: &Resource{ + MilliCPU: 4, Memory: 2000, + ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 1}, + }, + expected: false, + }, + { + resource1: &Resource{ + MilliCPU: 10, Memory: 4000, + ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 1}, + }, + resource2: &Resource{ + MilliCPU: 100, Memory: 8000, + ScalarResources: map[v1.ResourceName]float64{"nvidia.com/A100": 1}, + }, + req: &Resource{ + MilliCPU: 10, Memory: 4000, + ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 0, "nvidia.com/A100": 1, "scalar": 1}, + }, + expected: true, + }, + { + resource1: &Resource{ + MilliCPU: 110, Memory: 4000, + ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 1, "nvidia.com/A100": 1, "scalar": 1}, + }, + resource2: &Resource{ + MilliCPU: 100, Memory: 8000, + ScalarResources: map[v1.ResourceName]float64{"nvidia.com/A100": 1, "scalar": 1}, + }, + req: &Resource{ + Memory: 4000, + ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 0, "nvidia.com/A100": 1, "scalar": 1}, + }, + expected: true, + }, + } + + for i, test := range tests { + flag := test.resource1.LessEqualWithDimension(test.resource2, test.req) + if !equality.Semantic.DeepEqual(test.expected, flag) { + t.Errorf("Case %v: expected: %#v, got: %#v", i, test.expected, flag) + } + } +} + func TestLessPartly(t *testing.T) { testsForDefaultZero := []struct { resource1 *Resource diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 6e46d41f133..d03ad6f1623 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -183,9 +183,8 @@ type DefaultBinder struct { } // Bind will send bind request to api server -func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*schedulingapi.TaskInfo) ([]*schedulingapi.TaskInfo, []error) { - var errTasks []*schedulingapi.TaskInfo - var errs []error +func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*schedulingapi.TaskInfo) map[schedulingapi.TaskID]string { + errMsg := make(map[schedulingapi.TaskID]string) for _, task := range tasks { p := task.Pod if err := db.kubeclient.CoreV1().Pods(p.Namespace).Bind(context.TODO(), @@ -198,19 +197,13 @@ func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*scheduli }, metav1.CreateOptions{}); err != nil { klog.Errorf("Failed to bind pod <%v/%v> to node %s : %#v", p.Namespace, p.Name, task.NodeName, err) - errTasks = append(errTasks, task) - errs = append(errs, err) + errMsg[task.UID] = err.Error() } else { - db.recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", task.Namespace, task.Name, task.NodeName) metrics.UpdateTaskScheduleDuration(metrics.Duration(p.CreationTimestamp.Time)) // update metrics as soon as pod is bind } } - if len(errTasks) > 0 { - return errTasks, errs - } - - return nil, nil + return errMsg } // NewDefaultBinder create binder with kube client and event recorder, support fake binder if passed fake client and fake event recorder @@ -892,12 +885,18 @@ func (sc *SchedulerCache) Evict(taskInfo *schedulingapi.TaskInfo, reason string) // Bind binds task to the target host. func (sc *SchedulerCache) Bind(tasks []*schedulingapi.TaskInfo) { tmp := time.Now() - errTasks, errs := sc.Binder.Bind(sc.kubeClient, tasks) - if errs == nil { + errMsg := sc.Binder.Bind(sc.kubeClient, tasks) + if len(errMsg) == 0 { klog.V(3).Infof("bind ok, latency %v", time.Since(tmp)) } else { - for i, task := range errTasks { - unschedulableMsg := fmt.Sprintf("failed to bind to node %s: %s", task.NodeName, errs[i]) + klog.V(3).Infof("There are %d tasks in total and %d binds failed, latency %v", len(tasks), len(errMsg), time.Since(tmp)) + } + + for _, task := range tasks { + if reason, ok := errMsg[task.UID]; !ok { + sc.Recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", task.Namespace, task.Name, task.NodeName) + } else { + unschedulableMsg := fmt.Sprintf("failed to bind to node %s: %s", task.NodeName, reason) if err := sc.taskUnschedulable(task, schedulingapi.PodReasonSchedulerError, unschedulableMsg, ""); err != nil { klog.ErrorS(err, "Failed to update pod status when bind task error", "task", task.Name) } diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index b22d25647f9..94ab18f373a 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -99,7 +99,7 @@ type VolumeBinder interface { // Binder interface for binding task and hostname type Binder interface { - Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, []error) + Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) map[api.TaskID]string } // Evictor interface for evict pods diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index f4acc7593b1..73e6854717f 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -40,7 +40,7 @@ var ( Subsystem: VolcanoNamespace, Name: "e2e_scheduling_latency_milliseconds", Help: "E2e scheduling latency in milliseconds (scheduling algorithm + binding)", - Buckets: prometheus.ExponentialBuckets(5, 2, 10), + Buckets: prometheus.ExponentialBuckets(5, 2, 15), }, ) @@ -83,18 +83,18 @@ var ( pluginSchedulingLatency = promauto.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: VolcanoNamespace, - Name: "plugin_scheduling_latency_microseconds", - Help: "Plugin scheduling latency in microseconds", - Buckets: prometheus.ExponentialBuckets(5, 2, 10), + Name: "plugin_scheduling_latency_milliseconds", + Help: "Plugin scheduling latency in milliseconds", + Buckets: prometheus.ExponentialBuckets(5, 2, 15), }, []string{"plugin", "OnSession"}, ) actionSchedulingLatency = promauto.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: VolcanoNamespace, - Name: "action_scheduling_latency_microseconds", - Help: "Action scheduling latency in microseconds", - Buckets: prometheus.ExponentialBuckets(5, 2, 10), + Name: "action_scheduling_latency_milliseconds", + Help: "Action scheduling latency in milliseconds", + Buckets: prometheus.ExponentialBuckets(5, 2, 15), }, []string{"action"}, ) @@ -103,7 +103,7 @@ var ( Subsystem: VolcanoNamespace, Name: "task_scheduling_latency_milliseconds", Help: "Task scheduling latency in milliseconds", - Buckets: prometheus.ExponentialBuckets(5, 2, 10), + Buckets: prometheus.ExponentialBuckets(5, 2, 15), }, ) @@ -150,12 +150,12 @@ var ( // UpdatePluginDuration updates latency for every plugin func UpdatePluginDuration(pluginName, onSessionStatus string, duration time.Duration) { - pluginSchedulingLatency.WithLabelValues(pluginName, onSessionStatus).Observe(DurationInMicroseconds(duration)) + pluginSchedulingLatency.WithLabelValues(pluginName, onSessionStatus).Observe(DurationInMilliseconds(duration)) } // UpdateActionDuration updates latency for every action func UpdateActionDuration(actionName string, duration time.Duration) { - actionSchedulingLatency.WithLabelValues(actionName).Observe(DurationInMicroseconds(duration)) + actionSchedulingLatency.WithLabelValues(actionName).Observe(DurationInMilliseconds(duration)) } // UpdateE2eDuration updates entire end to end scheduling latency diff --git a/pkg/scheduler/plugins/capacity/capacity_test.go b/pkg/scheduler/plugins/capacity/capacity_test.go index 436048f16d7..9a5bb2de3cc 100644 --- a/pkg/scheduler/plugins/capacity/capacity_test.go +++ b/pkg/scheduler/plugins/capacity/capacity_test.go @@ -17,6 +17,7 @@ limitations under the License. package capacity import ( + "os" "testing" corev1 "k8s.io/api/core/v1" @@ -24,6 +25,7 @@ import ( "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/scheduler/actions/allocate" + "volcano.sh/volcano/pkg/scheduler/actions/enqueue" "volcano.sh/volcano/pkg/scheduler/actions/reclaim" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/conf" @@ -33,11 +35,15 @@ import ( "volcano.sh/volcano/pkg/scheduler/util" ) +func TestMain(m *testing.M) { + options.Default() + os.Exit(m.Run()) +} + func Test_capacityPlugin_OnSessionOpen(t *testing.T) { plugins := map[string]framework.PluginBuilder{PluginName: New, predicates.PluginName: predicates.New} trueValue := true actions := []framework.Action{allocate.New(), reclaim.New()} - options.Default() // nodes n1 := util.BuildNode("n1", api.BuildResourceList("2", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"selector": "worker"}) @@ -139,3 +145,98 @@ func Test_capacityPlugin_OnSessionOpen(t *testing.T) { }) } } + +func TestEnqueueAndAllocable(t *testing.T) { + // nodes + n1 := util.BuildNode("n1", api.BuildResourceList("3", "3G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil) + n2 := util.BuildNode("n2", api.BuildResourceList("3", "3G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil) + + // resources + res1c3g := api.BuildResourceList("1", "3G") + res3c1g := api.BuildResourceList("3", "1G") + res1c0g := api.BuildResourceList("1", "0G") + res0c1g := api.BuildResourceList("0", "1G") + res1c1g := api.BuildResourceList("1", "1G") + // pod + p1 := util.BuildPod("ns1", "pod1", "n1", corev1.PodRunning, res1c3g, "pg1", nil, nil) + p2 := util.BuildPod("ns1", "pod2", "n2", corev1.PodRunning, res3c1g, "pg2", nil, nil) + p3 := util.BuildPod("ns1", "pod3", "", corev1.PodPending, res1c0g, "pg3", nil, nil) + p4 := util.BuildPod("ns1", "pod4", "", corev1.PodPending, res0c1g, "pg4", nil, nil) + p5 := util.BuildPod("ns1", "pod5", "", corev1.PodPending, res1c1g, "pg5", nil, nil) + + // podgroup + pg1 := util.BuildPodGroup("pg1", "ns1", "q1", 1, nil, schedulingv1beta1.PodGroupRunning) + pg2 := util.BuildPodGroup("pg2", "ns1", "q2", 1, nil, schedulingv1beta1.PodGroupRunning) + pg3 := util.BuildPodGroup("pg3", "ns1", "q1", 1, nil, schedulingv1beta1.PodGroupPending) + pg4 := util.BuildPodGroup("pg4", "ns1", "q2", 1, nil, schedulingv1beta1.PodGroupPending) + pg5 := util.BuildPodGroup("pg5", "ns1", "q1", 1, nil, schedulingv1beta1.PodGroupPending) + pg1.Spec.MinResources = &res1c3g + pg2.Spec.MinResources = &res3c1g + pg3.Spec.MinResources = &res1c0g + pg4.Spec.MinResources = &res0c1g + pg5.Spec.MinResources = &res1c1g + + queue1 := util.BuildQueueWithResourcesQuantity("q1", api.BuildResourceList("2", "2G"), api.BuildResourceList("2", "2G")) + queue2 := util.BuildQueueWithResourcesQuantity("q2", api.BuildResourceList("2", "2G"), api.BuildResourceList("3", "3G")) + + plugins := map[string]framework.PluginBuilder{PluginName: New} + trueValue := true + tiers := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledAllocatable: &trueValue, + EnablePreemptive: &trueValue, + EnabledOverused: &trueValue, + EnabledJobEnqueued: &trueValue, + }, + }, + }, + } + tests := []uthelper.TestCommonStruct{ + { + Name: "case0: memory exceed derserved, job only request cpu can be enqueued and allocated", + Plugins: plugins, + Pods: []*corev1.Pod{p1, p2, p3}, + Nodes: []*corev1.Node{n1, n2}, + PodGroups: []*schedulingv1beta1.PodGroup{pg1, pg2, pg3}, + Queues: []*schedulingv1beta1.Queue{queue1, queue2}, + ExpectBindsNum: 1, + ExpectBindMap: map[string]string{"ns1/pod3": "n1"}, + }, + { + Name: "case1: cpu exceed derserved, job only request memory can be enqueued and allocated", + Plugins: plugins, + Pods: []*corev1.Pod{p1, p2, p4}, + Nodes: []*corev1.Node{n1, n2}, + PodGroups: []*schedulingv1beta1.PodGroup{pg1, pg2, pg4}, + Queues: []*schedulingv1beta1.Queue{queue1, queue2}, + ExpectBindsNum: 1, + ExpectBindMap: map[string]string{"ns1/pod4": "n2"}, + }, + { + Name: "case2: exceed capacity, can not enqueue", + Plugins: plugins, + Pods: []*corev1.Pod{p1, p2, p5}, + Nodes: []*corev1.Node{n1, n2}, + PodGroups: []*schedulingv1beta1.PodGroup{pg1, pg2, pg5}, + Queues: []*schedulingv1beta1.Queue{queue1, queue2}, + ExpectBindsNum: 0, + ExpectBindMap: map[string]string{}, + }, + } + actions := []framework.Action{enqueue.New(), allocate.New()} + + for i, test := range tests { + t.Run(test.Name, func(t *testing.T) { + test.RegisterSession(tiers, nil) + defer test.Close() + test.Run(actions) + + if err := test.CheckAll(i); err != nil { + t.Fatal(err) + } + }) + } +} diff --git a/pkg/scheduler/plugins/overcommit/overcommit.go b/pkg/scheduler/plugins/overcommit/overcommit.go index d7345233681..021ed9e15eb 100644 --- a/pkg/scheduler/plugins/overcommit/overcommit.go +++ b/pkg/scheduler/plugins/overcommit/overcommit.go @@ -119,7 +119,7 @@ func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) { //TODO: if allow 1 more job to be inqueue beyond overcommit-factor, large job may be inqueue and create pods jobMinReq := api.NewResource(*job.PodGroup.Spec.MinResources) - if inqueue.Add(jobMinReq).LessEqual(idle, api.Zero) { + if inqueue.Add(jobMinReq).LessEqualWithDimension(idle, jobMinReq) { // only compare the requested resource klog.V(4).Infof("Sufficient resources, permit job <%s/%s> to be inqueue", job.Namespace, job.Name) return util.Permit } diff --git a/pkg/scheduler/plugins/proportion/proportion_test.go b/pkg/scheduler/plugins/proportion/proportion_test.go index 70a4c827190..17fba38d34b 100644 --- a/pkg/scheduler/plugins/proportion/proportion_test.go +++ b/pkg/scheduler/plugins/proportion/proportion_test.go @@ -16,34 +16,38 @@ package proportion import ( "io" "net/http" - "reflect" + "os" "strconv" "strings" "testing" "time" - "github.com/agiledragon/gomonkey/v2" "github.com/prometheus/client_golang/prometheus/promhttp" apiv1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" - schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/scheduler/actions/allocate" + "volcano.sh/volcano/pkg/scheduler/actions/enqueue" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/cache" "volcano.sh/volcano/pkg/scheduler/conf" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/plugins/gang" "volcano.sh/volcano/pkg/scheduler/plugins/priority" + "volcano.sh/volcano/pkg/scheduler/uthelper" "volcano.sh/volcano/pkg/scheduler/util" ) +func TestMain(m *testing.M) { + options.Default() + os.Exit(m.Run()) +} + func getWorkerAffinity() *apiv1.Affinity { return &apiv1.Affinity{ PodAntiAffinity: &apiv1.PodAntiAffinity{ @@ -102,22 +106,8 @@ func getLocalMetrics() int { func TestProportion(t *testing.T) { c := make(chan bool, 1) - var tmp *cache.SchedulerCache - patches := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "AddBindTask", func(scCache *cache.SchedulerCache, task *api.TaskInfo) error { - scCache.Binder.Bind(nil, []*api.TaskInfo{task}) - return nil - }) - defer patches.Reset() - - patchUpdateQueueStatus := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "UpdateQueueStatus", func(scCache *cache.SchedulerCache, queue *api.QueueInfo) error { - return nil - }) - defer patchUpdateQueueStatus.Reset() - - framework.RegisterPluginBuilder(PluginName, New) - framework.RegisterPluginBuilder(gang.PluginName, gang.New) - framework.RegisterPluginBuilder(priority.PluginName, priority.New) - options.ServerOpts = options.NewServerOption() + + uthelper.RegisterPlugins(map[string]framework.PluginBuilder{PluginName: New, gang.PluginName: gang.New, priority.PluginName: priority.New}) defer framework.CleanupPluginBuilders() // Running pods @@ -144,58 +134,16 @@ func TestProportion(t *testing.T) { p1 := &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: "p1"}, Value: 1} p2 := &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: "p2"}, Value: 2} // podgroup - pg1 := &schedulingv1beta1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns1", - Name: "pg1", - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "q1", - MinMember: int32(2), - PriorityClassName: p2.Name, - }, - } - pg2 := &schedulingv1beta1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns1", - Name: "pg2", - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "q1", - MinMember: int32(1), - PriorityClassName: p1.Name, - }, - } - pgRes3 := api.BuildResourceList("1", "1k", []api.ScalarResource{{Name: "nvidia.com/gpu", Value: "1"}, {Name: "rdma/hca", Value: "1"}}...) - pg3 := &schedulingv1beta1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "ns1", - Name: "pg3", - }, - Spec: schedulingv1beta1.PodGroupSpec{ - Queue: "q2", - MinMember: int32(1), - PriorityClassName: p1.Name, - MinResources: &pgRes3, - }, - } + pg1 := util.BuildPodGroupWithPrio("pg1", "ns1", "q1", 2, nil, "", p2.Name) + pg2 := util.BuildPodGroupWithPrio("pg2", "ns1", "q1", 1, nil, "", p1.Name) + pg3 := util.BuildPodGroupWithPrio("pg3", "ns1", "q2", 1, nil, "", p1.Name) - // queue - queue1 := &schedulingv1beta1.Queue{ - ObjectMeta: metav1.ObjectMeta{ - Name: "q1", - }, - } + pgRes3 := api.BuildResourceList("1", "1k", []api.ScalarResource{{Name: "nvidia.com/gpu", Value: "1"}, {Name: "rdma/hca", Value: "1"}}...) + pg3.Spec.MinResources = &pgRes3 // queue - queue2 := &schedulingv1beta1.Queue{ - ObjectMeta: metav1.ObjectMeta{ - Name: "q2", - }, - Spec: schedulingv1beta1.QueueSpec{ - Capability: api.BuildResourceList("2", "2k", []api.ScalarResource{{Name: "pods", Value: "10"}, {Name: "nvidia.com/gpu", Value: "4"}}...), - }, - } + queue1 := util.BuildQueue("q1", 0, nil) + queue2 := util.BuildQueue("q2", 0, api.BuildResourceList("2", "2k", []api.ScalarResource{{Name: "pods", Value: "10"}, {Name: "nvidia.com/gpu", Value: "4"}}...)) // tests tests := []struct { @@ -238,18 +186,7 @@ func TestProportion(t *testing.T) { t.Logf("%s: [Event] %s", test.name, event) } }() - schedulerCache := &cache.SchedulerCache{ - Nodes: make(map[string]*api.NodeInfo), - Jobs: make(map[api.JobID]*api.JobInfo), - PriorityClasses: make(map[string]*schedulingv1.PriorityClass), - Queues: make(map[api.QueueID]*api.QueueInfo), - Binder: binder, - StatusUpdater: &util.FakeStatusUpdater{}, - VolumeBinder: &util.FakeVolumeBinder{}, - Recorder: recorder, - } - // deletedJobs to DeletedJobs - schedulerCache.DeletedJobs = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + schedulerCache := cache.NewCustomMockSchedulerCache("mock-test", binder, nil, &util.FakeStatusUpdater{}, nil, &util.FakeVolumeBinder{}, recorder) for _, node := range test.nodes { schedulerCache.AddOrUpdateNode(node) @@ -350,3 +287,146 @@ func TestProportion(t *testing.T) { } } } + +func TestEnqueueAndAllocable(t *testing.T) { + // nodes + n1 := util.BuildNode("n1", api.BuildResourceList("2", "2G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil) + n2 := util.BuildNode("n2", api.BuildResourceList("2", "2G", []api.ScalarResource{{Name: "pods", Value: "10"}}...), nil) + + // resources + res1c2g := api.BuildResourceList("1", "2G") + res2c1g := api.BuildResourceList("2", "1G") + res1c0g := api.BuildResourceList("1", "0G") + res0c1g := api.BuildResourceList("0", "1G") + res1c1g := api.BuildResourceList("1", "1G") + // pod + p1 := util.BuildPod("ns1", "pod1", "n1", apiv1.PodRunning, res1c2g, "pg1", nil, nil) + p2 := util.BuildPod("ns1", "pod2", "n2", apiv1.PodRunning, res2c1g, "pg2", nil, nil) + p3 := util.BuildPod("ns1", "pod3", "", apiv1.PodPending, res1c0g, "pg3", nil, nil) + p4 := util.BuildPod("ns1", "pod4", "", apiv1.PodPending, res0c1g, "pg4", nil, nil) + p5 := util.BuildPod("ns1", "pod5", "", apiv1.PodPending, res1c1g, "pg5", nil, nil) + + // podgroup + pg1 := util.BuildPodGroup("pg1", "ns1", "q1", 1, nil, schedulingv1beta1.PodGroupRunning) + pg2 := util.BuildPodGroup("pg2", "ns1", "q2", 1, nil, schedulingv1beta1.PodGroupRunning) + pg3 := util.BuildPodGroup("pg3", "ns1", "q1", 1, nil, schedulingv1beta1.PodGroupPending) + pg4 := util.BuildPodGroup("pg4", "ns1", "q2", 1, nil, schedulingv1beta1.PodGroupPending) + pg5 := util.BuildPodGroup("pg5", "ns1", "q1", 1, nil, schedulingv1beta1.PodGroupPending) + pg1.Spec.MinResources = &res1c2g + pg2.Spec.MinResources = &res2c1g + pg3.Spec.MinResources = &res1c0g + pg4.Spec.MinResources = &res0c1g + pg5.Spec.MinResources = &res1c1g + + queue1 := util.BuildQueue("q1", 1, api.BuildResourceList("2", "2G")) + queue2 := util.BuildQueue("q2", 1, api.BuildResourceList("3", "3G")) + + plugins := map[string]framework.PluginBuilder{PluginName: New} + trueValue, falseValue := true, false + allEnable := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledAllocatable: &trueValue, + EnabledOverused: &trueValue, + EnabledJobEnqueued: &trueValue, + }, + }, + }, + } + enqueueable := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledAllocatable: &falseValue, + EnabledOverused: &falseValue, + EnabledJobEnqueued: &trueValue, + }, + }, + }, + } + allocatable := []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: PluginName, + EnabledAllocatable: &trueValue, + EnabledOverused: &falseValue, + EnabledJobEnqueued: &falseValue, + }, + }, + }, + } + tests := []struct { + uthelper.TestCommonStruct + tiers []conf.Tier + }{ + { + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "case0: memory exceed derserved, job only request cpu can be enqueued and allocated", + Plugins: plugins, + Pods: []*apiv1.Pod{p1, p2, p3}, + Nodes: []*apiv1.Node{n1, n2}, + PodGroups: []*schedulingv1beta1.PodGroup{pg1, pg2, pg3}, + Queues: []*schedulingv1beta1.Queue{queue1, queue2}, + ExpectBindsNum: 1, + ExpectBindMap: map[string]string{"ns1/pod3": "n1"}, + }, + tiers: allEnable, + }, + { + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "case1: cpu exceed derserved, job only request memory can be enqueued and allocated", + Plugins: plugins, + Pods: []*apiv1.Pod{p1, p2, p4}, + Nodes: []*apiv1.Node{n1, n2}, + PodGroups: []*schedulingv1beta1.PodGroup{pg1, pg2, pg4}, + Queues: []*schedulingv1beta1.Queue{queue1, queue2}, + ExpectBindsNum: 1, + ExpectBindMap: map[string]string{"ns1/pod4": "n2"}, + }, + tiers: allEnable, + }, + { + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "case2: exceed capacity, can not enqueue", + Plugins: plugins, + Pods: []*apiv1.Pod{p1, p2, p5}, + Nodes: []*apiv1.Node{n1, n2}, + PodGroups: []*schedulingv1beta1.PodGroup{pg1, pg2, pg5}, + Queues: []*schedulingv1beta1.Queue{queue1, queue2}, + ExpectBindsNum: 0, + ExpectBindMap: map[string]string{}, + }, + tiers: enqueueable, + }, + { + TestCommonStruct: uthelper.TestCommonStruct{ + Name: "case2: exceed deserved, can not allocate", + Plugins: plugins, + Pods: []*apiv1.Pod{p1, p2, p5}, + Nodes: []*apiv1.Node{n1, n2}, + PodGroups: []*schedulingv1beta1.PodGroup{pg1, pg2, pg5}, + Queues: []*schedulingv1beta1.Queue{queue1, queue2}, + ExpectBindsNum: 0, + ExpectBindMap: map[string]string{}, + }, + tiers: allocatable, + }, + } + actions := []framework.Action{enqueue.New(), allocate.New()} + + for i, test := range tests { + t.Run(test.Name, func(t *testing.T) { + test.RegisterSession(test.tiers, nil) + defer test.Close() + test.Run(actions) + + if err := test.CheckAll(i); err != nil { + t.Fatal(err) + } + }) + } +} diff --git a/pkg/scheduler/util/test_utils.go b/pkg/scheduler/util/test_utils.go index 0038d87539f..3b6737810fb 100644 --- a/pkg/scheduler/util/test_utils.go +++ b/pkg/scheduler/util/test_utils.go @@ -330,7 +330,7 @@ func (fb *FakeBinder) Length() int { } // Bind used by fake binder struct to bind pods -func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, []error) { +func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) map[api.TaskID]string { fb.Lock() defer fb.Unlock() for _, p := range tasks { @@ -339,7 +339,7 @@ func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInf fb.Channel <- key // need to wait binding pod because Bind process is asynchronous } - return nil, nil + return nil } // FakeEvictor is used as fake evictor