Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enhanced FitError of JobInfo by logging the insufficient resources #3052

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func (ji JobInfo) String() string {

// FitError returns detailed information on why a job's task failed to fit on
// each available node
func (ji *JobInfo) FitError() string {
func (ji *JobInfo) FitError(nodeMap map[string]*NodeInfo) string {
sortReasonsHistogram := func(reasons map[string]int) []string {
reasonStrings := []string{}
for k, v := range reasons {
Expand All @@ -620,6 +620,31 @@ func (ji *JobInfo) FitError() string {
return reasonStrings
}

getUnavailableResources := func(nodeMap map[string]*NodeInfo, tasks tasksMap) string {
if nodeMap == nil {
return ""
}
var unavailableResources []string
seenResources := make(map[string]bool)
for uid := range tasks {
nodeName := tasks[uid].NodeName
if nodeName != "" {
taskResource := tasks[uid].InitResreq
nodeResource := nodeMap[nodeName].Allocatable
for _, rn := range taskResource.ResourceNames() {
if nodeResource.Get(rn) < taskResource.Get(rn) {
msg := fmt.Sprintf("Insufficient resource %s in %s: required %f available %f", rn.String(), nodeName, taskResource.Get(rn), nodeResource.Get(rn))
if !seenResources[msg] {
unavailableResources = append(unavailableResources, msg)
seenResources[msg] = true
}
}
}
}
}
return strings.Join(unavailableResources, ", ")
}

// Stat histogram for all tasks of the job
reasons := make(map[string]int)
for status, taskMap := range ji.TaskStatusIndex {
Expand All @@ -630,12 +655,13 @@ func (ji *JobInfo) FitError() string {

// Stat histogram for pending tasks only
reasons = make(map[string]int)
resourceError := getUnavailableResources(nodeMap, ji.Tasks)
for uid := range ji.TaskStatusIndex[Pending] {
reason, _ := ji.TaskSchedulingReason(uid)
reasons[reason]++
}
if len(reasons) > 0 {
reasonMsg += "; " + fmt.Sprintf("%s: %s", Pending.String(), strings.Join(sortReasonsHistogram(reasons), ", "))
reasonMsg += "; " + fmt.Sprintf("%s: %s", Pending.String(), strings.Join(sortReasonsHistogram(reasons), ", "+resourceError))
}
return reasonMsg
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/api/job_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func TestTaskSchedulingReason(t *testing.T) {
task.Status = Pending
job.TaskStatusIndex[Pending][task.UID] = task
}
job.JobFitErrors = job.FitError()
job.JobFitErrors = job.FitError(nil)

// assert
for uid, exp := range test.expected {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo) {
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
len(job.TaskStatusIndex[schedulingapi.Pending]),
len(job.Tasks),
job.FitError())
job.FitError(sc.Nodes))
sc.recordPodGroupEvent(job.PodGroup, v1.EventTypeWarning, string(scheduling.PodGroupUnschedulableType), msg)
} else {
sc.recordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupScheduled), string(scheduling.PodGroupReady))
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
}
unreadyTaskCount = job.MinAvailable - schedulableTaskNum()
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
unreadyTaskCount, len(job.Tasks), job.FitError())
unreadyTaskCount, len(job.Tasks), job.FitError(ssn.Nodes))
job.JobFitErrors = msg

unScheduleJobCount++
Expand Down