Skip to content

Commit

Permalink
improve: consider job's each task's min member
Browse files Browse the repository at this point in the history
Signed-off-by: lowang-bh <[email protected]>

rebase from master

Signed-off-by: lowang-bh <[email protected]>
  • Loading branch information
lowang-bh committed Jun 15, 2024
1 parent 7557262 commit 0d511ce
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 26 deletions.
7 changes: 3 additions & 4 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,10 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
job.NodesFitErrors[task.UID] = fitErrors
// Assume that all left tasks is allocable, can not meet gang-scheduling min member, we should break from continuously allocating
// otherwise, should continue to find other allocable task
// TODO: especially, should check each task spec's left pod can meet their min member
if job.ReadyTaskNum()+int32(tasks.Len()) < job.MinAvailable {
break
} else {
if job.CheckJobNeedContinueAllocating() {
continue
} else {
break
}
}

Expand Down
38 changes: 34 additions & 4 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestAllocate(t *testing.T) {
proportion.PluginName: proportion.New,
predicates.PluginName: predicates.New,
nodeorder.PluginName: nodeorder.New,
gang.PluginName: gang.New,
}
options.Default()
tests := []uthelper.TestCommonStruct{
Expand All @@ -72,10 +73,10 @@ func TestAllocate(t *testing.T) {
Queues: []*schedulingv1.Queue{
util.BuildQueue("c1", 1, nil),
},
Bind: map[string]string{
ExpectBindMap: map[string]string{
"c1/p2": "n1",
},
BindsNum: 1,
ExpectBindsNum: 1,
},
{
Name: "prepredicate failed and tasks are not used up, continue on untill min member meet",
Expand All @@ -96,11 +97,33 @@ func TestAllocate(t *testing.T) {
Queues: []*schedulingv1.Queue{
util.BuildQueue("c1", 1, nil),
},
Bind: map[string]string{
ExpectBindMap: map[string]string{
"c1/p0": "n1",
"c1/p2": "n2",
},
BindsNum: 2,
ExpectBindsNum: 2,
},
{
Name: "master's min member can not be allocated, break from allocating",
PodGroups: []*schedulingv1.PodGroup{
util.BuildPodGroup("pg1", "c1", "c1", 2, map[string]int32{"master": 2, "worker": 0}, schedulingv1.PodGroupInqueue),
},
Pods: []*v1.Pod{
// should use different role, because allocate actions default to enable the role caches when predicate
util.BuildPod("c1", "p0", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "master"}, map[string]string{"nodeRole": "master"}),
util.BuildPod("c1", "p2", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
util.BuildPod("c1", "p3", "", v1.PodPending, api.BuildResourceList("1", "1G"), "pg1", map[string]string{"volcano.sh/task-spec": "worker"}, map[string]string{"nodeRole": "worker"}),
},
Nodes: []*v1.Node{
util.BuildNode("n1", api.BuildResourceList("1", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "master"}),
util.BuildNode("n2", api.BuildResourceList("2", "2Gi", []api.ScalarResource{{Name: "pods", Value: "10"}}...), map[string]string{"nodeRole": "worker"}),
},
Queues: []*schedulingv1.Queue{
util.BuildQueue("c1", 1, nil),
},
ExpectBindMap: map[string]string{},
ExpectBindsNum: 0,
},
{
Name: "one Job with two Pods on one node",
Expand Down Expand Up @@ -186,6 +209,13 @@ func TestAllocate(t *testing.T) {
tiers := []conf.Tier{
{
Plugins: []conf.PluginOption{
{
Name: gang.PluginName,
EnabledJobOrder: &trueValue,
EnabledJobReady: &trueValue,
EnabledJobPipelined: &trueValue,
EnabledJobStarving: &trueValue,
},
{
Name: drf.PluginName,
EnabledPreemptable: &trueValue,
Expand Down
85 changes: 67 additions & 18 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,72 @@ func (ji *JobInfo) PendingBestEffortTaskNum() int32 {
return int32(count)
}

// CheckJobNeedContinueAllocating checks whether it can continue to allocate for current job
// there are two cases to continue:
// 1. job's total allocable number meet its minAvailable
// 2. each task's allocable number meet its independent minAvailable
func (ji *JobInfo) CheckJobNeedContinueAllocating() bool {
failedRoles := map[string]struct{}{}
for tid := range ji.NodesFitErrors {
task := ji.Tasks[tid]
failedRoles[getTaskSpec(task.Pod)] = struct{}{}
}

pending := map[string]int32{}
for _, task := range ji.TaskStatusIndex[Pending] {
pending[getTaskSpec(task.Pod)]++
}
// 1. don't consider each role's min, just consider total allocable number vs job's MinAvailable
if ji.MinAvailable < ji.TaskMinAvailableTotal {
left := int32(0)
for role, cnt := range pending {
if _, ok := failedRoles[role]; !ok {
left += cnt
}
}
return ji.ReadyTaskNum()+left >= ji.MinAvailable
}

// 2. if each task role has its independent minMember, check it
allocated := ji.getJobAllocatedRoles()
for role := range failedRoles {
min := ji.TaskMinAvailable[role]
if min == 0 {
continue
}
// current role predicated failed and it means the left task with same role can not be allocated,
// and allocated number less than minAvailable, it can not be ready
if allocated[role] < min {
return false
}
}

return true
}

// getJobAllocatedRoles returns result records each role's allocated number
func (ji *JobInfo) getJobAllocatedRoles() map[string]int32 {
occupiedMap := map[string]int32{}
for status, tasks := range ji.TaskStatusIndex {
if AllocatedStatus(status) ||
status == Succeeded {
for _, task := range tasks {
occupiedMap[getTaskSpec(task.Pod)]++
}
continue
}

if status == Pending {
for _, task := range tasks {
if task.InitResreq.IsEmpty() {
occupiedMap[getTaskSpec(task.Pod)]++
}
}
}
}
return occupiedMap
}

// CheckTaskValid returns whether each task of job is valid.
func (ji *JobInfo) CheckTaskValid() bool {
// if job minAvailable is less than sum of(task minAvailable), skip this check
Expand Down Expand Up @@ -760,24 +826,7 @@ func (ji *JobInfo) CheckTaskReady() bool {
if ji.MinAvailable < ji.TaskMinAvailableTotal {
return true
}
occupiedMap := map[string]int32{}
for status, tasks := range ji.TaskStatusIndex {
if AllocatedStatus(status) ||
status == Succeeded {
for _, task := range tasks {
occupiedMap[getTaskSpec(task.Pod)]++
}
continue
}

if status == Pending {
for _, task := range tasks {
if task.InitResreq.IsEmpty() {
occupiedMap[getTaskSpec(task.Pod)]++
}
}
}
}
occupiedMap := ji.getJobAllocatedRoles()
for taskSpec, minNum := range ji.TaskMinAvailable {
if occupiedMap[taskSpec] < minNum {
klog.V(4).Infof("Job %s/%s Task %s occupied %v less than task min avaliable", ji.Namespace, ji.Name, taskSpec, occupiedMap[taskSpec])
Expand Down

0 comments on commit 0d511ce

Please sign in to comment.