diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 0ff3032b16a..0f74f6d3018 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -96,7 +96,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) { preemptorJob := preemptors.Pop().(*api.JobInfo) stmt := framework.NewStatement(ssn) - assigned := false + var assigned bool + var err error for { // If job is not request more resource, then stop preempting. if !ssn.JobStarving(preemptorJob) { @@ -112,7 +113,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) { preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo) - if preempted, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool { + assigned, err = preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool { // Ignore non running task. if !api.PreemptableStatus(task.Status) { return false @@ -130,8 +131,9 @@ func (pmpt *Action) Execute(ssn *framework.Session) { } // Preempt other jobs within queue return job.Queue == preemptorJob.Queue && preemptor.Job != task.Job - }, ph); preempted { - assigned = true + }, ph) + if err != nil { + klog.V(3).Infof("Failed to preempt Task , err: %s", err) } } @@ -167,7 +169,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) { preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo) stmt := framework.NewStatement(ssn) - assigned, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool { + assigned, err := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool { // Ignore non running task. if !api.PreemptableStatus(task.Status) { return false @@ -179,6 +181,9 @@ func (pmpt *Action) Execute(ssn *framework.Session) { // Preempt tasks within job. return preemptor.Job == task.Job }, ph) + if err != nil { + klog.V(4).Infof("Failed to preempt Task , err: %s", err) + } stmt.Commit() // If no preemption, next job. @@ -230,7 +235,7 @@ func preempt( job, found := ssn.Jobs[preemptor.Job] if !found { - return false, fmt.Errorf("Job %s not found in SSN", preemptor.Job) + return false, fmt.Errorf("not found Job %s in Session", preemptor.Job) } currentQueue := ssn.Queues[job.Queue] @@ -254,19 +259,20 @@ func preempt( klog.V(3).Infof("No validated victims on Node <%s>: %v", node.Name, err) continue } - - victimsQueue := util.NewPriorityQueue(func(l, r interface{}) bool { + // lower priority task are evicted first + lessFn := func(l, r interface{}) bool { lv := l.(*api.TaskInfo) rv := r.(*api.TaskInfo) if lv.Job != rv.Job { return !ssn.JobOrderFn(ssn.Jobs[lv.Job], ssn.Jobs[rv.Job]) } return !ssn.TaskOrderFn(l, r) - }) + } + victimsQueue := util.NewPriorityQueue(lessFn) for _, victim := range victims { victimsQueue.Push(victim) } - // Preempt victims for tasks, pick lowest priority task first. + // Preempt victims for tasks, pick the lowest priority task first. preempted := api.EmptyResource() for !victimsQueue.Empty() { diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 430ec99a988..6cac5749667 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -60,8 +60,7 @@ func (ra *Action) Execute(ssn *framework.Session) { } if queue, found := ssn.Queues[job.Queue]; !found { - klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>", - job.Queue, job.Namespace, job.Name) + klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>", job.Queue, job.Namespace, job.Name) continue } else if _, existed := queueMap[queue.UID]; !existed { klog.V(4).Infof("Added Queue <%s> for Job <%s/%s>", queue.Name, job.Namespace, job.Name) @@ -137,8 +136,7 @@ func (ra *Action) Execute(ssn *framework.Session) { task.Namespace, task.Name, n.Name, statusSets.Message()) continue } - klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", - task.Namespace, task.Name, n.Name) + klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", task.Namespace, task.Name, n.Name) var reclaimees []*api.TaskInfo for _, task := range n.Tasks {