Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update NominatedNodeName for pipelined task #3498

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ func (ji *JobInfo) FitError() string {
// Stat histogram for pending tasks only
reasons = make(map[string]int)
for uid := range ji.TaskStatusIndex[Pending] {
reason, _ := ji.TaskSchedulingReason(uid)
reason, _, _ := ji.TaskSchedulingReason(uid)
reasons[reason]++
}
if len(reasons) > 0 {
Expand All @@ -652,10 +652,10 @@ func (ji *JobInfo) FitError() string {

// TaskSchedulingReason get detailed reason and message of the given task
// It returns detailed reason and message for tasks based on last scheduling transaction.
func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason string, msg string) {
func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason, msg, nominatedNodeName string) {
taskInfo, exists := ji.Tasks[tid]
if !exists {
return "", ""
return "", "", ""
}

// Get detailed scheduling reason based on LastTransaction
Expand All @@ -669,19 +669,19 @@ func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason string, msg string)
case Allocated:
// Pod is schedulable
msg = fmt.Sprintf("Pod %s/%s can possibly be assigned to %s", taskInfo.Namespace, taskInfo.Name, ctx.NodeName)
return PodReasonSchedulable, msg
return PodReasonSchedulable, msg, ""
Copy link
Member

@Monokaix Monokaix May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allocated is also a unschedulable case, we should also check whether nominatedNode should be needed, maybe we can refer to kube -scheduler to deep dive.

Copy link
Author

@bibibox bibibox May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the autoscaler will check both condition status and reason whether a pod is unschedulable or not:
image
so Allocated will not be thought as unschedulable case for autoscaler

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

case Pipelined:
msg = fmt.Sprintf("Pod %s/%s can possibly be assigned to %s, once resource is released", taskInfo.Namespace, taskInfo.Name, ctx.NodeName)
return PodReasonUnschedulable, msg
return PodReasonUnschedulable, msg, ctx.NodeName
case Pending:
if fe := ji.NodesFitErrors[tid]; fe != nil {
// Pod is unschedulable
return PodReasonUnschedulable, fe.Error()
return PodReasonUnschedulable, fe.Error(), ""
}
// Pod is not scheduled yet, keep UNSCHEDULABLE as the reason to support cluster autoscaler
return PodReasonUnschedulable, msg
return PodReasonUnschedulable, msg, ""
default:
return status.String(), msg
return status.String(), msg, ""
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/api/job_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestTaskSchedulingReason(t *testing.T) {
for uid, exp := range test.expected {
msg := job.JobFitErrors
if uid != "pg" {
_, msg = job.TaskSchedulingReason(TaskID(uid))
_, msg, _ = job.TaskSchedulingReason(TaskID(uid))
}
t.Logf("case #%d, task %v, result: %s", i, uid, msg)
if msg != exp {
Expand Down
44 changes: 29 additions & 15 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,16 @@ func podConditionHaveUpdate(status *v1.PodStatus, condition *v1.PodCondition) bo
return !isEqual
}

// UpdatePodCondition will Update pod with podCondition
func (su *defaultStatusUpdater) UpdatePodCondition(pod *v1.Pod, condition *v1.PodCondition) (*v1.Pod, error) {
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status)
if podutil.UpdatePodCondition(&pod.Status, condition) {
return su.kubeclient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
}
return pod, nil
func podNominatedNodeNameNeedUpdate(status *v1.PodStatus, nodeName string) bool {
return status.NominatedNodeName != nodeName
}

// UpdatePodStatus will Update pod status
func (su *defaultStatusUpdater) UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) {
return su.kubeclient.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
}

// UpdatePodGroup will Update pod with podCondition
// UpdatePodGroup will Update PodGroup
func (su *defaultStatusUpdater) UpdatePodGroup(pg *schedulingapi.PodGroup) (*schedulingapi.PodGroup, error) {
podgroup := &vcv1beta1.PodGroup{}
if err := schedulingscheme.Scheme.Convert(&pg.PodGroup, podgroup, nil); err != nil {
Expand Down Expand Up @@ -982,7 +982,7 @@ func (sc *SchedulerCache) EventRecorder() record.EventRecorder {
}

// taskUnschedulable updates pod status of pending task
func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason, message string) error {
func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason, message, nominatedNodeName string) error {
pod := task.Pod

condition := &v1.PodCondition{
Expand All @@ -992,14 +992,28 @@ func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason
Message: message,
}

if podConditionHaveUpdate(&pod.Status, condition) {
updateCond := podConditionHaveUpdate(&pod.Status, condition)
updateNomiNode := podNominatedNodeNameNeedUpdate(&pod.Status, nominatedNodeName)

if updateCond || updateNomiNode {
pod = pod.DeepCopy()

if updateCond && podutil.UpdatePodCondition(&pod.Status, condition) {
klog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status)
}

// if nominatedNode field changed, we should update it to the pod status, for k8s
// autoscaler will check this field and ignore this pod when scale up.
if updateNomiNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add a comment here to explain why this is added?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

klog.V(3).Infof("Updating pod nominatedNodeName for %s/%s from (%s) to (%s)", pod.Namespace, pod.Name, pod.Status.NominatedNodeName, nominatedNodeName)
pod.Status.NominatedNodeName = nominatedNodeName
}

// The reason field in 'Events' should be "FailedScheduling", there is not constants defined for this in
// k8s core, so using the same string here.
// The reason field in PodCondition can be "Unschedulable"
sc.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", message)
if _, err := sc.StatusUpdater.UpdatePodCondition(pod, condition); err != nil {
if _, err := sc.StatusUpdater.UpdatePodStatus(pod); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -1434,13 +1448,13 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updat
// Update podCondition for tasks Allocated and Pending before job discarded
for _, status := range []schedulingapi.TaskStatus{schedulingapi.Allocated, schedulingapi.Pending, schedulingapi.Pipelined} {
for _, taskInfo := range job.TaskStatusIndex[status] {
reason, msg := job.TaskSchedulingReason(taskInfo.UID)
reason, msg, nominatedNodeName := job.TaskSchedulingReason(taskInfo.UID)
if len(msg) == 0 {
msg = baseErrorMessage
}
if err := sc.taskUnschedulable(taskInfo, reason, msg); err != nil {
klog.Errorf("Failed to update unschedulable task status <%s/%s>: %v",
taskInfo.Namespace, taskInfo.Name, err)
if err := sc.taskUnschedulable(taskInfo, reason, msg, nominatedNodeName); err != nil {
klog.ErrorS(err, "Failed to update unschedulable task status", "task", klog.KRef(taskInfo.Namespace, taskInfo.Name),
"reason", reason, "message", msg)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type Evictor interface {

// StatusUpdater updates pod with given PodCondition
type StatusUpdater interface {
UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error)
UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error)
UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error)
UpdateQueueStatus(queue *api.QueueInfo) error
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/util/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,13 @@ func (fe *FakeEvictor) Evict(p *v1.Pod, reason string) error {
type FakeStatusUpdater struct {
}

// UpdatePodCondition is a empty function
func (ftsu *FakeStatusUpdater) UpdatePodCondition(pod *v1.Pod, podCondition *v1.PodCondition) (*v1.Pod, error) {
// UpdatePodStatus is an empty function
func (ftsu *FakeStatusUpdater) UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) {
// do nothing here
return nil, nil
}

// UpdatePodGroup is a empty function
// UpdatePodGroup is an empty function
func (ftsu *FakeStatusUpdater) UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, error) {
// do nothing here
return nil, nil
Expand Down
Loading