Skip to content

Commit

Permalink
sync up worker status all the time (kubeflow#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
hougangliu authored and k8s-ci-robot committed Dec 20, 2018
1 parent bca0b58 commit b11b81d
Showing 1 changed file with 16 additions and 33 deletions.
49 changes: 16 additions & 33 deletions pkg/controller/studyjob/studyjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,57 +348,47 @@ func (r *ReconcileStudyJobController) updateWorker(c katibapi.ManagerClient, ins
if ctime != nil && cjob.Status.LastScheduleTime != nil {
if ctime.Before(cjob.Status.LastScheduleTime) && len(cjob.Status.Active) == 0 {
saveModel(c, instance.Status.StudyID, instance.Status.Trials[i].TrialID, wid)
instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionCompleted
instance.Status.Trials[i].WorkerList[j].CompletionTime = metav1.Now()
update = true
_, err := c.UpdateWorkerState(
context.Background(),
&katibapi.UpdateWorkerStateRequest{
WorkerId: instance.Status.Trials[i].WorkerList[j].WorkerID,
Status: katibapi.State_COMPLETED,
})
if err != nil {
log.Printf("Fail to update worker info. ID %s", instance.Status.Trials[i].WorkerList[j].WorkerID)
return false, err
}
susp := true
cjob.Spec.Suspend = &susp
if err := r.Update(context.TODO(), cjob); err != nil {
return false, err
}

cwids = append(cwids, wid)
}
}
} else {
// for some reason, metricsCollector for this worker cannot be found (deleted by anyone accidentally or even failed to be created)
update = true
instance.Status.Condition = katibv1alpha1.ConditionFailed
}
if update {
instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionCompleted
instance.Status.Trials[i].WorkerList[j].CompletionTime = metav1.Now()
cwids = append(cwids, wid)
}
case katibapi.State_RUNNING:
if instance.Status.Trials[i].WorkerList[j].Condition != katibv1alpha1.ConditionRunning {
instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionRunning
update = true
}
if errors.IsNotFound(cjoberr) {
r.spawnMetricsCollector(instance, c, instance.Status.StudyID, instance.Status.Trials[i].TrialID, wid, ns, instance.Spec.MetricsCollectorSpec)
}
_, err := c.UpdateWorkerState(
context.Background(),
&katibapi.UpdateWorkerStateRequest{
WorkerId: instance.Status.Trials[i].WorkerList[j].WorkerID,
Status: katibapi.State_RUNNING,
})
if err != nil {
log.Printf("Fail to update worker info. ID %s", instance.Status.Trials[i].WorkerList[j].WorkerID)
return false, err
spawnErr := r.spawnMetricsCollector(instance, c, instance.Status.StudyID, instance.Status.Trials[i].TrialID, wid, ns, instance.Spec.MetricsCollectorSpec)
if spawnErr != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
}
}
case katibapi.State_ERROR:
if instance.Status.Trials[i].WorkerList[j].Condition != katibv1alpha1.ConditionFailed {
instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionFailed
update = true
}
}
if update {
_, err := c.UpdateWorkerState(
context.Background(),
&katibapi.UpdateWorkerStateRequest{
WorkerId: instance.Status.Trials[i].WorkerList[j].WorkerID,
Status: katibapi.State_ERROR,
Status: status.WorkerState,
})
if err != nil {
log.Printf("Fail to update worker info. ID %s", instance.Status.Trials[i].WorkerList[j].WorkerID)
Expand Down Expand Up @@ -613,17 +603,14 @@ func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJ
BUFSIZE := 1024
job := createWorkerJobObj(wkind)
if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(job); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("Yaml decode error %v", err)
return "", err
}
if err := controllerutil.SetControllerReference(instance, job.(metav1.Object), r.scheme); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("SetControllerReference error %v", err)
return "", err
}
if err := r.Create(context.TODO(), job); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("Job Create error %v", err)
return "", err
}
Expand All @@ -637,7 +624,6 @@ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alp
wkind, err := getWorkerKind(instance.Spec.WorkerSpec)
if err != nil {
log.Printf("getWorkerKind error %v", err)
instance.Status.Condition = katibv1alpha1.ConditionFailed
return err
}
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, mcs)
Expand All @@ -647,19 +633,16 @@ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alp
}

if err := k8syaml.NewYAMLOrJSONDecoder(mcm, BUFSIZE).Decode(&mcjob); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("MetricsCollector Yaml decode error %v", err)
return err
}

if err := controllerutil.SetControllerReference(instance, &mcjob, r.scheme); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("MetricsCollector SetControllerReference error %v", err)
return err
}

if err := r.Create(context.TODO(), &mcjob); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("MetricsCollector Job Create error %v", err)
return err
}
Expand Down

0 comments on commit b11b81d

Please sign in to comment.