Skip to content

Commit

Permalink
improve: parse and store task spec in taskInfo; pre-check if task has…
Browse files Browse the repository at this point in the history
… fit error so as to save predicating time

fix comments: use taskRole to store the value of 'volcano.sh/task-spec' in taskInfo

Signed-off-by: lowang-bh <[email protected]>

Signed-off-by: lowang-bh <[email protected]>

Signed-off-by: lowang-bh <[email protected]>
  • Loading branch information
lowang-bh committed Jul 13, 2024
1 parent 0d511ce commit 23c5d4c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 28 deletions.
13 changes: 10 additions & 3 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
continue
}

// check if the task with its spec has already predicates failed
if job.TaskHasFitErrors(task) {
klog.V(5).Infof("Task %s with role spec %s has already predicated failed, skip", task.Name, task.TaskRole)
continue
}

klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name)

if err := ssn.PrePredicateFn(task); err != nil {
Expand All @@ -193,9 +199,10 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, alloc.predicate, true)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
// Assume that all left tasks is allocable, can not meet gang-scheduling min member, we should break from continuously allocating
// otherwise, should continue to find other allocable task
if job.CheckJobNeedContinueAllocating() {
// Assume that all left tasks are allocatable, but can not meet gang-scheduling min member,
// so we should break from continuously allocating.
// otherwise, should continue to find other allocatable task
if job.NeedContinueAllocating() {
continue
} else {
break
Expand Down
79 changes: 55 additions & 24 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type TaskInfo struct {

Name string
Namespace string
TaskRole string // value of "volcano.sh/task-spec"

// Resreq is the resource that used when task running.
Resreq *Resource
Expand Down Expand Up @@ -148,7 +149,10 @@ func getJobID(pod *v1.Pod) JobID {
return ""
}

func getTaskSpec(pod *v1.Pod) string {
func getTaskRole(pod *v1.Pod) string {
if pod == nil {
return ""
}
if ts, found := pod.Annotations[batch.TaskSpecKey]; found && len(ts) != 0 {
return ts
}
Expand All @@ -170,6 +174,7 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
preemptable := GetPodPreemptable(pod)
revocableZone := GetPodRevocableZone(pod)
topologyInfo := GetPodTopologyInfo(pod)
role := getTaskRole(pod)

jobID := getJobID(pod)

Expand All @@ -178,6 +183,7 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
Job: jobID,
Name: pod.Name,
Namespace: pod.Namespace,
TaskRole: role,
Priority: 1,
Pod: pod,
Resreq: resReq,
Expand Down Expand Up @@ -251,6 +257,7 @@ func (ti *TaskInfo) Clone() *TaskInfo {
Job: ti.Job,
Name: ti.Name,
Namespace: ti.Namespace,
TaskRole: ti.TaskRole,
Priority: ti.Priority,
PodVolumes: ti.PodVolumes,
Pod: ti.Pod,
Expand All @@ -269,18 +276,11 @@ func (ti *TaskInfo) Clone() *TaskInfo {
}
}

func (ti *TaskInfo) GetTaskSpecKey() string {
if ti.Pod == nil {
return ""
}
return getTaskSpec(ti.Pod)
}

// String returns the taskInfo details in a string
func (ti TaskInfo) String() string {
res := fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v, "+
res := fmt.Sprintf("Task (%v:%v/%v): taskSpec %s, job %v, status %v, pri %v, "+
"resreq %v, preemptable %v, revocableZone %v",
ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.Priority,
ti.UID, ti.Namespace, ti.Name, ti.TaskRole, ti.Job, ti.Status, ti.Priority,
ti.Resreq, ti.Preemptable, ti.RevocableZone)

if ti.NumaInfo != nil {
Expand Down Expand Up @@ -723,22 +723,53 @@ func (ji *JobInfo) PendingBestEffortTaskNum() int32 {
return int32(count)
}

// CheckJobNeedContinueAllocating checks whether it can continue to allocate for current job
// there are two cases to continue:
// 1. job's total allocable number meet its minAvailable
// 2. each task's allocable number meet its independent minAvailable
func (ji *JobInfo) CheckJobNeedContinueAllocating() bool {
// FitFailedRoles returns the job roles' failed fit records
func (ji *JobInfo) FitFailedRoles() map[string]struct{} {
failedRoles := map[string]struct{}{}
for tid := range ji.NodesFitErrors {
task := ji.Tasks[tid]
failedRoles[getTaskSpec(task.Pod)] = struct{}{}
failedRoles[task.TaskRole] = struct{}{}
}
return failedRoles
}

// TaskHasFitErrors checks if the task has fit errors and can continue try predicating
func (ji *JobInfo) TaskHasFitErrors(task *TaskInfo) bool {
// if the task didn't set the spec key, should not use the cache
if len(task.TaskRole) == 0 {
return false
}

_, exist := ji.FitFailedRoles()[task.TaskRole]
return exist
}

// NeedContinueAllocating checks whether it can continue on allocating for current job
// when its one pod predicated failed, there are two cases to continue:
// 1. job's total allocatable number meet its minAvailable(each task role has no independent minMember setting):
// because there are cases that some of the pods are not allocatable, but other pods are allocatable and
// the number of this kind pods can meet the gang-scheduling
// 2. each task's allocable number meet its independent minAvailable
// this is for the case that each task role has its own independent minMember.
// eg, current role's pod has a failed predicating result but its allocated number has meet its minMember,
// the other roles' pods which have no failed predicating results can continue on
//
// performance analysis:
//
// As the failed predicating role has been pre-checked when it was popped from queue,
// this function will only be called at most as the number of roles in this job.
func (ji *JobInfo) NeedContinueAllocating() bool {
// Ensures all tasks must be running; if any pod allocation fails, further execution stops
if int(ji.MinAvailable) == len(ji.Tasks) {
return false
}
failedRoles := ji.FitFailedRoles()

pending := map[string]int32{}
for _, task := range ji.TaskStatusIndex[Pending] {
pending[getTaskSpec(task.Pod)]++
pending[task.TaskRole]++
}
// 1. don't consider each role's min, just consider total allocable number vs job's MinAvailable
// 1. don't consider each role's min, just consider total allocatable number vs job's MinAvailable
if ji.MinAvailable < ji.TaskMinAvailableTotal {
left := int32(0)
for role, cnt := range pending {
Expand Down Expand Up @@ -773,15 +804,15 @@ func (ji *JobInfo) getJobAllocatedRoles() map[string]int32 {
if AllocatedStatus(status) ||
status == Succeeded {
for _, task := range tasks {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskRole]++
}
continue
}

if status == Pending {
for _, task := range tasks {
if task.InitResreq.IsEmpty() {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskRole]++
}
}
}
Expand All @@ -803,7 +834,7 @@ func (ji *JobInfo) CheckTaskValid() bool {
status == Pipelined ||
status == Pending {
for _, task := range tasks {
actual[getTaskSpec(task.Pod)]++
actual[task.TaskRole]++
}
}
}
Expand Down Expand Up @@ -847,15 +878,15 @@ func (ji *JobInfo) CheckTaskPipelined() bool {
status == Succeeded ||
status == Pipelined {
for _, task := range tasks {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskRole]++
}
continue
}

if status == Pending {
for _, task := range tasks {
if task.InitResreq.IsEmpty() {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskRole]++
}
}
}
Expand All @@ -880,7 +911,7 @@ func (ji *JobInfo) CheckTaskStarving() bool {
status == Succeeded ||
status == Pipelined {
for _, task := range tasks {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskRole]++
}
continue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/util/predicate_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI
}

func taskGroupID(task *api.TaskInfo) string {
return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
return fmt.Sprintf("%s/%s", task.Job, task.TaskRole)
}

func NewPredicateHelper() PredicateHelper {
Expand Down

0 comments on commit 23c5d4c

Please sign in to comment.