Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support min-max elastic quota scheduling #3702

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ type Action struct {
session *framework.Session
// configured flag for error cache
enablePredicateErrorCache bool
enableGangCheckOverused bool // indicates whether consider job's min request(gang-scheduling) when check overused
}

func New() *Action {
return &Action{
enablePredicateErrorCache: true, // default to enable it
enablePredicateErrorCache: true, // default to enable it
enableGangCheckOverused: false, // default to disable it
lowang-bh marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -50,6 +52,7 @@ func (alloc *Action) Initialize() {}
func (alloc *Action) parseArguments(ssn *framework.Session) {
arguments := framework.GetArgOfActionFromConf(ssn.Configurations, alloc.Name())
arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
arguments.GetBool(&alloc.enableGangCheckOverused, conf.EnableGangCheckOverusedKey)
}

func (alloc *Action) Execute(ssn *framework.Session) {
Expand Down Expand Up @@ -132,9 +135,11 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a

queue := queues.Pop().(*api.QueueInfo)

if ssn.Overused(queue) {
klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
if !alloc.enableGangCheckOverused {
if ssn.Overused(queue, nil) {
klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
}
}

klog.V(3).Infof("Try to allocate resource to Jobs in Queue <%s>", queue.Name)
Expand All @@ -146,6 +151,21 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a
}

job := jobs.Pop().(*api.JobInfo)

if len(job.TaskStatusIndex[api.Pending]) == 0 {
klog.V(4).Infof("Job <%v/%v> has no pending tasks, skip it.", job.Namespace, job.Name)
queues.Push(queue)
continue
}
// check if job's queue will overused when allocated resource for it
if alloc.enableGangCheckOverused {
if ssn.Overused(queue, job) {
klog.V(3).Infof("Queue <%s> will be overused if resource is allocated to job <%s/%s>, ignore it.", queue.Name, job.Namespace, job.Name)
queues.Push(queue) // should consider other jobs in this queue, so put it back
continue
}
}

if _, found = pendingTasks[job.UID]; !found {
tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
Expand Down
48 changes: 36 additions & 12 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ import (
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/util"
)

type Action struct{}
type Action struct {
enableGangCheckOverused bool
}

func New() *Action {
return &Action{}
return &Action{
enableGangCheckOverused: false,
}
}

func (ra *Action) Name() string {
Expand All @@ -37,10 +42,17 @@ func (ra *Action) Name() string {

func (ra *Action) Initialize() {}

func (ra *Action) parseArguments(ssn *framework.Session) {
arguments := framework.GetArgOfActionFromConf(ssn.Configurations, ra.Name())
arguments.GetBool(&ra.enableGangCheckOverused, conf.EnableGangCheckOverusedKey)
}

func (ra *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Reclaim ...")
defer klog.V(5).Infof("Leaving Reclaim ...")

ra.parseArguments(ssn)

queues := util.NewPriorityQueue(ssn.QueueOrderFn)
queueMap := map[api.QueueID]*api.QueueInfo{}

Expand Down Expand Up @@ -94,9 +106,11 @@ func (ra *Action) Execute(ssn *framework.Session) {
var task *api.TaskInfo

queue := queues.Pop().(*api.QueueInfo)
if ssn.Overused(queue) {
klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
if !ra.enableGangCheckOverused {
if ssn.Overused(queue, nil) {
klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
}
}

// Found "high" priority job
Expand All @@ -123,14 +137,24 @@ func (ra *Action) Execute(ssn *framework.Session) {
continue
}

if !ssn.Allocatable(queue, task) {
klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name)
continue
}
if ra.enableGangCheckOverused {
if !ssn.Preemptive(queue, job) {
klog.V(3).Infof("Queue <%s> can not reclaim by preempt others when considering job <%s> , ignore it.", queue.Name, job.Name)
queues.Push(queue) // need considering the next job in queue
continue
}
} else {
if !ssn.Allocatable(queue, task) {
klog.V(3).Infof("Queue <%s> is overused when considering task <%s>, ignore it.", queue.Name, task.Name)
queues.Push(queue)
continue
}

if !ssn.Preemptive(queue, task) {
klog.V(3).Infof("Queue <%s> can not reclaim by preempt others when considering task <%s> , ignore it.", queue.Name, task.Name)
continue
if !ssn.Preemptive(queue, task) {
klog.V(3).Infof("Queue <%s> can not reclaim by preempt others when considering task <%s> , ignore it.", queue.Name, task.Name)
queues.Push(queue)
continue
}
}

if err := ssn.PrePredicateFn(task); err != nil {
Expand Down
104 changes: 104 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/conf"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/plugins/capacity"
"volcano.sh/volcano/pkg/scheduler/plugins/conformance"
"volcano.sh/volcano/pkg/scheduler/plugins/gang"
"volcano.sh/volcano/pkg/scheduler/plugins/priority"
Expand Down Expand Up @@ -198,3 +199,106 @@ func TestReclaim(t *testing.T) {
})
}
}

func TestEnableGangReclaim(t *testing.T) {
req1 := api.BuildResourceList("1", "1G")
req2 := api.BuildResourceList("2", "2G")
min := api.BuildResourceList("2", "2G")
mid := api.BuildResourceList("3", "3G")
max := api.BuildResourceList("4", "4G") // 2*req2
common := uthelper.TestCommonStruct{
Plugins: map[string]framework.PluginBuilder{
conformance.PluginName: conformance.New,
gang.PluginName: gang.New,
capacity.PluginName: capacity.New,
},
PodGroups: []*schedulingv1beta1.PodGroup{
util.BuildPodGroupWithMinResources("pg0", "c1", "q1", 0, nil, nil, schedulingv1beta1.PodGroupRunning),
util.BuildPodGroupWithMinResources("pg1", "c1", "q2", 1, nil, req1, schedulingv1beta1.PodGroupRunning),
util.BuildPodGroupWithMinResources("pg2", "c1", "q2", 2, nil, max, schedulingv1beta1.PodGroupInqueue),
},
Pods: []*v1.Pod{
util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, req2, "pg0", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, req1, "pg1", map[string]string{schedulingv1beta1.PodPreemptable: "true"}, make(map[string]string)),
util.BuildPod("c1", "preemptor1", "", v1.PodPending, req2, "pg2", nil, nil),
util.BuildPod("c1", "preemptor2", "", v1.PodPending, req2, "pg2", nil, nil),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("4", "4Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), make(map[string]string)),
},
Queues: []*schedulingv1beta1.Queue{
util.BuildQueueWithResourcesQuantity("q1", req1, min),
util.BuildQueueWithResourcesQuantity("q2", mid, max),
},
}
tests := []struct {
enableGang bool
uthelper.TestCommonStruct
}{
{
enableGang: false,
TestCommonStruct: uthelper.TestCommonStruct{
Name: "when enableGangCheckOverused=false, can reclaim one pod but can not meet gang",
Plugins: common.Plugins,
PodGroups: common.PodGroups,
Pods: common.Pods,
Nodes: common.Nodes,
Queues: common.Queues,
ExpectEvictNum: 1,
ExpectEvicted: []string{"c1/preemptee1"},
},
},
{
enableGang: true,
TestCommonStruct: uthelper.TestCommonStruct{
Name: "when enableGangCheckOverused=true, can not reclaim",
Plugins: common.Plugins,
PodGroups: common.PodGroups,
Pods: common.Pods,
Nodes: common.Nodes,
Queues: common.Queues,
ExpectEvictNum: 0,
ExpectEvicted: []string{},
},
},
}

reclaim := New()
trueValue := true
tiers := []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: conformance.PluginName,
EnabledReclaimable: &trueValue,
},
{
Name: gang.PluginName,
EnabledReclaimable: &trueValue,
},
{
Name: capacity.PluginName,
EnabledReclaimable: &trueValue,
EnabledOverused: &trueValue,
EnabledAllocatable: &trueValue,
EnablePreemptive: &trueValue,
},
{
Name: priority.PluginName,
EnabledJobOrder: &trueValue,
EnabledTaskOrder: &trueValue,
},
},
},
}
for i, test := range tests {
t.Run(test.Name, func(t *testing.T) {
test.RegisterSession(tiers, []conf.Configuration{{Name: reclaim.Name(), Arguments: map[string]interface{}{conf.EnableGangCheckOverusedKey: test.enableGang}}})
defer test.Close()
test.Run([]framework.Action{reclaim})
if err := test.CheckAll(i); err != nil {
t.Fatal(err)
}
})
}
}
2 changes: 2 additions & 0 deletions pkg/scheduler/conf/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ package conf
const (
// EnablePredicateErrCacheKey is the key whether predicate error cache is enabled
EnablePredicateErrCacheKey = "predicateErrorCacheEnable"
// EnableGangCheckOverusedKey is the key whether check the job's min request when check queue overused
EnableGangCheckOverusedKey = "overusedCheckGangEnable"
)
4 changes: 2 additions & 2 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Session struct {
nodeReduceFns map[string]api.NodeReduceFn
preemptableFns map[string]api.EvictableFn
reclaimableFns map[string]api.EvictableFn
overusedFns map[string]api.ValidateFn
overusedFns map[string]api.ValidateWithCandidateFn
// preemptiveFns means whether current queue can reclaim from other queue,
// while reclaimableFns means whether current queue's resources can be reclaimed.
preemptiveFns map[string]api.ValidateWithCandidateFn
Expand Down Expand Up @@ -143,7 +143,7 @@ func openSession(cache cache.Cache) *Session {
nodeReduceFns: map[string]api.NodeReduceFn{},
preemptableFns: map[string]api.EvictableFn{},
reclaimableFns: map[string]api.EvictableFn{},
overusedFns: map[string]api.ValidateFn{},
overusedFns: map[string]api.ValidateWithCandidateFn{},
preemptiveFns: map[string]api.ValidateWithCandidateFn{},
allocatableFns: map[string]api.AllocatableFn{},
jobReadyFns: map[string]api.ValidateFn{},
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (ssn *Session) AddNodeReduceFn(name string, pf api.NodeReduceFn) {
}

// AddOverusedFn add overused function
func (ssn *Session) AddOverusedFn(name string, fn api.ValidateFn) {
func (ssn *Session) AddOverusedFn(name string, fn api.ValidateWithCandidateFn) {
ssn.overusedFns[name] = fn
}

Expand Down Expand Up @@ -255,7 +255,7 @@ func (ssn *Session) Preemptable(preemptor *api.TaskInfo, preemptees []*api.TaskI
}

// Overused invoke overused function of the plugins
func (ssn *Session) Overused(queue *api.QueueInfo) bool {
func (ssn *Session) Overused(queue *api.QueueInfo, req interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledOverused) {
Expand All @@ -265,7 +265,7 @@ func (ssn *Session) Overused(queue *api.QueueInfo) bool {
if !found {
continue
}
if of(queue) {
if of(queue, req) {
return true
}
}
Expand All @@ -275,7 +275,7 @@ func (ssn *Session) Overused(queue *api.QueueInfo) bool {
}

// Preemptive invoke can preemptive function of the plugins
func (ssn *Session) Preemptive(queue *api.QueueInfo, candidate *api.TaskInfo) bool {
func (ssn *Session) Preemptive(queue *api.QueueInfo, candidate interface{}) bool {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
of, found := ssn.preemptiveFns[plugin.Name]
Expand Down
53 changes: 49 additions & 4 deletions pkg/scheduler/plugins/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,64 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
return victims, util.Permit
})

ssn.AddOverusedFn(cp.Name(), func(obj interface{}, req interface{}) bool {
queue := obj.(*api.QueueInfo)
attr := cp.queueOpts[queue.UID]
var overused, preemptable bool
var allocated *api.Resource
var candidateName string

// overused = false default, this will keep original: capacity has no overused check and return false
if req != nil {
allocated = attr.allocated.Clone()
var resReq *api.Resource
if job, ok := req.(*api.JobInfo); ok {
preemptable = job.Preemptable
resReq = job.GetMinResources()
allocated.Add(resReq) // future used resource when job is allocated
candidateName = job.Name
} else if task, ok := req.(*api.TaskInfo); ok {
preemptable = task.Preemptable
resReq = task.Resreq
allocated.Add(resReq)
candidateName = task.Name
}
if preemptable { // preemptable jobs/tasks can use up to capacity of queue
overused = !allocated.LessEqualWithDimension(attr.capability, resReq)
} else {
overused = !allocated.LessEqualWithDimension(attr.deserved, resReq)
}
}
metrics.UpdateQueueOverused(attr.name, overused)
if overused {
klog.V(3).Infof("Queue <%v>: deserved <%v>, future allocated <%v>, share <%v>, capability <%v>, candidate=%s, preemptable=%v",
queue.Name, attr.deserved, allocated, attr.share, attr.capability, candidateName, preemptable)
}
return overused
})

ssn.AddPreemptiveFn(cp.Name(), func(obj interface{}, candidate interface{}) bool {
if !readyToSchedule {
klog.V(3).Infof("Capacity plugin failed to check queue's hierarchical structure!")
return false
}

queue := obj.(*api.QueueInfo)
task := candidate.(*api.TaskInfo)
attr := cp.queueOpts[queue.UID]

futureUsed := attr.allocated.Clone().Add(task.Resreq)
overused := !futureUsed.LessEqualWithDimension(attr.deserved, task.Resreq)
metrics.UpdateQueueOverused(attr.name, overused)
var overused bool // default preemptive is true
if task, ok1 := candidate.(*api.TaskInfo); ok1 {
futureUsed := attr.allocated.Clone().Add(task.Resreq)
overused = !futureUsed.LessEqualWithDimension(attr.deserved, task.Resreq)
} else if job, ok2 := candidate.(*api.JobInfo); ok2 {
request := job.GetMinResources()
futureUsed := attr.allocated.Clone().Add(request)
// no matter preemtable or not, if used will exceed deserverd, cannot reclaim
preeptive := futureUsed.LessEqualWithDimension(attr.deserved, request)
overused = !preeptive
}

//metrics.UpdateQueueOverused(attr.name, overused)
if overused {
klog.V(3).Infof("Queue <%v> can not reclaim, deserved <%v>, allocated <%v>, share <%v>",
queue.Name, attr.deserved, attr.allocated, attr.share)
Expand Down
Loading
Loading