Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change enqueue to optional action #2309

Merged
merged 2 commits into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ limitations under the License.

package conf

// EnabledActionMap check if a action exist in scheduler configmap. If not exist the value is false.
var EnabledActionMap map[string]bool

// SchedulerConfiguration defines the configuration of scheduler.
type SchedulerConfiguration struct {
// Actions defines the actions list of scheduler in order
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/framework/job_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/conf"
)

const (
Expand Down Expand Up @@ -98,6 +99,11 @@ func (ju *jobUpdater) updateJob(index int) {
job := ju.jobQueue[index]
ssn := ju.ssn

// If not config enqueue action, change Pending pg into Inqueue statue to avoid blocking job scheduling.
if !conf.EnabledActionMap["enqueue"] && job.PodGroup.Status.Phase == scheduling.PodGroupPending {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comments here or extract a small func for this special treatment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments for this block

job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
}

job.PodGroup.Status = jobStatus(ssn, job)
oldStatus, found := ssn.podGroupStatus[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus)
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ func (pc *Scheduler) runOnce() {
configurations := pc.configurations
pc.mutex.Unlock()

//Load configmap to check which action is enabled.
conf.EnabledActionMap = make(map[string]bool)
for _, action := range actions {
conf.EnabledActionMap[action.Name()] = true
}

ssn := framework.OpenSession(pc.cache, plugins, configurations)
defer framework.CloseSession(ssn)

Expand Down
104 changes: 104 additions & 0 deletions test/e2e/schedulingaction/enqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2021 The Volcano Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schedulingaction

import (
"strings"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"gopkg.in/yaml.v2"
e2eutil "volcano.sh/volcano/test/e2e/util"
)

var _ = ginkgo.Describe("Enqueue E2E Test", func() {
ginkgo.It("allocate work even not config enqueue action", func() {
ginkgo.By("remove action enqueue from configmap")
cmc := e2eutil.NewConfigMapCase("volcano-system", "integration-scheduler-configmap")
cmc.ChangeBy(func(data map[string]string) (changed bool, changedBefore map[string]string) {
vcScheConfStr, ok := data["volcano-scheduler-ci.conf"]
gomega.Expect(ok).To(gomega.BeTrue())

schedulerConf := &e2eutil.SchedulerConfiguration{}
err := yaml.Unmarshal([]byte(vcScheConfStr), schedulerConf)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
actstring := ""
if strings.Contains(schedulerConf.Actions, "enqueue") {
acts := strings.Split(schedulerConf.Actions, ",")
for i, act := range acts {
acts[i] = strings.TrimSpace(act)
if acts[i] != "enqueue" {
actstring += acts[i] + ","
}
}
actstring = strings.TrimRight(actstring, ",")
schedulerConf.Actions = actstring
}

newVCScheConfBytes, err := yaml.Marshal(schedulerConf)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

changed = true
changedBefore = make(map[string]string)
changedBefore["volcano-scheduler-ci.conf"] = vcScheConfStr
data["volcano-scheduler-ci.conf"] = string(newVCScheConfBytes)
return
})
defer cmc.UndoChanged()

ctx := e2eutil.InitTestContext(e2eutil.Options{
NodesNumLimit: 2,
NodesResourceLimit: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2000m"),
corev1.ResourceMemory: resource.MustParse("2048Mi")},
})
defer e2eutil.CleanupTestContext(ctx)

slot1 := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("512Mi")}
slot2 := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
corev1.ResourceMemory: resource.MustParse("1024Mi")}

job := &e2eutil.JobSpec{
Tasks: []e2eutil.TaskSpec{
{
Img: e2eutil.DefaultNginxImage,
Req: slot1,
Min: 1,
Rep: 1,
},
},
}

job.Name = "low"
lowReqJob := e2eutil.CreateJob(ctx, job)
err := e2eutil.WaitJobReady(ctx, lowReqJob)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

job.Name = "high"
job.Tasks[0].Req = slot2
highReqJob := e2eutil.CreateJob(ctx, job)
err = e2eutil.WaitJobReady(ctx, highReqJob)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})
})
169 changes: 169 additions & 0 deletions test/e2e/util/configmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package util

import (
"context"
"strings"
"time"

"github.com/onsi/gomega"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type ConfigMapCase struct {
NameSpace string
Name string // configmap.name

startTs time.Time // start timestamp
undoData map[string]string
ocm *v1.ConfigMap
}

func NewConfigMapCase(ns, name string) *ConfigMapCase {
return &ConfigMapCase{
NameSpace: ns,
Name: name,

undoData: make(map[string]string),
}
}

// ChangeBy call fn and update configmap by changed
func (c *ConfigMapCase) ChangeBy(fn func(data map[string]string) (changed bool, changedBefore map[string]string)) error {
if c.ocm == nil {
cm, err := KubeClient.CoreV1().ConfigMaps(c.NameSpace).Get(context.TODO(), c.Name, metav1.GetOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
c.ocm = cm
}
if changed, changedBefore := fn(c.ocm.Data); changed {
time.Sleep(time.Second) // wait last configmap-change done completely
cm, err := KubeClient.CoreV1().ConfigMaps(c.NameSpace).Update(context.TODO(), c.ocm, metav1.UpdateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
c.ocm, c.undoData = cm, changedBefore

// add pod/volcano-scheduler.annotation to update Mounted-ConfigMaps immediately
schedulerPods, err := KubeClient.CoreV1().Pods("volcano-system").List(context.TODO(), metav1.ListOptions{LabelSelector: "app=volcano-scheduler"})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
for _, scheduler := range schedulerPods.Items {
if !strings.HasPrefix(scheduler.Name, "volcano-scheduler") {
continue
}
scheduler.Annotations["refreshts"] = time.Now().Format("060102150405.000")
_, err = KubeClient.CoreV1().Pods("volcano-system").Update(context.TODO(), &scheduler, metav1.UpdateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
c.startTs = time.Now()
}
return nil
}

// UndoChanged restore configmap if exist undoData
func (c *ConfigMapCase) UndoChanged() error {
if len(c.undoData) == 0 {
return nil
}
for filename, old := range c.undoData {
c.ocm.Data[filename] = old
}
atLeast := time.Second // at least 1s wait between 2 configmap-change
if dur := time.Now().Sub(c.startTs); dur < atLeast {
time.Sleep(atLeast - dur)
}
cm, err := KubeClient.CoreV1().ConfigMaps(c.NameSpace).Update(context.TODO(), c.ocm, metav1.UpdateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
c.ocm = cm

// add pod/volcano-scheduler.annotation to update Mounted-ConfigMaps immediately
schedulerPods, err := KubeClient.CoreV1().Pods("volcano-system").List(context.TODO(), metav1.ListOptions{LabelSelector: "app=volcano-scheduler"})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
for _, scheduler := range schedulerPods.Items {
if !strings.HasPrefix(scheduler.Name, "volcano-scheduler") {
continue
}
scheduler.Annotations["refreshts"] = time.Now().Format("060102150405.000")
_, err = KubeClient.CoreV1().Pods("volcano-system").Update(context.TODO(), &scheduler, metav1.UpdateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
return nil
}

// SchedulerConfiguration defines the configuration of scheduler.
type SchedulerConfiguration struct {
// Actions defines the actions list of scheduler in order
Actions string `yaml:"actions"`
// Tiers defines plugins in different tiers
Tiers []Tier `yaml:"tiers,omitempty"`
// Configurations is configuration for actions
Configurations []Configuration `yaml:"configurations,omitempty"`
}

// Tier defines plugin tier
type Tier struct {
Plugins []PluginOption `yaml:"plugins,omitempty"`
}

func (t Tier) ContainsPlugin(name string) bool {
return t.GetPluginIdxOf(name) >= 0
}

func (t Tier) GetPluginIdxOf(name string) int {
for idx, p := range t.Plugins {
if p.Name == name {
return idx
}
}
return -1
}

// Configuration is configuration of action
type Configuration struct {
// Name is name of action
Name string `yaml:"name"`
// Arguments defines the different arguments that can be given to specified action
Arguments map[string]string `yaml:"arguments,omitempty"`
}

// PluginOption defines the options of plugin
type PluginOption struct {
// The name of Plugin
Name string `yaml:"name"`
// EnabledJobOrder defines whether jobOrderFn is enabled
EnabledJobOrder *bool `yaml:"enableJobOrder,omitempty"`
// EnabledNamespaceOrder defines whether namespaceOrderFn is enabled
EnabledNamespaceOrder *bool `yaml:"enableNamespaceOrder,omitempty"`
// EnabledHierachy defines whether hierarchical sharing is enabled
EnabledHierarchy *bool `yaml:"enableHierarchy,omitempty"`
// EnabledJobReady defines whether jobReadyFn is enabled
EnabledJobReady *bool `yaml:"enableJobReady,omitempty"`
// EnabledJobPipelined defines whether jobPipelinedFn is enabled
EnabledJobPipelined *bool `yaml:"enableJobPipelined,omitempty"`
// EnabledTaskOrder defines whether taskOrderFn is enabled
EnabledTaskOrder *bool `yaml:"enableTaskOrder,omitempty"`
// EnabledPreemptable defines whether preemptableFn is enabled
EnabledPreemptable *bool `yaml:"enablePreemptable,omitempty"`
// EnabledReclaimable defines whether reclaimableFn is enabled
EnabledReclaimable *bool `yaml:"enableReclaimable,omitempty"`
// EnabledQueueOrder defines whether queueOrderFn is enabled
EnabledQueueOrder *bool `yaml:"enableQueueOrder,omitempty"`
// EnabledPredicate defines whether predicateFn is enabled
EnabledClusterOrder *bool `yaml:"EnabledClusterOrder,omitempty"`
// EnableClusterOrder defines whether clusterOrderFn is enabled
EnabledPredicate *bool `yaml:"enablePredicate,omitempty"`
// EnabledBestNode defines whether bestNodeFn is enabled
EnabledBestNode *bool `yaml:"enableBestNode,omitempty"`
// EnabledNodeOrder defines whether NodeOrderFn is enabled
EnabledNodeOrder *bool `yaml:"enableNodeOrder,omitempty"`
// EnabledTargetJob defines whether targetJobFn is enabled
EnabledTargetJob *bool `yaml:"enableTargetJob,omitempty"`
// EnabledReservedNodes defines whether reservedNodesFn is enabled
EnabledReservedNodes *bool `yaml:"enableReservedNodes,omitempty"`
// EnabledJobEnqueued defines whether jobEnqueuedFn is enabled
EnabledJobEnqueued *bool `yaml:"enableJobEnqueued,omitempty"`
// EnabledVictim defines whether victimsFn is enabled
EnabledVictim *bool `yaml:"enabledVictim,omitempty"`
// EnabledJobStarving defines whether jobStarvingFn is enabled
EnabledJobStarving *bool `yaml:"enableJobStarving,omitempty"`
// Arguments defines the different arguments that can be given to different plugins
Arguments map[string]string `yaml:"arguments,omitempty"`
}