Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 1340 prometheus counters #1375

Merged
2 changes: 2 additions & 0 deletions pkg/apis/mxnet/v1/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ const (
Plural = "mxjobs"
// Singular is the singular for mxJob.
Singular = "mxjob"
// FrameworkName is the name of the ML Framework
FrameworkName = "mxnet"
)
2 changes: 2 additions & 0 deletions pkg/apis/pytorch/v1/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ const (
Plural = "pytorchjobs"
// Singular is the singular for pytorchJob.
Singular = "pytorchjob"
// FrameworkName is the name of the ML Framework
FrameworkName = "pytorch"
)
2 changes: 2 additions & 0 deletions pkg/apis/tensorflow/v1/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ const (
Plural = "tfjobs"
// Singular is the singular for TFJob.
Singular = "tfjob"
// FrameworkName is the name of the ML Framework
FrameworkName = "tensorflow"
)
2 changes: 2 additions & 0 deletions pkg/apis/xgboost/v1/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ const (
Plural = "xgboostjobs"
// Singular is the singular for XGBoostJob.
Singular = "xgboostjob"
// FrameworkName is the name of the ML Framework
FrameworkName = "xgboost"
)
89 changes: 89 additions & 0 deletions pkg/common/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License

package common

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

// Define all the prometheus counters for all jobs
var (
jobsCreatedCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "training_operator_jobs_created_total",
Help: "Counts number of jobs created",
},
[]string{"job_namespace", "framework"},
)
jobsDeletedCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "training_operator_jobs_deleted_total",
Help: "Counts number of jobs deleted",
},
[]string{"job_namespace", "framework"},
)
jobsSuccessfulCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "training_operator_jobs_successful_total",
Help: "Counts number of jobs successful",
},
[]string{"job_namespace", "framework"},
)
jobsFailedCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "training_operator_jobs_failed_total",
Help: "Counts number of jobs failed",
},
[]string{"job_namespace", "framework"},
)
jobsRestartedCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "training_operator_jobs_restarted_total",
Help: "Counts number of jobs restarted",
},
[]string{"job_namespace", "framework"},
)
)

func init() {
// Register custom metrics with the global prometheus registry
metrics.Registry.MustRegister(jobsCreatedCount,
jobsDeletedCount,
jobsSuccessfulCount,
jobsFailedCount,
jobsRestartedCount)
}

func CreatedJobsCounterInc(job_namespace, framework string) {
jobsCreatedCount.WithLabelValues(job_namespace, framework).Inc()
}

func DeletedJobsCounterInc(job_namespace, framework string) {
jobsDeletedCount.WithLabelValues(job_namespace, framework).Inc()
}

func SuccessfulJobsCounterInc(job_namespace, framework string) {
jobsSuccessfulCount.WithLabelValues(job_namespace, framework).Inc()
}

func FailedJobsCounterInc(job_namespace, framework string) {
jobsFailedCount.WithLabelValues(job_namespace, framework).Inc()
}

func RestartedJobsCounterInc(job_namespace, framework string) {
jobsRestartedCount.WithLabelValues(job_namespace, framework).Inc()
}
17 changes: 11 additions & 6 deletions pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/kubeflow/common/pkg/controller.v1/expectation"
commonutil "github.com/kubeflow/common/pkg/util"
mxjobv1 "github.com/kubeflow/tf-operator/pkg/apis/mxnet/v1"
trainingoperatorcommon "github.com/kubeflow/tf-operator/pkg/common"
"github.com/kubeflow/tf-operator/pkg/common/util"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -81,7 +82,7 @@ var (
DefaultCleanPodPolicy = commonv1.CleanPodPolicyNone
)

// NewReconciler creates a PyTorchJob Reconciler
// NewReconciler creates a MXJob Reconciler
func NewReconciler(mgr manager.Manager) *MXJobReconciler {
r := &MXJobReconciler{
Client: mgr.GetClient(),
Expand Down Expand Up @@ -158,13 +159,13 @@ func (r *MXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// Set default priorities to mxnet job
r.Scheme.Default(mxjob)

// Convert PyTorch.Spec.PyTorchReplicasSpecs to map[commonv1.ReplicaType]*commonv1.ReplicaSpec
// Convert MX.Spec.MXReplicasSpecs to map[commonv1.ReplicaType]*commonv1.ReplicaSpec
replicas := map[commonv1.ReplicaType]*commonv1.ReplicaSpec{}
for k, v := range mxjob.Spec.MXReplicaSpecs {
replicas[commonv1.ReplicaType(k)] = v
}

// Construct RunPolicy based on PyTorchJob.Spec
// Construct RunPolicy based on MXJob.Spec
runPolicy := &commonv1.RunPolicy{
CleanPodPolicy: mxjob.Spec.RunPolicy.CleanPodPolicy,
TTLSecondsAfterFinished: mxjob.Spec.RunPolicy.TTLSecondsAfterFinished,
Expand All @@ -176,7 +177,7 @@ func (r *MXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
// Use common to reconcile the job related pod and service
err = r.ReconcileJobs(mxjob, replicas, mxjob.Status, runPolicy)
if err != nil {
logrus.Warnf("Reconcile PyTorch Job error %v", err)
logrus.Warnf("Reconcile MX Job error %v", err)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -322,6 +323,7 @@ func (r *MXJobReconciler) DeleteJob(job interface{}) error {
}
r.Recorder.Eventf(mxjob, corev1.EventTypeNormal, control.SuccessfulDeletePodReason, "Deleted job: %v", mxjob.Name)
logrus.Info("job deleted", "namespace", mxjob.Namespace, "name", mxjob.Name)
trainingoperatorcommon.DeletedJobsCounterInc(mxjob.Namespace, mxjobv1.FrameworkName)
return nil
}

Expand Down Expand Up @@ -379,6 +381,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1
logrus.Infof("Append mxjob condition error: %v", err)
return err
}
trainingoperatorcommon.SuccessfulJobsCounterInc(mxjob.Namespace, mxjobv1.FrameworkName)
}

if failed > 0 {
Expand All @@ -390,6 +393,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1
logrus.Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.RestartedJobsCounterInc(mxjob.Namespace, mxjobv1.FrameworkName)
} else {
msg := fmt.Sprintf("mxjob %s is failed because %d %s replica(s) failed.", mxjob.Name, failed, rtype)
r.Recorder.Event(mxjob, corev1.EventTypeNormal, mxJobFailedReason, msg)
Expand All @@ -402,6 +406,7 @@ func (r *MXJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1
logrus.Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.FailedJobsCounterInc(mxjob.Namespace, mxjobv1.FrameworkName)
}
}
}
Expand Down Expand Up @@ -456,9 +461,9 @@ func (r *MXJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {

// Use defaulters registered in scheme.
r.Scheme.Default(mxjob)
msg := fmt.Sprintf("xgboostJob %s is created.", e.Object.GetName())
msg := fmt.Sprintf("MXJob %s is created.", e.Object.GetName())
logrus.Info(msg)

trainingoperatorcommon.CreatedJobsCounterInc(mxjob.Namespace, mxjobv1.FrameworkName)
if err := commonutil.UpdateJobConditions(&mxjob.Status, commonv1.JobCreated, "MXJobCreated", msg); err != nil {
logrus.Error(err, "append job condition error")
return false
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

commonutil "github.com/kubeflow/common/pkg/util"
trainingoperatorcommon "github.com/kubeflow/tf-operator/pkg/common"
"github.com/kubeflow/tf-operator/pkg/common/util"
"k8s.io/apimachinery/pkg/api/meta"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -308,6 +309,7 @@ func (r *PyTorchJobReconciler) DeleteJob(job interface{}) error {
}
r.recorder.Eventf(pytorchjob, corev1.EventTypeNormal, control.SuccessfulDeletePodReason, "Deleted job: %v", pytorchjob.Name)
logrus.Info("job deleted", "namespace", pytorchjob.Namespace, "name", pytorchjob.Name)
trainingoperatorcommon.DeletedJobsCounterInc(pytorchjob.Namespace, pytorchv1.FrameworkName)
return nil
}

Expand Down Expand Up @@ -352,6 +354,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, replicas map[com
commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, pytorchv1.FrameworkName)
return nil
}
}
Expand All @@ -364,6 +367,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, replicas map[com
commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.RestartedJobsCounterInc(pytorchjob.Namespace, pytorchv1.FrameworkName)
} else {
msg := fmt.Sprintf("PyTorchJob %s is failed because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.JobFailedReason, msg)
Expand All @@ -376,6 +380,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, replicas map[com
commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err)
return err
}
trainingoperatorcommon.FailedJobsCounterInc(pytorchjob.Namespace, pytorchv1.FrameworkName)
}
}
}
Expand Down Expand Up @@ -446,6 +451,7 @@ func (r *PyTorchJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool
r.Scheme.Default(pytorchjob)
msg := fmt.Sprintf("PyTorchJob %s is created.", e.Object.GetName())
logrus.Info(msg)
trainingoperatorcommon.CreatedJobsCounterInc(pytorchjob.Namespace, pytorchv1.FrameworkName)
if err := commonutil.UpdateJobConditions(&pytorchjob.Status, commonv1.JobCreated, "PyTorchJobCreated", msg); err != nil {
logrus.Error(err, "append job condition error")
return false
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller.v1/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"time"

"github.com/kubeflow/tf-operator/pkg/common/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
Expand All @@ -42,8 +44,6 @@ import (
tfjobinformers "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions"
tfjobinformersv1 "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions/tensorflow/v1"
tfjoblisters "github.com/kubeflow/tf-operator/pkg/client/listers/tensorflow/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/runtime/schema"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller.v1/tensorflow/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -29,8 +31,6 @@ import (
commonutil "github.com/kubeflow/common/pkg/util"
"github.com/kubeflow/common/pkg/util/k8sutil"
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/runtime"
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller.v1/tensorflow/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"

"github.com/kubeflow/tf-operator/pkg/common/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -33,8 +35,6 @@ import (
commonutil "github.com/kubeflow/common/pkg/util"
train_util "github.com/kubeflow/common/pkg/util/train"
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
Expand Down
14 changes: 8 additions & 6 deletions pkg/controller.v1/tensorflow/tfjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (

tensorflowv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"
trainingoperatorcommon "github.com/kubeflow/tf-operator/pkg/common"
"github.com/kubeflow/tf-operator/pkg/common/util"
)

Expand Down Expand Up @@ -345,6 +346,7 @@ func (r *TFJobReconciler) DeleteJob(job interface{}) error {

r.recorder.Eventf(tfJob, v1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", tfJob.Name)
log.Infof("job %s/%s has been deleted", tfJob.Namespace, tfJob.Name)
trainingoperatorcommon.DeletedJobsCounterInc(tfJob.Namespace, tensorflowv1.FrameworkName)
return nil
}

Expand Down Expand Up @@ -432,7 +434,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1
commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err)
return err
}
tfJobsSuccessCount.WithLabelValues(tfJob.Namespace).Inc()
trainingoperatorcommon.SuccessfulJobsCounterInc(tfJob.Namespace, tensorflowv1.FrameworkName)
}
}
} else {
Expand All @@ -454,7 +456,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1
commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err)
return err
}
tfJobsSuccessCount.WithLabelValues(tfJob.Namespace).Inc()
trainingoperatorcommon.SuccessfulJobsCounterInc(tfJob.Namespace, tensorflowv1.FrameworkName)
} else if running > 0 {
// Some workers are still running, leave a running condition.
msg := fmt.Sprintf("TFJob %s/%s is running.",
Expand All @@ -479,7 +481,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1
if restart {
// job is restarting, no need to set it failed
// we know it because we update the status condition when reconciling the replicas
tfJobsFailureCount.WithLabelValues(tfJob.Namespace).Inc()
trainingoperatorcommon.RestartedJobsCounterInc(tfJob.Namespace, tensorflowv1.FrameworkName)
} else {
msg := fmt.Sprintf("TFJob %s/%s has failed because %d %s replica(s) failed.",
tfJob.Namespace, tfJob.Name, failed, rtype)
Expand All @@ -494,7 +496,7 @@ func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1
commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err)
return err
}
tfJobsFailureCount.WithLabelValues(tfJob.Namespace).Inc()
trainingoperatorcommon.FailedJobsCounterInc(tfJob.Namespace, tensorflowv1.FrameworkName)
}
}
}
Expand Down Expand Up @@ -729,7 +731,7 @@ func (r *TFJobReconciler) ReconcilePods(
commonutil.LoggerForJob(tfJob).Infof("Append tfjob condition error: %v", err)
return err
}
tfJobsRestartCount.WithLabelValues(tfJob.Namespace).Inc()
trainingoperatorcommon.RestartedJobsCounterInc(tfJob.Namespace, tensorflowv1.FrameworkName)
}
}

Expand Down Expand Up @@ -844,7 +846,7 @@ func (r *TFJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool {
r.Scheme.Default(tfJob)
msg := fmt.Sprintf("TFJob %s is created.", e.Object.GetName())
logrus.Info(msg)

trainingoperatorcommon.CreatedJobsCounterInc(tfJob.Namespace, tensorflowv1.FrameworkName)
if err := commonutil.UpdateJobConditions(&tfJob.Status, commonv1.JobCreated, "TFJobCreated", msg); err != nil {
log.Log.Error(err, "append job condition error")
return false
Expand Down
Loading