Skip to content

Commit

Permalink
Merge pull request #3498 from bibibox/update_nominatedNodename
Browse files Browse the repository at this point in the history
Update NominatedNodeName for pipelined task
  • Loading branch information
volcano-sh-bot authored May 29, 2024
2 parents 70a483b + 875da0c commit bc356e9
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 28 deletions.
16 changes: 8 additions & 8 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func (ji *JobInfo) FitError() string {
// Stat histogram for pending tasks only
reasons = make(map[string]int)
for uid := range ji.TaskStatusIndex[Pending] {
reason, _ := ji.TaskSchedulingReason(uid)
reason, _, _ := ji.TaskSchedulingReason(uid)
reasons[reason]++
}
if len(reasons) > 0 {
Expand All @@ -652,10 +652,10 @@ func (ji *JobInfo) FitError() string {

// TaskSchedulingReason get detailed reason and message of the given task
// It returns detailed reason and message for tasks based on last scheduling transaction.
func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason string, msg string) {
func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason, msg, nominatedNodeName string) {
taskInfo, exists := ji.Tasks[tid]
if !exists {
return "", ""
return "", "", ""
}

// Get detailed scheduling reason based on LastTransaction
Expand All @@ -669,19 +669,19 @@ func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason string, msg string)
case Allocated:
// Pod is schedulable
msg = fmt.Sprintf("Pod %s/%s can possibly be assigned to %s", taskInfo.Namespace, taskInfo.Name, ctx.NodeName)
return PodReasonSchedulable, msg
return PodReasonSchedulable, msg, ""
case Pipelined:
msg = fmt.Sprintf("Pod %s/%s can possibly be assigned to %s, once resource is released", taskInfo.Namespace, taskInfo.Name, ctx.NodeName)
return PodReasonUnschedulable, msg
return PodReasonUnschedulable, msg, ctx.NodeName
case Pending:
if fe := ji.NodesFitErrors[tid]; fe != nil {
// Pod is unschedulable
return PodReasonUnschedulable, fe.Error()
return PodReasonUnschedulable, fe.Error(), ""
}
// Pod is not scheduled yet, keep UNSCHEDULABLE as the reason to support cluster autoscaler
return PodReasonUnschedulable, msg
return PodReasonUnschedulable, msg, ""
default:
return status.String(), msg
return status.String(), msg, ""
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/api/job_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestTaskSchedulingReason(t *testing.T) {
for uid, exp := range test.expected {
msg := job.JobFitErrors
if uid != "pg" {
_, msg = job.TaskSchedulingReason(TaskID(uid))
_, msg, _ = job.TaskSchedulingReason(TaskID(uid))
}
t.Logf("case #%d, task %v, result: %s", i, uid, msg)
if msg != exp {
Expand Down
44 changes: 29 additions & 15 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,16 @@ func podConditionHaveUpdate(status *v1.PodStatus, condition *v1.PodCondition) bo
return !isEqual
}

// UpdatePodCondition will Update pod with podCondition
func (su *defaultStatusUpdater) UpdatePodCondition(pod *v1.Pod, condition *v1.PodCondition) (*v1.Pod, error) {
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status)
if podutil.UpdatePodCondition(&pod.Status, condition) {
return su.kubeclient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
}
return pod, nil
func podNominatedNodeNameNeedUpdate(status *v1.PodStatus, nodeName string) bool {
return status.NominatedNodeName != nodeName
}

// UpdatePodStatus will Update pod status
func (su *defaultStatusUpdater) UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) {
return su.kubeclient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
}

// UpdatePodGroup will Update pod with podCondition
// UpdatePodGroup will Update PodGroup
func (su *defaultStatusUpdater) UpdatePodGroup(pg *schedulingapi.PodGroup) (*schedulingapi.PodGroup, error) {
podgroup := &vcv1beta1.PodGroup{}
if err := schedulingscheme.Scheme.Convert(&pg.PodGroup, podgroup, nil); err != nil {
Expand Down Expand Up @@ -982,7 +982,7 @@ func (sc *SchedulerCache) EventRecorder() record.EventRecorder {
}

// taskUnschedulable updates pod status of pending task
func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason, message string) error {
func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason, message, nominatedNodeName string) error {
pod := task.Pod

condition := &v1.PodCondition{
Expand All @@ -992,14 +992,28 @@ func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason
Message: message,
}

if podConditionHaveUpdate(&pod.Status, condition) {
updateCond := podConditionHaveUpdate(&pod.Status, condition)
updateNomiNode := podNominatedNodeNameNeedUpdate(&pod.Status, nominatedNodeName)

if updateCond || updateNomiNode {
pod = pod.DeepCopy()

if updateCond && podutil.UpdatePodCondition(&pod.Status, condition) {
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status)
}

// if nominatedNode field changed, we should update it to the pod status, for k8s
// autoscaler will check this field and ignore this pod when scale up.
if updateNomiNode {
klog.V(3).Infof("Updating pod nominatedNodeName for %s/%s from (%s) to (%s)", pod.Namespace, pod.Name, pod.Status.NominatedNodeName, nominatedNodeName)
pod.Status.NominatedNodeName = nominatedNodeName
}

// The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in
// k8s core, so using the same string here.
// The reason field in PodCondition can be "Unschedulable"
sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message)
if _, err := sc.StatusUpdater.UpdatePodCondition(pod, condition); err != nil {
if _, err := sc.StatusUpdater.UpdatePodStatus(pod); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -1434,13 +1448,13 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updat
// Update podCondition for tasks Allocated and Pending before job discarded
for _, status := range []schedulingapi.TaskStatus{schedulingapi.Allocated, schedulingapi.Pending, schedulingapi.Pipelined} {
for _, taskInfo := range job.TaskStatusIndex[status] {
reason, msg := job.TaskSchedulingReason(taskInfo.UID)
reason, msg, nominatedNodeName := job.TaskSchedulingReason(taskInfo.UID)
if len(msg) == 0 {
msg = baseErrorMessage
}
if err := sc.taskUnschedulable(taskInfo, reason, msg); err != nil {
klog.Errorf("Failed to update unschedulable task status <%s/%s>: %v",
taskInfo.Namespace, taskInfo.Name, err)
if err := sc.taskUnschedulable(taskInfo, reason, msg, nominatedNodeName); err != nil {
klog.ErrorS(err, "Failed to update unschedulable task status", "task", klog.KRef(taskInfo.Namespace, taskInfo.Name),
"reason", reason, "message", msg)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type Evictor interface {

// StatusUpdater updates pod with given PodCondition
type StatusUpdater interface {
UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error)
UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error)
UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error)
UpdateQueueStatus(queue *api.QueueInfo) error
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/util/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,13 @@ func (fe *FakeEvictor) Evict(p *v1.Pod, reason string) error {
type FakeStatusUpdater struct {
}

// UpdatePodCondition is a empty function
func (ftsu *FakeStatusUpdater) UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error) {
// UpdatePodStatus is an empty function
func (ftsu *FakeStatusUpdater) UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) {
// do nothing here
return nil, nil
}

// UpdatePodGroup is a empty function
// UpdatePodGroup is an empty function
func (ftsu *FakeStatusUpdater) UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error) {
// do nothing here
return nil, nil
Expand Down

0 comments on commit bc356e9

Please sign in to comment.