From 4bb47cb713cc24d5d6a9dfa05184bd72a104ca40 Mon Sep 17 00:00:00 2001 From: cegao Date: Tue, 22 Dec 2020 19:56:42 +0800 Subject: [PATCH] fix: Fix the log message Signed-off-by: cegao --- pkg/controller.v1/tensorflow/status.go | 35 ++++++++++++------- py/kubeflow/tf_operator/tf_job_client.py | 8 ++--- .../kubeflow/common/pkg/util/logger.go | 5 +-- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/pkg/controller.v1/tensorflow/status.go b/pkg/controller.v1/tensorflow/status.go index 4706fdf876..5b6bf716d1 100644 --- a/pkg/controller.v1/tensorflow/status.go +++ b/pkg/controller.v1/tensorflow/status.go @@ -105,29 +105,34 @@ func (tc *TFController) UpdateJobStatus(job interface{}, replicas map[commonv1.R running := status.Active failed := status.Failed - logger.Infof("TFJob=%s, ReplicaType=%s expected=%d, running=%d, failed=%d", - tfJob.Name, rtype, expected, running, failed) + logger.Infof("TFJob=%s/%s, ReplicaType=%s expected=%d, running=%d, failed=%d", + tfJob.Namespace, tfJob.Name, rtype, expected, running, failed) // If the TFJob contains Chief or Master spec, then we will update the status // according to the Chief/Master spec. if ContainChieforMasterSpec(tfJob.Spec.TFReplicaSpecs) { if tfv1.IsChieforMaster(rtype) { if running > 0 { - msg := fmt.Sprintf("TFJob %s is running.", tfJob.Name) - err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobRunning, tfJobRunningReason, msg) + msg := fmt.Sprintf("TFJob %s/%s is running.", + tfJob.Namespace, tfJob.Name) + err := commonutil.UpdateJobConditions(jobStatus, + commonv1.JobRunning, tfJobRunningReason, msg) if err != nil { - commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) + commonutil.LoggerForJob(tfJob).Infof( + "Append tfjob condition error: %v", err) return err } } if expected == 0 { - msg := fmt.Sprintf("TFJob %s successfully completed.", tfJob.Name) + msg := fmt.Sprintf("TFJob %s/%s successfully completed.", + tfJob.Namespace, tfJob.Name) tc.Recorder.Event(tfJob, corev1.EventTypeNormal, tfJobSucceededReason, msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobSucceeded, tfJobSucceededReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, + commonv1.JobSucceeded, tfJobSucceededReason, msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -141,13 +146,15 @@ func (tc *TFController) UpdateJobStatus(job interface{}, replicas map[commonv1.R // 1. If default success policy is used and worker 0 has completed. // 2. If `SuccessPolicyAllWorkers` success policy is used and all workers are succeeded. if expected == 0 || (worker0Completed && *tfJob.Spec.SuccessPolicy != tfv1.SuccessPolicyAllWorkers) { - msg := fmt.Sprintf("TFJob %s successfully completed.", tfJob.Name) + msg := fmt.Sprintf("TFJob %s/%s successfully completed.", + tfJob.Namespace, tfJob.Name) tc.Recorder.Event(tfJob, corev1.EventTypeNormal, tfJobSucceededReason, msg) if jobStatus.CompletionTime == nil { now := metav1.Now() jobStatus.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobSucceeded, tfJobSucceededReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, + commonv1.JobSucceeded, tfJobSucceededReason, msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err @@ -155,7 +162,8 @@ func (tc *TFController) UpdateJobStatus(job interface{}, replicas map[commonv1.R tfJobsSuccessCount.Inc() } else if running > 0 { // Some workers are still running, leave a running condition. - msg := fmt.Sprintf("TFJob %s is running.", tfJob.Name) + msg := fmt.Sprintf("TFJob %s/%s successfully completed.", + tfJob.Namespace, tfJob.Name) err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobRunning, tfJobRunningReason, msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) @@ -178,14 +186,15 @@ func (tc *TFController) UpdateJobStatus(job interface{}, replicas map[commonv1.R // we know it because we update the status condition when reconciling the replicas tfJobsFailureCount.Inc() } else { - msg := fmt.Sprintf("TFJob %s has failed because %d %s replica(s) failed.", - tfJob.Name, failed, rtype) + msg := fmt.Sprintf("TFJob %s/%s has failed because %d %s replica(s) failed.", + tfJob.Namespace, tfJob.Name, failed, rtype) tc.Recorder.Event(tfJob, corev1.EventTypeNormal, tfJobFailedReason, msg) if tfJob.Status.CompletionTime == nil { now := metav1.Now() tfJob.Status.CompletionTime = &now } - err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobFailed, tfJobFailedReason, msg) + err := commonutil.UpdateJobConditions(jobStatus, + commonv1.JobFailed, tfJobFailedReason, msg) if err != nil { commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err) return err diff --git a/py/kubeflow/tf_operator/tf_job_client.py b/py/kubeflow/tf_operator/tf_job_client.py index 5b21fe9e1b..f37b8af1e2 100644 --- a/py/kubeflow/tf_operator/tf_job_client.py +++ b/py/kubeflow/tf_operator/tf_job_client.py @@ -65,7 +65,7 @@ def delete_tf_job(client, namespace, name, version="v1"): # owned references are deleted. "propagationPolicy": "Foreground", } - logging.info("Deleting job %s.%s", namespace, name) + logging.info("Deleting job %s/%s", namespace, name) thread = crd_api.delete_namespaced_custom_object( TF_JOB_GROUP, version, @@ -75,7 +75,7 @@ def delete_tf_job(client, namespace, name, version="v1"): body, async_req=True) api_response = thread.get(TIMEOUT) - logging.info("Deleting job %s.%s returned: %s", namespace, name, + logging.info("Deleting job %s/%s returned: %s", namespace, name, api_response) return api_response except rest.ApiException as e: @@ -150,8 +150,8 @@ def wait_for_condition(client, except multiprocessing.TimeoutError: logging.error("Timeout trying to get TFJob.") except Exception as e: - logging.error("There was a problem waiting for Job %s.%s; Exception; %s", - name, name, e) + logging.error("There was a problem waiting for Job %s/%s; Exception; %s", + namespace, name, e) raise if results: diff --git a/vendor/github.com/kubeflow/common/pkg/util/logger.go b/vendor/github.com/kubeflow/common/pkg/util/logger.go index d3ce95e6e3..c2293493bf 100644 --- a/vendor/github.com/kubeflow/common/pkg/util/logger.go +++ b/vendor/github.com/kubeflow/common/pkg/util/logger.go @@ -18,7 +18,7 @@ import ( "strings" log "github.com/sirupsen/logrus" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) @@ -33,11 +33,12 @@ func LoggerForReplica(job metav1.Object, rtype string) *log.Entry { }) } +// LoggerForJob gets the logger for the given job. func LoggerForJob(job metav1.Object) *log.Entry { return log.WithFields(log.Fields{ // We use job to match the key used in controller.go // Its more common in K8s to use a period to indicate namespace.name. So that's what we use. - "job": job.GetNamespace() + "." + job.GetName(), + "job": job.GetNamespace() + "/" + job.GetName(), "uid": job.GetUID(), }) }