Skip to content

Commit

Permalink
GetPodsForJob filter the pod list owned by job
Browse files Browse the repository at this point in the history
Signed-off-by: yowenter <[email protected]>
  • Loading branch information
yowenter committed Apr 27, 2023
1 parent 8a066f9 commit e6191ff
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 37 deletions.
26 changes: 5 additions & 21 deletions pkg/common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,17 @@ func ConvertServiceList(list []corev1.Service) []*corev1.Service {
return ret
}

// ConvertPodList convert pod list to pod pointer list
func ConvertPodList(list []corev1.Pod) []*corev1.Pod {
// JobControlledPodList filter pod list owned by the job.
func JobControlledPodList(list []corev1.Pod, job metav1.Object) []*corev1.Pod {
if list == nil {
return nil
}
ret := make([]*corev1.Pod, 0, len(list))
for i := range list {
ret = append(ret, &list[i])
}
return ret
}

// ConvertPodListWithFilter converts pod list to pod pointer list with ObjectFilterFunction
func ConvertPodListWithFilter(list []corev1.Pod, pass ObjectFilterFunction) []*corev1.Pod {
if list == nil {
return nil
}
ret := make([]*corev1.Pod, 0, len(list))
for i := range list {
obj := &list[i]
if pass != nil {
if pass(obj) {
ret = append(ret, obj)
}
} else {
ret = append(ret, obj)
if !metav1.IsControlledBy(&list[i], job) {
continue
}
ret = append(ret, &list[i])
}
return ret
}
Expand Down
13 changes: 2 additions & 11 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,7 @@ func (jc *MPIJobReconciler) GetPodsForJob(jobObject interface{}) ([]*corev1.Pod,
return nil, err
}

var filter util.ObjectFilterFunction = func(obj metav1.Object) bool {
return metav1.IsControlledBy(obj, job)
}

return util.ConvertPodListWithFilter(podlist.Items, filter), nil
return util.JobControlledPodList(podlist.Items, job), nil
}

func (jc *MPIJobReconciler) DeleteJob(job interface{}) error {
Expand Down Expand Up @@ -1243,12 +1239,7 @@ func (jc *MPIJobReconciler) getRunningWorkerPods(mpiJob *kubeflowv1.MPIJob) ([]*
podList = append(podList, podFullList.Items[idx])
}
}

var filter util.ObjectFilterFunction = func(obj metav1.Object) bool {
return metav1.IsControlledBy(obj, mpiJob)
}

return util.ConvertPodListWithFilter(podList, filter), nil
return util.JobControlledPodList(podList, mpiJob), nil
}

// newConfigMap creates a new ConfigMap containing configurations for an MPIJob
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (r *MXJobReconciler) GetPodsForJob(obj interface{}) ([]*corev1.Pod, error)
if err != nil {
return nil, err
}
return util.ConvertPodList(podlist.Items), nil
return util.JobControlledPodList(podlist.Items, job), nil
}

func (r *MXJobReconciler) GetServicesForJob(job interface{}) ([]*corev1.Service, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (r *PaddleJobReconciler) GetPodsForJob(obj interface{}) ([]*corev1.Pod, err
return nil, err
}

return util.ConvertPodList(podlist.Items), nil
return util.JobControlledPodList(podlist.Items, job), nil
}

func (r *PaddleJobReconciler) GetServicesForJob(obj interface{}) ([]*corev1.Service, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (r *PyTorchJobReconciler) GetPodsForJob(obj interface{}) ([]*corev1.Pod, er
return nil, err
}

return util.ConvertPodList(podlist.Items), nil
return util.JobControlledPodList(podlist.Items, job), nil
}

func (r *PyTorchJobReconciler) GetServicesForJob(obj interface{}) ([]*corev1.Service, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/tensorflow/tfjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (r *TFJobReconciler) GetPodsForJob(jobObject interface{}) ([]*corev1.Pod, e
return nil, err
}

pods := util.ConvertPodList(podlist.Items)
pods := util.JobControlledPodList(podlist.Items, job)

// If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing Pods (see #42639).
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/xgboost/xgboostjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (r *XGBoostJobReconciler) GetPodsForJob(obj interface{}) ([]*corev1.Pod, er
return nil, err
}

return util.ConvertPodList(podlist.Items), nil
return util.JobControlledPodList(podlist.Items, job), nil
}

// GetServicesForJob returns the services managed by the job. This can be achieved by selecting services using label key "job-name"
Expand Down

0 comments on commit e6191ff

Please sign in to comment.