From cbf34e2f1a7428d8448bf844f4042a1fc802eb94 Mon Sep 17 00:00:00 2001 From: wpeng102 Date: Sat, 25 Jun 2022 09:15:09 +0800 Subject: [PATCH 1/2] change enqueue to optional action Signed-off-by: wpeng102 --- pkg/scheduler/framework/job_updater.go | 6 ++++++ pkg/scheduler/scheduler.go | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/pkg/scheduler/framework/job_updater.go b/pkg/scheduler/framework/job_updater.go index 23679138c0..c99b1d4a4b 100644 --- a/pkg/scheduler/framework/job_updater.go +++ b/pkg/scheduler/framework/job_updater.go @@ -13,6 +13,8 @@ import ( "volcano.sh/volcano/pkg/scheduler/api" ) +var EnqueueExist bool + const ( jobUpdaterWorker = 16 @@ -98,6 +100,10 @@ func (ju *jobUpdater) updateJob(index int) { job := ju.jobQueue[index] ssn := ju.ssn + if !EnqueueExist && job.PodGroup.Status.Phase == scheduling.PodGroupPending { + job.PodGroup.Status.Phase = scheduling.PodGroupInqueue + } + job.PodGroup.Status = jobStatus(ssn, job) oldStatus, found := ssn.podGroupStatus[job.UID] updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 09dddfdf17..584e5d3bc6 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -102,6 +102,13 @@ func (pc *Scheduler) runOnce() { configurations := pc.configurations pc.mutex.Unlock() + framework.EnqueueExist = false + for _, action := range actions { + if action.Name() == "enqueue" { + framework.EnqueueExist = true + } + } + ssn := framework.OpenSession(pc.cache, plugins, configurations) defer framework.CloseSession(ssn) From a358c94367e99d764142f3d1369cc58d2e2c72f0 Mon Sep 17 00:00:00 2001 From: wpeng102 Date: Sat, 25 Jun 2022 17:08:52 +0800 Subject: [PATCH 2/2] introduce ActionMap to store which action is configured in scheduler configmap and add E2E Signed-off-by: wpeng102 Signed-off-by: wpeng102 Signed-off-by: wpeng102 Signed-off-by: wpeng102 Signed-off-by: wpeng102 Signed-off-by: wpeng102 Signed-off-by: wpeng102 Signed-off-by: wpeng102 Signed-off-by: wpeng102 Signed-off-by: wpeng102 Signed-off-by: wpeng102 --- pkg/scheduler/conf/scheduler_conf.go | 3 + pkg/scheduler/framework/job_updater.go | 6 +- pkg/scheduler/scheduler.go | 7 +- test/e2e/schedulingaction/enqueue.go | 104 +++++++++++++++ test/e2e/util/configmap.go | 169 +++++++++++++++++++++++++ 5 files changed, 282 insertions(+), 7 deletions(-) create mode 100644 test/e2e/schedulingaction/enqueue.go create mode 100644 test/e2e/util/configmap.go diff --git a/pkg/scheduler/conf/scheduler_conf.go b/pkg/scheduler/conf/scheduler_conf.go index cdaed5c7f8..fd0f8f8414 100644 --- a/pkg/scheduler/conf/scheduler_conf.go +++ b/pkg/scheduler/conf/scheduler_conf.go @@ -16,6 +16,9 @@ limitations under the License. package conf +// EnabledActionMap check if a action exist in scheduler configmap. If not exist the value is false. +var EnabledActionMap map[string]bool + // SchedulerConfiguration defines the configuration of scheduler. type SchedulerConfiguration struct { // Actions defines the actions list of scheduler in order diff --git a/pkg/scheduler/framework/job_updater.go b/pkg/scheduler/framework/job_updater.go index c99b1d4a4b..b44621dc3e 100644 --- a/pkg/scheduler/framework/job_updater.go +++ b/pkg/scheduler/framework/job_updater.go @@ -11,10 +11,9 @@ import ( "volcano.sh/apis/pkg/apis/scheduling" "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/conf" ) -var EnqueueExist bool - const ( jobUpdaterWorker = 16 @@ -100,7 +99,8 @@ func (ju *jobUpdater) updateJob(index int) { job := ju.jobQueue[index] ssn := ju.ssn - if !EnqueueExist && job.PodGroup.Status.Phase == scheduling.PodGroupPending { + // If not config enqueue action, change Pending pg into Inqueue statue to avoid blocking job scheduling. + if !conf.EnabledActionMap["enqueue"] && job.PodGroup.Status.Phase == scheduling.PodGroupPending { job.PodGroup.Status.Phase = scheduling.PodGroupInqueue } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 584e5d3bc6..6408c30352 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -102,11 +102,10 @@ func (pc *Scheduler) runOnce() { configurations := pc.configurations pc.mutex.Unlock() - framework.EnqueueExist = false + //Load configmap to check which action is enabled. + conf.EnabledActionMap = make(map[string]bool) for _, action := range actions { - if action.Name() == "enqueue" { - framework.EnqueueExist = true - } + conf.EnabledActionMap[action.Name()] = true } ssn := framework.OpenSession(pc.cache, plugins, configurations) diff --git a/test/e2e/schedulingaction/enqueue.go b/test/e2e/schedulingaction/enqueue.go new file mode 100644 index 0000000000..be4e2046ad --- /dev/null +++ b/test/e2e/schedulingaction/enqueue.go @@ -0,0 +1,104 @@ +/* +Copyright 2021 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schedulingaction + +import ( + "strings" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + "gopkg.in/yaml.v2" + e2eutil "volcano.sh/volcano/test/e2e/util" +) + +var _ = ginkgo.Describe("Enqueue E2E Test", func() { + ginkgo.It("allocate work even not config enqueue action", func() { + ginkgo.By("remove action enqueue from configmap") + cmc := e2eutil.NewConfigMapCase("volcano-system", "integration-scheduler-configmap") + cmc.ChangeBy(func(data map[string]string) (changed bool, changedBefore map[string]string) { + vcScheConfStr, ok := data["volcano-scheduler-ci.conf"] + gomega.Expect(ok).To(gomega.BeTrue()) + + schedulerConf := &e2eutil.SchedulerConfiguration{} + err := yaml.Unmarshal([]byte(vcScheConfStr), schedulerConf) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + actstring := "" + if strings.Contains(schedulerConf.Actions, "enqueue") { + acts := strings.Split(schedulerConf.Actions, ",") + for i, act := range acts { + acts[i] = strings.TrimSpace(act) + if acts[i] != "enqueue" { + actstring += acts[i] + "," + } + } + actstring = strings.TrimRight(actstring, ",") + schedulerConf.Actions = actstring + } + + newVCScheConfBytes, err := yaml.Marshal(schedulerConf) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + changed = true + changedBefore = make(map[string]string) + changedBefore["volcano-scheduler-ci.conf"] = vcScheConfStr + data["volcano-scheduler-ci.conf"] = string(newVCScheConfBytes) + return + }) + defer cmc.UndoChanged() + + ctx := e2eutil.InitTestContext(e2eutil.Options{ + NodesNumLimit: 2, + NodesResourceLimit: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2000m"), + corev1.ResourceMemory: resource.MustParse("2048Mi")}, + }) + defer e2eutil.CleanupTestContext(ctx) + + slot1 := corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("512Mi")} + slot2 := corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000m"), + corev1.ResourceMemory: resource.MustParse("1024Mi")} + + job := &e2eutil.JobSpec{ + Tasks: []e2eutil.TaskSpec{ + { + Img: e2eutil.DefaultNginxImage, + Req: slot1, + Min: 1, + Rep: 1, + }, + }, + } + + job.Name = "low" + lowReqJob := e2eutil.CreateJob(ctx, job) + err := e2eutil.WaitJobReady(ctx, lowReqJob) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + job.Name = "high" + job.Tasks[0].Req = slot2 + highReqJob := e2eutil.CreateJob(ctx, job) + err = e2eutil.WaitJobReady(ctx, highReqJob) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) +}) diff --git a/test/e2e/util/configmap.go b/test/e2e/util/configmap.go new file mode 100644 index 0000000000..52743dbddf --- /dev/null +++ b/test/e2e/util/configmap.go @@ -0,0 +1,169 @@ +package util + +import ( + "context" + "strings" + "time" + + "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type ConfigMapCase struct { + NameSpace string + Name string // configmap.name + + startTs time.Time // start timestamp + undoData map[string]string + ocm *v1.ConfigMap +} + +func NewConfigMapCase(ns, name string) *ConfigMapCase { + return &ConfigMapCase{ + NameSpace: ns, + Name: name, + + undoData: make(map[string]string), + } +} + +// ChangeBy call fn and update configmap by changed +func (c *ConfigMapCase) ChangeBy(fn func(data map[string]string) (changed bool, changedBefore map[string]string)) error { + if c.ocm == nil { + cm, err := KubeClient.CoreV1().ConfigMaps(c.NameSpace).Get(context.TODO(), c.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + c.ocm = cm + } + if changed, changedBefore := fn(c.ocm.Data); changed { + time.Sleep(time.Second) // wait last configmap-change done completely + cm, err := KubeClient.CoreV1().ConfigMaps(c.NameSpace).Update(context.TODO(), c.ocm, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + c.ocm, c.undoData = cm, changedBefore + + // add pod/volcano-scheduler.annotation to update Mounted-ConfigMaps immediately + schedulerPods, err := KubeClient.CoreV1().Pods("volcano-system").List(context.TODO(), metav1.ListOptions{LabelSelector: "app=volcano-scheduler"}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, scheduler := range schedulerPods.Items { + if !strings.HasPrefix(scheduler.Name, "volcano-scheduler") { + continue + } + scheduler.Annotations["refreshts"] = time.Now().Format("060102150405.000") + _, err = KubeClient.CoreV1().Pods("volcano-system").Update(context.TODO(), &scheduler, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + c.startTs = time.Now() + } + return nil +} + +// UndoChanged restore configmap if exist undoData +func (c *ConfigMapCase) UndoChanged() error { + if len(c.undoData) == 0 { + return nil + } + for filename, old := range c.undoData { + c.ocm.Data[filename] = old + } + atLeast := time.Second // at least 1s wait between 2 configmap-change + if dur := time.Now().Sub(c.startTs); dur < atLeast { + time.Sleep(atLeast - dur) + } + cm, err := KubeClient.CoreV1().ConfigMaps(c.NameSpace).Update(context.TODO(), c.ocm, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + c.ocm = cm + + // add pod/volcano-scheduler.annotation to update Mounted-ConfigMaps immediately + schedulerPods, err := KubeClient.CoreV1().Pods("volcano-system").List(context.TODO(), metav1.ListOptions{LabelSelector: "app=volcano-scheduler"}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, scheduler := range schedulerPods.Items { + if !strings.HasPrefix(scheduler.Name, "volcano-scheduler") { + continue + } + scheduler.Annotations["refreshts"] = time.Now().Format("060102150405.000") + _, err = KubeClient.CoreV1().Pods("volcano-system").Update(context.TODO(), &scheduler, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + return nil +} + +// SchedulerConfiguration defines the configuration of scheduler. +type SchedulerConfiguration struct { + // Actions defines the actions list of scheduler in order + Actions string `yaml:"actions"` + // Tiers defines plugins in different tiers + Tiers []Tier `yaml:"tiers,omitempty"` + // Configurations is configuration for actions + Configurations []Configuration `yaml:"configurations,omitempty"` +} + +// Tier defines plugin tier +type Tier struct { + Plugins []PluginOption `yaml:"plugins,omitempty"` +} + +func (t Tier) ContainsPlugin(name string) bool { + return t.GetPluginIdxOf(name) >= 0 +} + +func (t Tier) GetPluginIdxOf(name string) int { + for idx, p := range t.Plugins { + if p.Name == name { + return idx + } + } + return -1 +} + +// Configuration is configuration of action +type Configuration struct { + // Name is name of action + Name string `yaml:"name"` + // Arguments defines the different arguments that can be given to specified action + Arguments map[string]string `yaml:"arguments,omitempty"` +} + +// PluginOption defines the options of plugin +type PluginOption struct { + // The name of Plugin + Name string `yaml:"name"` + // EnabledJobOrder defines whether jobOrderFn is enabled + EnabledJobOrder *bool `yaml:"enableJobOrder,omitempty"` + // EnabledNamespaceOrder defines whether namespaceOrderFn is enabled + EnabledNamespaceOrder *bool `yaml:"enableNamespaceOrder,omitempty"` + // EnabledHierachy defines whether hierarchical sharing is enabled + EnabledHierarchy *bool `yaml:"enableHierarchy,omitempty"` + // EnabledJobReady defines whether jobReadyFn is enabled + EnabledJobReady *bool `yaml:"enableJobReady,omitempty"` + // EnabledJobPipelined defines whether jobPipelinedFn is enabled + EnabledJobPipelined *bool `yaml:"enableJobPipelined,omitempty"` + // EnabledTaskOrder defines whether taskOrderFn is enabled + EnabledTaskOrder *bool `yaml:"enableTaskOrder,omitempty"` + // EnabledPreemptable defines whether preemptableFn is enabled + EnabledPreemptable *bool `yaml:"enablePreemptable,omitempty"` + // EnabledReclaimable defines whether reclaimableFn is enabled + EnabledReclaimable *bool `yaml:"enableReclaimable,omitempty"` + // EnabledQueueOrder defines whether queueOrderFn is enabled + EnabledQueueOrder *bool `yaml:"enableQueueOrder,omitempty"` + // EnabledPredicate defines whether predicateFn is enabled + EnabledClusterOrder *bool `yaml:"EnabledClusterOrder,omitempty"` + // EnableClusterOrder defines whether clusterOrderFn is enabled + EnabledPredicate *bool `yaml:"enablePredicate,omitempty"` + // EnabledBestNode defines whether bestNodeFn is enabled + EnabledBestNode *bool `yaml:"enableBestNode,omitempty"` + // EnabledNodeOrder defines whether NodeOrderFn is enabled + EnabledNodeOrder *bool `yaml:"enableNodeOrder,omitempty"` + // EnabledTargetJob defines whether targetJobFn is enabled + EnabledTargetJob *bool `yaml:"enableTargetJob,omitempty"` + // EnabledReservedNodes defines whether reservedNodesFn is enabled + EnabledReservedNodes *bool `yaml:"enableReservedNodes,omitempty"` + // EnabledJobEnqueued defines whether jobEnqueuedFn is enabled + EnabledJobEnqueued *bool `yaml:"enableJobEnqueued,omitempty"` + // EnabledVictim defines whether victimsFn is enabled + EnabledVictim *bool `yaml:"enabledVictim,omitempty"` + // EnabledJobStarving defines whether jobStarvingFn is enabled + EnabledJobStarving *bool `yaml:"enableJobStarving,omitempty"` + // Arguments defines the different arguments that can be given to different plugins + Arguments map[string]string `yaml:"arguments,omitempty"` +}