Skip to content

Commit

Permalink
Use the same reasons for Condition and Event (#1854)
Browse files Browse the repository at this point in the history
  • Loading branch information
tenzen-y authored Jul 10, 2023
1 parent fcdf9a3 commit 9e084ff
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 140 deletions.
5 changes: 3 additions & 2 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (jc *JobController) ReconcileJobs(
utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err))
return err
}
jobKind := jc.Controller.GetAPIGroupVersionKind().Kind
// Reset expectations
// 1. Since `ReconcileJobs` is called, we expect that previous expectations are all satisfied,
// and it's safe to reset the expectations
Expand Down Expand Up @@ -222,9 +223,9 @@ func (jc *JobController) ReconcileJobs(
}
}

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.JobFailedReason, failureMessage)
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)

if err := commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.JobFailedReason, failureMessage); err != nil {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil {
log.Infof("Append job condition error: %v", err)
return err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,10 @@ func (jc *JobController) ReconcilePods(

msg := fmt.Sprintf("job %s is restarting because %s replica(s) failed.",
metaObject.GetName(), rType)
jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, "JobRestarting", msg)
if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, "JobRestarting", msg); err != nil {
jc.Recorder.Event(runtimeObject, v1.EventTypeWarning,
commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg)
if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting,
commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg); err != nil {
commonutil.LoggerForJob(metaObject).Infof("Append job condition error: %v", err)
return err
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/controller.v1/mpi/mpijob.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,7 @@ const (
// podTemplateSchedulerNameReason is the warning reason when other scheduler name is set
// in pod templates with gang-scheduling enabled
podTemplateSchedulerNameReason = "SettedPodTemplateSchedulerName"
)

const (
// mpiJobCreatedReason is added in a mpijob when it is created.
mpiJobCreatedReason = "MPIJobCreated"
// mpiJobSucceededReason is added in a mpijob when it is succeeded.
mpiJobSucceededReason = "MPIJobSucceeded"
// mpiJobRunningReason is added in a mpijob when it is running.
mpiJobRunningReason = "MPIJobRunning"
// mpiJobFailedReason is added in a mpijob when it is failed.
mpiJobFailedReason = "MPIJobFailed"
// mpiJobEvict
mpiJobEvict = "MPIJobEvicted"
)
Expand Down
38 changes: 21 additions & 17 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

if err = kubeflowv1.ValidateV1MpiJobSpec(&mpijob.Spec); err != nil {
logger.Error(err, "MPIJob failed validation")
jc.Recorder.Eventf(mpijob, corev1.EventTypeWarning, commonutil.JobFailedValidationReason, "MPIJob failed validation because %s", err)
jc.Recorder.Eventf(mpijob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedValidationReason),
"MPIJob failed validation because %s", err)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -319,7 +320,8 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, mpiJobCreatedReason, msg); err != nil {
if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated,
commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg); err != nil {
log.Log.Error(err, "append job condition error")
return false
}
Expand Down Expand Up @@ -395,10 +397,10 @@ func (jc *MPIJobReconciler) ReconcilePods(
if launcher == nil {
launcher, err = jc.KubeClientSet.CoreV1().Pods(mpiJob.Namespace).Create(context.Background(), jc.newLauncher(mpiJob, ctlrconfig.Config.MPIKubectlDeliveryImage, isGPULauncher), metav1.CreateOptions{})
if err != nil {
jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "launcher pod created failed: %v", err)
jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason), "launcher pod created failed: %v", err)
return err
} else {
jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, mpiJobRunningReason, "launcher pod created success: %v", launcher.Name)
jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), "launcher pod created success: %v", launcher.Name)
}
}
}
Expand All @@ -418,12 +420,12 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch
if isPodSucceeded(launcher) {
mpiJob.Status.ReplicaStatuses[kubeflowv1.MPIJobReplicaTypeLauncher].Succeeded = 1
msg := fmt.Sprintf("MPIJob %s/%s successfully completed.", mpiJob.Namespace, mpiJob.Name)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, mpiJobSucceededReason, msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobPlural, commonutil.JobSucceededReason), msg)
if mpiJob.Status.CompletionTime == nil {
now := metav1.Now()
mpiJob.Status.CompletionTime = &now
}
err := updateMPIJobConditions(mpiJob, kubeflowv1.JobSucceeded, mpiJobSucceededReason, msg)
err := updateMPIJobConditions(mpiJob, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
if err != nil {
return err
}
Expand All @@ -432,7 +434,7 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch
msg := fmt.Sprintf("MPIJob %s/%s has failed", mpiJob.Namespace, mpiJob.Name)
reason := launcher.Status.Reason
if reason == "" {
reason = mpiJobFailedReason
reason = commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)
}
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, reason, msg)
if reason == "Evicted" {
Expand Down Expand Up @@ -482,11 +484,11 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch

if launcher != nil && launcher.Status.Phase == corev1.PodRunning && running == len(worker) {
msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name)
err := updateMPIJobConditions(mpiJob, kubeflowv1.JobRunning, mpiJobRunningReason, msg)
err := updateMPIJobConditions(mpiJob, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg)
if err != nil {
return err
}
jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, "MPIJobRunning", "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name)
jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name)
}
return nil
}
Expand Down Expand Up @@ -581,7 +583,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if rtype == kubeflowv1.MPIJobReplicaTypeLauncher {
if running > 0 {
msg := fmt.Sprintf("MPIJob %s is running.", mpiJob.Name)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.JobRunningReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand All @@ -591,12 +593,12 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if expected == 0 {
msg := fmt.Sprintf("MPIJob %s is successfully completed.", mpiJob.Name)
logrus.Info(msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.JobSucceededReason, msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.JobSucceededReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand All @@ -608,21 +610,22 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if failed > 0 {
if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode {
msg := fmt.Sprintf("MPIJob %s is restarting because %d %s replica(s) failed.", mpiJob.Name, failed, rtype)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.JobRestartingReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.JobRestartingReason, msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.RestartedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
} else {
msg := fmt.Sprintf("MPIJob %s is failed because %d %s replica(s) failed.", mpiJob.Name, failed, rtype)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.JobFailedReason, msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.JobFailedReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed,
commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand Down Expand Up @@ -917,7 +920,8 @@ func (jc *MPIJobReconciler) getOrCreateWorker(mpiJob *kubeflowv1.MPIJob) ([]*cor
// can attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil && !errors.IsNotFound(err) {
jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "worker pod created failed: %v", err)
jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason),
"worker pod created failed: %v", err)
return nil, err
}
// If the worker is not controlled by this MPIJob resource, we should log
Expand Down
28 changes: 10 additions & 18 deletions pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,6 @@ import (

const (
controllerName = "mxjob-controller"

// mxJobSucceededReason is added in a mxjob when it is succeeded.
mxJobSucceededReason = "MXJobSucceeded"
// mxJobRunningReason is added in a mxjob when it is running.
mxJobRunningReason = "MXJobRunning"
// mxJobFailedReason is added in a mxjob when it is failed.
mxJobFailedReason = "MXJobFailed"
// mxJobRestarting is added in a mxjob when it is restarting.
mxJobRestartingReason = "MXJobRestarting"
)

// NewReconciler creates a MXJob Reconciler
Expand Down Expand Up @@ -133,7 +124,8 @@ func (r *MXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl

if err = kubeflowv1.ValidateV1MXJob(mxjob); err != nil {
logger.Error(err, "MXJob failed validation")
r.Recorder.Eventf(mxjob, corev1.EventTypeWarning, commonutil.JobFailedValidationReason, "MXJob failed validation because %s", err)
r.Recorder.Eventf(mxjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedValidationReason),
"MXJob failed validation because %s", err)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -379,7 +371,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
if rtype == kubeflowv1.MXJobReplicaTypeScheduler || singleTraining {
if running > 0 {
msg := fmt.Sprintf("MXJob %s is running.", mxjob.Name)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, mxJobRunningReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRunningReason), msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
Expand All @@ -388,12 +380,12 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
// when scheduler is succeeded, the job is finished.
if expected == 0 {
msg := fmt.Sprintf("MXJob %s is successfully completed.", mxjob.Name)
r.Recorder.Event(mxjob, corev1.EventTypeNormal, mxJobSucceededReason, msg)
r.Recorder.Event(mxjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, mxJobSucceededReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
Expand All @@ -405,21 +397,21 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
if failed > 0 {
if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode {
msg := fmt.Sprintf("mxjob %s is restarting because %d %s replica(s) failed.", mxjob.Name, failed, rtype)
r.Recorder.Event(mxjob, corev1.EventTypeWarning, mxJobRestartingReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, mxJobRestartingReason, msg)
r.Recorder.Event(mxjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg)
if err != nil {
logrus.Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.RestartedJobsCounterInc(mxjob.Namespace, r.GetFrameworkName())
} else {
msg := fmt.Sprintf("mxjob %s is failed because %d %s replica(s) failed.", mxjob.Name, failed, rtype)
r.Recorder.Event(mxjob, corev1.EventTypeNormal, mxJobFailedReason, msg)
r.Recorder.Event(mxjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, mxJobFailedReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg)
if err != nil {
logrus.Infof("Append job condition error: %v", err)
return err
Expand Down Expand Up @@ -486,7 +478,7 @@ func (r *MXJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
msg := fmt.Sprintf("MXJob %s is created.", e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mxJob.Namespace, r.GetFrameworkName())
if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, "MXJobCreated", msg); err != nil {
if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobCreatedReason), msg); err != nil {
logrus.Error(err, "append job condition error")
return false
}
Expand Down
Loading

0 comments on commit 9e084ff

Please sign in to comment.