Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the reconcile flow #1111

Merged
merged 1 commit into from
Dec 12, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 65 additions & 49 deletions pkg/controller.v1/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"

"github.com/kubeflow/tf-operator/cmd/tf-operator.v1/app/options"
common "github.com/kubeflow/common/job_controller/api/v1"
"github.com/kubeflow/tf-operator/cmd/tf-operator.v1/app/options"
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"
tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
tfjobscheme "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -354,6 +354,38 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error {
return err
}

// If the TFJob is terminated, delete all pods and services.
if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) {
if err := tc.deletePodsAndServices(tfjob, pods); err != nil {
return err
}

if err := tc.cleanupTFJob(tfjob); err != nil {
return err
}

if tc.Config.EnableGangScheduling {
if err := tc.DeletePodGroup(tfjob); err != nil {
return err
}
}

// At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status.
// If any replicas are still Active, set their status to succeeded.
if isSucceeded(tfjob.Status) {
for rtype := range tfjob.Status.ReplicaStatuses {
tfjob.Status.ReplicaStatuses[rtype].Succeeded += tfjob.Status.ReplicaStatuses[rtype].Active
tfjob.Status.ReplicaStatuses[rtype].Active = 0
}
}
// no need to update the tfjob if the status hasn't changed since last time even the tfjob is not running.

if !apiequality.Semantic.DeepEqual(*oldStatus, tfjob.Status) {
return tc.updateStatusHandler(tfjob)
}
return nil
}

// retrieve the previous number of retry
previousRetry := tc.WorkQueue.NumRequeues(tfjobKey)

Expand Down Expand Up @@ -392,25 +424,13 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error {
tfJobExceedsLimit = true
}

// If the TFJob is terminated, delete all pods and services.
if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) || tfJobExceedsLimit {
if tfJobExceedsLimit {
// If the TFJob exceeds backoff limit or is past active deadline
// delete all pods and services, then set the status to failed
if err := tc.deletePodsAndServices(tfjob, pods); err != nil {
return err
}

if tfJobExceedsLimit {
tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobFailedReason, failureMessage)
if tfjob.Status.CompletionTime == nil {
now := metav1.Now()
tfjob.Status.CompletionTime = &now
}
err := updateTFJobConditions(tfjob, common.JobFailed, tfJobFailedReason, failureMessage)
if err != nil {
tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}

if err := tc.cleanupTFJob(tfjob); err != nil {
return err
}
Expand All @@ -421,46 +441,42 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1.TFJob) error {
}
}

// At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status.
// If any replicas are still Active, set their status to succeeded.
if isSucceeded(tfjob.Status) {
for rtype := range tfjob.Status.ReplicaStatuses {
tfjob.Status.ReplicaStatuses[rtype].Succeeded += tfjob.Status.ReplicaStatuses[rtype].Active
tfjob.Status.ReplicaStatuses[rtype].Active = 0
}
tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobFailedReason, failureMessage)
if tfjob.Status.CompletionTime == nil {
now := metav1.Now()
tfjob.Status.CompletionTime = &now
}
// no need to update the tfjob if the status hasn't changed since last time even the tfjob is not running.

if !apiequality.Semantic.DeepEqual(*oldStatus, tfjob.Status) {
return tc.updateStatusHandler(tfjob)
if err := updateTFJobConditions(
tfjob, common.JobFailed, tfJobFailedReason, failureMessage); err != nil {
tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
return nil
}

if tc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(tfjob)
_, err := tc.SyncPodGroup(tfjob, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", tfjob.Name, err)
} else {
if tc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(tfjob)
_, err := tc.SyncPodGroup(tfjob, minAvailableReplicas)
if err != nil {
logger.Warnf("Sync PodGroup %v: %v", tfjob.Name, err)
}
}
}

// Save the current state of the replicas
replicasStatus := make(map[string]v1.PodPhase)
// Save the current state of the replicas
replicasStatus := make(map[string]v1.PodPhase)

// Diff current active pods/services with replicas.
for rtype, spec := range tfjob.Spec.TFReplicaSpecs {
err = tc.reconcilePods(tfjob, pods, rtype, spec, replicasStatus)
if err != nil {
logger.Warnf("reconcilePods error %v", err)
return err
}
// Diff current active pods/services with replicas.
for rtype, spec := range tfjob.Spec.TFReplicaSpecs {
err = tc.reconcilePods(tfjob, pods, rtype, spec, replicasStatus)
if err != nil {
logger.Warnf("reconcilePods error %v", err)
return err
}

err = tc.reconcileServices(tfjob, services, rtype, spec)
err = tc.reconcileServices(tfjob, services, rtype, spec)

if err != nil {
logger.Warnf("reconcileServices error %v", err)
return err
if err != nil {
logger.Warnf("reconcileServices error %v", err)
return err
}
}
}

Expand Down