From 875da0c17f43a846ca435b3c28997c9d5facdf8f Mon Sep 17 00:00:00 2001 From: Wenbo Zhang Date: Mon, 27 May 2024 20:31:39 +0800 Subject: [PATCH] Update NominatedNodeName for pipelined task Signed-off-by: Wenbo Zhang --- pkg/scheduler/api/job_info.go | 16 +++++------ pkg/scheduler/api/job_info_test.go | 2 +- pkg/scheduler/cache/cache.go | 44 ++++++++++++++++++++---------- pkg/scheduler/cache/interface.go | 2 +- pkg/scheduler/util/test_utils.go | 6 ++-- 5 files changed, 42 insertions(+), 28 deletions(-) diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index 2ba29e8b2e..116e832564 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -641,7 +641,7 @@ func (ji *JobInfo) FitError() string { // Stat histogram for pending tasks only reasons = make(map[string]int) for uid := range ji.TaskStatusIndex[Pending] { - reason, _ := ji.TaskSchedulingReason(uid) + reason, _, _ := ji.TaskSchedulingReason(uid) reasons[reason]++ } if len(reasons) > 0 { @@ -652,10 +652,10 @@ func (ji *JobInfo) FitError() string { // TaskSchedulingReason get detailed reason and message of the given task // It returns detailed reason and message for tasks based on last scheduling transaction. -func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason string, msg string) { +func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason, msg, nominatedNodeName string) { taskInfo, exists := ji.Tasks[tid] if !exists { - return "", "" + return "", "", "" } // Get detailed scheduling reason based on LastTransaction @@ -669,19 +669,19 @@ func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason string, msg string) case Allocated: // Pod is schedulable msg = fmt.Sprintf("Pod %s/%s can possibly be assigned to %s", taskInfo.Namespace, taskInfo.Name, ctx.NodeName) - return PodReasonSchedulable, msg + return PodReasonSchedulable, msg, "" case Pipelined: msg = fmt.Sprintf("Pod %s/%s can possibly be assigned to %s, once resource is released", taskInfo.Namespace, taskInfo.Name, ctx.NodeName) - return PodReasonUnschedulable, msg + return PodReasonUnschedulable, msg, ctx.NodeName case Pending: if fe := ji.NodesFitErrors[tid]; fe != nil { // Pod is unschedulable - return PodReasonUnschedulable, fe.Error() + return PodReasonUnschedulable, fe.Error(), "" } // Pod is not scheduled yet, keep UNSCHEDULABLE as the reason to support cluster autoscaler - return PodReasonUnschedulable, msg + return PodReasonUnschedulable, msg, "" default: - return status.String(), msg + return status.String(), msg, "" } } diff --git a/pkg/scheduler/api/job_info_test.go b/pkg/scheduler/api/job_info_test.go index 2562062f34..3a83ed508c 100644 --- a/pkg/scheduler/api/job_info_test.go +++ b/pkg/scheduler/api/job_info_test.go @@ -285,7 +285,7 @@ func TestTaskSchedulingReason(t *testing.T) { for uid, exp := range test.expected { msg := job.JobFitErrors if uid != "pg" { - _, msg = job.TaskSchedulingReason(TaskID(uid)) + _, msg, _ = job.TaskSchedulingReason(TaskID(uid)) } t.Logf("case #%d, task %v, result: %s", i, uid, msg) if msg != exp { diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 2c7cf79438..1a8140f17a 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -278,16 +278,16 @@ func podConditionHaveUpdate(status *v1.PodStatus, condition *v1.PodCondition) bo return !isEqual } -// UpdatePodCondition will Update pod with podCondition -func (su *defaultStatusUpdater) UpdatePodCondition(pod *v1.Pod, condition *v1.PodCondition) (*v1.Pod, error) { - klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status) - if podutil.UpdatePodCondition(&pod.Status, condition) { - return su.kubeclient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}) - } - return pod, nil +func podNominatedNodeNameNeedUpdate(status *v1.PodStatus, nodeName string) bool { + return status.NominatedNodeName != nodeName +} + +// UpdatePodStatus will Update pod status +func (su *defaultStatusUpdater) UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) { + return su.kubeclient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}) } -// UpdatePodGroup will Update pod with podCondition +// UpdatePodGroup will Update PodGroup func (su *defaultStatusUpdater) UpdatePodGroup(pg *schedulingapi.PodGroup) (*schedulingapi.PodGroup, error) { podgroup := &vcv1beta1.PodGroup{} if err := schedulingscheme.Scheme.Convert(&pg.PodGroup, podgroup, nil); err != nil { @@ -982,7 +982,7 @@ func (sc *SchedulerCache) EventRecorder() record.EventRecorder { } // taskUnschedulable updates pod status of pending task -func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason, message string) error { +func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason, message, nominatedNodeName string) error { pod := task.Pod condition := &v1.PodCondition{ @@ -992,14 +992,28 @@ func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason Message: message, } - if podConditionHaveUpdate(&pod.Status, condition) { + updateCond := podConditionHaveUpdate(&pod.Status, condition) + updateNomiNode := podNominatedNodeNameNeedUpdate(&pod.Status, nominatedNodeName) + + if updateCond || updateNomiNode { pod = pod.DeepCopy() + if updateCond && podutil.UpdatePodCondition(&pod.Status, condition) { + klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status) + } + + // if nominatedNode field changed, we should update it to the pod status, for k8s + // autoscaler will check this field and ignore this pod when scale up. + if updateNomiNode { + klog.V(3).Infof("Updating pod nominatedNodeName for %s/%s from (%s) to (%s)", pod.Namespace, pod.Name, pod.Status.NominatedNodeName, nominatedNodeName) + pod.Status.NominatedNodeName = nominatedNodeName + } + // The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in // k8s core, so using the same string here. // The reason field in PodCondition can be "Unschedulable" sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message) - if _, err := sc.StatusUpdater.UpdatePodCondition(pod, condition); err != nil { + if _, err := sc.StatusUpdater.UpdatePodStatus(pod); err != nil { return err } } else { @@ -1434,13 +1448,13 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updat // Update podCondition for tasks Allocated and Pending before job discarded for _, status := range []schedulingapi.TaskStatus{schedulingapi.Allocated, schedulingapi.Pending, schedulingapi.Pipelined} { for _, taskInfo := range job.TaskStatusIndex[status] { - reason, msg := job.TaskSchedulingReason(taskInfo.UID) + reason, msg, nominatedNodeName := job.TaskSchedulingReason(taskInfo.UID) if len(msg) == 0 { msg = baseErrorMessage } - if err := sc.taskUnschedulable(taskInfo, reason, msg); err != nil { - klog.Errorf("Failed to update unschedulable task status <%s/%s>: %v", - taskInfo.Namespace, taskInfo.Name, err) + if err := sc.taskUnschedulable(taskInfo, reason, msg, nominatedNodeName); err != nil { + klog.ErrorS(err, "Failed to update unschedulable task status", "task", klog.KRef(taskInfo.Namespace, taskInfo.Name), + "reason", reason, "message", msg) } } } diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index 63c704e4ad..33eee3e9a3 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -109,7 +109,7 @@ type Evictor interface { // StatusUpdater updates pod with given PodCondition type StatusUpdater interface { - UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error) + UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error) UpdateQueueStatus(queue *api.QueueInfo) error } diff --git a/pkg/scheduler/util/test_utils.go b/pkg/scheduler/util/test_utils.go index 696399c19b..0e8b89c2e8 100644 --- a/pkg/scheduler/util/test_utils.go +++ b/pkg/scheduler/util/test_utils.go @@ -388,13 +388,13 @@ func (fe *FakeEvictor) Evict(p *v1.Pod, reason string) error { type FakeStatusUpdater struct { } -// UpdatePodCondition is a empty function -func (ftsu *FakeStatusUpdater) UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error) { +// UpdatePodStatus is an empty function +func (ftsu *FakeStatusUpdater) UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) { // do nothing here return nil, nil } -// UpdatePodGroup is a empty function +// UpdatePodGroup is an empty function func (ftsu *FakeStatusUpdater) UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error) { // do nothing here return nil, nil