Skip to content

Commit

Permalink
fix: Fix the log message (kubeflow#1203)
Browse files Browse the repository at this point in the history
Signed-off-by: cegao <[email protected]>
  • Loading branch information
gaocegege authored Dec 22, 2020
1 parent 6df2d50 commit fb7b161
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 19 deletions.
35 changes: 22 additions & 13 deletions pkg/controller.v1/tensorflow/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,29 +105,34 @@ func (tc *TFController) UpdateJobStatus(job interface{}, replicas map[commonv1.R
running := status.Active
failed := status.Failed

logger.Infof("TFJob=%s, ReplicaType=%s expected=%d, running=%d, failed=%d",
tfJob.Name, rtype, expected, running, failed)
logger.Infof("TFJob=%s/%s, ReplicaType=%s expected=%d, running=%d, failed=%d",
tfJob.Namespace, tfJob.Name, rtype, expected, running, failed)

// If the TFJob contains Chief or Master spec, then we will update the status
// according to the Chief/Master spec.
if ContainChieforMasterSpec(tfJob.Spec.TFReplicaSpecs) {
if tfv1.IsChieforMaster(rtype) {
if running > 0 {
msg := fmt.Sprintf("TFJob %s is running.", tfJob.Name)
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobRunning, tfJobRunningReason, msg)
msg := fmt.Sprintf("TFJob %s/%s is running.",
tfJob.Namespace, tfJob.Name)
err := commonutil.UpdateJobConditions(jobStatus,
commonv1.JobRunning, tfJobRunningReason, msg)
if err != nil {
commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err)
commonutil.LoggerForJob(tfJob).Infof(
"Append tfjob condition error: %v", err)
return err
}
}
if expected == 0 {
msg := fmt.Sprintf("TFJob %s successfully completed.", tfJob.Name)
msg := fmt.Sprintf("TFJob %s/%s successfully completed.",
tfJob.Namespace, tfJob.Name)
tc.Recorder.Event(tfJob, corev1.EventTypeNormal, tfJobSucceededReason, msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobSucceeded, tfJobSucceededReason, msg)
err := commonutil.UpdateJobConditions(jobStatus,
commonv1.JobSucceeded, tfJobSucceededReason, msg)
if err != nil {
commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err)
return err
Expand All @@ -141,21 +146,24 @@ func (tc *TFController) UpdateJobStatus(job interface{}, replicas map[commonv1.R
// 1. If default success policy is used and worker 0 has completed.
// 2. If `SuccessPolicyAllWorkers` success policy is used and all workers are succeeded.
if expected == 0 || (worker0Completed && *tfJob.Spec.SuccessPolicy != tfv1.SuccessPolicyAllWorkers) {
msg := fmt.Sprintf("TFJob %s successfully completed.", tfJob.Name)
msg := fmt.Sprintf("TFJob %s/%s successfully completed.",
tfJob.Namespace, tfJob.Name)
tc.Recorder.Event(tfJob, corev1.EventTypeNormal, tfJobSucceededReason, msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobSucceeded, tfJobSucceededReason, msg)
err := commonutil.UpdateJobConditions(jobStatus,
commonv1.JobSucceeded, tfJobSucceededReason, msg)
if err != nil {
commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err)
return err
}
tfJobsSuccessCount.Inc()
} else if running > 0 {
// Some workers are still running, leave a running condition.
msg := fmt.Sprintf("TFJob %s is running.", tfJob.Name)
msg := fmt.Sprintf("TFJob %s/%s successfully completed.",
tfJob.Namespace, tfJob.Name)
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobRunning, tfJobRunningReason, msg)
if err != nil {
commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err)
Expand All @@ -178,14 +186,15 @@ func (tc *TFController) UpdateJobStatus(job interface{}, replicas map[commonv1.R
// we know it because we update the status condition when reconciling the replicas
tfJobsFailureCount.Inc()
} else {
msg := fmt.Sprintf("TFJob %s has failed because %d %s replica(s) failed.",
tfJob.Name, failed, rtype)
msg := fmt.Sprintf("TFJob %s/%s has failed because %d %s replica(s) failed.",
tfJob.Namespace, tfJob.Name, failed, rtype)
tc.Recorder.Event(tfJob, corev1.EventTypeNormal, tfJobFailedReason, msg)
if tfJob.Status.CompletionTime == nil {
now := metav1.Now()
tfJob.Status.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobFailed, tfJobFailedReason, msg)
err := commonutil.UpdateJobConditions(jobStatus,
commonv1.JobFailed, tfJobFailedReason, msg)
if err != nil {
commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err)
return err
Expand Down
8 changes: 4 additions & 4 deletions py/kubeflow/tf_operator/tf_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def delete_tf_job(client, namespace, name, version="v1"):
# owned references are deleted.
"propagationPolicy": "Foreground",
}
logging.info("Deleting job %s.%s", namespace, name)
logging.info("Deleting job %s/%s", namespace, name)
thread = crd_api.delete_namespaced_custom_object(
TF_JOB_GROUP,
version,
Expand All @@ -75,7 +75,7 @@ def delete_tf_job(client, namespace, name, version="v1"):
body,
async_req=True)
api_response = thread.get(TIMEOUT)
logging.info("Deleting job %s.%s returned: %s", namespace, name,
logging.info("Deleting job %s/%s returned: %s", namespace, name,
api_response)
return api_response
except rest.ApiException as e:
Expand Down Expand Up @@ -150,8 +150,8 @@ def wait_for_condition(client,
except multiprocessing.TimeoutError:
logging.error("Timeout trying to get TFJob.")
except Exception as e:
logging.error("There was a problem waiting for Job %s.%s; Exception; %s",
name, name, e)
logging.error("There was a problem waiting for Job %s/%s; Exception; %s",
namespace, name, e)
raise

if results:
Expand Down
5 changes: 3 additions & 2 deletions vendor/github.com/kubeflow/common/pkg/util/logger.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit fb7b161

Please sign in to comment.