From e31018ac83a74e31ca0a0559c4eca9bcecb600a2 Mon Sep 17 00:00:00 2001 From: Jeremy Lewi Date: Sun, 25 Mar 2018 18:11:00 -0700 Subject: [PATCH] Fix bug with jobs not being marked as completed. (#501) * Fix bug with jobs not being marked as completed. * A bug was introduced with getting the replica status in #344 which switched to creating pods directly. * Our presubmits/postsubmits were failing but this went unnoticed because the git status check was improperly reported as succeeded. * The bug is because we try to get the pod status by name but the name doesn't include the random salt in the pod name. * The code in question is a legacy of when we were using job controllers and we first got the status of the job controller. We incorrectly changed that code to get the pod. The correct thing is to just list pods by label; we already do that in the code below so we just need to delete some code. * Don't create any resources if the DeletionTimestamp is set. Creating resources at this point would end blocking deletion of the object because the controller would create resources while we are trying to delete them. * Use logrus in controller.go, trainer.go, and replicas.go to log with fields providing information about the job and repliac. This makes it easy to filter logs for a particular job. * Use logrus to log the name of the job in a field. * Checking the deletiontime stamp doesn't appear to be sufficient. Use the Phase to determine whether we should create resources. * Run gofmt. * * Reset the rate limiter after every successful sync. * Otherwise the ratelimiter will end up delaying processing subsequent events which isn't what we want. * Run goimports to fix lint issues. * * Reconcile needs to update the TFJob stored in TrainingJob. This ensures TrainingJob has an up to date representation of the job. * Otherwise changes made to the spec won't be available to TrainingJob. For example, if the job is deleted by the user, the deletion timestamp will be set. But if we don't update the TFJob stored in TrainingJob this change won't be propogated. * * TrainingJob.update should log the value of the job not the pointer. * Add more comments to the code. --- pkg/controller/controller.go | 61 +++++++++++-- pkg/trainer/replicas.go | 66 +++++++------- pkg/trainer/training.go | 86 +++++++++++++------ py/test_runner.py | 3 +- py/tf_job_client.py | 4 +- test/workflows/components/workflows.libsonnet | 3 +- 6 files changed, 154 insertions(+), 69 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b575f7620d..a68fc406f6 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -34,6 +34,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "github.com/juju/ratelimit" tfv1alpha1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha1" tfjobclient "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned" kubeflowscheme "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned/scheme" @@ -76,6 +77,21 @@ type Controller struct { // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. + // + // Items in the work queue correspond to the name of the job. + // In response to various events (e.g. Add, Update, Delete), the informer + // is configured to add events to the queue. Since the item in the queue + // represents a job and not a particular event, we end up aggregating events for + // a job and ensure that a particular job isn't being processed by multiple + // workers simultaneously. + // + // We rely on the informer to periodically generate Update events. This ensures + // we regularly check on each TFJob and take any action needed. + // + // If there is a problem processing a job, processNextWorkItem just requeues + // the work item. This ensures that we end up retrying it. In this case + // we rely on the rateLimiter in the worker queue to retry with exponential + // backoff. WorkQueue workqueue.RateLimitingInterface // recorder is an event recorder for recording Event resources to the @@ -100,10 +116,19 @@ func New(kubeClient kubernetes.Interface, tfJobClient tfjobclient.Interface, eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}) + // Use a ratelimiter with overall and per-item rate limitting. + // The overall is a token bucket and the per-item is exponential + // For the per item + rateLimiter := workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &workqueue.BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))}, + ) + controller := &Controller{ KubeClient: kubeClient, TFJobClient: tfJobClient, - WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "TFjobs"), + WorkQueue: workqueue.NewNamedRateLimitingQueue(rateLimiter, "TFjobs"), recorder: recorder, // TODO(jlewi)): What to do about cluster.Cluster? jobs: make(map[string]*trainer.TrainingJob), @@ -185,16 +210,23 @@ func (c *Controller) processNextWorkItem() bool { if quit { return false } + defer c.WorkQueue.Done(key) - forget, err := c.syncHandler(key.(string)) + _, err := c.syncHandler(key.(string)) if err == nil { - if forget { - c.WorkQueue.Forget(key) - } + // Calling forget resets the rate limiter for this item. + // Since the sync was processed successfully we want to reset the ratelimiter + // so that future events can be processed immediately. + log.WithFields(log.Fields{ + "job": key, + }).Infof("WorkQueue forgetting key %v", key) + c.WorkQueue.Forget(key) return true } + // There was an error processing the key so to retry we requeue it. + // The WorkQueue uses a rate limiter to control when the key gets retried. utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err)) c.WorkQueue.AddRateLimited(key) @@ -209,7 +241,9 @@ func (c *Controller) processNextWorkItem() bool { func (c *Controller) syncTFJob(key string) (bool, error) { startTime := time.Now() defer func() { - log.Debugf("Finished syncing job %q (%v)", key, time.Since(startTime)) + log.WithFields(log.Fields{ + "job": key, + }).Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) }() ns, name, err := cache.SplitMetaNamespaceKey(key) @@ -224,7 +258,9 @@ func (c *Controller) syncTFJob(key string) (bool, error) { if err != nil { if apierrors.IsNotFound(err) { - log.Debugf("Job has been deleted: %v", key) + log.WithFields(log.Fields{ + "job": key, + }).Infof("Job has been deleted: %v", key) return true, nil } return false, err @@ -233,12 +269,22 @@ func (c *Controller) syncTFJob(key string) (bool, error) { // Create a new TrainingJob if there is no TrainingJob stored for it in the jobs map or if the UID's don't match. // The UID's won't match in the event we deleted the job and then recreated the job with the same name. if cJob, ok := c.jobs[key]; !ok || cJob.UID() != tfJob.UID { + log.WithFields(log.Fields{ + "job": key, + }).Infof("Creating new job %v", key) nc, err := trainer.NewJob(c.KubeClient, c.TFJobClient, c.recorder, tfJob, &c.config) if err != nil { + log.WithFields(log.Fields{ + "job": key, + }).Errorf("There was a problem creating NewJob %v; Error: %v", key, err) return false, err } c.jobs[key] = nc + } else { + // Replace the TFJob stored inside TrainingJob with the latest job. + // We need to do this to pull in the latest changes to the spec/status. + c.jobs[key].Update(tfJob) } nc := c.jobs[key] @@ -247,6 +293,7 @@ func (c *Controller) syncTFJob(key string) (bool, error) { return false, err } + // TODO(jlewi): Why do we issue a get request again here? tfJob, err = c.TFJobClient.KubeflowV1alpha1().TFJobs(tfJob.ObjectMeta.Namespace).Get(tfJob.ObjectMeta.Name, metav1.GetOptions{}) if err != nil { diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index e2cacda8b0..9a671e38e2 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -20,7 +20,7 @@ import ( "fmt" "strings" - log "github.com/golang/glog" + log "github.com/sirupsen/logrus" "k8s.io/api/core/v1" k8s_errors "k8s.io/apimachinery/pkg/api/errors" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +38,7 @@ import ( const ( SuccessfulCreateReason = "SuccessfulCreate" FailedCreateReason = "FailedCreate" + indexField = "replica" ) // TFReplicaSet is a set of TF processes all acting as the same role (e.g. worker @@ -47,6 +48,9 @@ type TFReplicaSet struct { // Job is a pointer to the TrainingJob to which this replica belongs. Job *TrainingJob Spec tfv1alpha1.TFReplicaSpec + + // contextLogger is a logger to use for logging information about this replica. + contextLogger *log.Entry } // TFReplicaSetInterface is an interface for managing a set of replicas. @@ -102,6 +106,14 @@ func NewTFReplicaSet(clientSet kubernetes.Interface, recorder record.EventRecord recorder: recorder, Job: job, Spec: tfReplicaSpec, + contextLogger: log.WithFields(log.Fields{ + "job_type": string(tfReplicaSpec.TFReplicaType), + "runtime_id": job.job.Spec.RuntimeId, + "tf_job_name": job.job.ObjectMeta.Name, + // We use job to match the key used in controller.go + // In controller.go we log the key used with the workqueue. + "job": job.job.ObjectMeta.Namespace + "/" + job.job.ObjectMeta.Name, + }), }, nil } @@ -150,7 +162,9 @@ func (s *TFReplicaSet) CreateServiceWithIndex(index int32) (*v1.Service, error) }, } - log.Infof("Creating service: %v", service.ObjectMeta.Name) + s.contextLogger.WithFields(log.Fields{ + indexField: index, + }).Infof("Creating service: %v", service.ObjectMeta.Name) return s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Create(service) } @@ -184,7 +198,7 @@ func (s *TFReplicaSet) CreatePodWithIndex(index int32) (*v1.Pod, error) { tfConfigJson, err := json.Marshal(tfConfig) if err != nil { - log.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err) + s.contextLogger.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err) return nil, err } @@ -205,7 +219,9 @@ func (s *TFReplicaSet) CreatePodWithIndex(index int32) (*v1.Pod, error) { }) } - log.Infof("Creating pod: %v", pod.ObjectMeta.Name) + s.contextLogger.WithFields(log.Fields{ + indexField: index, + }).Infof("Creating pod: %v", pod.ObjectMeta.Name) return s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Create(pod) } @@ -222,48 +238,50 @@ func (s *TFReplicaSet) Delete() error { LabelSelector: selector, } - log.V(1).Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector) + s.contextLogger.Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector) err = s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options) if err != nil { - log.Errorf("There was a problem deleting the jobs; %v", err) + s.contextLogger.Errorf("There was a problem deleting the jobs; %v", err) failures = true } // We need to delete the completed pods. - log.Infof("Deleting Pods namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector) + s.contextLogger.Infof("Deleting Pods namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector) err = s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options) if err != nil { - log.Errorf("There was a problem deleting the pods; %v", err) + s.contextLogger.Errorf("There was a problem deleting the pods; %v", err) failures = true } // Services doesn't support DeleteCollection so we delete them individually. // TODO(jlewi): We should check if this has changed with K8s 1.8 or other releases. for index := int32(0); index < *s.Spec.Replicas; index++ { - log.V(1).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index))) + s.contextLogger.WithFields(log.Fields{ + indexField: index, + }).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index))) err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.genName(index), &meta_v1.DeleteOptions{}) if err != nil { - log.Errorf("Error deleting service %v; %v", s.genName(index), err) + s.contextLogger.Errorf("Error deleting service %v; %v", s.genName(index), err) failures = true } } // If the ConfigMap for the default parameter server exists, we delete it - log.Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName()) + s.contextLogger.Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName()) _, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Get(s.defaultPSConfigMapName(), meta_v1.GetOptions{}) if err != nil { if !k8sutil.IsKubernetesResourceNotFoundError(err) { - log.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err) + s.contextLogger.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err) failures = true } } else { - log.Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName()) + s.contextLogger.Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName()) err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Delete(s.defaultPSConfigMapName(), &meta_v1.DeleteOptions{}) if err != nil { - log.Errorf("There was a problem deleting the ConfigMaps; %v", err) + s.contextLogger.Errorf("There was a problem deleting the ConfigMaps; %v", err) failures = true } } @@ -331,20 +349,10 @@ func replicaStatusFromPodList(l v1.PodList, name string) tfv1alpha1.ReplicaState // GetSingleReplicaStatus returns status for a single replica func (s *TFReplicaSet) GetSingleReplicaStatus(index int32) tfv1alpha1.ReplicaState { - p, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{}) - - if err != nil { - return tfv1alpha1.ReplicaStateUnknown - } - - if v1.PodSucceeded == p.Status.Phase { - return tfv1alpha1.ReplicaStateSucceeded - } - labels := s.LabelsByIndex(index) selector, err := labels.ToSelector() if err != nil { - log.Errorf("labels.ToSelector() error; %v", err) + s.contextLogger.Errorf("labels.ToSelector() error; %v", err) return tfv1alpha1.ReplicaStateFailed } @@ -434,14 +442,14 @@ func (s *TFReplicaSet) SyncPods() error { } if len(pl.Items) == 0 { - log.Infof("No pod found for job %s, creating a new one.", s.Job.name) + s.contextLogger.Infof("Job %s missing pod for replica %s index %s, creating a new one.", s.Job.name(), string(s.Spec.TFReplicaType), index) // Create the pod createdPod, err := s.CreatePodWithIndex(index) // If the pod already exists do nothing. if err != nil { if k8s_errors.IsAlreadyExists(err) { - log.Infof("Pod: %v already exists.", createdPod.ObjectMeta.Name) + s.contextLogger.Infof("Pod: %v already exists.", createdPod.ObjectMeta.Name) continue } s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err) @@ -466,14 +474,14 @@ func (s *TFReplicaSet) SyncServices() error { for index := int32(0); index < *s.Spec.Replicas; index++ { _, err := s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{}) if err != nil && k8s_errors.IsNotFound(err) { - log.Infof("Service: %v not found, create new one.", s.genName(index)) + s.contextLogger.Infof("Service: %v not found, create new one.", s.genName(index)) // Create the service createdService, err := s.CreateServiceWithIndex(index) // If the service already exists do nothing. if err != nil { if k8s_errors.IsAlreadyExists(err) { - log.Infof("Service: %v already exists.", s.genName(index)) + s.contextLogger.Infof("Service: %v already exists.", s.genName(index)) continue } s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err) diff --git a/pkg/trainer/training.go b/pkg/trainer/training.go index 86579e698e..f92726d859 100644 --- a/pkg/trainer/training.go +++ b/pkg/trainer/training.go @@ -60,6 +60,9 @@ type TrainingJob struct { memberCounter int pdb *v1beta1.PodDisruptionBudget + + // contextLogger is a logger to use for logging information about this replica. + contextLogger *log.Entry } // ClusterSpec represents a cluster TensorFlow specification. @@ -82,6 +85,13 @@ func initJob(kubeCli kubernetes.Interface, tfJobClient tfjobclient.Interface, re Replicas: make([]*TFReplicaSet, 0), job: job, status: *job.Status.DeepCopy(), + + contextLogger: log.WithFields(log.Fields{ + // We use job to match the key used in controller.go + // In controller.go we log the key used with the workqueue. + "job": job.ObjectMeta.Namespace + "/" + job.ObjectMeta.Name, + "uid": job.ObjectMeta.UID, + }), } return j, nil @@ -97,6 +107,15 @@ func NewJob(kubeCli kubernetes.Interface, tfJobClient tfjobclient.Interface, rec return j, nil } +// Update replaces the TFJob corresponding to TrainingJob with the provided job. +// This function is used when the Spec/Status of the job is modified outside the controller. +// For example, if the user issues a delete request. This will update the metadata on the object +// so we need to replace the spec. +func (j *TrainingJob) Update(newJob *tfv1alpha1.TFJob) { + j.contextLogger.Info("Updating job to %+v", *newJob) + j.job = newJob +} + // UID returns the user ID of the requesting user func (j *TrainingJob) UID() types.UID { return j.job.ObjectMeta.UID @@ -271,7 +290,7 @@ func (j *TrainingJob) Delete() { // we shouldn't delete the pods when the jobs finish because leaving the pods // allows us to get the logs from the pods after the job finishes. // - log.Infof("TFJob %v deleted by the user", j.fullname()) + j.contextLogger.Infof("TFJob %v deleted by the user", j.fullname()) // TODO(jlewi): This logic is probably insufficient. if j.job.Status.Phase != tfv1alpha1.TFJobPhaseCleanUp { j.status.Phase = tfv1alpha1.TFJobPhaseCleanUp @@ -281,14 +300,14 @@ func (j *TrainingJob) Delete() { // we just rely on K8s garbage collection to delete the resources before // deleting TFJob? if cErr := j.deleteResources(); cErr != nil { - log.Errorf("trainingJob.deleteResources() error; %v", cErr) + j.contextLogger.Errorf("trainingJob.deleteResources() error; %v", cErr) } if j.pdb != nil { // if the job has PDB for gang scheduling, delete it err := j.KubeCli.PolicyV1beta1().PodDisruptionBudgets(j.job.ObjectMeta.Namespace).Delete(j.pdb.ObjectMeta.Name, &meta_v1.DeleteOptions{}) if err != nil { - log.Errorf("Error deleting PDB %v; %v", j.pdb.ObjectMeta.Name, err) + j.contextLogger.Errorf("Error deleting PDB %v; %v", j.pdb.ObjectMeta.Name, err) } } } @@ -314,12 +333,20 @@ func (j *TrainingJob) updateCRDStatus() error { // Reconcile tries to get the job into the desired state. func (j *TrainingJob) Reconcile(config *tfv1alpha1.ControllerConfig, enableGangScheduling bool) error { + // TODO(jlewi): This doesn't seem to be a reliable way to detect deletion. + if j.job.ObjectMeta.DeletionTimestamp != nil { + j.contextLogger.Info("Deletion timestamp set; skipping reconcile") + // Job is in the process of being deleted so do nothing. + // We especially don't want to create new resources as that could block deletion. + return nil + } + if j.job.Status.Phase == tfv1alpha1.TFJobPhaseNone { // The job hasn't been setup. j.setup(config) if err := j.updateCRDStatus(); err != nil { - log.Warningf("failed to update CRD status: %v", err) + j.contextLogger.Warningf("failed to update CRD status: %v", err) return err } } @@ -328,10 +355,10 @@ func (j *TrainingJob) Reconcile(config *tfv1alpha1.ControllerConfig, enableGangS // These are go-lang structures which aren't preserved in the APIServer. So we always need to call setupReplicas // unlike setup which only needs to be called once during the lifecycle of the job. if err := j.setupReplicas(); err != nil { - log.Errorf("failed to create replicas: %v", err) + j.contextLogger.Errorf("failed to create replicas: %v", err) j.status.Reason = fmt.Sprintf("Could not create in memory datastructures; %v", err) if uErr := j.updateCRDStatus(); err != nil { - log.Warningf("Job %v; failed to update status error: %v", j.job.ObjectMeta.Name, uErr) + j.contextLogger.Warningf("Job %v; failed to update status error: %v", j.job.ObjectMeta.Name, uErr) } return err } @@ -341,28 +368,31 @@ func (j *TrainingJob) Reconcile(config *tfv1alpha1.ControllerConfig, enableGangS if enableGangScheduling { err := j.syncPdb() if err != nil { - log.Errorf("SyncPdb error: %v", err) + j.contextLogger.Errorf("SyncPdb error: %v", err) } } - // sync pods - for _, rc := range j.Replicas { - err := rc.SyncPods() - if err != nil { - log.Errorf("SyncPods error: %v", err) + // Only sync pods and services if we are running. + if j.status.Phase == tfv1alpha1.TFJobPhaseCreating || j.status.Phase == tfv1alpha1.TFJobPhaseRunning { + // sync pods + for _, rc := range j.Replicas { + err := rc.SyncPods() + if err != nil { + j.contextLogger.Errorf("SyncPods error: %v", err) + } } - } - // sync services - for _, rc := range j.Replicas { - err := rc.SyncServices() - if err != nil { - log.Errorf("SyncServices error: %v", err) + // sync services + for _, rc := range j.Replicas { + err := rc.SyncServices() + if err != nil { + j.contextLogger.Errorf("SyncServices error: %v", err) + } } } if err := j.updateCRDStatus(); err != nil { - log.Warningf("Job %v; failed to update status error: %v", j.job.ObjectMeta.Name, err) + j.contextLogger.Warningf("Job %v; failed to update status error: %v", j.job.ObjectMeta.Name, err) return err } @@ -371,36 +401,36 @@ func (j *TrainingJob) Reconcile(config *tfv1alpha1.ControllerConfig, enableGangS j.status.ReplicaStatuses = replicaStatuses if err != nil { - log.Errorf("GetStatus() for job %v returned error: %v", j.job.ObjectMeta.Name, err) + j.contextLogger.Errorf("GetStatus() for job %v returned error: %v", j.job.ObjectMeta.Name, err) return err } // TODO(jlewi): We should update the Phase if we detect the job is done. if state == tfv1alpha1.StateFailed { - log.Errorf("Master failed Job: %v.", j.job.ObjectMeta.Name) + j.contextLogger.Errorf("Master failed Job: %v.", j.job.ObjectMeta.Name) j.status.Phase = tfv1alpha1.TFJobPhaseDone j.status.State = tfv1alpha1.StateFailed } else if state == tfv1alpha1.StateSucceeded { - log.Infof("Master succeeded Job: %v.", j.job.ObjectMeta.Name) + j.contextLogger.Infof("Master succeeded Job: %v.", j.job.ObjectMeta.Name) j.status.Phase = tfv1alpha1.TFJobPhaseDone j.status.State = tfv1alpha1.StateSucceeded } else if state == tfv1alpha1.StateRunning { - log.Infof("Master running Job: %v.", j.job.ObjectMeta.Name) + j.contextLogger.Infof("Master running Job: %v.", j.job.ObjectMeta.Name) j.status.Phase = tfv1alpha1.TFJobPhaseRunning j.status.State = tfv1alpha1.StateRunning } else { - log.Infof("Job %v status=%v", j.job.ObjectMeta.Name, util.Pformat(j.status)) + j.contextLogger.Infof("Job %v status=%v", j.job.ObjectMeta.Name, util.Pformat(j.status)) } // If the phase changed we should update the CRD. if err := j.updateCRDStatus(); err != nil { - log.Warningf("Job %v, failed to update CRD status error: %v", j.job.ObjectMeta.Name, err) + j.contextLogger.Warningf("Job %v, failed to update CRD status error: %v", j.job.ObjectMeta.Name, err) return err } if j.job.Status.Phase == tfv1alpha1.TFJobPhaseCleanUp { if cErr := j.deleteResources(); cErr != nil { - log.Errorf("Job %v trainingJob.Delete() error; %v", j.job.ObjectMeta.Name, cErr) + j.contextLogger.Errorf("Job %v trainingJob.Delete() error; %v", j.job.ObjectMeta.Name, cErr) } // j.status.SetPhase(spec.TFJobPhaseDone) // Return from run because we want to stop reconciling the object. @@ -411,7 +441,7 @@ func (j *TrainingJob) Reconcile(config *tfv1alpha1.ControllerConfig, enableGangS // doesn't match c.Cluster.status. So you can change c.Status in order to propagate // changes to the CRD status. if err := j.updateCRDStatus(); err != nil { - log.Warningf("Job %v; failed to update CRD status error: %v", j.job.ObjectMeta.Name, err) + j.contextLogger.Warningf("Job %v; failed to update CRD status error: %v", j.job.ObjectMeta.Name, err) return err } @@ -464,7 +494,7 @@ func (j *TrainingJob) syncPdb() error { createdPdb, err := j.KubeCli.PolicyV1beta1().PodDisruptionBudgets(j.job.ObjectMeta.Namespace).Create(pdb) if err != nil { if k8s_errors.IsAlreadyExists(err) { - log.Infof("PDB: %v already exists.", j.job.ObjectMeta.Name) + j.contextLogger.Infof("PDB: %v already exists.", j.job.ObjectMeta.Name) return nil } diff --git a/py/test_runner.py b/py/test_runner.py index 531a61085a..ff52a3a5c1 100644 --- a/py/test_runner.py +++ b/py/test_runner.py @@ -52,7 +52,7 @@ def wait_for_delete(client, if datetime.datetime.now() + polling_interval > end_time: raise util.TimeoutError( - "Timeout waiting for job {0} in namespace {1} to finish.".format( + "Timeout waiting for job {0} in namespace {1} to be deleted.".format( name, namespace)) time.sleep(polling_interval.seconds) @@ -195,6 +195,7 @@ def run_test(args): # pylint: disable=too-many-branches,too-many-statements tf_job_client.delete_tf_job(api_client, namespace, name) + logging.info("Waiting for job %s in namespaces %s to be deleted.", name, namespace) wait_for_delete( api_client, namespace, name, status_callback=tf_job_client.log_status) diff --git a/py/tf_job_client.py b/py/tf_job_client.py index ca2c992642..1446fa343b 100644 --- a/py/tf_job_client.py +++ b/py/tf_job_client.py @@ -63,7 +63,7 @@ def delete_tf_job(client, namespace, name): logging.info("Deleting job %s.%s", namespace, name) api_response = crd_api.delete_namespaced_custom_object( TF_JOB_GROUP, TF_JOB_VERSION, namespace, TF_JOB_PLURAL, name, body) - logging.info("Deleted job %s.%s", namespace, name) + logging.info("Deleting job %s.%s returned: %s", namespace, name, api_response) return api_response except ApiException as e: message = "" @@ -99,7 +99,7 @@ def log_status(tf_job): def wait_for_job(client, namespace, name, - timeout=datetime.timedelta(minutes=5), + timeout=datetime.timedelta(minutes=10), polling_interval=datetime.timedelta(seconds=30), status_callback=None): """Wait for the specified job to finish. diff --git a/test/workflows/components/workflows.libsonnet b/test/workflows/components/workflows.libsonnet index 8714e1172b..ac55de93d6 100644 --- a/test/workflows/components/workflows.libsonnet +++ b/test/workflows/components/workflows.libsonnet @@ -168,7 +168,7 @@ }, }, ], // volumes - // onExit specifies the template that should always run when the workflow completes. + // onExit specifies the template that should always run when the workflow completes. onExit: "exit-handler", templates: [ { @@ -221,7 +221,6 @@ [{ name: "teardown-cluster", template: "teardown-cluster", - }], [{ name: "copy-artifacts",