Skip to content

Commit

Permalink
status: Fail the TFJob if PS is failed (kubeflow#690)
Browse files Browse the repository at this point in the history
* status: Fail the TFJob if PS is failed

Signed-off-by: Ce Gao <[email protected]>

* status: Fix

Signed-off-by: Ce Gao <[email protected]>
  • Loading branch information
gaocegege authored and k8s-ci-robot committed Jun 21, 2018
1 parent 5e3bd2b commit f9b9551
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 42 deletions.
55 changes: 17 additions & 38 deletions pkg/controller.v2/controller_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func updateStatusSingle(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha2.TFReplicaType,
failed := int(tfjob.Status.TFReplicaStatuses[rtype].Failed)

// All workers are running, set StartTime.
if running == replicas {
if running == replicas && tfjob.Status.StartTime == nil {
now := metav1.Now()
tfjob.Status.StartTime = &now
}
Expand All @@ -102,25 +102,6 @@ func updateStatusSingle(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha2.TFReplicaType,
return err
}
}

if failed > 0 {
if restart {
msg := fmt.Sprintf("TFJob %s is restarting.", tfjob.Name)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobRestarting, tfJobRestartingReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
} else {
msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}
}
return nil
}
} else {
if rtype == tfv1alpha2.TFReplicaTypeWorker {
Expand All @@ -145,26 +126,24 @@ func updateStatusSingle(tfjob *tfv1alpha2.TFJob, rtype tfv1alpha2.TFReplicaType,
return err
}
}
}
}

// Some workers or pss are failed , leave a failed condition.
if failed > 0 {
if restart {
msg := fmt.Sprintf("TFJob %s is restarting.", tfjob.Name)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobRestarting, tfJobRestartingReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
} else {
msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}
if failed > 0 {
if restart {
msg := fmt.Sprintf("TFJob %s is restarting.", tfjob.Name)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobRestarting, tfJobRestartingReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
} else {
msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name)
err := updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
return nil
}
}
return nil
Expand Down
46 changes: 42 additions & 4 deletions pkg/controller.v2/controller_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,21 @@ func TestStatus(t *testing.T) {
restart: false,
expectedType: tfv1alpha2.TFJobRunning,
},
testCase{
description: "Chief is running, a PS is failed",
tfJob: testutil.NewTFJobWithChief(4, 2),
expectedFailedPS: 1,
expectedSucceededPS: 0,
expectedActivePS: 1,
expectedFailedWorker: 0,
expectedSucceededWorker: 4,
expectedActiveWorker: 0,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 1,
restart: false,
expectedType: tfv1alpha2.TFJobFailed,
},
testCase{
description: "Chief is failed, workers are succeeded",
tfJob: testutil.NewTFJobWithChief(4, 2),
Expand Down Expand Up @@ -298,11 +313,34 @@ func TestStatus(t *testing.T) {
if err != nil {
t.Errorf("%s: Expected error %v to be nil", c.description, err)
}
if c.tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker] != nil {
replicas := c.tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker].Replicas
err := updateStatusSingle(c.tfJob, tfv1alpha2.TFReplicaTypeWorker, int(*replicas), c.restart)
if err != nil {
t.Errorf("%s: Expected error %v to be nil", c.description, err)
}
}
if c.tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypePS] != nil {
replicas := c.tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypePS].Replicas
err := updateStatusSingle(c.tfJob, tfv1alpha2.TFReplicaTypePS, int(*replicas), c.restart)
if err != nil {
t.Errorf("%s: Expected error %v to be nil", c.description, err)
}
}
} else {
replicas := c.tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker].Replicas
err := updateStatusSingle(c.tfJob, tfv1alpha2.TFReplicaTypeWorker, int(*replicas), c.restart)
if err != nil {
t.Errorf("%s: Expected error %v to be nil", c.description, err)
if c.tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker] != nil {
replicas := c.tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker].Replicas
err := updateStatusSingle(c.tfJob, tfv1alpha2.TFReplicaTypeWorker, int(*replicas), c.restart)
if err != nil {
t.Errorf("%s: Expected error %v to be nil", c.description, err)
}
}
if c.tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypePS] != nil {
replicas := c.tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypePS].Replicas
err := updateStatusSingle(c.tfJob, tfv1alpha2.TFReplicaTypePS, int(*replicas), c.restart)
if err != nil {
t.Errorf("%s: Expected error %v to be nil", c.description, err)
}
}
}
found := false
Expand Down

0 comments on commit f9b9551

Please sign in to comment.