From eebc8380ca1d6b8746e230c93898dbbd99f9ecb4 Mon Sep 17 00:00:00 2001 From: Ce Gao Date: Wed, 13 Jun 2018 13:24:29 +0800 Subject: [PATCH] status: Support chief (#637) * pkg: Support chief Signed-off-by: Ce Gao * status: Avoid conflicts Avoid the situation that failed and succeeded are all in conditions Signed-off-by: Ce Gao --- pkg/controller.v2/controller_helper.go | 7 ++ pkg/controller.v2/controller_status.go | 97 ++++++++++++++++++-------- 2 files changed, 73 insertions(+), 31 deletions(-) diff --git a/pkg/controller.v2/controller_helper.go b/pkg/controller.v2/controller_helper.go index deada3fd4f..2ff3bedd46 100644 --- a/pkg/controller.v2/controller_helper.go +++ b/pkg/controller.v2/controller_helper.go @@ -95,3 +95,10 @@ func getPortFromTFJob(tfJob *tfv1alpha2.TFJob, rtype tfv1alpha2.TFReplicaType) ( } return -1, errPortNotFound } + +func containChiefSpec(tfJob *tfv1alpha2.TFJob) bool { + if _, ok := tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeChief]; ok { + return true + } + return false +} diff --git a/pkg/controller.v2/controller_status.go b/pkg/controller.v2/controller_status.go index 8378146ceb..a477a6fdb2 100644 --- a/pkg/controller.v2/controller_status.go +++ b/pkg/controller.v2/controller_status.go @@ -42,43 +42,78 @@ func (tc *TFJobController) updateStatus(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha running := int(tfjob.Status.TFReplicaStatuses[rtype].Active) failed := int(tfjob.Status.TFReplicaStatuses[rtype].Failed) - if rtype == tfv1alpha2.TFReplicaTypeWorker { - // All workers are running, set StartTime. - if running == replicas { - now := metav1.Now() - tfjob.Status.StartTime = &now - } + // All workers are running, set StartTime. + if running == replicas { + now := metav1.Now() + tfjob.Status.StartTime = &now + } - // Some workers are still running, leave a running condition. - if running > 0 { - msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name) - err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg) - if err != nil { - loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err) - return err + if containChiefSpec(tfjob) { + if rtype == tfv1alpha2.TFReplicaTypeChief { + if running > 0 { + msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name) + err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg) + if err != nil { + loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err) + return err + } + } + if expected == 0 { + msg := fmt.Sprintf("TFJob %s is successfully completed.", tfjob.Name) + now := metav1.Now() + tfjob.Status.CompletionTime = &now + err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobSucceeded, tfJobSucceededReason, msg) + if err != nil { + loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err) + return err + } } - } - // All workers are succeeded, leave a succeeded condition. - if expected == 0 { - msg := fmt.Sprintf("TFJob %s is successfully completed.", tfjob.Name) - now := metav1.Now() - tfjob.Status.CompletionTime = &now - err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobSucceeded, tfJobSucceededReason, msg) - if err != nil { - loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err) - return err + // Some workers or pss are failed , leave a failed condition. + if failed > 0 { + msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name) + err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg) + if err != nil { + loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err) + return err + } } + return nil } - } + } else { + if rtype == tfv1alpha2.TFReplicaTypeWorker { + // Some workers are still running, leave a running condition. + if running > 0 { + msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name) + err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg) + if err != nil { + loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err) + return err + } + } + + // All workers are succeeded, leave a succeeded condition. + if expected == 0 { + msg := fmt.Sprintf("TFJob %s is successfully completed.", tfjob.Name) + now := metav1.Now() + tfjob.Status.CompletionTime = &now + err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobSucceeded, tfJobSucceededReason, msg) + if err != nil { + loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err) + return err + } + } - // Some workers or pss are failed , leave a failed condition. - if failed > 0 { - msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name) - err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg) - if err != nil { - loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err) - return err + // Some workers or pss are failed , leave a failed condition. + if failed > 0 { + msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name) + err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg) + if err != nil { + loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err) + return err + } + } + return nil } } return nil