Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetPodsForJob check the pod owner reference job uid #1796

Merged
merged 1 commit into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 5 additions & 21 deletions pkg/common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,17 @@ func ConvertServiceList(list []corev1.Service) []*corev1.Service {
return ret
}

// ConvertPodList convert pod list to pod pointer list
func ConvertPodList(list []corev1.Pod) []*corev1.Pod {
// JobControlledPodList filter pod list owned by the job.
func JobControlledPodList(list []corev1.Pod, job metav1.Object) []*corev1.Pod {
if list == nil {
return nil
}
ret := make([]*corev1.Pod, 0, len(list))
for i := range list {
ret = append(ret, &list[i])
}
return ret
}

// ConvertPodListWithFilter converts pod list to pod pointer list with ObjectFilterFunction
func ConvertPodListWithFilter(list []corev1.Pod, pass ObjectFilterFunction) []*corev1.Pod {
if list == nil {
return nil
}
ret := make([]*corev1.Pod, 0, len(list))
for i := range list {
obj := &list[i]
if pass != nil {
if pass(obj) {
ret = append(ret, obj)
}
} else {
ret = append(ret, obj)
if !metav1.IsControlledBy(&list[i], job) {
continue
}
ret = append(ret, &list[i])
}
return ret
}
Expand Down
13 changes: 2 additions & 11 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,7 @@ func (jc *MPIJobReconciler) GetPodsForJob(jobObject interface{}) ([]*corev1.Pod,
return nil, err
}

var filter util.ObjectFilterFunction = func(obj metav1.Object) bool {
return metav1.IsControlledBy(obj, job)
}

return util.ConvertPodListWithFilter(podlist.Items, filter), nil
return util.JobControlledPodList(podlist.Items, job), nil
}

func (jc *MPIJobReconciler) DeleteJob(job interface{}) error {
Expand Down Expand Up @@ -1243,12 +1239,7 @@ func (jc *MPIJobReconciler) getRunningWorkerPods(mpiJob *kubeflowv1.MPIJob) ([]*
podList = append(podList, podFullList.Items[idx])
}
}

var filter util.ObjectFilterFunction = func(obj metav1.Object) bool {
return metav1.IsControlledBy(obj, mpiJob)
}

return util.ConvertPodListWithFilter(podList, filter), nil
return util.JobControlledPodList(podList, mpiJob), nil
}

// newConfigMap creates a new ConfigMap containing configurations for an MPIJob
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (r *MXJobReconciler) GetPodsForJob(obj interface{}) ([]*corev1.Pod, error)
if err != nil {
return nil, err
}
return util.ConvertPodList(podlist.Items), nil
return util.JobControlledPodList(podlist.Items, job), nil
}

func (r *MXJobReconciler) GetServicesForJob(job interface{}) ([]*corev1.Service, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (r *PaddleJobReconciler) GetPodsForJob(obj interface{}) ([]*corev1.Pod, err
return nil, err
}

return util.ConvertPodList(podlist.Items), nil
return util.JobControlledPodList(podlist.Items, job), nil
}

func (r *PaddleJobReconciler) GetServicesForJob(obj interface{}) ([]*corev1.Service, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (r *PyTorchJobReconciler) GetPodsForJob(obj interface{}) ([]*corev1.Pod, er
return nil, err
}

return util.ConvertPodList(podlist.Items), nil
return util.JobControlledPodList(podlist.Items, job), nil
}

func (r *PyTorchJobReconciler) GetServicesForJob(obj interface{}) ([]*corev1.Service, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/tensorflow/tfjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (r *TFJobReconciler) GetPodsForJob(jobObject interface{}) ([]*corev1.Pod, e
return nil, err
}

pods := util.ConvertPodList(podlist.Items)
pods := util.JobControlledPodList(podlist.Items, job)

// If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing Pods (see #42639).
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/xgboost/xgboostjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (r *XGBoostJobReconciler) GetPodsForJob(obj interface{}) ([]*corev1.Pod, er
return nil, err
}

return util.ConvertPodList(podlist.Items), nil
return util.JobControlledPodList(podlist.Items, job), nil
}

// GetServicesForJob returns the services managed by the job. This can be achieved by selecting services using label key "job-name"
Expand Down