Skip to content

Commit

Permalink
optimize code (#890)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiajin Zheng authored and k8s-ci-robot committed Dec 17, 2018
1 parent 4754c01 commit 31e7169
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 25 deletions.
38 changes: 20 additions & 18 deletions pkg/controller.v2/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,10 @@ func (tc *TFController) Run(threadiness int, stopCh <-chan struct{}) error {

// Wait for the caches to be synced before starting workers.
log.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, tc.tfJobInformerSynced); !ok {
return fmt.Errorf("failed to wait for tfjob caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, tc.PodInformerSynced); !ok {
return fmt.Errorf("failed to wait for pod caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, tc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for service caches to sync")
if ok := cache.WaitForCacheSync(stopCh, tc.tfJobInformerSynced,
tc.PodInformerSynced,
tc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

log.Infof("Starting %v workers", threadiness)
Expand All @@ -213,15 +207,26 @@ func (tc *TFController) runWorker() {
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (tc *TFController) processNextWorkItem() bool {
key, quit := tc.WorkQueue.Get()
obj, quit := tc.WorkQueue.Get()
if quit {
return false
}
defer tc.WorkQueue.Done(key)
defer tc.WorkQueue.Done(obj)

var key string
var ok bool
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
tc.WorkQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return true
}

logger := tflogger.LoggerForKey(key.(string))
logger := tflogger.LoggerForKey(key)

tfJob, err := tc.getTFJobFromKey(key.(string))
tfJob, err := tc.getTFJobFromKey(key)
if err != nil {
if err == errNotExists {
logger.Infof("TFJob has been deleted: %v", key)
Expand All @@ -240,7 +245,7 @@ func (tc *TFController) processNextWorkItem() bool {
}

// Sync TFJob to match the actual state to this desired state.
forget, err := tc.syncHandler(key.(string))
forget, err := tc.syncHandler(key)
if err == nil {
if forget {
tc.WorkQueue.Forget(key)
Expand Down Expand Up @@ -279,9 +284,6 @@ func (tc *TFController) syncTFJob(key string) (bool, error) {
if err != nil {
return false, err
}
if len(namespace) == 0 || len(name) == 0 {
return false, fmt.Errorf("invalid tfjob key %q: either namespace or name is missing", key)
}

sharedTFJob, err := tc.getTFJobFromName(namespace, name)
if err != nil {
Expand Down
8 changes: 2 additions & 6 deletions pkg/controller.v2/tensorflow/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func (tc *TFController) getTFJobFromName(namespace, name string) (*tfv1alpha2.TF
}

func (tc *TFController) getTFJobFromKey(key string) (*tfv1alpha2.TFJob, error) {
logger := tflogger.LoggerForKey(key)
// Check if the key exists.
obj, exists, err := tc.tfJobInformer.GetIndexer().GetByKey(key)
logger := tflogger.LoggerForKey(key)
if err != nil {
logger.Errorf("Failed to get TFJob '%s' from informer index: %+v", key, err)
return nil, errGetFromKey
Expand All @@ -77,11 +77,7 @@ func (tc *TFController) getTFJobFromKey(key string) (*tfv1alpha2.TFJob, error) {
return nil, errNotExists
}

tfjob, err := tfJobFromUnstructured(obj)
if err != nil {
return nil, err
}
return tfjob, nil
return tfJobFromUnstructured(obj)
}

func tfJobFromUnstructured(obj interface{}) (*tfv1alpha2.TFJob, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v2/tensorflow/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

const (
failedMarshalTFJobReason = "FailedInvalidTFJobSpec"
failedMarshalTFJobReason = "InvalidTFJobSpec"
)

// When a pod is added, set the defaults and enqueue the current tfjob.
Expand Down

0 comments on commit 31e7169

Please sign in to comment.