Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

simplify reconciler #166

Merged
merged 1 commit into from
Dec 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/reconciler.v1/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ To use the reconciler, following methods must be overridden according to the API

```go
// GetJob returns the job that matches the request
func (r *KubeflowJobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error)
func (r *JobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about something like Training or TrainingJob since we are always dealing with model trainings here. Job seems very generic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TrainingJob sounds good to me. As the common library is designed for all kinds of controllers, I would suggest renaming the GenericJob to TrainingJob in the pull request I mentioned above for tf-operator (kubeflow/training-operator#1398). What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM


// ExtractReplicasSpec extracts the ReplicasSpec map from this job
func (r *KubeflowJobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error)
func (r *JobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error)

// ExtractRunPolicy extracts the RunPolicy from this job
func (r *KubeflowJobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error)
func (r *JobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error)

// ExtractJobStatus extracts the JobStatus from this job
func (r *KubeflowJobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error)
func (r *JobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error)

// IsMasterRole checks if Pod is the master Pod
func (r *KubeflowJobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool
func (r *JobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool
```

A simple example can be found at `test_job/reconciler.v1/test_job/test_job_reconciler.go`.
A simple example can be found at `test_job/reconciler.v1/test_job/test_job_reconciler.go`.
10 changes: 10 additions & 0 deletions pkg/reconciler.v1/common/gang_volcano.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ func (r *VolcanoReconciler) GetGangSchedulerName() string {
return "volcano"
}

// GangSchedulingEnabled returns if gang-scheduling is enabled for all jobs
func (r *VolcanoReconciler) GangSchedulingEnabled() bool {
return r.BaseGangReconciler.GangSchedulingEnabled()
}

// GetPodGroupName returns the name of PodGroup for this job
func (r *VolcanoReconciler) GetPodGroupName(job client.Object) string {
return r.BaseGangReconciler.GetPodGroupName(job)
}

// GetPodGroupForJob returns the PodGroup associated with this job
func (r *VolcanoReconciler) GetPodGroupForJob(ctx context.Context, job client.Object) (client.Object, error) {
var pg *volcano.PodGroup = nil
Expand Down
22 changes: 2 additions & 20 deletions pkg/reconciler.v1/common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// recorder or logger
type ReconcilerUtilInterface interface {
// GetReconcilerName SHOULD be overridden if a new Reconciler is defined. The default implementation returns
// "Kubeflow Reconciler"
// "common-reconciler"
GetReconcilerName() string

// GetRecorder CAN be overridden to customize EventRecorder
Expand All @@ -48,7 +48,7 @@ type ReconcilerUtilInterface interface {
// GangSchedulingInterface defines the abstract interface for gang-scheduling related actions, such like get, create or
// delete PodGroup
type GangSchedulingInterface interface {
// OverrideForGangSchedulingInterface MUST NOT be overridden as it reset ReconcilerUtilInterface
// OverrideForGangSchedulingInterface MUST NOT be overridden as it resets ReconcilerUtilInterface
OverrideForGangSchedulingInterface(ui ReconcilerUtilInterface)

// GangSchedulingEnabled CAN be overridden if definition of gang-scheduling enabling changes.
Expand Down Expand Up @@ -239,21 +239,3 @@ type JobInterface interface {
// PastActiveDeadline CAN be overridden to customize how to determine if this job has past activate deadline.
PastActiveDeadline(runPolicy *commonv1.RunPolicy, jobStatus *commonv1.JobStatus) bool
}

// KubeflowReconcilerInterface defines the abstract interface for a base reconciler for kubeflow jobs.
type KubeflowReconcilerInterface interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation to move KubeflowReconcilerInterface to training-operator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When writing the test code, test_reconciler, which could be considered as one example of how the common.reconciler will be used, I found that:

  1. Such interface will not be used for any abstraction when implementing a real reconciler
  2. Embedding modularized interfaces in KubeflowReconcilerInterface conflicts embedding them in the real reconciler struct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it make sense to me. current testing integration between common and training-operator is a headache.

JobInterface
PodInterface
ServiceInterface
GangSchedulingInterface
ReconcilerUtilInterface

// OverrideForKubeflowReconcilerInterface MUST NOT be overridden as it reset ReconcilerUtilInterface, PodInterface, ServiceInterface, JobInterface, GangSchedulingInterface
OverrideForKubeflowReconcilerInterface(ji JobInterface, pi PodInterface, si ServiceInterface, gi GangSchedulingInterface, ui ReconcilerUtilInterface)

// Reconcile CAN be overridden to customize how to handle a request.
Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

// SetupWithManager CAN be overridden to customize how to set up the reconciler with the manager.
SetupWithManager(mgr ctrl.Manager, obj client.Object) error
}
86 changes: 43 additions & 43 deletions pkg/reconciler.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ const (
ErrReconcileGangTemplate = "ReconcilePodGroups error %v"
ErrGetReplicasStatusFromStatusFailedTemplate = "failed to get ReplicasStatus for %s from status"

WarnDefaultImplementationTemplate = "Warning: executing default implementation for KubeflowReconciler.%s"
WarnDefaultImplementationTemplate = "Warning: executing default implementation for JobReconciler.%s"
WarnNotCountedInBackoffLimit = "The restart policy of replica %v of the job %v is not OnFailure or Always. Not counted in backoff limit."
)

// KubeflowJobReconciler defines a Reconciler dealing with KubeflowJob
type KubeflowJobReconciler struct {
// JobReconciler defines a Reconciler dealing with generic training job
type JobReconciler struct {
client.Client
ReconcilerUtilInterface
PodInterface
Expand All @@ -74,16 +74,16 @@ type KubeflowJobReconciler struct {
counter *commonutil.Counter
}

// BareKubeflowJobReconciler returns the pointer of a KubeflowJobReconciler with minimal implementation
func BareKubeflowJobReconciler(client client.Client) *KubeflowJobReconciler {
return &KubeflowJobReconciler{
// BareJobReconciler returns the pointer of a JobReconciler with minimal implementation
func BareJobReconciler(client client.Client) *JobReconciler {
return &JobReconciler{
Client: client,
counter: commonutil.NewCounter(),
}
}

// OverrideForJobInterface resets ReconcilerUtilInterface, PodInterface, ServiceInterface, GangSchedulingInterface used in KubeflowJobReconciler
func (r *KubeflowJobReconciler) OverrideForJobInterface(ui ReconcilerUtilInterface, pi PodInterface, si ServiceInterface, gi GangSchedulingInterface) {
// OverrideForJobInterface resets ReconcilerUtilInterface, PodInterface, ServiceInterface, GangSchedulingInterface used in JobReconciler
func (r *JobReconciler) OverrideForJobInterface(ui ReconcilerUtilInterface, pi PodInterface, si ServiceInterface, gi GangSchedulingInterface) {
if ui != nil {
r.ReconcilerUtilInterface = ui
}
Expand All @@ -98,8 +98,8 @@ func (r *KubeflowJobReconciler) OverrideForJobInterface(ui ReconcilerUtilInterfa
}
}

// GenLabels returns labels used for this job (based on the name of this KubeflowJob)
func (r *KubeflowJobReconciler) GenLabels(jobName string) map[string]string {
// GenLabels returns labels used for this job (based on the name of this generic training job)
func (r *JobReconciler) GenLabels(jobName string) map[string]string {
jobName = strings.Replace(jobName, "/", "-", -1)
return map[string]string{
// TODO(#149): Remove deprecated labels.
Expand All @@ -110,13 +110,13 @@ func (r *KubeflowJobReconciler) GenLabels(jobName string) map[string]string {
}
}

// GetGroupNameLabelValue returns the Group Name for the KubeflowJob, which is "kubeflow.org"
func (r *KubeflowJobReconciler) GetGroupNameLabelValue() string {
// GetGroupNameLabelValue returns the Group Name for the generic training job, which is "kubeflow.org"
func (r *JobReconciler) GetGroupNameLabelValue() string {
return GroupName
}

// ReconcileJob reconciles KubeflowJob
func (r *KubeflowJobReconciler) ReconcileJob(
// ReconcileJob reconciles generic training job
func (r *JobReconciler) ReconcileJob(
ctx context.Context,
job client.Object,
replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec,
Expand Down Expand Up @@ -262,26 +262,26 @@ func (r *KubeflowJobReconciler) ReconcileJob(
return nil
}

// DeleteJob deletes this KubeflowJob
func (r *KubeflowJobReconciler) DeleteJob(job client.Object) error {
// DeleteJob deletes this generic training job
func (r *JobReconciler) DeleteJob(job client.Object) error {
return r.Delete(context.Background(), job)
}

// RecordAbnormalPods records abnormal pods during the reconciliation of jobs
func (r *KubeflowJobReconciler) RecordAbnormalPods(activePods []*corev1.Pod, object client.Object) {
func (r *JobReconciler) RecordAbnormalPods(activePods []*corev1.Pod, object client.Object) {
core.RecordAbnormalPods(activePods, object, r.GetRecorder())
}

// SetStatusForSuccessJob sets the status for job that succeed
func (r *KubeflowJobReconciler) SetStatusForSuccessJob(status *commonv1.JobStatus) {
func (r *JobReconciler) SetStatusForSuccessJob(status *commonv1.JobStatus) {
for rytpe := range status.ReplicaStatuses {
status.ReplicaStatuses[rytpe].Succeeded += status.ReplicaStatuses[rytpe].Active
status.ReplicaStatuses[rytpe].Active = 0
}
}

// UpdateJobStatus updates the status of this KubeflowJob WITHOUT pushing the updated status to the APIServer
func (r *KubeflowJobReconciler) UpdateJobStatus(
// UpdateJobStatus updates the status of this generic training job WITHOUT pushing the updated status to the APIServer
func (r *JobReconciler) UpdateJobStatus(
job client.Object,
replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec,
jobStatus *commonv1.JobStatus) error {
Expand Down Expand Up @@ -371,13 +371,13 @@ func (r *KubeflowJobReconciler) UpdateJobStatus(
return nil
}

// UpdateJobStatusInAPIServer updates the status of this KubeflowJob in APIServer
func (r *KubeflowJobReconciler) UpdateJobStatusInAPIServer(ctx context.Context, job client.Object) error {
// UpdateJobStatusInAPIServer updates the status of this generic training job in APIServer
func (r *JobReconciler) UpdateJobStatusInAPIServer(ctx context.Context, job client.Object) error {
return r.Status().Update(ctx, job)
}

// CleanupResources cleans up all resources associated with this KubeflowJob
func (r *KubeflowJobReconciler) CleanupResources(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error {
// CleanupResources cleans up all resources associated with this generic training job
func (r *JobReconciler) CleanupResources(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error {
if *runPolicy.CleanPodPolicy == commonv1.CleanPodPolicyNone {
return nil
}
Expand Down Expand Up @@ -418,8 +418,8 @@ func (r *KubeflowJobReconciler) CleanupResources(runPolicy *commonv1.RunPolicy,
return nil
}

// CleanupJob cleans up all resources associated with this KubeflowJob as well as the job itself
func (r *KubeflowJobReconciler) CleanupJob(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error {
// CleanupJob cleans up all resources associated with this generic training job as well as the job itself
func (r *JobReconciler) CleanupJob(runPolicy *commonv1.RunPolicy, status commonv1.JobStatus, job client.Object) error {
currentTime := time.Now()

ttl := runPolicy.TTLSecondsAfterFinished
Expand Down Expand Up @@ -447,54 +447,54 @@ func (r *KubeflowJobReconciler) CleanupJob(runPolicy *commonv1.RunPolicy, status
return nil
}

// IsFlagReplicaTypeForJobStatus checks if this replicaType is the flag replicaType for the status of KubeflowJob
func (r *KubeflowJobReconciler) IsFlagReplicaTypeForJobStatus(rtype string) bool {
// IsFlagReplicaTypeForJobStatus checks if this replicaType is the flag replicaType for the status of generic training job
func (r *JobReconciler) IsFlagReplicaTypeForJobStatus(rtype string) bool {
logrus.Warnf(WarnDefaultImplementationTemplate, "IsFlagReplicaTypeForJobStatus")
return true
}

// IsJobSucceeded checks if this KubeflowJob succeeded
func (r *KubeflowJobReconciler) IsJobSucceeded(status commonv1.JobStatus) bool {
// IsJobSucceeded checks if this generic training job succeeded
func (r *JobReconciler) IsJobSucceeded(status commonv1.JobStatus) bool {
return commonutil.IsSucceeded(status)
}

// IsJobFailed checks if this KubeflowJob failed
func (r *KubeflowJobReconciler) IsJobFailed(status commonv1.JobStatus) bool {
// IsJobFailed checks if this generic training job failed
func (r *JobReconciler) IsJobFailed(status commonv1.JobStatus) bool {
return commonutil.IsFailed(status)
}

// ShouldCleanUp checks if resources associated with this KubeflowJob should be cleaned up
func (r *KubeflowJobReconciler) ShouldCleanUp(status commonv1.JobStatus) bool {
// ShouldCleanUp checks if resources associated with this generic training job should be cleaned up
func (r *JobReconciler) ShouldCleanUp(status commonv1.JobStatus) bool {
return r.IsJobSucceeded(status) || r.IsJobFailed(status)
}

// PastBackoffLimit checks if this KubeflowJob has past backoff limit
func (r *KubeflowJobReconciler) PastBackoffLimit(jobName string, runPolicy *commonv1.RunPolicy,
// PastBackoffLimit checks if this generic training job has past backoff limit
func (r *JobReconciler) PastBackoffLimit(jobName string, runPolicy *commonv1.RunPolicy,
replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, pods []*corev1.Pod) (bool, error) {
return core.PastBackoffLimit(jobName, runPolicy, replicas, pods, r.FilterPodsForReplicaType)
}

// PastActiveDeadline checks if this KubeflowJob has ActiveDeadlineSeconds field set and if it is exceeded.
func (r *KubeflowJobReconciler) PastActiveDeadline(runPolicy *commonv1.RunPolicy, jobStatus *commonv1.JobStatus) bool {
// PastActiveDeadline checks if this generic training job has ActiveDeadlineSeconds field set and if it is exceeded.
func (r *JobReconciler) PastActiveDeadline(runPolicy *commonv1.RunPolicy, jobStatus *commonv1.JobStatus) bool {
return core.PastActiveDeadline(runPolicy, *jobStatus)
}

func (r *KubeflowJobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) {
func (r *JobReconciler) GetJob(ctx context.Context, req ctrl.Request) (client.Object, error) {
panic("implement me")
}

func (r *KubeflowJobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error) {
func (r *JobReconciler) ExtractReplicasSpec(job client.Object) (map[commonv1.ReplicaType]*commonv1.ReplicaSpec, error) {
panic("implement me")
}

func (r *KubeflowJobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error) {
func (r *JobReconciler) ExtractRunPolicy(job client.Object) (*commonv1.RunPolicy, error) {
panic("implement me")
}

func (r *KubeflowJobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error) {
func (r *JobReconciler) ExtractJobStatus(job client.Object) (*commonv1.JobStatus, error) {
panic("implement me")
}

func (r *KubeflowJobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool {
func (r *JobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool {
panic("implement me")
}
38 changes: 19 additions & 19 deletions pkg/reconciler.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ var (
})
)

// KubeflowPodReconciler defines a Pod Reconciler for KubeflowJob
type KubeflowPodReconciler struct {
// PodReconciler defines a Pod Reconciler for generic training job
type PodReconciler struct {
client.Client
ReconcilerUtilInterface
GangSchedulingInterface
JobInterface
}

// OverrideForPodInterface resets ReconcilerUtilInterface, GangSchedulingInterface, JobInterface for KubeflowPodReconciler
func (r *KubeflowPodReconciler) OverrideForPodInterface(ui ReconcilerUtilInterface, gi GangSchedulingInterface, ji JobInterface) {
// OverrideForPodInterface resets ReconcilerUtilInterface, GangSchedulingInterface, JobInterface for PodReconciler
func (r *PodReconciler) OverrideForPodInterface(ui ReconcilerUtilInterface, gi GangSchedulingInterface, ji JobInterface) {
if ui != nil {
r.ReconcilerUtilInterface = ui
}
Expand All @@ -73,50 +73,50 @@ func (r *KubeflowPodReconciler) OverrideForPodInterface(ui ReconcilerUtilInterfa
}
}

// BareKubeflowPodReconciler returns a pointer of BareKubeflowPodReconciler with minimal implementation
func BareKubeflowPodReconciler(client client.Client) *KubeflowPodReconciler {
return &KubeflowPodReconciler{Client: client}
// BarePodReconciler returns a pointer of BarePodReconciler with minimal implementation
func BarePodReconciler(client client.Client) *PodReconciler {
return &PodReconciler{Client: client}
}

// GenPodName returns the name of the Pod based on jobName, replicaType and its index
func (r *KubeflowPodReconciler) GenPodName(jobName string, rtype string, index string) string {
func (r *PodReconciler) GenPodName(jobName string, rtype string, index string) string {
return core.GenGeneralName(jobName, rtype, index)
}

// GetDefaultContainerName returns the default name of the container
func (r *KubeflowPodReconciler) GetDefaultContainerName() string {
func (r *PodReconciler) GetDefaultContainerName() string {
return DefaultContainerName
}

// GetPodsForJob returns all Pods associated with this job
func (r *KubeflowPodReconciler) GetPodsForJob(ctx context.Context, job client.Object) ([]*corev1.Pod, error) {
func (r *PodReconciler) GetPodsForJob(ctx context.Context, job client.Object) ([]*corev1.Pod, error) {
podList := &corev1.PodList{}
err := r.List(ctx, podList, client.MatchingLabels(r.GenLabels(job.GetName())))
if err != nil {
return nil, err
}

var pods []*corev1.Pod = nil
for _, pod := range podList.Items {
pods = append(pods, &pod)
var pods []*corev1.Pod
for idx := range podList.Items {
pods = append(pods, &podList.Items[idx])
}

return pods, nil
// TODO: (zw0610) adding Claiming Pods
}

// GetPodSlices generates podSlice from all Pods listed for this job
func (r *KubeflowPodReconciler) GetPodSlices(pods []*corev1.Pod, replicas int, logger *log.Entry) [][]*corev1.Pod {
func (r *PodReconciler) GetPodSlices(pods []*corev1.Pod, replicas int, logger *log.Entry) [][]*corev1.Pod {
return core.GetPodSlices(pods, replicas, logger)
}

// FilterPodsForReplicaType filters out Pods for this replicaType
func (r *KubeflowPodReconciler) FilterPodsForReplicaType(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, error) {
func (r *PodReconciler) FilterPodsForReplicaType(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, error) {
return core.FilterPodsForReplicaType(pods, replicaType)
}

// ReconcilePods reconciles Pods for this job
func (r *KubeflowPodReconciler) ReconcilePods(
func (r *PodReconciler) ReconcilePods(
ctx context.Context,
job client.Object,
jobStatus *commonv1.JobStatus,
Expand Down Expand Up @@ -198,7 +198,7 @@ func (r *KubeflowPodReconciler) ReconcilePods(
}

// CreateNewPod generate Pods for this job and submits creation request to APIServer
func (r *KubeflowPodReconciler) CreateNewPod(job client.Object, rt string, index string,
func (r *PodReconciler) CreateNewPod(job client.Object, rt string, index string,
spec *commonv1.ReplicaSpec, masterRole bool, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) error {

logger := commonutil.LoggerForReplica(job, rt)
Expand Down Expand Up @@ -260,7 +260,7 @@ func (r *KubeflowPodReconciler) CreateNewPod(job client.Object, rt string, index
}

// DeletePod delete a Pod specified by name and namespace
func (r *KubeflowPodReconciler) DeletePod(ctx context.Context, ns string, name string) error {
func (r *PodReconciler) DeletePod(ctx context.Context, ns string, name string) error {
pod := &corev1.Pod{}
pod.Name = name
pod.Namespace = ns
Expand All @@ -272,7 +272,7 @@ func (r *KubeflowPodReconciler) DeletePod(ctx context.Context, ns string, name s
}

// DecoratePod decorates podTemplate before a Pod is submitted to the APIServer
func (r *KubeflowPodReconciler) DecoratePod(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object) {
func (r *PodReconciler) DecoratePod(rtype string, podTemplate *corev1.PodTemplateSpec, job client.Object) {
// Default implementation applies nothing to podTemplate
return
}
Loading