Skip to content

Commit

Permalink
fix: log when outputting preempt error
Browse files Browse the repository at this point in the history
Signed-off-by: googs1025 <[email protected]>
  • Loading branch information
googs1025 committed Apr 28, 2024
1 parent 3c21d67 commit 2f13c94
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
26 changes: 16 additions & 10 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
preemptorJob := preemptors.Pop().(*api.JobInfo)

stmt := framework.NewStatement(ssn)
assigned := false
var assigned bool
var err error
for {
// If job is not request more resource, then stop preempting.
if !ssn.JobStarving(preemptorJob) {
Expand All @@ -112,7 +113,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {

preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)

if preempted, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
assigned, err = preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
if !api.PreemptableStatus(task.Status) {
return false
Expand All @@ -130,8 +131,9 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
}
// Preempt other jobs within queue
return job.Queue == preemptorJob.Queue && preemptor.Job != task.Job
}, ph); preempted {
assigned = true
}, ph)
if err != nil {
klog.V(3).Infof("Failed to preempt Task , err: %s", err)
}
}

Expand Down Expand Up @@ -167,7 +169,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo)

stmt := framework.NewStatement(ssn)
assigned, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
assigned, err := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
if !api.PreemptableStatus(task.Status) {
return false
Expand All @@ -179,6 +181,9 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
// Preempt tasks within job.
return preemptor.Job == task.Job
}, ph)
if err != nil {
klog.V(4).Infof("Failed to preempt Task , err: %s", err)
}
stmt.Commit()

// If no preemption, next job.
Expand Down Expand Up @@ -230,7 +235,7 @@ func preempt(

job, found := ssn.Jobs[preemptor.Job]
if !found {
return false, fmt.Errorf("Job %s not found in SSN", preemptor.Job)
return false, fmt.Errorf("not found Job %s in Session", preemptor.Job)
}

currentQueue := ssn.Queues[job.Queue]
Expand All @@ -254,19 +259,20 @@ func preempt(
klog.V(3).Infof("No validated victims on Node <%s>: %v", node.Name, err)
continue
}

victimsQueue := util.NewPriorityQueue(func(l, r interface{}) bool {
// lower priority task are evicted first
lessFn := func(l, r interface{}) bool {
lv := l.(*api.TaskInfo)
rv := r.(*api.TaskInfo)
if lv.Job != rv.Job {
return !ssn.JobOrderFn(ssn.Jobs[lv.Job], ssn.Jobs[rv.Job])
}
return !ssn.TaskOrderFn(l, r)
})
}
victimsQueue := util.NewPriorityQueue(lessFn)
for _, victim := range victims {
victimsQueue.Push(victim)
}
// Preempt victims for tasks, pick lowest priority task first.
// Preempt victims for tasks, pick the lowest priority task first.
preempted := api.EmptyResource()

for !victimsQueue.Empty() {
Expand Down
6 changes: 2 additions & 4 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func (ra *Action) Execute(ssn *framework.Session) {
}

if queue, found := ssn.Queues[job.Queue]; !found {
klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>", job.Queue, job.Namespace, job.Name)
continue
} else if _, existed := queueMap[queue.UID]; !existed {
klog.V(4).Infof("Added Queue <%s> for Job <%s/%s>", queue.Name, job.Namespace, job.Name)
Expand Down Expand Up @@ -137,8 +136,7 @@ func (ra *Action) Execute(ssn *framework.Session) {
task.Namespace, task.Name, n.Name, statusSets.Message())
continue
}
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
task.Namespace, task.Name, n.Name)
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", task.Namespace, task.Name, n.Name)

var reclaimees []*api.TaskInfo
for _, task := range n.Tasks {
Expand Down

0 comments on commit 2f13c94

Please sign in to comment.