Skip to content

Commit

Permalink
Merge pull request #3732 from lowang-bh/cherry-pick-allocatableCompare
Browse files Browse the repository at this point in the history
[cherry-pick for release-1.10] enquable and allocatable compare resource with the required dimensions and add testcaes
  • Loading branch information
volcano-sh-bot authored Sep 18, 2024
2 parents e7afe07 + 9dc4c11 commit 8bd71df
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 24 deletions.
3 changes: 3 additions & 0 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ func (r *Resource) LessEqualWithDimension(rr *Resource, req *Resource) bool {
}

for name, quant := range req.ScalarResources {
if IsIgnoredScalarResource(name) {
continue
}
rQuant := r.ScalarResources[name]
rrQuant := rr.ScalarResources[name]
if quant > 0 && rQuant > rrQuant {
Expand Down
20 changes: 7 additions & 13 deletions pkg/scheduler/plugins/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
task := candidate.(*api.TaskInfo)
attr := cp.queueOpts[queue.UID]

overused := attr.deserved.LessEqualWithDimension(attr.allocated, task.InitResreq)
futureUsed := attr.allocated.Clone().Add(task.Resreq)
overused := !futureUsed.LessEqualWithDimension(attr.deserved, task.Resreq)
metrics.UpdateQueueOverused(attr.name, overused)
if overused {
klog.V(3).Infof("Queue <%v> can not reclaim, deserved <%v>, allocated <%v>, share <%v>",
Expand All @@ -272,8 +273,8 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
ssn.AddAllocatableFn(cp.Name(), func(queue *api.QueueInfo, candidate *api.TaskInfo) bool {
attr := cp.queueOpts[queue.UID]

free, _ := attr.realCapability.Diff(attr.allocated, api.Zero)
allocatable := candidate.Resreq.LessEqual(free, api.Zero)
futureUsed := attr.allocated.Clone().Add(candidate.Resreq)
allocatable := futureUsed.LessEqualWithDimension(attr.realCapability, candidate.Resreq)
if !allocatable {
klog.V(3).Infof("Queue <%v>: realCapability <%v>, allocated <%v>; Candidate <%v>: resource request <%v>",
queue.Name, attr.realCapability, attr.allocated, candidate.Name, candidate.Resreq)
Expand Down Expand Up @@ -303,19 +304,12 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(5).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s> inqueue <%s> elastic <%s>",
job.Name, minReq.String(), queue.Name, attr.realCapability.String(), attr.allocated.String(), attr.inqueue.String(), attr.elastic.String())
// The queue resource quota limit has not reached
r := minReq.Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic)
rr := attr.realCapability.Clone()
r := minReq.Clone().Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic)

for name := range rr.ScalarResources {
if _, ok := r.ScalarResources[name]; !ok {
delete(rr.ScalarResources, name)
}
}

inqueue := r.LessEqual(rr, api.Infinity)
inqueue := r.LessEqualWithDimension(attr.realCapability, minReq)
klog.V(5).Infof("job %s inqueue %v", job.Name, inqueue)
if inqueue {
attr.inqueue.Add(job.DeductSchGatedResources(job.GetMinResources()))
attr.inqueue.Add(job.DeductSchGatedResources(minReq))
return util.Permit
}
ssn.RecordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupUnschedulableType), "queue resource quota insufficient")
Expand Down
24 changes: 24 additions & 0 deletions pkg/scheduler/plugins/capacity/capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ func Test_capacityPlugin_OnSessionOpen(t *testing.T) {
queue8 := util.BuildQueueWithPriorityAndResourcesQuantity("q8", 1, nil, api.BuildResourceList("2", "4Gi"))
queue9 := util.BuildQueueWithPriorityAndResourcesQuantity("q9", 10, nil, api.BuildResourceList("2", "4Gi"))

// case5: p16 + p17 in queue10 will exceed queue's deserved, is not preemptive
p16 := util.BuildPod("ns1", "p16", "n1", corev1.PodRunning, api.BuildResourceList("1", "3Gi"), "pg16", make(map[string]string), nil)
p17 := util.BuildPod("ns1", "p17", "", corev1.PodPending, api.BuildResourceList("1", "1Gi"), "pg17", make(map[string]string), nil)
p18 := util.BuildPod("ns1", "p18", "n1", corev1.PodRunning, api.BuildResourceList("1", "1Gi"), "pg18", make(map[string]string), nil)
// podgroup
pg16 := util.BuildPodGroup("pg16", "ns1", "q10", 1, nil, schedulingv1beta1.PodGroupRunning)
pg17 := util.BuildPodGroup("pg17", "ns1", "q10", 1, nil, schedulingv1beta1.PodGroupInqueue)
pg18 := util.BuildPodGroup("pg18", "ns1", "q11", 1, nil, schedulingv1beta1.PodGroupRunning)
// queue
queue10 := util.BuildQueueWithResourcesQuantity("q10", api.BuildResourceList("2", "2Gi"), api.BuildResourceList("4", "4Gi"))
queue11 := util.BuildQueueWithResourcesQuantity("q11", api.BuildResourceList("0", "0Gi"), api.BuildResourceList("2", "2Gi"))

tests := []uthelper.TestCommonStruct{
{
Name: "case0: Pod allocatable when queue has not exceed capability",
Expand Down Expand Up @@ -182,8 +194,20 @@ func Test_capacityPlugin_OnSessionOpen(t *testing.T) {
ExpectBindMap: map[string]string{
"ns1/p15": "n6",
},

ExpectBindsNum: 1,
},
{
Name: "case5: Can not reclaim from other queues when allocated + req > deserved",
Plugins: plugins,
Pods: []*corev1.Pod{p16, p17, p18},
Nodes: []*corev1.Node{n1},
PodGroups: []*schedulingv1beta1.PodGroup{pg16, pg17, pg18},
Queues: []*schedulingv1beta1.Queue{queue10, queue11},
ExpectPipeLined: map[string][]string{},
ExpectEvicted: []string{},
ExpectEvictNum: 0,
},
}

tiers := []conf.Tier{
Expand Down
15 changes: 4 additions & 11 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
ssn.AddAllocatableFn(pp.Name(), func(queue *api.QueueInfo, candidate *api.TaskInfo) bool {
attr := pp.queueOpts[queue.UID]

free, _ := attr.deserved.Diff(attr.allocated, api.Zero)
allocatable := candidate.Resreq.LessEqual(free, api.Zero)
futureUsed := attr.allocated.Clone().Add(candidate.Resreq)
allocatable := futureUsed.LessEqualWithDimension(attr.deserved, candidate.Resreq)
if !allocatable {
klog.V(3).Infof("Queue <%v>: deserved <%v>, allocated <%v>; Candidate <%v>: resource request <%v>",
queue.Name, attr.deserved, attr.allocated, candidate.Name, candidate.Resreq)
Expand Down Expand Up @@ -346,16 +346,9 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(5).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s> inqueue <%s> elastic <%s>",
job.Name, minReq.String(), queue.Name, attr.realCapability.String(), attr.allocated.String(), attr.inqueue.String(), attr.elastic.String())
// The queue resource quota limit has not reached
r := minReq.Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic)
rr := attr.realCapability.Clone()
r := minReq.Clone().Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic)

for name := range rr.ScalarResources {
if _, ok := r.ScalarResources[name]; !ok {
delete(rr.ScalarResources, name)
}
}

inqueue := r.LessEqual(rr, api.Infinity)
inqueue := r.LessEqualWithDimension(attr.realCapability, minReq)
klog.V(5).Infof("job %s inqueue %v", job.Name, inqueue)
if inqueue {
// deduct the resources of scheduling gated tasks in a job when calculating inqueued resources
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/uthelper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ func (test *TestCommonStruct) Run(actions []framework.Action) {
if len(actions) == 0 {
panic("no actions provided, please specify a list of actions to execute")
}

// registry actions in conf variables
conf.EnabledActionMap = make(map[string]bool, len(actions))
for _, action := range actions {
conf.EnabledActionMap[action.Name()] = true
}

for _, action := range actions {
action.Initialize()
action.Execute(test.ssn)
Expand Down

0 comments on commit 8bd71df

Please sign in to comment.