Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set state to failed if there is a problem initializing job #219

Merged
merged 5 commits into from
Dec 15, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 107 additions & 211 deletions pkg/trainer/training.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@ import (
"github.com/tensorflow/k8s/pkg/spec"
"github.com/tensorflow/k8s/pkg/util"
"github.com/tensorflow/k8s/pkg/util/k8sutil"
"github.com/tensorflow/k8s/pkg/util/retryutil"

"math"
"strings"
"sync"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
)
Expand Down Expand Up @@ -108,17 +105,6 @@ func NewJob(kubeCli kubernetes.Interface, tfJobClient k8sutil.TfJobClient, job *
go func() {
defer wg.Done()

if err := j.setup(config); err != nil {
log.Errorf("TfJob failed to setup: %v", err)
if j.status.Phase != spec.TfJobPhaseFailed {
j.status.SetReason(err.Error())
j.status.SetPhase(spec.TfJobPhaseFailed)
if err := j.updateTPRStatus(); err != nil {
log.Errorf("failed to update cluster phase (%v): %v", spec.TfJobPhaseFailed, err)
}
}
return
}
j.run(config, stopC)
}()

Expand Down Expand Up @@ -174,25 +160,6 @@ func (j *TrainingJob) deleteResources() error {
return nil
}

// TODO(jlewi): We can probably delete this.

//func replicaSetStatusToProto(r *TFReplicaSet, status *TFReplicaSetStatus) *tpb.TFReplicaSetStatus {
//
// p := &tpb.TFReplicaSetStatus{
// State: status.State.Enum(),
// // Type: r.Spec.TfReplicaTypeProcess.Type,
// ReplicaStates: make([]*tpb.TFReplicaSetStatus_ReplicaStates, 0),
// }
//
// for state, count := range status.ReplicasStates {
// p.ReplicaStates = append(p.ReplicaStates, &tpb.TFReplicaSetStatus_ReplicaStates{
// State: state.Enum(),
// NumReplicas: proto.Int(count),
// })
// }
// return p
//}

func (j *TrainingJob) GetStatus() (spec.State, []*spec.TfReplicaStatus, error) {
state := spec.StateUnknown
replicaStatuses := make([]*spec.TfReplicaStatus, 0)
Expand Down Expand Up @@ -275,77 +242,62 @@ func (j *TrainingJob) masterName() string {
}

// setup the training job.
func (j *TrainingJob) setup(config *spec.ControllerConfig) error {
func (j *TrainingJob) setup(config *spec.ControllerConfig) {
if j.job == nil {
return fmt.Errorf("job.Spec can't be nil")
}

err := j.job.Spec.SetDefaults()
if err != nil {
return fmt.Errorf("there was a problem setting defaults for job spec: %v", err)
j.status.SetReason("Internal error; tried to setup a job with no spec.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe tried to --> failed to would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

j.status.SetPhase(spec.TfJobPhaseFailed)
j.status.SetState(spec.StateFailed)
}

err = j.job.Spec.Validate()
if err != nil {
return fmt.Errorf("invalid job spec: %v", err)
}
err := func() error {
// If the job has already started we shouldn't set it up again.
if j.status.Phase != spec.TfJobPhaseNone {
log.Warningf("Job %v has already been setup.", j.name())
return nil
}

for _, t := range j.job.Spec.ReplicaSpecs {
r, err := NewTFReplicaSet(j.KubeCli, *t, j)
err := j.job.Spec.SetDefaults()
if err != nil {
return err
return fmt.Errorf("there was a problem setting defaults for job spec: %v", err)
}
j.Replicas = append(j.Replicas, r)
}

tb, err := initTensorBoard(j.KubeCli, j)
if err != nil {
return err
}
j.TensorBoard = tb

if err := j.job.Spec.ConfigureAccelerators(config.Accelerators); err != nil {
return fmt.Errorf("ConfigureAccelerators(...) error; %v", err)
}
err = j.job.Spec.Validate()
if err != nil {
return fmt.Errorf("invalid job spec: %v", err)
}

if j.job.Spec.RuntimeId == "" {
j.job.Spec.RuntimeId = util.RandString(4)
}
for _, t := range j.job.Spec.ReplicaSpecs {
r, err := NewTFReplicaSet(j.KubeCli, *t, j)
if err != nil {
return err
}
j.Replicas = append(j.Replicas, r)
}

var shouldCreateCluster bool
switch j.status.Phase {
case spec.TfJobPhaseNone:
shouldCreateCluster = true
//case spec.TfJobPhaseCreating:
// return errCreatedCluster
case spec.TfJobPhaseRunning:
shouldCreateCluster = false
case spec.TfJobPhaseFailed:
shouldCreateCluster = false
default:
return fmt.Errorf("unexpected TfJob phase: %s", j.status.Phase)
}
tb, err := initTensorBoard(j.KubeCli, j)
if err != nil {
return err
}
j.TensorBoard = tb

if shouldCreateCluster {
return j.triggerCreatePhase()
}
return nil
}
if err := j.job.Spec.ConfigureAccelerators(config.Accelerators); err != nil {
return fmt.Errorf("ConfigureAccelerators(...) error; %v", err)
}

// triggerCreatePhase sets the phase to TfJobPhaseCreating additional resource creation happens in TrainingJob.run
// TODO(jlewi): Need to reconcile this function copied from the etcd core operator OS code with the pattern
// for the TF job. What exactly do we want to do during the Create job phase? Right now the create method
// is called on each invocation of reconcile in run to ensure all the required resources exist. Maybe there's
// a better way?
func (j *TrainingJob) triggerCreatePhase() error {
j.status.SetPhase(spec.TfJobPhaseCreating)
if j.job.Spec.RuntimeId == "" {
j.job.Spec.RuntimeId = util.RandString(4)
}
return nil
}()

if err := j.updateTPRStatus(); err != nil {
return fmt.Errorf("cluster create: failed to update TfJob phase (%v): %v", spec.TfJobPhaseCreating, err)
if err != nil {
j.status.SetReason(err.Error())
j.status.SetPhase(spec.TfJobPhaseFailed)
j.status.SetState(spec.StateFailed)
} else {
j.status.SetPhase(spec.TfJobPhaseCreating)
j.status.SetState(spec.StateRunning)
}
log.Infof("Creating job: %v with Spec (%#v), Status (%#v)", j.job.Metadata.Name, j.job.Spec, j.job.Status)

return nil
}

func (j *TrainingJob) Delete() {
Expand Down Expand Up @@ -394,26 +346,76 @@ func (j *TrainingJob) updateTPRStatus() error {
return nil
}

func (j *TrainingJob) run(config *spec.ControllerConfig, stopC <-chan struct{}) {
// TODO(jlewi): What does the run function do?
clusterFailed := false
// reconcile tries to get the job into the desired state.
func (j* TrainingJob) reconcile(config *spec.ControllerConfig) {
if j.status.Phase == spec.TfJobPhaseNone {
// The job hasn't been setup.
j.setup(config)

defer func() {
if clusterFailed {
j.reportFailedStatus()
if err := j.updateTPRStatus(); err != nil {
log.Warningf("failed to update TPR status: %v", err)
}
}

close(j.stopCh)
}()
// TODO(jlewi): Can we determine from the CRD status whether we should
// Create the resources or not? We need to ensure the resources exist so for
// now we always call Create.
if j.job.Status.Phase == spec.TfJobPhaseCreating || j.job.Status.Phase == spec.TfJobPhaseRunning {
// We call Create to make sure all the resources exist and are running.
if cErr := j.createResources(config); cErr != nil {
log.Errorf("trainingJobCreateReplicas() error; %v", cErr)
}

state, replicaStatuses, err := j.GetStatus()

j.status.ReplicaStatuses = replicaStatuses
if err != nil {
log.Errorf("GetStatus() for job %v returned error: %v", j.job.Metadata.Name, err)
}
// TODO(jlewi): We should update the Phase if we detect the job is done.
if state == spec.StateFailed {
log.Errorf("Master failed Job: %v.", j.job.Metadata.Name)
j.status.SetPhase(spec.TfJobPhaseDone)
j.status.SetState(spec.StateFailed)
} else if state == spec.StateSucceeded {
log.Infof("Master succeeded Job: %v.", j.job.Metadata.Name)
j.status.SetPhase(spec.TfJobPhaseDone)
j.status.SetState(spec.StateSucceeded)
} else {
log.V(1).Infof("Job %v status=%v", j.job.Metadata.Name, util.Pformat(j.status))
}
}

// If the phase changed we should update the TPR.
if err := j.updateTPRStatus(); err != nil {
log.Warningf("Job %v, failed to update TPR status error: %v", j.job.Metadata.Name, err)
}

if j.job.Status.Phase == spec.TfJobPhaseCleanUp {
if cErr := j.deleteResources(); cErr != nil {
log.Errorf("Job %v trainingJob.Delete() error; %v", j.job.Metadata.Name, cErr)
}
// j.status.SetPhase(spec.TfJobPhaseDone)
// Return from run because we want to stop reconciling the object.
return
}

// Update the phase to running.
j.status.SetPhase(spec.TfJobPhaseRunning)
// updateTPRStatus will update the status of the TPR with c.Status if c.Status
// doesn't match c.Cluster.status. So you can change c.Status in order to propogate
// changes to the TPR status.
if err := j.updateTPRStatus(); err != nil {
log.Warningf("failed to update TPR status: %v", err)
log.Warningf("Job %v; failed to update TPR status error: %v", j.job.Metadata.Name, err)
}
log.Infof("start running...")
}

// run is the main processing loop for TfJob resources.
func (j *TrainingJob) run(config *spec.ControllerConfig, stopC <-chan struct{}) {
defer func() {
close(j.stopCh)
}()

j.reconcile(config)

var rerr error
for {
select {
case <-stopC:
Expand Down Expand Up @@ -447,118 +449,12 @@ func (j *TrainingJob) run(config *spec.ControllerConfig, stopC <-chan struct{})
// Return from run because we want to stop reconciling the object.
return
}

case <-time.After(reconcileInterval):
// TODO(jlewi): Can we determine from the TPR status whether we should
// Create the resources or not? We need to ensure the resources exist so for
// now we always call Create.
if j.job.Status.Phase == spec.TfJobPhaseRunning {
// We call Create to make sure all the resources exist and are running.
if cErr := j.createResources(config); cErr != nil {
log.Errorf("trainingJobCreateReplicas() error; %v", cErr)
}

state, replicaStatuses, err := j.GetStatus()

j.status.ReplicaStatuses = replicaStatuses
if err != nil {
log.Errorf("GetStatus() for job %v returned error: %v", j.job.Metadata.Name, err)
}
// TODO(jlewi): We should update the Phase if we detect the job is done.
if state == spec.StateFailed {
log.Errorf("Master failed Job: %v.", j.job.Metadata.Name)
j.status.SetPhase(spec.TfJobPhaseDone)
j.status.SetState(spec.StateFailed)
} else if state == spec.StateSucceeded {
log.Infof("Master succeeded Job: %v.", j.job.Metadata.Name)
j.status.SetPhase(spec.TfJobPhaseDone)
j.status.SetState(spec.StateSucceeded)
} else {
log.V(1).Infof("Job %v status=%v", j.job.Metadata.Name, util.Pformat(j.status))
}
}

// If the phase changed we should update the TPR.
if err := j.updateTPRStatus(); err != nil {
log.Warningf("Job %v, failed to update TPR status error: %v", j.job.Metadata.Name, err)
}

if j.job.Status.Phase == spec.TfJobPhaseCleanUp {
if cErr := j.deleteResources(); cErr != nil {
log.Errorf("Job %v trainingJob.Delete() error; %v", j.job.Metadata.Name, cErr)
}
// j.status.SetPhase(spec.TfJobPhaseDone)
// Return from run because we want to stop reconciling the object.
return
}

if rerr != nil {
log.Errorf("failed to reconcile job %v, error: %v", j.job.Metadata.Name, rerr)
break
}

// updateTPRStatus will update the status of the TPR with c.Status if c.Status
// doesn't match c.Cluster.status. So you can change c.Status in order to propogate
// changes to the TPR status.
if err := j.updateTPRStatus(); err != nil {
log.Warningf("Job %v; failed to update TPR status error: %v", j.job.Metadata.Name, err)
}
j.reconcile(config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we will do this every 8 minute intervals, if i understand this code, maybe we only need do setup when my job is created and phase is none.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reconcile function is the code that periodically checks the job and figures out what needs to be done. The reconcile function takes advantage of state (e.g. phase) preserved in the CRD to avoid repeating unnecessary steps like setup.

}

//if isFatalError(rerr) {
// clusterFailed = true
// j.status.SetReason(rerr.Error())
//
// log.Errorf("cluster failed: %v", rerr)
// return
//}
}
}

//func isSpecEqual(s1, s2 spec.TfJobSpec) bool {
// // TODO(jlewi): Need to implement this function.
// return false
// //if s1.Size != s2.Size || s1.Paused != s2.Paused || s1.Version != s2.Version {
// // return false
// //}
// //return isBackupPolicyEqual(s1.Backup, s2.Backup)
//}

// TODO(jlewi): We probably need to update this function.
func (j *TrainingJob) reportFailedStatus() {
retryInterval := 5 * time.Second

f := func() (bool, error) {
j.status.SetPhase(spec.TfJobPhaseFailed)
err := j.updateTPRStatus()
if err == nil || k8sutil.IsKubernetesResourceNotFoundError(err) {
return true, nil
}

if !apierrors.IsConflict(err) {
log.Warningf("retry report status in %v: fail to update: %v", retryInterval, err)
return false, nil
}

cl, err := j.tfJobClient.Get(j.job.Metadata.Namespace, j.job.Metadata.Name)
if err != nil {
// Update (PUT) will return conflict even if object is deleted since we have UID set in object.
// Because it will check UID first and return something like:
// "Precondition failed: UID in precondition: 0xc42712c0f0, UID in object meta: ".
if k8sutil.IsKubernetesResourceNotFoundError(err) {
return true, nil
}
log.Warningf("retry report status in %v: fail to get latest version: %v", retryInterval, err)
return false, nil
}
j.job = cl
return false, nil

}

retryutil.Retry(retryInterval, math.MaxInt64, f)
}

func (j *TrainingJob) name() string {
return j.job.Metadata.GetName()
}
Expand Down
Loading