Skip to content

Commit

Permalink
Fix bug with jobs not being marked as completed.
Browse files Browse the repository at this point in the history
* A bug was introduced with getting the replica status in #344 which
switched to creating pods directly.

* Our presubmits/postsubmits were failing but this went unnoticed because
the git status check was improperly reported as succeeded.

* The bug is because we try to get the pod status by name but the name
doesn't include the random salt in the pod name.

* The code in question is a legacy of when we were using job controllers and
we first got the status of the job controller. We incorrectly changed that
code to get the pod. The correct thing is to just list pods by label; we
already do that in the code below so we just need to delete some code.

* Don't create any resources if the DeletionTimestamp is set.
  Creating resources at this point would end blocking deletion of the object
  because the controller would create resources while we are trying to delete
  them.

* Use logrus in controller.go, trainer.go, and replicas.go to log
  with fields providing information about the job and repliac.
  This makes it easy to filter logs for a particular job.

* Use logrus to log the name of the job in a field.
  • Loading branch information
jlewi committed Mar 25, 2018
1 parent eec56b5 commit 63f4248
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 55 deletions.
17 changes: 15 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func (c *Controller) processNextWorkItem() bool {
forget, err := c.syncHandler(key.(string))
if err == nil {
if forget {
log.WithFields(log.Fields{
"job": key,
}).Infof("WorkQueue forgetting key %v", key)
c.WorkQueue.Forget(key)
}
return true
Expand All @@ -209,7 +212,9 @@ func (c *Controller) processNextWorkItem() bool {
func (c *Controller) syncTFJob(key string) (bool, error) {
startTime := time.Now()
defer func() {
log.Debugf("Finished syncing job %q (%v)", key, time.Since(startTime))
log.WithFields(log.Fields{
"job": key,
}).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
}()

ns, name, err := cache.SplitMetaNamespaceKey(key)
Expand All @@ -224,7 +229,9 @@ func (c *Controller) syncTFJob(key string) (bool, error) {

if err != nil {
if apierrors.IsNotFound(err) {
log.Debugf("Job has been deleted: %v", key)
log.WithFields(log.Fields{
"job": key,
}).Infof("Job has been deleted: %v", key)
return true, nil
}
return false, err
Expand All @@ -233,9 +240,15 @@ func (c *Controller) syncTFJob(key string) (bool, error) {
// Create a new TrainingJob if there is no TrainingJob stored for it in the jobs map or if the UID's don't match.
// The UID's won't match in the event we deleted the job and then recreated the job with the same name.
if cJob, ok := c.jobs[key]; !ok || cJob.UID() != tfJob.UID {
log.WithFields(log.Fields{
"job": key,
}).Infof("Creating new job %v", key)
nc, err := trainer.NewJob(c.KubeClient, c.TFJobClient, c.recorder, tfJob, &c.config)

if err != nil {
log.WithFields(log.Fields{
"job": key,
}).Errorf("There was a problem creating NewJob %v; Error: %v", key, err)
return false, err
}
c.jobs[key] = nc
Expand Down
66 changes: 37 additions & 29 deletions pkg/trainer/replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"fmt"
"strings"

log "github.com/golang/glog"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -38,6 +38,7 @@ import (
const (
SuccessfulCreateReason = "SuccessfulCreate"
FailedCreateReason = "FailedCreate"
indexField = "replica"
)

// TFReplicaSet is a set of TF processes all acting as the same role (e.g. worker
Expand All @@ -47,6 +48,9 @@ type TFReplicaSet struct {
// Job is a pointer to the TrainingJob to which this replica belongs.
Job *TrainingJob
Spec tfv1alpha1.TFReplicaSpec

// contextLogger is a logger to use for logging information about this replica.
contextLogger *log.Entry
}

// TFReplicaSetInterface is an interface for managing a set of replicas.
Expand Down Expand Up @@ -102,6 +106,14 @@ func NewTFReplicaSet(clientSet kubernetes.Interface, recorder record.EventRecord
recorder: recorder,
Job: job,
Spec: tfReplicaSpec,
contextLogger: log.WithFields(log.Fields{
"job_type": string(tfReplicaSpec.TFReplicaType),
"runtime_id": job.job.Spec.RuntimeId,
"tf_job_name": job.job.ObjectMeta.Name,
// We use job to match the key used in controller.go
// In controller.go we log the key used with the workqueue.
"job": job.job.ObjectMeta.Namespace + "/" + job.job.ObjectMeta.Name,
}),
}, nil
}

Expand Down Expand Up @@ -150,7 +162,9 @@ func (s *TFReplicaSet) CreateServiceWithIndex(index int32) (*v1.Service, error)
},
}

log.Infof("Creating service: %v", service.ObjectMeta.Name)
s.contextLogger.WithFields(log.Fields{
indexField: index,
}).Infof("Creating service: %v", service.ObjectMeta.Name)
return s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Create(service)
}

Expand Down Expand Up @@ -184,7 +198,7 @@ func (s *TFReplicaSet) CreatePodWithIndex(index int32) (*v1.Pod, error) {

tfConfigJson, err := json.Marshal(tfConfig)
if err != nil {
log.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err)
s.contextLogger.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err)
return nil, err
}

Expand All @@ -205,7 +219,9 @@ func (s *TFReplicaSet) CreatePodWithIndex(index int32) (*v1.Pod, error) {
})
}

log.Infof("Creating pod: %v", pod.ObjectMeta.Name)
s.contextLogger.WithFields(log.Fields{
indexField: index,
}).Infof("Creating pod: %v", pod.ObjectMeta.Name)
return s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Create(pod)
}

Expand All @@ -222,48 +238,50 @@ func (s *TFReplicaSet) Delete() error {
LabelSelector: selector,
}

log.V(1).Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
s.contextLogger.Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
err = s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options)

if err != nil {
log.Errorf("There was a problem deleting the jobs; %v", err)
s.contextLogger.Errorf("There was a problem deleting the jobs; %v", err)
failures = true
}

// We need to delete the completed pods.
log.Infof("Deleting Pods namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
s.contextLogger.Infof("Deleting Pods namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
err = s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options)

if err != nil {
log.Errorf("There was a problem deleting the pods; %v", err)
s.contextLogger.Errorf("There was a problem deleting the pods; %v", err)
failures = true
}

// Services doesn't support DeleteCollection so we delete them individually.
// TODO(jlewi): We should check if this has changed with K8s 1.8 or other releases.
for index := int32(0); index < *s.Spec.Replicas; index++ {
log.V(1).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index)))
s.contextLogger.WithFields(log.Fields{
indexField: index,
}).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index)))
err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.genName(index), &meta_v1.DeleteOptions{})

if err != nil {
log.Errorf("Error deleting service %v; %v", s.genName(index), err)
s.contextLogger.Errorf("Error deleting service %v; %v", s.genName(index), err)
failures = true
}
}

// If the ConfigMap for the default parameter server exists, we delete it
log.Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
s.contextLogger.Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
_, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Get(s.defaultPSConfigMapName(), meta_v1.GetOptions{})
if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
log.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err)
s.contextLogger.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err)
failures = true
}
} else {
log.Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
s.contextLogger.Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Delete(s.defaultPSConfigMapName(), &meta_v1.DeleteOptions{})
if err != nil {
log.Errorf("There was a problem deleting the ConfigMaps; %v", err)
s.contextLogger.Errorf("There was a problem deleting the ConfigMaps; %v", err)
failures = true
}
}
Expand Down Expand Up @@ -331,20 +349,10 @@ func replicaStatusFromPodList(l v1.PodList, name string) tfv1alpha1.ReplicaState

// GetSingleReplicaStatus returns status for a single replica
func (s *TFReplicaSet) GetSingleReplicaStatus(index int32) tfv1alpha1.ReplicaState {
p, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{})

if err != nil {
return tfv1alpha1.ReplicaStateUnknown
}

if v1.PodSucceeded == p.Status.Phase {
return tfv1alpha1.ReplicaStateSucceeded
}

labels := s.LabelsByIndex(index)
selector, err := labels.ToSelector()
if err != nil {
log.Errorf("labels.ToSelector() error; %v", err)
s.contextLogger.Errorf("labels.ToSelector() error; %v", err)
return tfv1alpha1.ReplicaStateFailed
}

Expand Down Expand Up @@ -434,14 +442,14 @@ func (s *TFReplicaSet) SyncPods() error {
}

if len(pl.Items) == 0 {
log.Infof("No pod found for job %s, creating a new one.", s.Job.name)
s.contextLogger.Infof("No pod found for job %s, creating a new one.", s.Job.name)
// Create the pod
createdPod, err := s.CreatePodWithIndex(index)

// If the pod already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("Pod: %v already exists.", createdPod.ObjectMeta.Name)
s.contextLogger.Infof("Pod: %v already exists.", createdPod.ObjectMeta.Name)
continue
}
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
Expand All @@ -466,14 +474,14 @@ func (s *TFReplicaSet) SyncServices() error {
for index := int32(0); index < *s.Spec.Replicas; index++ {
_, err := s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{})
if err != nil && k8s_errors.IsNotFound(err) {
log.Infof("Service: %v not found, create new one.", s.genName(index))
s.contextLogger.Infof("Service: %v not found, create new one.", s.genName(index))
// Create the service
createdService, err := s.CreateServiceWithIndex(index)

// If the service already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("Service: %v already exists.", s.genName(index))
s.contextLogger.Infof("Service: %v already exists.", s.genName(index))
continue
}
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
Expand Down
Loading

0 comments on commit 63f4248

Please sign in to comment.