Skip to content

Commit

Permalink
improve: parse and store task spec in taskInfo
Browse files Browse the repository at this point in the history
Signed-off-by: lowang-bh <[email protected]>
  • Loading branch information
lowang-bh committed May 23, 2024
1 parent 8b788b9 commit 1ece51e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 24 deletions.
4 changes: 2 additions & 2 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, alloc.predicate, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
// Assume that all left tasks is allocable, can not meet gang-scheduling min member, we should break from continuously allocating
// otherwise, should continue to find other allocable task
// Assume that all left tasks is allocatable, can not meet gang-scheduling min member, we should break from continuously allocating
// otherwise, should continue to find other allocatable task
if job.CheckJobNeedContinueAllocating() {
continue
} else {
Expand Down
47 changes: 26 additions & 21 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type TaskInfo struct {

Name string
Namespace string
TaskSpec string // value of "volcano.sh/task-spec"

// Resreq is the resource that used when task running.
Resreq *Resource
Expand Down Expand Up @@ -149,6 +150,9 @@ func getJobID(pod *v1.Pod) JobID {
}

func getTaskSpec(pod *v1.Pod) string {
if pod == nil {
return ""
}
if ts, found := pod.Annotations[batch.TaskSpecKey]; found && len(ts) != 0 {
return ts
}
Expand All @@ -170,6 +174,7 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
preemptable := GetPodPreemptable(pod)
revocableZone := GetPodRevocableZone(pod)
topologyInfo := GetPodTopologyInfo(pod)
spec := getTaskSpec(pod)

jobID := getJobID(pod)

Expand All @@ -178,6 +183,7 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
Job: jobID,
Name: pod.Name,
Namespace: pod.Namespace,
TaskSpec: spec,
Priority: 1,
Pod: pod,
Resreq: resReq,
Expand Down Expand Up @@ -251,6 +257,7 @@ func (ti *TaskInfo) Clone() *TaskInfo {
Job: ti.Job,
Name: ti.Name,
Namespace: ti.Namespace,
TaskSpec: ti.TaskSpec,
Priority: ti.Priority,
PodVolumes: ti.PodVolumes,
Pod: ti.Pod,
Expand All @@ -269,18 +276,11 @@ func (ti *TaskInfo) Clone() *TaskInfo {
}
}

func (ti *TaskInfo) GetTaskSpecKey() string {
if ti.Pod == nil {
return ""
}
return getTaskSpec(ti.Pod)
}

// String returns the taskInfo details in a string
func (ti TaskInfo) String() string {
res := fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v, "+
res := fmt.Sprintf("Task (%v:%v/%v): taskSpec %s, job %v, status %v, pri %v, "+
"resreq %v, preemptable %v, revocableZone %v",
ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.Priority,
ti.UID, ti.Namespace, ti.Name, ti.TaskSpec, ti.Job, ti.Status, ti.Priority,
ti.Resreq, ti.Preemptable, ti.RevocableZone)

if ti.NumaInfo != nil {
Expand Down Expand Up @@ -715,22 +715,27 @@ func (ji *JobInfo) PendingBestEffortTaskNum() int32 {
return int32(count)
}

// CheckJobNeedContinueAllocating checks whether it can continue to allocate for current job
// there are two cases to continue:
// 1. job's total allocable number meet its minAvailable
// CheckJobNeedContinueAllocating checks whether it can continue on allocating for current job
// when its one pod predicated failed, there are two cases to continue:
// 1. job's total allocatable number meet its minAvailable(each task role has no independent minMember setting):
// because there are cases that some of the pods are not allocatable, but other pods are allocatable and
// the number of this kind pods can meet the gang-scheduling
// 2. each task's allocable number meet its independent minAvailable
// this is for the case that each task role has its own independent minMember.
// eg, current role's pod has a failed predicating result but its allocated number has meet its minMember,
// the other roles' pods which have no failed predicating results can continue on
func (ji *JobInfo) CheckJobNeedContinueAllocating() bool {
failedRoles := map[string]struct{}{}
for tid := range ji.NodesFitErrors {
task := ji.Tasks[tid]
failedRoles[getTaskSpec(task.Pod)] = struct{}{}
failedRoles[task.TaskSpec] = struct{}{}
}

pending := map[string]int32{}
for _, task := range ji.TaskStatusIndex[Pending] {
pending[getTaskSpec(task.Pod)]++
pending[task.TaskSpec]++
}
// 1. don't consider each role's min, just consider total allocable number vs job's MinAvailable
// 1. don't consider each role's min, just consider total allocatable number vs job's MinAvailable
if ji.MinAvailable < ji.TaskMinAvailableTotal {
left := int32(0)
for role, cnt := range pending {
Expand Down Expand Up @@ -765,15 +770,15 @@ func (ji *JobInfo) getJobAllocatedRoles() map[string]int32 {
if AllocatedStatus(status) ||
status == Succeeded {
for _, task := range tasks {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskSpec]++
}
continue
}

if status == Pending {
for _, task := range tasks {
if task.InitResreq.IsEmpty() {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskSpec]++
}
}
}
Expand All @@ -795,7 +800,7 @@ func (ji *JobInfo) CheckTaskValid() bool {
status == Pipelined ||
status == Pending {
for _, task := range tasks {
actual[getTaskSpec(task.Pod)]++
actual[task.TaskSpec]++
}
}
}
Expand Down Expand Up @@ -839,15 +844,15 @@ func (ji *JobInfo) CheckTaskPipelined() bool {
status == Succeeded ||
status == Pipelined {
for _, task := range tasks {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskSpec]++
}
continue
}

if status == Pending {
for _, task := range tasks {
if task.InitResreq.IsEmpty() {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskSpec]++
}
}
}
Expand All @@ -872,7 +877,7 @@ func (ji *JobInfo) CheckTaskStarving() bool {
status == Succeeded ||
status == Pipelined {
for _, task := range tasks {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskSpec]++
}
continue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/util/predicate_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI
}

func taskGroupID(task *api.TaskInfo) string {
return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
return fmt.Sprintf("%s/%s", task.Job, task.TaskSpec)
}

func NewPredicateHelper() PredicateHelper {
Expand Down

0 comments on commit 1ece51e

Please sign in to comment.