Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the same reasons for Condition and Event #1854

Merged
merged 1 commit into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (jc *JobController) ReconcileJobs(
utilruntime.HandleError(fmt.Errorf("couldn't get key for job object %#v: %v", job, err))
return err
}
jobKind := jc.Controller.GetAPIGroupVersionKind().Kind
// Reset expectations
// 1. Since `ReconcileJobs` is called, we expect that previous expectations are all satisfied,
// and it's safe to reset the expectations
Expand Down Expand Up @@ -222,9 +223,9 @@ func (jc *JobController) ReconcileJobs(
}
}

jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.JobFailedReason, failureMessage)
jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage)

if err := commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.JobFailedReason, failureMessage); err != nil {
if err = commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage); err != nil {
log.Infof("Append job condition error: %v", err)
return err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller.v1/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,10 @@ func (jc *JobController) ReconcilePods(

msg := fmt.Sprintf("job %s is restarting because %s replica(s) failed.",
metaObject.GetName(), rType)
jc.Recorder.Event(runtimeObject, v1.EventTypeWarning, "JobRestarting", msg)
if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting, "JobRestarting", msg); err != nil {
jc.Recorder.Event(runtimeObject, v1.EventTypeWarning,
commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg)
if err := commonutil.UpdateJobConditions(jobStatus, apiv1.JobRestarting,
commonutil.NewReason(jc.Controller.GetAPIGroupVersionKind().Kind, commonutil.JobRestartingReason), msg); err != nil {
commonutil.LoggerForJob(metaObject).Infof("Append job condition error: %v", err)
return err
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/controller.v1/mpi/mpijob.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,7 @@ const (
// podTemplateSchedulerNameReason is the warning reason when other scheduler name is set
// in pod templates with gang-scheduling enabled
podTemplateSchedulerNameReason = "SettedPodTemplateSchedulerName"
)

const (
// mpiJobCreatedReason is added in a mpijob when it is created.
mpiJobCreatedReason = "MPIJobCreated"
// mpiJobSucceededReason is added in a mpijob when it is succeeded.
mpiJobSucceededReason = "MPIJobSucceeded"
// mpiJobRunningReason is added in a mpijob when it is running.
mpiJobRunningReason = "MPIJobRunning"
// mpiJobFailedReason is added in a mpijob when it is failed.
mpiJobFailedReason = "MPIJobFailed"
// mpiJobEvict
mpiJobEvict = "MPIJobEvicted"
)
Expand Down
38 changes: 21 additions & 17 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

if err = kubeflowv1.ValidateV1MpiJobSpec(&mpijob.Spec); err != nil {
logger.Error(err, "MPIJob failed validation")
jc.Recorder.Eventf(mpijob, corev1.EventTypeWarning, commonutil.JobFailedValidationReason, "MPIJob failed validation because %s", err)
jc.Recorder.Eventf(mpijob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedValidationReason),
"MPIJob failed validation because %s", err)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -319,7 +320,8 @@ func (jc *MPIJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated, mpiJobCreatedReason, msg); err != nil {
if err := commonutil.UpdateJobConditions(&mpiJob.Status, kubeflowv1.JobCreated,
commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobCreatedReason), msg); err != nil {
log.Log.Error(err, "append job condition error")
return false
}
Expand Down Expand Up @@ -395,10 +397,10 @@ func (jc *MPIJobReconciler) ReconcilePods(
if launcher == nil {
launcher, err = jc.KubeClientSet.CoreV1().Pods(mpiJob.Namespace).Create(context.Background(), jc.newLauncher(mpiJob, ctlrconfig.Config.MPIKubectlDeliveryImage, isGPULauncher), metav1.CreateOptions{})
if err != nil {
jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "launcher pod created failed: %v", err)
jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason), "launcher pod created failed: %v", err)
return err
} else {
jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, mpiJobRunningReason, "launcher pod created success: %v", launcher.Name)
jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), "launcher pod created success: %v", launcher.Name)
}
}
}
Expand All @@ -418,12 +420,12 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch
if isPodSucceeded(launcher) {
mpiJob.Status.ReplicaStatuses[kubeflowv1.MPIJobReplicaTypeLauncher].Succeeded = 1
msg := fmt.Sprintf("MPIJob %s/%s successfully completed.", mpiJob.Namespace, mpiJob.Name)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, mpiJobSucceededReason, msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobPlural, commonutil.JobSucceededReason), msg)
if mpiJob.Status.CompletionTime == nil {
now := metav1.Now()
mpiJob.Status.CompletionTime = &now
}
err := updateMPIJobConditions(mpiJob, kubeflowv1.JobSucceeded, mpiJobSucceededReason, msg)
err := updateMPIJobConditions(mpiJob, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
if err != nil {
return err
}
Expand All @@ -432,7 +434,7 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch
msg := fmt.Sprintf("MPIJob %s/%s has failed", mpiJob.Namespace, mpiJob.Name)
reason := launcher.Status.Reason
if reason == "" {
reason = mpiJobFailedReason
reason = commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)
}
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, reason, msg)
if reason == "Evicted" {
Expand Down Expand Up @@ -482,11 +484,11 @@ func (jc *MPIJobReconciler) updateMPIJobStatus(mpiJob *kubeflowv1.MPIJob, launch

if launcher != nil && launcher.Status.Phase == corev1.PodRunning && running == len(worker) {
msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name)
err := updateMPIJobConditions(mpiJob, kubeflowv1.JobRunning, mpiJobRunningReason, msg)
err := updateMPIJobConditions(mpiJob, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg)
if err != nil {
return err
}
jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, "MPIJobRunning", "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name)
jc.Recorder.Eventf(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name)
}
return nil
}
Expand Down Expand Up @@ -581,7 +583,7 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if rtype == kubeflowv1.MPIJobReplicaTypeLauncher {
if running > 0 {
msg := fmt.Sprintf("MPIJob %s is running.", mpiJob.Name)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.JobRunningReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRunningReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand All @@ -591,12 +593,12 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if expected == 0 {
msg := fmt.Sprintf("MPIJob %s is successfully completed.", mpiJob.Name)
logrus.Info(msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.JobSucceededReason, msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.JobSucceededReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobSucceededReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand All @@ -608,21 +610,22 @@ func (jc *MPIJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubefl
if failed > 0 {
if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode {
msg := fmt.Sprintf("MPIJob %s is restarting because %d %s replica(s) failed.", mpiJob.Name, failed, rtype)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.JobRestartingReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.JobRestartingReason, msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobRestartingReason), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.RestartedJobsCounterInc(mpiJob.Namespace, jc.GetFrameworkName())
} else {
msg := fmt.Sprintf("MPIJob %s is failed because %d %s replica(s) failed.", mpiJob.Name, failed, rtype)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.JobFailedReason, msg)
jc.Recorder.Event(mpiJob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.JobFailedReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed,
commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason)), msg)
if err != nil {
commonutil.LoggerForJob(mpiJob).Infof("Append job condition error: %v", err)
return err
Expand Down Expand Up @@ -917,7 +920,8 @@ func (jc *MPIJobReconciler) getOrCreateWorker(mpiJob *kubeflowv1.MPIJob) ([]*cor
// can attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
if err != nil && !errors.IsNotFound(err) {
jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "worker pod created failed: %v", err)
jc.Recorder.Eventf(mpiJob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedReason),
"worker pod created failed: %v", err)
return nil, err
}
// If the worker is not controlled by this MPIJob resource, we should log
Expand Down
28 changes: 10 additions & 18 deletions pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,6 @@ import (

const (
controllerName = "mxjob-controller"

// mxJobSucceededReason is added in a mxjob when it is succeeded.
mxJobSucceededReason = "MXJobSucceeded"
// mxJobRunningReason is added in a mxjob when it is running.
mxJobRunningReason = "MXJobRunning"
// mxJobFailedReason is added in a mxjob when it is failed.
mxJobFailedReason = "MXJobFailed"
// mxJobRestarting is added in a mxjob when it is restarting.
mxJobRestartingReason = "MXJobRestarting"
)

// NewReconciler creates a MXJob Reconciler
Expand Down Expand Up @@ -133,7 +124,8 @@ func (r *MXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl

if err = kubeflowv1.ValidateV1MXJob(mxjob); err != nil {
logger.Error(err, "MXJob failed validation")
r.Recorder.Eventf(mxjob, corev1.EventTypeWarning, commonutil.JobFailedValidationReason, "MXJob failed validation because %s", err)
r.Recorder.Eventf(mxjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MPIJobKind, commonutil.JobFailedValidationReason),
"MXJob failed validation because %s", err)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -379,7 +371,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
if rtype == kubeflowv1.MXJobReplicaTypeScheduler || singleTraining {
if running > 0 {
msg := fmt.Sprintf("MXJob %s is running.", mxjob.Name)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, mxJobRunningReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRunning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRunningReason), msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
Expand All @@ -388,12 +380,12 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
// when scheduler is succeeded, the job is finished.
if expected == 0 {
msg := fmt.Sprintf("MXJob %s is successfully completed.", mxjob.Name)
r.Recorder.Event(mxjob, corev1.EventTypeNormal, mxJobSucceededReason, msg)
r.Recorder.Event(mxjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, mxJobSucceededReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobSucceeded, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobSucceededReason), msg)
if err != nil {
logrus.Infof("Append mxjob condition error: %v", err)
return err
Expand All @@ -405,21 +397,21 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[kubeflow
if failed > 0 {
if spec.RestartPolicy == kubeflowv1.RestartPolicyExitCode {
msg := fmt.Sprintf("mxjob %s is restarting because %d %s replica(s) failed.", mxjob.Name, failed, rtype)
r.Recorder.Event(mxjob, corev1.EventTypeWarning, mxJobRestartingReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, mxJobRestartingReason, msg)
r.Recorder.Event(mxjob, corev1.EventTypeWarning, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobRestarting, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobRestartingReason), msg)
if err != nil {
logrus.Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.RestartedJobsCounterInc(mxjob.Namespace, r.GetFrameworkName())
} else {
msg := fmt.Sprintf("mxjob %s is failed because %d %s replica(s) failed.", mxjob.Name, failed, rtype)
r.Recorder.Event(mxjob, corev1.EventTypeNormal, mxJobFailedReason, msg)
r.Recorder.Event(mxjob, corev1.EventTypeNormal, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, mxJobFailedReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, kubeflowv1.JobFailed, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobFailedReason), msg)
if err != nil {
logrus.Infof("Append job condition error: %v", err)
return err
Expand Down Expand Up @@ -486,7 +478,7 @@ func (r *MXJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
msg := fmt.Sprintf("MXJob %s is created.", e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(mxJob.Namespace, r.GetFrameworkName())
if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, "MXJobCreated", msg); err != nil {
if err := commonutil.UpdateJobConditions(&mxJob.Status, kubeflowv1.JobCreated, commonutil.NewReason(kubeflowv1.MXJobKind, commonutil.JobCreatedReason), msg); err != nil {
logrus.Error(err, "append job condition error")
return false
}
Expand Down
Loading