Skip to content

Commit

Permalink
Fix bug with jobs not being marked as completed. (kubeflow#501)
Browse files Browse the repository at this point in the history
* Fix bug with jobs not being marked as completed.

* A bug was introduced with getting the replica status in kubeflow#344 which
switched to creating pods directly.

* Our presubmits/postsubmits were failing but this went unnoticed because
the git status check was improperly reported as succeeded.

* The bug is because we try to get the pod status by name but the name
doesn't include the random salt in the pod name.

* The code in question is a legacy of when we were using job controllers and
we first got the status of the job controller. We incorrectly changed that
code to get the pod. The correct thing is to just list pods by label; we
already do that in the code below so we just need to delete some code.

* Don't create any resources if the DeletionTimestamp is set.
  Creating resources at this point would end blocking deletion of the object
  because the controller would create resources while we are trying to delete
  them.

* Use logrus in controller.go, trainer.go, and replicas.go to log
  with fields providing information about the job and repliac.
  This makes it easy to filter logs for a particular job.

* Use logrus to log the name of the job in a field.

* Checking the deletiontime stamp doesn't appear to be sufficient.

Use the Phase to determine whether we should create resources.

* Run gofmt.

* * Reset the rate limiter after every successful sync.
* Otherwise the ratelimiter will end up delaying processing subsequent
  events which isn't what we want.

* Run goimports to fix lint issues.

* * Reconcile needs to update the TFJob stored in TrainingJob. This ensures
  TrainingJob has an up to date representation of the job.

* Otherwise changes made to the spec won't be available to TrainingJob. For
  example, if the job is deleted by the user, the deletion timestamp will
  be set. But if we don't update the TFJob stored in TrainingJob this
  change won't be propogated.

* * TrainingJob.update should log the value of the job not the pointer.

* Add more comments to the code.
  • Loading branch information
jlewi authored and k8s-ci-robot committed Mar 26, 2018
1 parent aad2178 commit e31018a
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 69 deletions.
61 changes: 54 additions & 7 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

"github.com/juju/ratelimit"
tfv1alpha1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha1"
tfjobclient "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
kubeflowscheme "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -76,6 +77,21 @@ type Controller struct {
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
//
// Items in the work queue correspond to the name of the job.
// In response to various events (e.g. Add, Update, Delete), the informer
// is configured to add events to the queue. Since the item in the queue
// represents a job and not a particular event, we end up aggregating events for
// a job and ensure that a particular job isn't being processed by multiple
// workers simultaneously.
//
// We rely on the informer to periodically generate Update events. This ensures
// we regularly check on each TFJob and take any action needed.
//
// If there is a problem processing a job, processNextWorkItem just requeues
// the work item. This ensures that we end up retrying it. In this case
// we rely on the rateLimiter in the worker queue to retry with exponential
// backoff.
WorkQueue workqueue.RateLimitingInterface

// recorder is an event recorder for recording Event resources to the
Expand All @@ -100,10 +116,19 @@ func New(kubeClient kubernetes.Interface, tfJobClient tfjobclient.Interface,
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})

// Use a ratelimiter with overall and per-item rate limitting.
// The overall is a token bucket and the per-item is exponential
// For the per item
rateLimiter := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Bucket: ratelimit.NewBucketWithRate(float64(10), int64(100))},
)

controller := &Controller{
KubeClient: kubeClient,
TFJobClient: tfJobClient,
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "TFjobs"),
WorkQueue: workqueue.NewNamedRateLimitingQueue(rateLimiter, "TFjobs"),
recorder: recorder,
// TODO(jlewi)): What to do about cluster.Cluster?
jobs: make(map[string]*trainer.TrainingJob),
Expand Down Expand Up @@ -185,16 +210,23 @@ func (c *Controller) processNextWorkItem() bool {
if quit {
return false
}

defer c.WorkQueue.Done(key)

forget, err := c.syncHandler(key.(string))
_, err := c.syncHandler(key.(string))
if err == nil {
if forget {
c.WorkQueue.Forget(key)
}
// Calling forget resets the rate limiter for this item.
// Since the sync was processed successfully we want to reset the ratelimiter
// so that future events can be processed immediately.
log.WithFields(log.Fields{
"job": key,
}).Infof("WorkQueue forgetting key %v", key)
c.WorkQueue.Forget(key)
return true
}

// There was an error processing the key so to retry we requeue it.
// The WorkQueue uses a rate limiter to control when the key gets retried.
utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))
c.WorkQueue.AddRateLimited(key)

Expand All @@ -209,7 +241,9 @@ func (c *Controller) processNextWorkItem() bool {
func (c *Controller) syncTFJob(key string) (bool, error) {
startTime := time.Now()
defer func() {
log.Debugf("Finished syncing job %q (%v)", key, time.Since(startTime))
log.WithFields(log.Fields{
"job": key,
}).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
}()

ns, name, err := cache.SplitMetaNamespaceKey(key)
Expand All @@ -224,7 +258,9 @@ func (c *Controller) syncTFJob(key string) (bool, error) {

if err != nil {
if apierrors.IsNotFound(err) {
log.Debugf("Job has been deleted: %v", key)
log.WithFields(log.Fields{
"job": key,
}).Infof("Job has been deleted: %v", key)
return true, nil
}
return false, err
Expand All @@ -233,12 +269,22 @@ func (c *Controller) syncTFJob(key string) (bool, error) {
// Create a new TrainingJob if there is no TrainingJob stored for it in the jobs map or if the UID's don't match.
// The UID's won't match in the event we deleted the job and then recreated the job with the same name.
if cJob, ok := c.jobs[key]; !ok || cJob.UID() != tfJob.UID {
log.WithFields(log.Fields{
"job": key,
}).Infof("Creating new job %v", key)
nc, err := trainer.NewJob(c.KubeClient, c.TFJobClient, c.recorder, tfJob, &c.config)

if err != nil {
log.WithFields(log.Fields{
"job": key,
}).Errorf("There was a problem creating NewJob %v; Error: %v", key, err)
return false, err
}
c.jobs[key] = nc
} else {
// Replace the TFJob stored inside TrainingJob with the latest job.
// We need to do this to pull in the latest changes to the spec/status.
c.jobs[key].Update(tfJob)
}

nc := c.jobs[key]
Expand All @@ -247,6 +293,7 @@ func (c *Controller) syncTFJob(key string) (bool, error) {
return false, err
}

// TODO(jlewi): Why do we issue a get request again here?
tfJob, err = c.TFJobClient.KubeflowV1alpha1().TFJobs(tfJob.ObjectMeta.Namespace).Get(tfJob.ObjectMeta.Name, metav1.GetOptions{})

if err != nil {
Expand Down
66 changes: 37 additions & 29 deletions pkg/trainer/replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"fmt"
"strings"

log "github.com/golang/glog"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -38,6 +38,7 @@ import (
const (
SuccessfulCreateReason = "SuccessfulCreate"
FailedCreateReason = "FailedCreate"
indexField = "replica"
)

// TFReplicaSet is a set of TF processes all acting as the same role (e.g. worker
Expand All @@ -47,6 +48,9 @@ type TFReplicaSet struct {
// Job is a pointer to the TrainingJob to which this replica belongs.
Job *TrainingJob
Spec tfv1alpha1.TFReplicaSpec

// contextLogger is a logger to use for logging information about this replica.
contextLogger *log.Entry
}

// TFReplicaSetInterface is an interface for managing a set of replicas.
Expand Down Expand Up @@ -102,6 +106,14 @@ func NewTFReplicaSet(clientSet kubernetes.Interface, recorder record.EventRecord
recorder: recorder,
Job: job,
Spec: tfReplicaSpec,
contextLogger: log.WithFields(log.Fields{
"job_type": string(tfReplicaSpec.TFReplicaType),
"runtime_id": job.job.Spec.RuntimeId,
"tf_job_name": job.job.ObjectMeta.Name,
// We use job to match the key used in controller.go
// In controller.go we log the key used with the workqueue.
"job": job.job.ObjectMeta.Namespace + "/" + job.job.ObjectMeta.Name,
}),
}, nil
}

Expand Down Expand Up @@ -150,7 +162,9 @@ func (s *TFReplicaSet) CreateServiceWithIndex(index int32) (*v1.Service, error)
},
}

log.Infof("Creating service: %v", service.ObjectMeta.Name)
s.contextLogger.WithFields(log.Fields{
indexField: index,
}).Infof("Creating service: %v", service.ObjectMeta.Name)
return s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Create(service)
}

Expand Down Expand Up @@ -184,7 +198,7 @@ func (s *TFReplicaSet) CreatePodWithIndex(index int32) (*v1.Pod, error) {

tfConfigJson, err := json.Marshal(tfConfig)
if err != nil {
log.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err)
s.contextLogger.Errorf("Job: %v serializing tfConfig: %v return error; %v", s.Job.job.ObjectMeta.Name, util.Pformat(tfConfig), err)
return nil, err
}

Expand All @@ -205,7 +219,9 @@ func (s *TFReplicaSet) CreatePodWithIndex(index int32) (*v1.Pod, error) {
})
}

log.Infof("Creating pod: %v", pod.ObjectMeta.Name)
s.contextLogger.WithFields(log.Fields{
indexField: index,
}).Infof("Creating pod: %v", pod.ObjectMeta.Name)
return s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Create(pod)
}

Expand All @@ -222,48 +238,50 @@ func (s *TFReplicaSet) Delete() error {
LabelSelector: selector,
}

log.V(1).Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
s.contextLogger.Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
err = s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options)

if err != nil {
log.Errorf("There was a problem deleting the jobs; %v", err)
s.contextLogger.Errorf("There was a problem deleting the jobs; %v", err)
failures = true
}

// We need to delete the completed pods.
log.Infof("Deleting Pods namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
s.contextLogger.Infof("Deleting Pods namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
err = s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options)

if err != nil {
log.Errorf("There was a problem deleting the pods; %v", err)
s.contextLogger.Errorf("There was a problem deleting the pods; %v", err)
failures = true
}

// Services doesn't support DeleteCollection so we delete them individually.
// TODO(jlewi): We should check if this has changed with K8s 1.8 or other releases.
for index := int32(0); index < *s.Spec.Replicas; index++ {
log.V(1).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index)))
s.contextLogger.WithFields(log.Fields{
indexField: index,
}).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.genName((index)))
err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.genName(index), &meta_v1.DeleteOptions{})

if err != nil {
log.Errorf("Error deleting service %v; %v", s.genName(index), err)
s.contextLogger.Errorf("Error deleting service %v; %v", s.genName(index), err)
failures = true
}
}

// If the ConfigMap for the default parameter server exists, we delete it
log.Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
s.contextLogger.Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
_, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Get(s.defaultPSConfigMapName(), meta_v1.GetOptions{})
if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
log.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err)
s.contextLogger.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err)
failures = true
}
} else {
log.Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
s.contextLogger.Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Delete(s.defaultPSConfigMapName(), &meta_v1.DeleteOptions{})
if err != nil {
log.Errorf("There was a problem deleting the ConfigMaps; %v", err)
s.contextLogger.Errorf("There was a problem deleting the ConfigMaps; %v", err)
failures = true
}
}
Expand Down Expand Up @@ -331,20 +349,10 @@ func replicaStatusFromPodList(l v1.PodList, name string) tfv1alpha1.ReplicaState

// GetSingleReplicaStatus returns status for a single replica
func (s *TFReplicaSet) GetSingleReplicaStatus(index int32) tfv1alpha1.ReplicaState {
p, err := s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{})

if err != nil {
return tfv1alpha1.ReplicaStateUnknown
}

if v1.PodSucceeded == p.Status.Phase {
return tfv1alpha1.ReplicaStateSucceeded
}

labels := s.LabelsByIndex(index)
selector, err := labels.ToSelector()
if err != nil {
log.Errorf("labels.ToSelector() error; %v", err)
s.contextLogger.Errorf("labels.ToSelector() error; %v", err)
return tfv1alpha1.ReplicaStateFailed
}

Expand Down Expand Up @@ -434,14 +442,14 @@ func (s *TFReplicaSet) SyncPods() error {
}

if len(pl.Items) == 0 {
log.Infof("No pod found for job %s, creating a new one.", s.Job.name)
s.contextLogger.Infof("Job %s missing pod for replica %s index %s, creating a new one.", s.Job.name(), string(s.Spec.TFReplicaType), index)
// Create the pod
createdPod, err := s.CreatePodWithIndex(index)

// If the pod already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("Pod: %v already exists.", createdPod.ObjectMeta.Name)
s.contextLogger.Infof("Pod: %v already exists.", createdPod.ObjectMeta.Name)
continue
}
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
Expand All @@ -466,14 +474,14 @@ func (s *TFReplicaSet) SyncServices() error {
for index := int32(0); index < *s.Spec.Replicas; index++ {
_, err := s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Get(s.genName(index), meta_v1.GetOptions{})
if err != nil && k8s_errors.IsNotFound(err) {
log.Infof("Service: %v not found, create new one.", s.genName(index))
s.contextLogger.Infof("Service: %v not found, create new one.", s.genName(index))
// Create the service
createdService, err := s.CreateServiceWithIndex(index)

// If the service already exists do nothing.
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("Service: %v already exists.", s.genName(index))
s.contextLogger.Infof("Service: %v already exists.", s.genName(index))
continue
}
s.recorder.Eventf(s.Job.job, v1.EventTypeWarning, FailedCreateReason, "Error creating: %v", err)
Expand Down
Loading

0 comments on commit e31018a

Please sign in to comment.