From b97dfc7d7737337ba026b6c2af149ec4cd3c8be0 Mon Sep 17 00:00:00 2001 From: Jeremy Lewi Date: Tue, 16 Jan 2018 04:51:13 -0800 Subject: [PATCH] Fix a bunch of problems in TfJob CRD that crept in while tests were broken (#308) * In syncTfJob when checking whether a work queue item corresponds to a TrainingJob already in the map we need to check the UID. Otherwise we will not properly handle the case where a training job is deleted and then a new job is recreated with the same name. * We need to make sure that the Replicas field in TrainingJob is always properly set; * We were only initializing replicas in setup which was problematic in the case where the TfJob controller gets restarted because on restarted setup won't be invoked because the job is past that phase and as a result the replicas won't be reinitialized. * test_runner needs to ignore case when checking whether the job succeeded otherwise we conclude that successful jobs failed * The controller should only forget about job after the job has been cleaned up; not when it is marked as succeeded or failed. * Add back code to support termination policies use the worker and not the master as the chief *This was added in #221 and accidentally removed in the refactor in #234. --- pkg/apis/tensorflow/v1alpha1/types.go | 1 - pkg/apis/tensorflow/validation/validation.go | 24 ++-- .../tensorflow/validation/validation_test.go | 99 ++++++++++++++++ pkg/controller/controller.go | 20 ++-- pkg/trainer/replicas.go | 106 ++++++++++-------- pkg/trainer/training.go | 92 ++++++++++----- pkg/trainer/training_test.go | 24 +++- py/test_runner.py | 2 +- 8 files changed, 266 insertions(+), 102 deletions(-) create mode 100644 pkg/apis/tensorflow/validation/validation_test.go diff --git a/pkg/apis/tensorflow/v1alpha1/types.go b/pkg/apis/tensorflow/v1alpha1/types.go index a873d5cfb3..6138b3f333 100644 --- a/pkg/apis/tensorflow/v1alpha1/types.go +++ b/pkg/apis/tensorflow/v1alpha1/types.go @@ -136,7 +136,6 @@ type ReplicaState string const ( ReplicaStateUnknown ReplicaState = "Unknown" - ReplicaStateStarting ReplicaState = "Starting" ReplicaStateRunning ReplicaState = "Running" ReplicaStateFailed ReplicaState = "Failed" ReplicaStateSucceeded ReplicaState = "Succeeded" diff --git a/pkg/apis/tensorflow/validation/validation.go b/pkg/apis/tensorflow/validation/validation.go index da712aef23..bda307e6a4 100644 --- a/pkg/apis/tensorflow/validation/validation.go +++ b/pkg/apis/tensorflow/validation/validation.go @@ -10,15 +10,21 @@ import ( // ValidateTfJobSpec checks that the TfJobSpec is valid. func ValidateTfJobSpec(c *tfv1.TfJobSpec) error { - // Check that each replica has a TensorFlow container. + if c.TerminationPolicy == nil || c.TerminationPolicy.Chief == nil { + return fmt.Errorf("invalid termination policy: %v", c.TerminationPolicy) + } + + chiefExists := false + + // Check that each replica has a TensorFlow container and a chief. for _, r := range c.ReplicaSpecs { found := false if r.Template == nil && r.TfReplicaType != tfv1.PS { return fmt.Errorf("Replica is missing Template; %v", util.Pformat(r)) } - if r.TfReplicaType == tfv1.MASTER && *r.Replicas != 1 { - return errors.New("The MASTER must have Replicas = 1") + if r.TfReplicaType == tfv1.TfReplicaType(c.TerminationPolicy.Chief.ReplicaName) { + chiefExists = true } if r.TfPort == nil { @@ -51,14 +57,12 @@ func ValidateTfJobSpec(c *tfv1.TfJobSpec) error { } } - if c.TerminationPolicy != nil { - if c.TerminationPolicy.Chief == nil { - return errors.New("invalid termination policy, Chief cannot be nil") - } - if c.TerminationPolicy.Chief.ReplicaName != "MASTER" || c.TerminationPolicy.Chief.ReplicaIndex != 0 { - return errors.New("invalid termination policy, Chief should have replicaName=MASTER and index=0") - } + if !chiefExists { + return fmt.Errorf("Missing ReplicaSpec for chief: %v", c.TerminationPolicy.Chief.ReplicaName) } + if c.TensorBoard != nil && c.TensorBoard.LogDir == "" { + return fmt.Errorf("tbReplicaSpec.LogDir must be specified") + } return nil } diff --git a/pkg/apis/tensorflow/validation/validation_test.go b/pkg/apis/tensorflow/validation/validation_test.go new file mode 100644 index 0000000000..53e6880901 --- /dev/null +++ b/pkg/apis/tensorflow/validation/validation_test.go @@ -0,0 +1,99 @@ +package validation + +import ( + "testing" + + tfv1 "github.com/tensorflow/k8s/pkg/apis/tensorflow/v1alpha1" + + "github.com/gogo/protobuf/proto" + "k8s.io/api/core/v1" +) + +func TestValidate(t *testing.T) { + type testCase struct { + in *tfv1.TfJobSpec + expectingError bool + } + + testCases := []testCase{ + { + in: &tfv1.TfJobSpec{ + ReplicaSpecs: []*tfv1.TfReplicaSpec{ + { + Template: &v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "tensorflow", + }, + }, + }, + }, + TfReplicaType: tfv1.MASTER, + Replicas: proto.Int32(1), + }, + }, + TfImage: "tensorflow/tensorflow:1.3.0", + }, + expectingError: false, + }, + { + in: &tfv1.TfJobSpec{ + ReplicaSpecs: []*tfv1.TfReplicaSpec{ + { + Template: &v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "tensorflow", + }, + }, + }, + }, + TfReplicaType: tfv1.WORKER, + Replicas: proto.Int32(1), + }, + }, + TfImage: "tensorflow/tensorflow:1.3.0", + }, + expectingError: true, + }, + { + in: &tfv1.TfJobSpec{ + ReplicaSpecs: []*tfv1.TfReplicaSpec{ + { + Template: &v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "tensorflow", + }, + }, + }, + }, + TfReplicaType: tfv1.WORKER, + Replicas: proto.Int32(1), + }, + }, + TfImage: "tensorflow/tensorflow:1.3.0", + TerminationPolicy: &tfv1.TerminationPolicySpec{ + Chief: &tfv1.ChiefSpec{ + ReplicaName: "WORKER", + ReplicaIndex: 0, + }, + }, + }, + expectingError: false, + }, + } + + for _, c := range testCases { + job := &tfv1.TfJob{ + Spec: *c.in, + } + tfv1.SetObjectDefaults_TfJob(job) + if err := ValidateTfJobSpec(&job.Spec); (err != nil) != c.expectingError { + t.Errorf("unexpected validation result: %v", err) + } + } +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index cbe50d2ec6..2c624ba41e 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -89,7 +89,7 @@ func New(kubeClient kubernetes.Interface, apiExtclient apiextensionsclient.Inter } }, Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: controller.enqueueController, + AddFunc: controller.enqueueController, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueController(newObj) }, @@ -125,8 +125,8 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { return fmt.Errorf("failed to wait for caches to sync") } - glog.Info("Starting workers") - // Launch two workers to process Foo resources + glog.Info("Starting %v workers", threadiness) + // Launch workers to process TfJob resources for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } @@ -169,9 +169,11 @@ func (c *Controller) processNextWorkItem() bool { return true } -// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning -// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked +// syncTFJob will sync the job with the given. This function is not meant to be invoked // concurrently with the same key. +// +// When a job is completely processed it will return true indicating that its ok to forget about this job since +// no more processing will occur for it. func (c *Controller) syncTFJob(key string) (bool, error) { startTime := time.Now() defer func() { @@ -196,7 +198,9 @@ func (c *Controller) syncTFJob(key string) (bool, error) { return false, err } - if _, ok := c.jobs[tfJob.ObjectMeta.Namespace+"-"+tfJob.ObjectMeta.Name]; !ok { + // Create a new TrainingJob if there is no TrainingJob stored for it in the jobs map or if the UID's don't match. + // The UID's won't match in the event we deleted the job and then recreated the job with the samee name. + if cJob, ok := c.jobs[tfJob.ObjectMeta.Namespace+"-"+tfJob.ObjectMeta.Name]; !ok || cJob.UID() != tfJob.UID { nc, err := trainer.NewJob(c.KubeClient, c.TfJobClient, tfJob, &c.config) if err != nil { @@ -217,7 +221,9 @@ func (c *Controller) syncTFJob(key string) (bool, error) { return false, err } - if tfJob.Status.State == tfv1alpha1.StateSucceeded { + // TODO(jlewi): This logic will need to change when/if we get rid of phases and move to conditions. At that + // case we should forget about a job when the appropriate condition is reached. + if tfJob.Status.Phase == tfv1alpha1.TfJobPhaseCleanUp { return true, nil } else { return false, nil diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index 81b7f70a0c..aff1cb350a 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -129,21 +129,35 @@ func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error { } _, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Create(cm) if err != nil { - log.Errorf("Error creating PS ConfigMap: %v, %v", cm.ObjectMeta.Name, err) - return err + if k8s_errors.IsAlreadyExists(err) { + log.Infof("%v already exists.", cm.Name) + } else { + log.Errorf("Error creating PS ConfigMap: %v, %v", cm.ObjectMeta.Name, err) + return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating PS ConfigMap %v returned error.", cm.Name), err}) + } } // Update Volumes to include the ConfigMap containing grpc_tensorflow_server.py - s.Spec.Template.Spec.Volumes = append(s.Spec.Template.Spec.Volumes, v1.Volume{ - Name: "ps-config-volume", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: s.defaultPSConfigMapName(), + name := "ps-config-volume" + hasVolume := false + for _, v := range s.Spec.Template.Spec.Volumes { + if v.Name == name { + hasVolume = true + break + } + } + if !hasVolume { + s.Spec.Template.Spec.Volumes = append(s.Spec.Template.Spec.Volumes, v1.Volume{ + Name: "ps-config-volume", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: s.defaultPSConfigMapName(), + }, }, }, - }, - }) + }) + } } for index := int32(0); index < *s.Spec.Replicas; index++ { @@ -410,9 +424,42 @@ func replicaStatusFromPodList(l v1.PodList, name tfv1alpha1.ContainerName) tfv1a return tfv1alpha1.ReplicaStateUnknown } +func (s *TFReplicaSet) GetSingleReplicaStatus(index int32) tfv1alpha1.ReplicaState { + j, err := s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).Get(s.jobName(index), meta_v1.GetOptions{}) + + if err != nil { + return tfv1alpha1.ReplicaStateUnknown + } + + if j.Status.Succeeded >= 1 { + return tfv1alpha1.ReplicaStateSucceeded + } + + labels := s.Labels() + labels["task_index"] = fmt.Sprintf("%v", index) + selector, err := labels.ToSelector() + if err != nil { + log.Errorf("labels.ToSelector() error; %v", err) + return tfv1alpha1.ReplicaStateFailed + } + + // TODO(jlewi): Handle errors. We need to get the pod and looking at recent container exits. + l, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).List(meta_v1.ListOptions{ + // TODO(jlewi): Why isn't the label selector working? + LabelSelector: selector, + }) + + if err != nil { + // TODO(jlewi): Are there errors that should be treated as retryable errors? + return tfv1alpha1.ReplicaStateFailed + } + + status := replicaStatusFromPodList(*l, tfv1alpha1.TENSORFLOW) + return status +} + // Status returns the status of the replica set. func (s *TFReplicaSet) GetStatus() (tfv1alpha1.TfReplicaStatus, error) { - status := tfv1alpha1.TfReplicaStatus{ TfReplicaType: s.Spec.TfReplicaType, State: tfv1alpha1.ReplicaStateUnknown, @@ -429,42 +476,7 @@ func (s *TFReplicaSet) GetStatus() (tfv1alpha1.TfReplicaStatus, error) { } for index := int32(0); index < *s.Spec.Replicas; index++ { - - j, err := s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).Get(s.jobName(index), meta_v1.GetOptions{}) - - if err != nil { - increment(tfv1alpha1.ReplicaStateUnknown) - continue - } - - if j.Status.Succeeded >= 1 { - increment(tfv1alpha1.ReplicaStateSucceeded) - continue - } - - labels := s.Labels() - labels["task_index"] = fmt.Sprintf("%v", index) - selector, err := labels.ToSelector() - if err != nil { - log.Errorf("labels.ToSelector() error; %v", err) - increment(tfv1alpha1.ReplicaStateFailed) - continue - } - - // TODO(jlewi): Handle errors. We need to get the pod and looking at recent container exits. - l, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).List(meta_v1.ListOptions{ - // TODO(jlewi): Why isn't the label selector working? - LabelSelector: selector, - }) - - if err != nil { - // TODO(jlewi): Are there errors that should be treated as retryable errors? - increment(tfv1alpha1.ReplicaStateFailed) - continue - } - - status := replicaStatusFromPodList(*l, tfv1alpha1.TENSORFLOW) - increment(status) + increment(s.GetSingleReplicaStatus(index)) } // Determine the overall status for the replica set based on the status of the individual diff --git a/pkg/trainer/training.go b/pkg/trainer/training.go index 86cb2d068d..b54f84fef3 100644 --- a/pkg/trainer/training.go +++ b/pkg/trainer/training.go @@ -11,14 +11,14 @@ import ( "strings" + "github.com/tensorflow/k8s/pkg/apis/tensorflow/helper" tfv1alpha1 "github.com/tensorflow/k8s/pkg/apis/tensorflow/v1alpha1" "github.com/tensorflow/k8s/pkg/apis/tensorflow/validation" tfjobclient "github.com/tensorflow/k8s/pkg/client/clientset/versioned" "github.com/tensorflow/k8s/pkg/client/clientset/versioned/scheme" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" - - "github.com/tensorflow/k8s/pkg/apis/tensorflow/helper" ) // TODO(jlewi): We should switch a New pattern and make trainingJob private so we can @@ -81,6 +81,10 @@ func NewJob(kubeCli kubernetes.Interface, tfJobClient tfjobclient.Interface, job return j, nil } +func (j *TrainingJob) UID() types.UID { + return j.job.ObjectMeta.UID +} + func (j *TrainingJob) ClusterSpec() ClusterSpec { clusterSpec := make(ClusterSpec) @@ -131,6 +135,9 @@ func (j *TrainingJob) deleteResources() error { } func (j *TrainingJob) GetStatus() (tfv1alpha1.State, []*tfv1alpha1.TfReplicaStatus, error) { + chief := j.job.Spec.TerminationPolicy.Chief + chiefState := tfv1alpha1.ReplicaStateUnknown + state := tfv1alpha1.StateUnknown replicaStatuses := make([]*tfv1alpha1.TfReplicaStatus, 0) @@ -148,23 +155,19 @@ func (j *TrainingJob) GetStatus() (tfv1alpha1.State, []*tfv1alpha1.TfReplicaStat replicaStatuses = append(replicaStatuses, &rStatus) - // If any replicas are failed mark job as failed. - if rStatus.State == tfv1alpha1.ReplicaStateFailed { - state = tfv1alpha1.StateFailed + if string(r.Spec.TfReplicaType) == chief.ReplicaName { + chiefState = r.GetSingleReplicaStatus(int32(chief.ReplicaIndex)) } } - if v, ok := replicaSetStates[tfv1alpha1.MASTER]; ok && v == tfv1alpha1.ReplicaStateSucceeded { - state = tfv1alpha1.StateSucceeded - return state, replicaStatuses, nil - } - - if v, ok := replicaSetStates[tfv1alpha1.MASTER]; ok && v == tfv1alpha1.ReplicaStateFailed { + if chiefState == tfv1alpha1.ReplicaStateRunning { + state = tfv1alpha1.StateRunning + } else if chiefState == tfv1alpha1.ReplicaStateFailed { state = tfv1alpha1.StateFailed - return state, replicaStatuses, nil + } else if chiefState == tfv1alpha1.ReplicaStateSucceeded { + state = tfv1alpha1.StateSucceeded } - state = tfv1alpha1.StateRunning return state, replicaStatuses, nil } @@ -234,20 +237,6 @@ func (j *TrainingJob) setup(config *tfv1alpha1.ControllerConfig) { return fmt.Errorf("invalid job spec: %v", err) } - for _, t := range j.job.Spec.ReplicaSpecs { - r, err := NewTFReplicaSet(j.KubeCli, *t, j) - if err != nil { - return err - } - j.Replicas = append(j.Replicas, r) - } - - tb, err := initTensorBoard(j.KubeCli, j) - if err != nil { - return err - } - j.TensorBoard = tb - if err := helper.ConfigureAcceleratorsForTfJobSpec(&j.job.Spec, config.Accelerators); err != nil { return fmt.Errorf("ConfigureAccelerators(...) error; %v", err) } @@ -268,6 +257,31 @@ func (j *TrainingJob) setup(config *tfv1alpha1.ControllerConfig) { } } +// setup Replicas. This creates in memory data structures corresponding to the replicas. +func (j *TrainingJob) setupReplicas() error { + + if len(j.Replicas) != len(j.job.Spec.ReplicaSpecs) { + j.Replicas = make([]*TFReplicaSet, 0, len(j.job.Spec.ReplicaSpecs)) + for _, t := range j.job.Spec.ReplicaSpecs { + r, err := NewTFReplicaSet(j.KubeCli, *t, j) + if err != nil { + return err + } + j.Replicas = append(j.Replicas, r) + } + } + + if j.TensorBoard == nil { + tb, err := initTensorBoard(j.KubeCli, j) + if err != nil { + return err + } + j.TensorBoard = tb + } + + return nil +} + func (j *TrainingJob) Delete() { // TODO(jlewi): Delete is what should cause us to delete the Pods. // we shouldn't delete the pods when the jobs finish because leaving the pods @@ -308,7 +322,7 @@ func (j *TrainingJob) updateTPRStatus() error { // reconcile tries to get the job into the desired state. func (j *TrainingJob) Reconcile(config *tfv1alpha1.ControllerConfig) error { - if j.status.Phase == tfv1alpha1.TfJobPhaseNone { + if j.job.Status.Phase == tfv1alpha1.TfJobPhaseNone { // The job hasn't been setup. j.setup(config) @@ -318,12 +332,30 @@ func (j *TrainingJob) Reconcile(config *tfv1alpha1.ControllerConfig) error { } } + // setupreplicas initializes data structures inside TrainingJob representing the replicas. + // These are go-lang structures which aren't preserved in the APIServer. So we always need to call setupReplicas + // unlike setup which only needs to be called once during the lifecycle of the job. + if err := j.setupReplicas(); err != nil { + log.Errorf("failed to create replicas: %v", err) + j.status.Reason = fmt.Sprintf("Could not create in memory datastructures; %v", err) + if uErr := j.updateTPRStatus(); err != nil { + log.Warningf("Job %v; failed to update status error: %v", j.job.ObjectMeta.Name, uErr) + } + return err + } + // TODO(jlewi): Can we determine from the CRD status whether we should // Create the resources or not? We need to ensure the resources exist so for // now we always call Create. if j.job.Status.Phase == tfv1alpha1.TfJobPhaseCreating || j.job.Status.Phase == tfv1alpha1.TfJobPhaseRunning { // We call Create to make sure all the resources exist and are running. if cErr := j.createResources(config); cErr != nil { + // TODO(jlewi): Should we eventually give up and mark the job as failed if we can't create the resources? + j.status.Reason = fmt.Sprintf("Could not create job resources; %v", cErr) + if err := j.updateTPRStatus(); err != nil { + log.Warningf("Job %v; failed to update status error: %v", j.job.ObjectMeta.Name, err) + return err + } log.Errorf("trainingJobCreateReplicas() error; %v", cErr) return cErr } @@ -351,7 +383,7 @@ func (j *TrainingJob) Reconcile(config *tfv1alpha1.ControllerConfig) error { // If the phase changed we should update the TPR. if err := j.updateTPRStatus(); err != nil { - log.Warningf("Job %v, failed to update TPR status error: %v", j.job.ObjectMeta.Name, err) + log.Warningf("Job %v, failed to update status error: %v", j.job.ObjectMeta.Name, err) return err } @@ -368,7 +400,7 @@ func (j *TrainingJob) Reconcile(config *tfv1alpha1.ControllerConfig) error { // doesn't match c.Cluster.status. So you can change c.Status in order to propagate // changes to the TPR status. if err := j.updateTPRStatus(); err != nil { - log.Warningf("Job %v; failed to update TPR status error: %v", j.job.ObjectMeta.Name, err) + log.Warningf("Job %v; failed to update status error: %v", j.job.ObjectMeta.Name, err) return err } diff --git a/pkg/trainer/training_test.go b/pkg/trainer/training_test.go index 1df2ba17d2..f2691c3d76 100644 --- a/pkg/trainer/training_test.go +++ b/pkg/trainer/training_test.go @@ -151,7 +151,7 @@ func TestClusterSpec(t *testing.T) { } job.setup(&tfv1alpha1.ControllerConfig{}) - + job.setupReplicas() actual := job.ClusterSpec() for k, v := range c.Expected { @@ -185,7 +185,7 @@ func TestJobSetup(t *testing.T) { Spec: tfv1alpha1.TfJobSpec{ ReplicaSpecs: []*tfv1alpha1.TfReplicaSpec{ { - Replicas: proto.Int32(2), + Replicas: proto.Int32(1), TfPort: proto.Int32(10), Template: &v1.PodTemplateSpec{ Spec: v1.PodSpec{ @@ -196,7 +196,7 @@ func TestJobSetup(t *testing.T) { }, }, }, - TfReplicaType: tfv1alpha1.PS, + TfReplicaType: tfv1alpha1.MASTER, }, }, }, @@ -226,7 +226,13 @@ func TestJobSetup(t *testing.T) { }, }, }, - TfReplicaType: tfv1alpha1.PS, + TfReplicaType: tfv1alpha1.WORKER, + }, + }, + TerminationPolicy: &tfv1alpha1.TerminationPolicySpec{ + Chief: &tfv1alpha1.ChiefSpec{ + ReplicaName: string(tfv1alpha1.WORKER), + ReplicaIndex: 0, }, }, }, @@ -257,16 +263,22 @@ func TestJobSetup(t *testing.T) { }, }, }, - TfReplicaType: tfv1alpha1.PS, + TfReplicaType: tfv1alpha1.WORKER, }, }, TensorBoard: &tfv1alpha1.TensorBoardSpec{}, + TerminationPolicy: &tfv1alpha1.TerminationPolicySpec{ + Chief: &tfv1alpha1.ChiefSpec{ + ReplicaName: string(tfv1alpha1.WORKER), + ReplicaIndex: 0, + }, + }, }, }, expectMounts: 0, expectPhase: tfv1alpha1.TfJobPhaseFailed, expectState: tfv1alpha1.StateFailed, - expectReason: "tbReplicaSpec.LogDir must be specified", + expectReason: "invalid job spec: tbReplicaSpec.LogDir must be specified", }, } diff --git a/py/test_runner.py b/py/test_runner.py index 1fbde783d5..6da9596909 100644 --- a/py/test_runner.py +++ b/py/test_runner.py @@ -54,7 +54,7 @@ def run_test(args): results = tf_job_client.wait_for_job(api_client, namespace, name, status_callback=tf_job_client.log_status) - if results["status"]["state"] != "succeeded": + if results["status"]["state"].lower() != "succeeded": t.failure = "Job {0} in namespace {1} in state {2}".format( name, namespace, results["status"]["state"])