diff --git a/CHANGELOG.md b/CHANGELOG.md index 2421f552..d673e298 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ NOTE: Breaking release in controllers. - Add Logrus helper wrapper. - Refactor to simplify the retrievers. - Add multiretriever to retriever different resource types on the same controller. +- Refactor metrics recorder implementation including the prometheus backend, the + output prometheus metrics have not been changed. ## [0.8.0] - 2019-12-11 diff --git a/controller/generic.go b/controller/generic.go index 140d75b7..79a8d9d3 100644 --- a/controller/generic.go +++ b/controller/generic.go @@ -16,7 +16,6 @@ import ( "github.com/spotahome/kooper/controller/leaderelection" "github.com/spotahome/kooper/log" - "github.com/spotahome/kooper/monitoring/metrics" ) var ( @@ -41,7 +40,7 @@ type Config struct { // leader election will be ignored LeaderElector leaderelection.Runner // MetricsRecorder will record the controller metrics. - MetricRecorder metrics.Recorder + MetricsRecorder MetricsRecorder // Logger will log messages of the controller. Logger log.Logger @@ -77,8 +76,8 @@ func (c *Config) setDefaults() error { "controller-id": c.Name, }) - if c.MetricRecorder == nil { - c.MetricRecorder = metrics.Dummy + if c.MetricsRecorder == nil { + c.MetricsRecorder = DummyMetricsRecorder c.Logger.Warningf("no metrics recorder specified, disabling metrics") } @@ -106,7 +105,7 @@ type generic struct { running bool runningMu sync.Mutex cfg Config - metrics metrics.Recorder + metrics MetricsRecorder leRunner leaderelection.Runner logger log.Logger } @@ -148,37 +147,37 @@ func New(cfg *Config) (Controller, error) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) - cfg.MetricRecorder.IncResourceEventQueued(cfg.Name, metrics.AddEvent) + cfg.MetricsRecorder.IncResourceEventQueued(context.TODO(), cfg.Name, AddEvent) } }, UpdateFunc: func(old interface{}, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { queue.Add(key) - cfg.MetricRecorder.IncResourceEventQueued(cfg.Name, metrics.AddEvent) + cfg.MetricsRecorder.IncResourceEventQueued(context.TODO(), cfg.Name, AddEvent) } }, DeleteFunc: func(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) - cfg.MetricRecorder.IncResourceEventQueued(cfg.Name, metrics.DeleteEvent) + cfg.MetricsRecorder.IncResourceEventQueued(context.TODO(), cfg.Name, DeleteEvent) } }, }, cfg.ResyncInterval) // Create processing chain processor(+middlewares) -> handler(+middlewares). - handler := newMetricsMeasuredHandler(cfg.Name, cfg.MetricRecorder, cfg.Handler) + handler := newMetricsMeasuredHandler(cfg.Name, cfg.MetricsRecorder, cfg.Handler) processor := newIndexerProcessor(informer.GetIndexer(), handler) if cfg.ProcessingJobRetries > 0 { - processor = newRetryProcessor(cfg.Name, cfg.ProcessingJobRetries, cfg.MetricRecorder, queue, processor) + processor = newRetryProcessor(cfg.Name, cfg.ProcessingJobRetries, cfg.MetricsRecorder, queue, processor) } // Create our generic controller object. return &generic{ queue: queue, informer: informer, - metrics: cfg.MetricRecorder, + metrics: cfg.MetricsRecorder, processor: processor, leRunner: cfg.LeaderElector, cfg: *cfg, diff --git a/controller/handler.go b/controller/handler.go index 9d0d466a..3e9c38eb 100644 --- a/controller/handler.go +++ b/controller/handler.go @@ -6,8 +6,6 @@ import ( "time" "k8s.io/apimachinery/pkg/runtime" - - "github.com/spotahome/kooper/monitoring/metrics" ) // Handler knows how to handle the received resources from a kubernetes cluster. @@ -47,11 +45,11 @@ func (h *HandlerFunc) Delete(ctx context.Context, s string) error { type metricsMeasuredHandler struct { id string - mrec metrics.Recorder + mrec MetricsRecorder next Handler } -func newMetricsMeasuredHandler(id string, mrec metrics.Recorder, next Handler) Handler { +func newMetricsMeasuredHandler(id string, mrec MetricsRecorder, next Handler) Handler { return metricsMeasuredHandler{ id: id, mrec: mrec, @@ -61,28 +59,28 @@ func newMetricsMeasuredHandler(id string, mrec metrics.Recorder, next Handler) H func (m metricsMeasuredHandler) Add(ctx context.Context, obj runtime.Object) (err error) { defer func(start time.Time) { - m.mrec.ObserveDurationResourceEventProcessed(m.id, metrics.AddEvent, start) + m.mrec.ObserveDurationResourceEventProcessed(ctx, m.id, AddEvent, start) if err != nil { - m.mrec.IncResourceEventProcessedError(m.id, metrics.AddEvent) + m.mrec.IncResourceEventProcessedError(ctx, m.id, AddEvent) } }(time.Now()) - m.mrec.IncResourceEventProcessed(m.id, metrics.AddEvent) + m.mrec.IncResourceEventProcessed(ctx, m.id, AddEvent) return m.next.Add(ctx, obj) } func (m metricsMeasuredHandler) Delete(ctx context.Context, objKey string) (err error) { defer func(start time.Time) { - m.mrec.ObserveDurationResourceEventProcessed(m.id, metrics.DeleteEvent, start) + m.mrec.ObserveDurationResourceEventProcessed(ctx, m.id, DeleteEvent, start) if err != nil { - m.mrec.IncResourceEventProcessedError(m.id, metrics.DeleteEvent) + m.mrec.IncResourceEventProcessedError(ctx, m.id, DeleteEvent) } }(time.Now()) - m.mrec.IncResourceEventProcessed(m.id, metrics.DeleteEvent) + m.mrec.IncResourceEventProcessed(ctx, m.id, DeleteEvent) return m.next.Delete(ctx, objKey) } diff --git a/controller/metrics.go b/controller/metrics.go new file mode 100644 index 00000000..54091fe1 --- /dev/null +++ b/controller/metrics.go @@ -0,0 +1,40 @@ +package controller + +import ( + "context" + "time" +) + +// EventType is the event type of a controller enqueued object. +type EventType string + +const ( + //AddEvent is the add event. + AddEvent EventType = "add" + // DeleteEvent is the delete event. + DeleteEvent EventType = "delete" + // RequeueEvent is a requeued event (unknown state when handling again). + RequeueEvent EventType = "requeue" +) + +// MetricsRecorder knows how to record metrics of a controller. +type MetricsRecorder interface { + // IncResourceEvent increments in one the metric records of a queued event. + IncResourceEventQueued(ctx context.Context, controller string, eventType EventType) + // IncResourceEventProcessed increments in one the metric records processed event. + IncResourceEventProcessed(ctx context.Context, controller string, eventType EventType) + // IncResourceEventProcessedError increments in one the metric records of a processed event in error. + IncResourceEventProcessedError(ctx context.Context, controller string, eventType EventType) + // ObserveDurationResourceEventProcessed measures the duration it took to process a event. + ObserveDurationResourceEventProcessed(ctx context.Context, controller string, eventType EventType, start time.Time) +} + +// DummyMetricsRecorder is a dummy metrics recorder that satisifies all . +var DummyMetricsRecorder = dummy(0) + +type dummy int + +func (dummy) IncResourceEventQueued(_ context.Context, _ string, _ EventType) {} +func (dummy) IncResourceEventProcessed(_ context.Context, _ string, _ EventType) {} +func (dummy) IncResourceEventProcessedError(_ context.Context, _ string, _ EventType) {} +func (dummy) ObserveDurationResourceEventProcessed(_ context.Context, _ string, _ EventType, _ time.Time) {} diff --git a/controller/processor.go b/controller/processor.go index 3c6a9233..4cac7283 100644 --- a/controller/processor.go +++ b/controller/processor.go @@ -7,8 +7,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" - - "github.com/spotahome/kooper/monitoring/metrics" ) // processor knows how to process object keys. @@ -57,12 +55,12 @@ var errRequeued = fmt.Errorf("requeued after receiving error") type retryProcessor struct { name string maxRetries int - mrec metrics.Recorder + mrec MetricsRecorder queue workqueue.RateLimitingInterface next processor } -func newRetryProcessor(name string, maxRetries int, mrec metrics.Recorder, queue workqueue.RateLimitingInterface, next processor) processor { +func newRetryProcessor(name string, maxRetries int, mrec MetricsRecorder, queue workqueue.RateLimitingInterface, next processor) processor { return retryProcessor{ name: name, maxRetries: maxRetries, @@ -78,7 +76,7 @@ func (r retryProcessor) Process(ctx context.Context, key string) error { // If there was an error and we have retries pending then requeue. if err != nil && r.queue.NumRequeues(key) < r.maxRetries { r.queue.AddRateLimited(key) - r.mrec.IncResourceEventQueued(r.name, metrics.RequeueEvent) + r.mrec.IncResourceEventQueued(ctx, r.name, RequeueEvent) return fmt.Errorf("%w: %s", errRequeued, err) } diff --git a/examples/metrics-controller/main.go b/examples/metrics-controller/main.go index f22808af..256315ad 100644 --- a/examples/metrics-controller/main.go +++ b/examples/metrics-controller/main.go @@ -26,7 +26,7 @@ import ( "github.com/spotahome/kooper/controller" "github.com/spotahome/kooper/log" kooperlogrus "github.com/spotahome/kooper/log/logrus" - "github.com/spotahome/kooper/monitoring/metrics" + kooperprometheus "github.com/spotahome/kooper/metrics/prometheus" ) const ( @@ -62,11 +62,11 @@ func errRandomly() error { } // creates prometheus recorder and starts serving metrics in background. -func createPrometheusRecorder(logger log.Logger) metrics.Recorder { +func createPrometheusRecorder(logger log.Logger) *kooperprometheus.Recorder { // We could use also prometheus global registry (the default one) // prometheus.DefaultRegisterer instead of creating a new one reg := prometheus.NewRegistry() - m := metrics.NewPrometheus(reg) + rec := kooperprometheus.New(kooperprometheus.Config{Registerer: reg}) // Start serving metrics in background. h := promhttp.HandlerFor(reg, promhttp.HandlerOpts{}) @@ -75,17 +75,7 @@ func createPrometheusRecorder(logger log.Logger) metrics.Recorder { http.ListenAndServe(metricsAddr, h) }() - return m -} - -func getMetricRecorder(backend string, logger log.Logger) (metrics.Recorder, error) { - switch backend { - case prometheusBackend: - logger.Infof("using Prometheus metrics recorder") - return createPrometheusRecorder(logger), nil - } - - return nil, fmt.Errorf("wrong metrics backend") + return rec } func run() error { @@ -136,15 +126,11 @@ func run() error { } // Create the controller that will refresh every 30 seconds. - m, err := getMetricRecorder(metricsBackend, logger) - if err != nil { - return fmt.Errorf("errors getting metrics backend: %w", err) - } cfg := &controller.Config{ Name: "metricsControllerTest", Handler: hand, Retriever: retr, - MetricRecorder: m, + MetricsRecorder: createPrometheusRecorder(logger), Logger: logger, ProcessingJobRetries: 3, } diff --git a/metrics/prometheus/prometheus.go b/metrics/prometheus/prometheus.go new file mode 100644 index 00000000..916e1213 --- /dev/null +++ b/metrics/prometheus/prometheus.go @@ -0,0 +1,114 @@ +package prometheus + +import ( + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/spotahome/kooper/controller" +) + +const ( + promNamespace = "kooper" + promControllerSubsystem = "controller" +) + +// Config is the Recorder Config. +type Config struct { + // Registerer is a prometheus registerer, e.g: prometheus.Registry. + // By default will use Prometheus default registry. + Registerer prometheus.Registerer + // Buckets sets custom buckets for the duration/latency metrics. This should be used when + // the default buckets don't work. This could happen when the time to process an event is not on the + // range of 5ms-10s duration. + // Check https://godoc.org/github.com/prometheus/client_golang/prometheus#pkg-variables + Buckets []float64 +} + +func (c *Config) defaults() { + if c.Registerer == nil { + c.Registerer = prometheus.DefaultRegisterer + } + + if c.Buckets == nil || len(c.Buckets) == 0 { + c.Buckets = prometheus.DefBuckets + } +} + +// Recorder implements the metrics recording in a prometheus registry. +type Recorder struct { + // Metrics + queuedEvents *prometheus.CounterVec + processedEvents *prometheus.CounterVec + processedEventErrors *prometheus.CounterVec + processedEventDuration *prometheus.HistogramVec +} + +// New returns a new Prometheus implementaiton for a metrics recorder. +func New(cfg Config) *Recorder { + cfg.defaults() + + r := &Recorder{ + queuedEvents: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: promControllerSubsystem, + Name: "queued_events_total", + Help: "Total number of events queued.", + }, []string{"controller", "type"}), + + processedEvents: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: promControllerSubsystem, + Name: "processed_events_total", + Help: "Total number of successfuly processed events.", + }, []string{"controller", "type"}), + + processedEventErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: promNamespace, + Subsystem: promControllerSubsystem, + Name: "processed_event_errors_total", + Help: "Total number of errors processing events.", + }, []string{"controller", "type"}), + + processedEventDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: promNamespace, + Subsystem: promControllerSubsystem, + Name: "processed_event_duration_seconds", + Help: "The duration for a successful event to be processed.", + Buckets: cfg.Buckets, + }, []string{"controller", "type"}), + } + + // Register metrics. + cfg.Registerer.MustRegister( + r.queuedEvents, + r.processedEvents, + r.processedEventErrors, + r.processedEventDuration) + + return r +} + +// IncResourceEventQueued satisfies metrics.Recorder interface. +func (r Recorder) IncResourceEventQueued(_ context.Context, controller string, eventType controller.EventType) { + r.queuedEvents.WithLabelValues(controller, string(eventType)).Inc() +} + +// IncResourceEventProcessed satisfies metrics.Recorder interface. +func (r Recorder) IncResourceEventProcessed(_ context.Context, controller string, eventType controller.EventType) { + r.processedEvents.WithLabelValues(controller, string(eventType)).Inc() +} + +// IncResourceEventProcessedError satisfies metrics.Recorder interface. +func (r Recorder) IncResourceEventProcessedError(_ context.Context, controller string, eventType controller.EventType) { + r.processedEventErrors.WithLabelValues(controller, string(eventType)).Inc() +} + +// ObserveDurationResourceEventProcessed satisfies metrics.Recorder interface. +func (r Recorder) ObserveDurationResourceEventProcessed(_ context.Context, controller string, eventType controller.EventType, start time.Time) { + secs := time.Now().Sub(start).Seconds() + r.processedEventDuration.WithLabelValues(controller, string(eventType)).Observe(secs) +} + +// Check we implement all the required metrics recorder interfaces. +var _ controller.MetricsRecorder = &Recorder{} diff --git a/monitoring/metrics/prometheus_test.go b/metrics/prometheus/prometheus_test.go similarity index 58% rename from monitoring/metrics/prometheus_test.go rename to metrics/prometheus/prometheus_test.go index 251bd534..64ec3569 100644 --- a/monitoring/metrics/prometheus_test.go +++ b/metrics/prometheus/prometheus_test.go @@ -1,6 +1,7 @@ -package metrics_test +package prometheus_test import ( + "context" "io/ioutil" "net/http/httptest" "testing" @@ -10,28 +11,29 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/assert" - "github.com/spotahome/kooper/monitoring/metrics" + "github.com/spotahome/kooper/controller" + kooperprometheus "github.com/spotahome/kooper/metrics/prometheus" ) -func TestPrometheusMetrics(t *testing.T) { - controller := "test" +func TestPrometheusRecorder(t *testing.T) { + controllerID := "test" tests := []struct { name string - addMetrics func(*metrics.Prometheus) + addMetrics func(*kooperprometheus.Recorder) expMetrics []string expCode int }{ { name: "Incrementing different kind of queued events should measure the queued events counter", - addMetrics: func(p *metrics.Prometheus) { - p.IncResourceEventQueued(controller, metrics.AddEvent) - p.IncResourceEventQueued(controller, metrics.AddEvent) - p.IncResourceEventQueued(controller, metrics.AddEvent) - p.IncResourceEventQueued(controller, metrics.AddEvent) - p.IncResourceEventQueued(controller, metrics.DeleteEvent) - p.IncResourceEventQueued(controller, metrics.DeleteEvent) - p.IncResourceEventQueued(controller, metrics.DeleteEvent) + addMetrics: func(r *kooperprometheus.Recorder) { + r.IncResourceEventQueued(context.TODO(), controllerID, controller.AddEvent) + r.IncResourceEventQueued(context.TODO(), controllerID, controller.AddEvent) + r.IncResourceEventQueued(context.TODO(), controllerID, controller.AddEvent) + r.IncResourceEventQueued(context.TODO(), controllerID, controller.AddEvent) + r.IncResourceEventQueued(context.TODO(), controllerID, controller.DeleteEvent) + r.IncResourceEventQueued(context.TODO(), controllerID, controller.DeleteEvent) + r.IncResourceEventQueued(context.TODO(), controllerID, controller.DeleteEvent) }, expMetrics: []string{ `kooper_controller_queued_events_total{controller="test",type="add"} 4`, @@ -41,17 +43,17 @@ func TestPrometheusMetrics(t *testing.T) { }, { name: "Incrementing different kind of processed events should measure the processed events counter", - addMetrics: func(p *metrics.Prometheus) { - p.IncResourceEventProcessed(controller, metrics.AddEvent) - p.IncResourceEventProcessedError(controller, metrics.AddEvent) - p.IncResourceEventProcessedError(controller, metrics.AddEvent) - p.IncResourceEventProcessed(controller, metrics.DeleteEvent) - p.IncResourceEventProcessed(controller, metrics.DeleteEvent) - p.IncResourceEventProcessed(controller, metrics.DeleteEvent) - p.IncResourceEventProcessedError(controller, metrics.DeleteEvent) - p.IncResourceEventProcessedError(controller, metrics.DeleteEvent) - p.IncResourceEventProcessedError(controller, metrics.DeleteEvent) - p.IncResourceEventProcessedError(controller, metrics.DeleteEvent) + addMetrics: func(r *kooperprometheus.Recorder) { + r.IncResourceEventProcessed(context.TODO(), controllerID, controller.AddEvent) + r.IncResourceEventProcessedError(context.TODO(), controllerID, controller.AddEvent) + r.IncResourceEventProcessedError(context.TODO(), controllerID, controller.AddEvent) + r.IncResourceEventProcessed(context.TODO(), controllerID, controller.DeleteEvent) + r.IncResourceEventProcessed(context.TODO(), controllerID, controller.DeleteEvent) + r.IncResourceEventProcessed(context.TODO(), controllerID, controller.DeleteEvent) + r.IncResourceEventProcessedError(context.TODO(), controllerID, controller.DeleteEvent) + r.IncResourceEventProcessedError(context.TODO(), controllerID, controller.DeleteEvent) + r.IncResourceEventProcessedError(context.TODO(), controllerID, controller.DeleteEvent) + r.IncResourceEventProcessedError(context.TODO(), controllerID, controller.DeleteEvent) }, expMetrics: []string{ @@ -64,20 +66,20 @@ func TestPrometheusMetrics(t *testing.T) { }, { name: "Measuring the duration of processed events return the correct buckets.", - addMetrics: func(p *metrics.Prometheus) { + addMetrics: func(r *kooperprometheus.Recorder) { now := time.Now() - p.ObserveDurationResourceEventProcessed(controller, metrics.AddEvent, now.Add(-2*time.Millisecond)) - p.ObserveDurationResourceEventProcessed(controller, metrics.AddEvent, now.Add(-3*time.Millisecond)) - p.ObserveDurationResourceEventProcessed(controller, metrics.AddEvent, now.Add(-11*time.Millisecond)) - p.ObserveDurationResourceEventProcessed(controller, metrics.AddEvent, now.Add(-280*time.Millisecond)) - p.ObserveDurationResourceEventProcessed(controller, metrics.AddEvent, now.Add(-1*time.Second)) - p.ObserveDurationResourceEventProcessed(controller, metrics.AddEvent, now.Add(-5*time.Second)) - p.ObserveDurationResourceEventProcessed(controller, metrics.DeleteEvent, now.Add(-110*time.Millisecond)) - p.ObserveDurationResourceEventProcessed(controller, metrics.DeleteEvent, now.Add(-560*time.Millisecond)) - p.ObserveDurationResourceEventProcessed(controller, metrics.DeleteEvent, now.Add(-4*time.Second)) - p.ObserveDurationResourceEventProcessed(controller, metrics.DeleteEvent, now.Add(-7*time.Second)) - p.ObserveDurationResourceEventProcessed(controller, metrics.DeleteEvent, now.Add(-12*time.Second)) - p.ObserveDurationResourceEventProcessed(controller, metrics.DeleteEvent, now.Add(-30*time.Second)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.AddEvent, now.Add(-2*time.Millisecond)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.AddEvent, now.Add(-3*time.Millisecond)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.AddEvent, now.Add(-11*time.Millisecond)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.AddEvent, now.Add(-280*time.Millisecond)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.AddEvent, now.Add(-1*time.Second)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.AddEvent, now.Add(-5*time.Second)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.DeleteEvent, now.Add(-110*time.Millisecond)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.DeleteEvent, now.Add(-560*time.Millisecond)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.DeleteEvent, now.Add(-4*time.Second)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.DeleteEvent, now.Add(-7*time.Second)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.DeleteEvent, now.Add(-12*time.Second)) + r.ObserveDurationResourceEventProcessed(context.TODO(), controllerID, controller.DeleteEvent, now.Add(-30*time.Second)) }, expMetrics: []string{ `kooper_controller_processed_event_duration_seconds_bucket{controller="test",type="add",le="0.005"} 2`, @@ -118,7 +120,9 @@ func TestPrometheusMetrics(t *testing.T) { // Create a new prometheus empty registry and a kooper prometheus recorder. reg := prometheus.NewRegistry() - m := metrics.NewPrometheus(reg) + m := kooperprometheus.New(kooperprometheus.Config{ + Registerer: reg, + }) // Add desired metrics test.addMetrics(m) diff --git a/monitoring/metrics/dummy.go b/monitoring/metrics/dummy.go deleted file mode 100644 index 11cdbe39..00000000 --- a/monitoring/metrics/dummy.go +++ /dev/null @@ -1,17 +0,0 @@ -package metrics - -import "time" - -// Dummy is a dummy stats recorder. -var Dummy = &dummy{} - -type dummy struct{} - -func (*dummy) IncResourceEventQueued(_ string, _ EventType) { -} -func (*dummy) IncResourceEventProcessed(_ string, _ EventType) { -} -func (*dummy) IncResourceEventProcessedError(_ string, _ EventType) { -} -func (*dummy) ObserveDurationResourceEventProcessed(_ string, _ EventType, _ time.Time) { -} diff --git a/monitoring/metrics/metrics.go b/monitoring/metrics/metrics.go deleted file mode 100644 index f4726eef..00000000 --- a/monitoring/metrics/metrics.go +++ /dev/null @@ -1,27 +0,0 @@ -package metrics - -import "time" - -// EventType is the event type handled by the controller. -type EventType string - -const ( - //AddEvent is the add event. - AddEvent EventType = "add" - // DeleteEvent is the delete event. - DeleteEvent EventType = "delete" - // RequeueEvent is a requeued event (unknown state when handling again). - RequeueEvent EventType = "requeue" -) - -// Recorder knows how to record metrics all over the application. -type Recorder interface { - // IncResourceEvent increments in one the metric records of a queued event. - IncResourceEventQueued(controller string, eventType EventType) - // IncResourceEventProcessed increments in one the metric records processed event. - IncResourceEventProcessed(controller string, eventType EventType) - // IncResourceEventProcessedError increments in one the metric records of a processed event in error. - IncResourceEventProcessedError(controller string, eventType EventType) - // ObserveDurationResourceEventProcessed measures the duration it took to process a event. - ObserveDurationResourceEventProcessed(controller string, eventType EventType, start time.Time) -} diff --git a/monitoring/metrics/prometheus.go b/monitoring/metrics/prometheus.go deleted file mode 100644 index f241176b..00000000 --- a/monitoring/metrics/prometheus.go +++ /dev/null @@ -1,100 +0,0 @@ -package metrics - -import ( - "time" - - "github.com/prometheus/client_golang/prometheus" -) - -const ( - promNamespace = "kooper" - promControllerSubsystem = "controller" -) - -// Prometheus implements the metrics recording in a prometheus registry. -type Prometheus struct { - // Metrics - queuedEvents *prometheus.CounterVec - processedEvents *prometheus.CounterVec - processedEventErrors *prometheus.CounterVec - processedEventDuration *prometheus.HistogramVec - - reg prometheus.Registerer -} - -// NewPrometheus returns a new Prometheus metrics backend with metrics prefixed by the namespace. -func NewPrometheus(registry prometheus.Registerer) *Prometheus { - return NewPrometheusWithBuckets(prometheus.DefBuckets, registry) -} - -// NewPrometheusWithBuckets returns a new Prometheus metrics backend with metrics prefixed by the -// namespace and with custom buckets for the duration/latency metrics. This kind should be used when -// the default buckets don't work. This could happen when the time to process an event is not on the -// range of 5ms-10s duration. -// Check https://godoc.org/github.com/prometheus/client_golang/prometheus#pkg-variables -func NewPrometheusWithBuckets(buckets []float64, registry prometheus.Registerer) *Prometheus { - p := &Prometheus{ - queuedEvents: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: promNamespace, - Subsystem: promControllerSubsystem, - Name: "queued_events_total", - Help: "Total number of events queued.", - }, []string{"controller", "type"}), - - processedEvents: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: promNamespace, - Subsystem: promControllerSubsystem, - Name: "processed_events_total", - Help: "Total number of successfuly processed events.", - }, []string{"controller", "type"}), - - processedEventErrors: prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: promNamespace, - Subsystem: promControllerSubsystem, - Name: "processed_event_errors_total", - Help: "Total number of errors processing events.", - }, []string{"controller", "type"}), - - processedEventDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: promNamespace, - Subsystem: promControllerSubsystem, - Name: "processed_event_duration_seconds", - Help: "The duration for a successful event to be processed.", - Buckets: buckets, - }, []string{"controller", "type"}), - reg: registry, - } - - p.registerMetrics() - return p -} - -func (p *Prometheus) registerMetrics() { - p.reg.MustRegister( - p.queuedEvents, - p.processedEvents, - p.processedEventErrors, - p.processedEventDuration) - -} - -// IncResourceEventQueued satisfies metrics.Recorder interface. -func (p *Prometheus) IncResourceEventQueued(controller string, eventType EventType) { - p.queuedEvents.WithLabelValues(controller, string(eventType)).Inc() -} - -// IncResourceEventProcessed satisfies metrics.Recorder interface. -func (p *Prometheus) IncResourceEventProcessed(controller string, eventType EventType) { - p.processedEvents.WithLabelValues(controller, string(eventType)).Inc() -} - -// IncResourceEventProcessedError satisfies metrics.Recorder interface. -func (p *Prometheus) IncResourceEventProcessedError(controller string, eventType EventType) { - p.processedEventErrors.WithLabelValues(controller, string(eventType)).Inc() -} - -// ObserveDurationResourceEventProcessed satisfies metrics.Recorder interface. -func (p *Prometheus) ObserveDurationResourceEventProcessed(controller string, eventType EventType, start time.Time) { - secs := time.Now().Sub(start).Seconds() - p.processedEventDuration.WithLabelValues(controller, string(eventType)).Observe(secs) -}