From 861beb942cb53079172c36430bd356ee51830b4c Mon Sep 17 00:00:00 2001 From: Wenbo Zhang Date: Mon, 22 Apr 2024 17:25:55 +0800 Subject: [PATCH 1/2] fix errTask channel memory leak Signed-off-by: Wenbo Zhang --- pkg/scheduler/api/job_info.go | 4 +- pkg/scheduler/cache/cache.go | 61 ++++++++++++++++++++++++--- pkg/scheduler/cache/event_handlers.go | 2 +- 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index be12040780..e8aa0805f5 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -560,8 +560,8 @@ func (ji *JobInfo) DeleteTaskInfo(ti *TaskInfo) error { return nil } - return fmt.Errorf("failed to find task <%v/%v> in job <%v/%v>", - ti.Namespace, ti.Name, ji.Namespace, ji.Name) + klog.Warningf("failed to find task <%v/%v> in job <%v/%v>", ti.Namespace, ti.Name, ji.Namespace, ji.Name) + return nil } // Clone is used to clone a jobInfo object diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index ec6d1de8eb..6a5d9b2205 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -526,12 +527,17 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu newDefaultQueue(vcClient, defaultQueue) klog.Infof("Create init queue named default") + errTaskRateLimiter := workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(100), 1000)}, + ) + sc := &SchedulerCache{ Jobs: make(map[schedulingapi.JobID]*schedulingapi.JobInfo), Nodes: make(map[string]*schedulingapi.NodeInfo), Queues: make(map[schedulingapi.QueueID]*schedulingapi.QueueInfo), PriorityClasses: make(map[string]*schedulingv1.PriorityClass), - errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + errTasks: workqueue.NewRateLimitingQueue(errTaskRateLimiter), nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), DeletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), kubeClient: kubeClient, @@ -1049,7 +1055,40 @@ func (sc *SchedulerCache) processCleanupJob() { } func (sc *SchedulerCache) resyncTask(task *schedulingapi.TaskInfo) { - sc.errTasks.AddRateLimited(task) + key := sc.generateErrTaskKey(task) + sc.errTasks.AddRateLimited(key) +} + +func (sc *SchedulerCache) generateErrTaskKey(task *schedulingapi.TaskInfo) string { + // Job UID is namespace + / +name, for example: theNs/theJob + // Task UID is derived from the Pod UID, for example: d336abea-4f14-42c7-8a6b-092959a31407 + // In the example above, the key ultimately becomes: theNs/theJob/d336abea-4f14-42c7-8a6b-092959a31407 + return fmt.Sprintf("%s/%s", task.Job, task.UID) +} + +func (sc *SchedulerCache) parseErrTaskKey(key string) (*schedulingapi.TaskInfo, error) { + i := strings.LastIndex(key, "/") + if i == -1 { + return nil, fmt.Errorf("failed to split task key %s", key) + } + + jobUID := key[:i] + taskUID := key[i+1:] + + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + + job, found := sc.Jobs[schedulingapi.JobID(jobUID)] + if !found { + return nil, fmt.Errorf("failed to find job %s", jobUID) + } + + task, found := job.Tasks[schedulingapi.TaskID(taskUID)] + if !found { + return nil, fmt.Errorf("failed to find task %s", taskUID) + } + + return task, nil } func (sc *SchedulerCache) processResyncTask() { @@ -1058,19 +1097,31 @@ func (sc *SchedulerCache) processResyncTask() { return } + klog.V(5).Infof("the length of errTasks is %d", sc.errTasks.Len()) + defer sc.errTasks.Done(obj) - task, ok := obj.(*schedulingapi.TaskInfo) + taskKey, ok := obj.(string) if !ok { - klog.Errorf("failed to convert %v to *schedulingapi.TaskInfo", obj) + klog.Errorf("Failed to convert %v to string.", obj) + return + } + + task, err := sc.parseErrTaskKey(taskKey) + if err != nil { + klog.ErrorS(err, "Failed to get task for sync task", "taskKey", taskKey) + sc.errTasks.Forget(obj) return } reSynced := false if err := sc.syncTask(task); err != nil { - klog.Errorf("Failed to sync pod <%v/%v>, retry it.", task.Namespace, task.Name) + klog.ErrorS(err, "Failed to sync task, retry it", "namespace", task.Namespace, "name", task.Name) sc.resyncTask(task) reSynced = true + } else { + klog.V(4).Infof("sync task <%s/%s> success", task.Namespace, task.Name) + sc.errTasks.Forget(obj) } // execute custom bind err handler call back func if exists. diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 151071b293..b1739f7637 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -326,7 +326,7 @@ func (sc *SchedulerCache) deleteTask(ti *schedulingapi.TaskInfo) error { if job, found := sc.Jobs[ti.Job]; found { jobErr = job.DeleteTaskInfo(ti) } else { - jobErr = fmt.Errorf("failed to find Job <%v> for Task %v/%v", ti.Job, ti.Namespace, ti.Name) + klog.Warningf("Failed to find Job <%v> for Task <%v/%v>", ti.Job, ti.Namespace, ti.Name) } } else { // should not run into here; record error so that easy to debug jobErr = fmt.Errorf("task %s/%s has null jobID", ti.Namespace, ti.Name) From c04ddac61ee96d59056ed0dae1aafabf9eff7309 Mon Sep 17 00:00:00 2001 From: Wenbo Zhang Date: Tue, 23 Apr 2024 20:30:46 +0800 Subject: [PATCH 2/2] fix comments Signed-off-by: Wenbo Zhang --- pkg/scheduler/cache/cache.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 6a5d9b2205..094e17cfca 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -1104,6 +1104,7 @@ func (sc *SchedulerCache) processResyncTask() { taskKey, ok := obj.(string) if !ok { klog.Errorf("Failed to convert %v to string.", obj) + sc.errTasks.Forget(obj) return } @@ -1120,7 +1121,7 @@ func (sc *SchedulerCache) processResyncTask() { sc.resyncTask(task) reSynced = true } else { - klog.V(4).Infof("sync task <%s/%s> success", task.Namespace, task.Name) + klog.V(4).Infof("Successfully synced task <%s/%s>", task.Namespace, task.Name) sc.errTasks.Forget(obj) }