Skip to content

Commit

Permalink
improve: parse and store task spec in taskInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
lowang-bh committed May 13, 2024
1 parent e76351e commit bfe1bb4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
34 changes: 17 additions & 17 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type TaskInfo struct {

Name string
Namespace string
TaskSpec string // value of "volcano.sh/task-spec"

// Resreq is the resource that used when task running.
Resreq *Resource
Expand Down Expand Up @@ -149,6 +150,9 @@ func getJobID(pod *v1.Pod) JobID {
}

func getTaskSpec(pod *v1.Pod) string {
if pod == nil {
return ""
}
if ts, found := pod.Annotations[batch.TaskSpecKey]; found && len(ts) != 0 {
return ts
}
Expand All @@ -170,6 +174,7 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
preemptable := GetPodPreemptable(pod)
revocableZone := GetPodRevocableZone(pod)
topologyInfo := GetPodTopologyInfo(pod)
spec := getTaskSpec(pod)

jobID := getJobID(pod)

Expand All @@ -178,6 +183,7 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
Job: jobID,
Name: pod.Name,
Namespace: pod.Namespace,
TaskSpec: spec,
Priority: 1,
Pod: pod,
Resreq: resReq,
Expand Down Expand Up @@ -251,6 +257,7 @@ func (ti *TaskInfo) Clone() *TaskInfo {
Job: ti.Job,
Name: ti.Name,
Namespace: ti.Namespace,
TaskSpec: ti.TaskSpec,
Priority: ti.Priority,
PodVolumes: ti.PodVolumes,
Pod: ti.Pod,
Expand All @@ -269,18 +276,11 @@ func (ti *TaskInfo) Clone() *TaskInfo {
}
}

func (ti *TaskInfo) GetTaskSpecKey() string {
if ti.Pod == nil {
return ""
}
return getTaskSpec(ti.Pod)
}

// String returns the taskInfo details in a string
func (ti TaskInfo) String() string {
res := fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v, "+
res := fmt.Sprintf("Task (%v:%v/%v): taskSpec %s, job %v, status %v, pri %v, "+
"resreq %v, preemptable %v, revocableZone %v",
ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.Priority,
ti.UID, ti.Namespace, ti.Name, ti.TaskSpec, ti.Job, ti.Status, ti.Priority,
ti.Resreq, ti.Preemptable, ti.RevocableZone)

if ti.NumaInfo != nil {
Expand Down Expand Up @@ -723,12 +723,12 @@ func (ji *JobInfo) CheckJobNeedContinueAllocating() bool {
failedRoles := map[string]struct{}{}
for tid := range ji.NodesFitErrors {
task := ji.Tasks[tid]
failedRoles[getTaskSpec(task.Pod)] = struct{}{}
failedRoles[task.TaskSpec] = struct{}{}
}

pending := map[string]int32{}
for _, task := range ji.TaskStatusIndex[Pending] {
pending[getTaskSpec(task.Pod)]++
pending[task.TaskSpec]++
}
// 1. don't consider each role's min, just consider total allocable number vs job's MinAvailable
if ji.MinAvailable < ji.TaskMinAvailableTotal {
Expand Down Expand Up @@ -765,15 +765,15 @@ func (ji *JobInfo) getJobAllocatedRoles() map[string]int32 {
if AllocatedStatus(status) ||
status == Succeeded {
for _, task := range tasks {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskSpec]++
}
continue
}

if status == Pending {
for _, task := range tasks {
if task.InitResreq.IsEmpty() {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskSpec]++
}
}
}
Expand All @@ -795,7 +795,7 @@ func (ji *JobInfo) CheckTaskValid() bool {
status == Pipelined ||
status == Pending {
for _, task := range tasks {
actual[getTaskSpec(task.Pod)]++
actual[task.TaskSpec]++
}
}
}
Expand Down Expand Up @@ -839,15 +839,15 @@ func (ji *JobInfo) CheckTaskPipelined() bool {
status == Succeeded ||
status == Pipelined {
for _, task := range tasks {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskSpec]++
}
continue
}

if status == Pending {
for _, task := range tasks {
if task.InitResreq.IsEmpty() {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskSpec]++
}
}
}
Expand All @@ -872,7 +872,7 @@ func (ji *JobInfo) CheckTaskStarving() bool {
status == Succeeded ||
status == Pipelined {
for _, task := range tasks {
occupiedMap[getTaskSpec(task.Pod)]++
occupiedMap[task.TaskSpec]++
}
continue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/util/predicate_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI
}

func taskGroupID(task *api.TaskInfo) string {
return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
return fmt.Sprintf("%s/%s", task.Job, task.TaskSpec)
}

func NewPredicateHelper() PredicateHelper {
Expand Down

0 comments on commit bfe1bb4

Please sign in to comment.