diff --git a/pkg/controller.v1/tensorflow/controller.go b/pkg/controller.v1/tensorflow/controller.go index a63a41ed67..6b7a963e02 100644 --- a/pkg/controller.v1/tensorflow/controller.go +++ b/pkg/controller.v1/tensorflow/controller.go @@ -31,8 +31,8 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" - "github.com/kubeflow/tf-operator/cmd/tf-operator.v1/app/options" common "github.com/kubeflow/common/job_controller/api/v1" + "github.com/kubeflow/tf-operator/cmd/tf-operator.v1/app/options" tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1" tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned" tfjobscheme "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned/scheme" @@ -354,6 +354,38 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error { return err } + // If the TFJob is terminated, delete all pods and services. + if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) { + if err := tc.deletePodsAndServices(tfjob, pods); err != nil { + return err + } + + if err := tc.cleanupTFJob(tfjob); err != nil { + return err + } + + if tc.Config.EnableGangScheduling { + if err := tc.DeletePodGroup(tfjob); err != nil { + return err + } + } + + // At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status. + // If any replicas are still Active, set their status to succeeded. + if isSucceeded(tfjob.Status) { + for rtype := range tfjob.Status.ReplicaStatuses { + tfjob.Status.ReplicaStatuses[rtype].Succeeded += tfjob.Status.ReplicaStatuses[rtype].Active + tfjob.Status.ReplicaStatuses[rtype].Active = 0 + } + } + // no need to update the tfjob if the status hasn't changed since last time even the tfjob is not running. + + if !apiequality.Semantic.DeepEqual(*oldStatus, tfjob.Status) { + return tc.updateStatusHandler(tfjob) + } + return nil + } + // retrieve the previous number of retry previousRetry := tc.WorkQueue.NumRequeues(tfjobKey) @@ -392,25 +424,13 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error { tfJobExceedsLimit = true } - // If the TFJob is terminated, delete all pods and services. - if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) || tfJobExceedsLimit { + if tfJobExceedsLimit { + // If the TFJob exceeds backoff limit or is past active deadline + // delete all pods and services, then set the status to failed if err := tc.deletePodsAndServices(tfjob, pods); err != nil { return err } - if tfJobExceedsLimit { - tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobFailedReason, failureMessage) - if tfjob.Status.CompletionTime == nil { - now := metav1.Now() - tfjob.Status.CompletionTime = &now - } - err := updateTFJobConditions(tfjob, common.JobFailed, tfJobFailedReason, failureMessage) - if err != nil { - tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err) - return err - } - } - if err := tc.cleanupTFJob(tfjob); err != nil { return err } @@ -421,46 +441,42 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error { } } - // At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status. - // If any replicas are still Active, set their status to succeeded. - if isSucceeded(tfjob.Status) { - for rtype := range tfjob.Status.ReplicaStatuses { - tfjob.Status.ReplicaStatuses[rtype].Succeeded += tfjob.Status.ReplicaStatuses[rtype].Active - tfjob.Status.ReplicaStatuses[rtype].Active = 0 - } + tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobFailedReason, failureMessage) + if tfjob.Status.CompletionTime == nil { + now := metav1.Now() + tfjob.Status.CompletionTime = &now } - // no need to update the tfjob if the status hasn't changed since last time even the tfjob is not running. - - if !apiequality.Semantic.DeepEqual(*oldStatus, tfjob.Status) { - return tc.updateStatusHandler(tfjob) + if err := updateTFJobConditions( + tfjob, common.JobFailed, tfJobFailedReason, failureMessage); err != nil { + tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err) + return err } - return nil - } - - if tc.Config.EnableGangScheduling { - minAvailableReplicas := getTotalReplicas(tfjob) - _, err := tc.SyncPodGroup(tfjob, minAvailableReplicas) - if err != nil { - logger.Warnf("Sync PodGroup %v: %v", tfjob.Name, err) + } else { + if tc.Config.EnableGangScheduling { + minAvailableReplicas := getTotalReplicas(tfjob) + _, err := tc.SyncPodGroup(tfjob, minAvailableReplicas) + if err != nil { + logger.Warnf("Sync PodGroup %v: %v", tfjob.Name, err) + } } - } - // Save the current state of the replicas - replicasStatus := make(map[string]v1.PodPhase) + // Save the current state of the replicas + replicasStatus := make(map[string]v1.PodPhase) - // Diff current active pods/services with replicas. - for rtype, spec := range tfjob.Spec.TFReplicaSpecs { - err = tc.reconcilePods(tfjob, pods, rtype, spec, replicasStatus) - if err != nil { - logger.Warnf("reconcilePods error %v", err) - return err - } + // Diff current active pods/services with replicas. + for rtype, spec := range tfjob.Spec.TFReplicaSpecs { + err = tc.reconcilePods(tfjob, pods, rtype, spec, replicasStatus) + if err != nil { + logger.Warnf("reconcilePods error %v", err) + return err + } - err = tc.reconcileServices(tfjob, services, rtype, spec) + err = tc.reconcileServices(tfjob, services, rtype, spec) - if err != nil { - logger.Warnf("reconcileServices error %v", err) - return err + if err != nil { + logger.Warnf("reconcileServices error %v", err) + return err + } } }