From 4864b1b98c4272846df3e03bee214e6bcf234d8d Mon Sep 17 00:00:00 2001 From: wangyang Date: Wed, 10 Jul 2024 16:17:00 +0800 Subject: [PATCH] Optimize bind event update logic Signed-off-by: wangyang --- pkg/scheduler/cache/cache.go | 29 ++++++++++++++--------------- pkg/scheduler/cache/interface.go | 2 +- pkg/scheduler/util/test_utils.go | 4 ++-- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 6e46d41f13..d03ad6f162 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -183,9 +183,8 @@ type DefaultBinder struct { } // Bind will send bind request to api server -func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*schedulingapi.TaskInfo) ([]*schedulingapi.TaskInfo, []error) { - var errTasks []*schedulingapi.TaskInfo - var errs []error +func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*schedulingapi.TaskInfo) map[schedulingapi.TaskID]string { + errMsg := make(map[schedulingapi.TaskID]string) for _, task := range tasks { p := task.Pod if err := db.kubeclient.CoreV1().Pods(p.Namespace).Bind(context.TODO(), @@ -198,19 +197,13 @@ func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*scheduli }, metav1.CreateOptions{}); err != nil { klog.Errorf("Failed to bind pod <%v/%v> to node %s : %#v", p.Namespace, p.Name, task.NodeName, err) - errTasks = append(errTasks, task) - errs = append(errs, err) + errMsg[task.UID] = err.Error() } else { - db.recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", task.Namespace, task.Name, task.NodeName) metrics.UpdateTaskScheduleDuration(metrics.Duration(p.CreationTimestamp.Time)) // update metrics as soon as pod is bind } } - if len(errTasks) > 0 { - return errTasks, errs - } - - return nil, nil + return errMsg } // NewDefaultBinder create binder with kube client and event recorder, support fake binder if passed fake client and fake event recorder @@ -892,12 +885,18 @@ func (sc *SchedulerCache) Evict(taskInfo *schedulingapi.TaskInfo, reason string) // Bind binds task to the target host. func (sc *SchedulerCache) Bind(tasks []*schedulingapi.TaskInfo) { tmp := time.Now() - errTasks, errs := sc.Binder.Bind(sc.kubeClient, tasks) - if errs == nil { + errMsg := sc.Binder.Bind(sc.kubeClient, tasks) + if len(errMsg) == 0 { klog.V(3).Infof("bind ok, latency %v", time.Since(tmp)) } else { - for i, task := range errTasks { - unschedulableMsg := fmt.Sprintf("failed to bind to node %s: %s", task.NodeName, errs[i]) + klog.V(3).Infof("There are %d tasks in total and %d binds failed, latency %v", len(tasks), len(errMsg), time.Since(tmp)) + } + + for _, task := range tasks { + if reason, ok := errMsg[task.UID]; !ok { + sc.Recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", task.Namespace, task.Name, task.NodeName) + } else { + unschedulableMsg := fmt.Sprintf("failed to bind to node %s: %s", task.NodeName, reason) if err := sc.taskUnschedulable(task, schedulingapi.PodReasonSchedulerError, unschedulableMsg, ""); err != nil { klog.ErrorS(err, "Failed to update pod status when bind task error", "task", task.Name) } diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index b22d25647f..94ab18f373 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -99,7 +99,7 @@ type VolumeBinder interface { // Binder interface for binding task and hostname type Binder interface { - Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, []error) + Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) map[api.TaskID]string } // Evictor interface for evict pods diff --git a/pkg/scheduler/util/test_utils.go b/pkg/scheduler/util/test_utils.go index 0038d87539..3b6737810f 100644 --- a/pkg/scheduler/util/test_utils.go +++ b/pkg/scheduler/util/test_utils.go @@ -330,7 +330,7 @@ func (fb *FakeBinder) Length() int { } // Bind used by fake binder struct to bind pods -func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, []error) { +func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) map[api.TaskID]string { fb.Lock() defer fb.Unlock() for _, p := range tasks { @@ -339,7 +339,7 @@ func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInf fb.Channel <- key // need to wait binding pod because Bind process is asynchronous } - return nil, nil + return nil } // FakeEvictor is used as fake evictor