Skip to content

Commit

Permalink
Merge pull request #3622 from wangyang0616/bugfix_binder_event
Browse files Browse the repository at this point in the history
Optimize bind event update logic
  • Loading branch information
volcano-sh-bot authored Aug 6, 2024
2 parents 4607119 + 4864b1b commit f3dd654
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 18 deletions.
29 changes: 14 additions & 15 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,8 @@ type DefaultBinder struct {
}

// Bind will send bind request to api server
func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*schedulingapi.TaskInfo) ([]*schedulingapi.TaskInfo, []error) {
var errTasks []*schedulingapi.TaskInfo
var errs []error
func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*schedulingapi.TaskInfo) map[schedulingapi.TaskID]string {
errMsg := make(map[schedulingapi.TaskID]string)
for _, task := range tasks {
p := task.Pod
if err := db.kubeclient.CoreV1().Pods(p.Namespace).Bind(context.TODO(),
Expand All @@ -198,19 +197,13 @@ func (db *DefaultBinder) Bind(kubeClient kubernetes.Interface, tasks []*scheduli
},
metav1.CreateOptions{}); err != nil {
klog.Errorf("Failed to bind pod <%v/%v> to node %s : %#v", p.Namespace, p.Name, task.NodeName, err)
errTasks = append(errTasks, task)
errs = append(errs, err)
errMsg[task.UID] = err.Error()
} else {
db.recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", task.Namespace, task.Name, task.NodeName)
metrics.UpdateTaskScheduleDuration(metrics.Duration(p.CreationTimestamp.Time)) // update metrics as soon as pod is bind
}
}

if len(errTasks) > 0 {
return errTasks, errs
}

return nil, nil
return errMsg
}

// NewDefaultBinder create binder with kube client and event recorder, support fake binder if passed fake client and fake event recorder
Expand Down Expand Up @@ -892,12 +885,18 @@ func (sc *SchedulerCache) Evict(taskInfo *schedulingapi.TaskInfo, reason string)
// Bind binds task to the target host.
func (sc *SchedulerCache) Bind(tasks []*schedulingapi.TaskInfo) {
tmp := time.Now()
errTasks, errs := sc.Binder.Bind(sc.kubeClient, tasks)
if errs == nil {
errMsg := sc.Binder.Bind(sc.kubeClient, tasks)
if len(errMsg) == 0 {
klog.V(3).Infof("bind ok, latency %v", time.Since(tmp))
} else {
for i, task := range errTasks {
unschedulableMsg := fmt.Sprintf("failed to bind to node %s: %s", task.NodeName, errs[i])
klog.V(3).Infof("There are %d tasks in total and %d binds failed, latency %v", len(tasks), len(errMsg), time.Since(tmp))
}

for _, task := range tasks {
if reason, ok := errMsg[task.UID]; !ok {
sc.Recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", task.Namespace, task.Name, task.NodeName)
} else {
unschedulableMsg := fmt.Sprintf("failed to bind to node %s: %s", task.NodeName, reason)
if err := sc.taskUnschedulable(task, schedulingapi.PodReasonSchedulerError, unschedulableMsg, ""); err != nil {
klog.ErrorS(err, "Failed to update pod status when bind task error", "task", task.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type VolumeBinder interface {

// Binder interface for binding task and hostname
type Binder interface {
Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, []error)
Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) map[api.TaskID]string
}

// Evictor interface for evict pods
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/util/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (fb *FakeBinder) Length() int {
}

// Bind used by fake binder struct to bind pods
func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) ([]*api.TaskInfo, []error) {
func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) map[api.TaskID]string {
fb.Lock()
defer fb.Unlock()
for _, p := range tasks {
Expand All @@ -339,7 +339,7 @@ func (fb *FakeBinder) Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInf
fb.Channel <- key // need to wait binding pod because Bind process is asynchronous
}

return nil, nil
return nil
}

// FakeEvictor is used as fake evictor
Expand Down

0 comments on commit f3dd654

Please sign in to comment.