Skip to content

Commit

Permalink
Refactor metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Xabier Larrakoetxea <[email protected]>
  • Loading branch information
slok committed Mar 27, 2020
1 parent 53f9470 commit 8afa703
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 227 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ NOTE: Breaking release in controllers.
- Add Logrus helper wrapper.
- Refactor to simplify the retrievers.
- Add multiretriever to retriever different resource types on the same controller.
- Refactor metrics recorder implementation including the prometheus backend, the
output prometheus metrics have not been changed.

## [0.8.0] - 2019-12-11

Expand Down
21 changes: 10 additions & 11 deletions controller/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/spotahome/kooper/controller/leaderelection"
"github.com/spotahome/kooper/log"
"github.com/spotahome/kooper/monitoring/metrics"
)

var (
Expand All @@ -41,7 +40,7 @@ type Config struct {
// leader election will be ignored
LeaderElector leaderelection.Runner
// MetricsRecorder will record the controller metrics.
MetricRecorder metrics.Recorder
MetricsRecorder MetricsRecorder
// Logger will log messages of the controller.
Logger log.Logger

Expand Down Expand Up @@ -77,8 +76,8 @@ func (c *Config) setDefaults() error {
"controller-id": c.Name,
})

if c.MetricRecorder == nil {
c.MetricRecorder = metrics.Dummy
if c.MetricsRecorder == nil {
c.MetricsRecorder = DummyMetricsRecorder
c.Logger.Warningf("no metrics recorder specified, disabling metrics")
}

Expand Down Expand Up @@ -106,7 +105,7 @@ type generic struct {
running bool
runningMu sync.Mutex
cfg Config
metrics metrics.Recorder
metrics MetricsRecorder
leRunner leaderelection.Runner
logger log.Logger
}
Expand Down Expand Up @@ -148,37 +147,37 @@ func New(cfg *Config) (Controller, error) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
cfg.MetricRecorder.IncResourceEventQueued(cfg.Name, metrics.AddEvent)
cfg.MetricsRecorder.IncResourceEventQueued(context.TODO(), cfg.Name, AddEvent)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
cfg.MetricRecorder.IncResourceEventQueued(cfg.Name, metrics.AddEvent)
cfg.MetricsRecorder.IncResourceEventQueued(context.TODO(), cfg.Name, AddEvent)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
cfg.MetricRecorder.IncResourceEventQueued(cfg.Name, metrics.DeleteEvent)
cfg.MetricsRecorder.IncResourceEventQueued(context.TODO(), cfg.Name, DeleteEvent)
}
},
}, cfg.ResyncInterval)

// Create processing chain processor(+middlewares) -> handler(+middlewares).
handler := newMetricsMeasuredHandler(cfg.Name, cfg.MetricRecorder, cfg.Handler)
handler := newMetricsMeasuredHandler(cfg.Name, cfg.MetricsRecorder, cfg.Handler)
processor := newIndexerProcessor(informer.GetIndexer(), handler)
if cfg.ProcessingJobRetries > 0 {
processor = newRetryProcessor(cfg.Name, cfg.ProcessingJobRetries, cfg.MetricRecorder, queue, processor)
processor = newRetryProcessor(cfg.Name, cfg.ProcessingJobRetries, cfg.MetricsRecorder, queue, processor)
}

// Create our generic controller object.
return &generic{
queue: queue,
informer: informer,
metrics: cfg.MetricRecorder,
metrics: cfg.MetricsRecorder,
processor: processor,
leRunner: cfg.LeaderElector,
cfg: *cfg,
Expand Down
18 changes: 8 additions & 10 deletions controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"time"

"k8s.io/apimachinery/pkg/runtime"

"github.com/spotahome/kooper/monitoring/metrics"
)

// Handler knows how to handle the received resources from a kubernetes cluster.
Expand Down Expand Up @@ -47,11 +45,11 @@ func (h *HandlerFunc) Delete(ctx context.Context, s string) error {

type metricsMeasuredHandler struct {
id string
mrec metrics.Recorder
mrec MetricsRecorder
next Handler
}

func newMetricsMeasuredHandler(id string, mrec metrics.Recorder, next Handler) Handler {
func newMetricsMeasuredHandler(id string, mrec MetricsRecorder, next Handler) Handler {
return metricsMeasuredHandler{
id: id,
mrec: mrec,
Expand All @@ -61,28 +59,28 @@ func newMetricsMeasuredHandler(id string, mrec metrics.Recorder, next Handler) H

func (m metricsMeasuredHandler) Add(ctx context.Context, obj runtime.Object) (err error) {
defer func(start time.Time) {
m.mrec.ObserveDurationResourceEventProcessed(m.id, metrics.AddEvent, start)
m.mrec.ObserveDurationResourceEventProcessed(ctx, m.id, AddEvent, start)

if err != nil {
m.mrec.IncResourceEventProcessedError(m.id, metrics.AddEvent)
m.mrec.IncResourceEventProcessedError(ctx, m.id, AddEvent)
}
}(time.Now())

m.mrec.IncResourceEventProcessed(m.id, metrics.AddEvent)
m.mrec.IncResourceEventProcessed(ctx, m.id, AddEvent)

return m.next.Add(ctx, obj)
}

func (m metricsMeasuredHandler) Delete(ctx context.Context, objKey string) (err error) {
defer func(start time.Time) {
m.mrec.ObserveDurationResourceEventProcessed(m.id, metrics.DeleteEvent, start)
m.mrec.ObserveDurationResourceEventProcessed(ctx, m.id, DeleteEvent, start)

if err != nil {
m.mrec.IncResourceEventProcessedError(m.id, metrics.DeleteEvent)
m.mrec.IncResourceEventProcessedError(ctx, m.id, DeleteEvent)
}
}(time.Now())

m.mrec.IncResourceEventProcessed(m.id, metrics.DeleteEvent)
m.mrec.IncResourceEventProcessed(ctx, m.id, DeleteEvent)

return m.next.Delete(ctx, objKey)
}
40 changes: 40 additions & 0 deletions controller/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package controller

import (
"context"
"time"
)

// EventType is the event type of a controller enqueued object.
type EventType string

const (
//AddEvent is the add event.
AddEvent EventType = "add"
// DeleteEvent is the delete event.
DeleteEvent EventType = "delete"
// RequeueEvent is a requeued event (unknown state when handling again).
RequeueEvent EventType = "requeue"
)

// MetricsRecorder knows how to record metrics of a controller.
type MetricsRecorder interface {
// IncResourceEvent increments in one the metric records of a queued event.
IncResourceEventQueued(ctx context.Context, controller string, eventType EventType)
// IncResourceEventProcessed increments in one the metric records processed event.
IncResourceEventProcessed(ctx context.Context, controller string, eventType EventType)
// IncResourceEventProcessedError increments in one the metric records of a processed event in error.
IncResourceEventProcessedError(ctx context.Context, controller string, eventType EventType)
// ObserveDurationResourceEventProcessed measures the duration it took to process a event.
ObserveDurationResourceEventProcessed(ctx context.Context, controller string, eventType EventType, start time.Time)
}

// DummyMetricsRecorder is a dummy metrics recorder that satisifies all .
var DummyMetricsRecorder = dummy(0)

type dummy int

func (dummy) IncResourceEventQueued(_ context.Context, _ string, _ EventType) {}
func (dummy) IncResourceEventProcessed(_ context.Context, _ string, _ EventType) {}
func (dummy) IncResourceEventProcessedError(_ context.Context, _ string, _ EventType) {}
func (dummy) ObserveDurationResourceEventProcessed(_ context.Context, _ string, _ EventType, _ time.Time) {}
8 changes: 3 additions & 5 deletions controller/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/spotahome/kooper/monitoring/metrics"
)

// processor knows how to process object keys.
Expand Down Expand Up @@ -57,12 +55,12 @@ var errRequeued = fmt.Errorf("requeued after receiving error")
type retryProcessor struct {
name string
maxRetries int
mrec metrics.Recorder
mrec MetricsRecorder
queue workqueue.RateLimitingInterface
next processor
}

func newRetryProcessor(name string, maxRetries int, mrec metrics.Recorder, queue workqueue.RateLimitingInterface, next processor) processor {
func newRetryProcessor(name string, maxRetries int, mrec MetricsRecorder, queue workqueue.RateLimitingInterface, next processor) processor {
return retryProcessor{
name: name,
maxRetries: maxRetries,
Expand All @@ -78,7 +76,7 @@ func (r retryProcessor) Process(ctx context.Context, key string) error {
// If there was an error and we have retries pending then requeue.
if err != nil && r.queue.NumRequeues(key) < r.maxRetries {
r.queue.AddRateLimited(key)
r.mrec.IncResourceEventQueued(r.name, metrics.RequeueEvent)
r.mrec.IncResourceEventQueued(ctx, r.name, RequeueEvent)
return fmt.Errorf("%w: %s", errRequeued, err)
}

Expand Down
24 changes: 5 additions & 19 deletions examples/metrics-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/spotahome/kooper/controller"
"github.com/spotahome/kooper/log"
kooperlogrus "github.com/spotahome/kooper/log/logrus"
"github.com/spotahome/kooper/monitoring/metrics"
kooperprometheus "github.com/spotahome/kooper/metrics/prometheus"
)

const (
Expand Down Expand Up @@ -62,11 +62,11 @@ func errRandomly() error {
}

// creates prometheus recorder and starts serving metrics in background.
func createPrometheusRecorder(logger log.Logger) metrics.Recorder {
func createPrometheusRecorder(logger log.Logger) *kooperprometheus.Recorder {
// We could use also prometheus global registry (the default one)
// prometheus.DefaultRegisterer instead of creating a new one
reg := prometheus.NewRegistry()
m := metrics.NewPrometheus(reg)
rec := kooperprometheus.New(kooperprometheus.Config{Registerer: reg})

// Start serving metrics in background.
h := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
Expand All @@ -75,17 +75,7 @@ func createPrometheusRecorder(logger log.Logger) metrics.Recorder {
http.ListenAndServe(metricsAddr, h)
}()

return m
}

func getMetricRecorder(backend string, logger log.Logger) (metrics.Recorder, error) {
switch backend {
case prometheusBackend:
logger.Infof("using Prometheus metrics recorder")
return createPrometheusRecorder(logger), nil
}

return nil, fmt.Errorf("wrong metrics backend")
return rec
}

func run() error {
Expand Down Expand Up @@ -136,15 +126,11 @@ func run() error {
}

// Create the controller that will refresh every 30 seconds.
m, err := getMetricRecorder(metricsBackend, logger)
if err != nil {
return fmt.Errorf("errors getting metrics backend: %w", err)
}
cfg := &controller.Config{
Name: "metricsControllerTest",
Handler: hand,
Retriever: retr,
MetricRecorder: m,
MetricsRecorder: createPrometheusRecorder(logger),
Logger: logger,
ProcessingJobRetries: 3,
}
Expand Down
114 changes: 114 additions & 0 deletions metrics/prometheus/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package prometheus

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/spotahome/kooper/controller"
)

const (
promNamespace = "kooper"
promControllerSubsystem = "controller"
)

// Config is the Recorder Config.
type Config struct {
// Registerer is a prometheus registerer, e.g: prometheus.Registry.
// By default will use Prometheus default registry.
Registerer prometheus.Registerer
// Buckets sets custom buckets for the duration/latency metrics. This should be used when
// the default buckets don't work. This could happen when the time to process an event is not on the
// range of 5ms-10s duration.
// Check https://godoc.org/github.com/prometheus/client_golang/prometheus#pkg-variables
Buckets []float64
}

func (c *Config) defaults() {
if c.Registerer == nil {
c.Registerer = prometheus.DefaultRegisterer
}

if c.Buckets == nil || len(c.Buckets) == 0 {
c.Buckets = prometheus.DefBuckets
}
}

// Recorder implements the metrics recording in a prometheus registry.
type Recorder struct {
// Metrics
queuedEvents *prometheus.CounterVec
processedEvents *prometheus.CounterVec
processedEventErrors *prometheus.CounterVec
processedEventDuration *prometheus.HistogramVec
}

// New returns a new Prometheus implementaiton for a metrics recorder.
func New(cfg Config) *Recorder {
cfg.defaults()

r := &Recorder{
queuedEvents: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promControllerSubsystem,
Name: "queued_events_total",
Help: "Total number of events queued.",
}, []string{"controller", "type"}),

processedEvents: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promControllerSubsystem,
Name: "processed_events_total",
Help: "Total number of successfuly processed events.",
}, []string{"controller", "type"}),

processedEventErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promControllerSubsystem,
Name: "processed_event_errors_total",
Help: "Total number of errors processing events.",
}, []string{"controller", "type"}),

processedEventDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: promNamespace,
Subsystem: promControllerSubsystem,
Name: "processed_event_duration_seconds",
Help: "The duration for a successful event to be processed.",
Buckets: cfg.Buckets,
}, []string{"controller", "type"}),
}

// Register metrics.
cfg.Registerer.MustRegister(
r.queuedEvents,
r.processedEvents,
r.processedEventErrors,
r.processedEventDuration)

return r
}

// IncResourceEventQueued satisfies metrics.Recorder interface.
func (r Recorder) IncResourceEventQueued(_ context.Context, controller string, eventType controller.EventType) {
r.queuedEvents.WithLabelValues(controller, string(eventType)).Inc()
}

// IncResourceEventProcessed satisfies metrics.Recorder interface.
func (r Recorder) IncResourceEventProcessed(_ context.Context, controller string, eventType controller.EventType) {
r.processedEvents.WithLabelValues(controller, string(eventType)).Inc()
}

// IncResourceEventProcessedError satisfies metrics.Recorder interface.
func (r Recorder) IncResourceEventProcessedError(_ context.Context, controller string, eventType controller.EventType) {
r.processedEventErrors.WithLabelValues(controller, string(eventType)).Inc()
}

// ObserveDurationResourceEventProcessed satisfies metrics.Recorder interface.
func (r Recorder) ObserveDurationResourceEventProcessed(_ context.Context, controller string, eventType controller.EventType, start time.Time) {
secs := time.Now().Sub(start).Seconds()
r.processedEventDuration.WithLabelValues(controller, string(eventType)).Observe(secs)
}

// Check we implement all the required metrics recorder interfaces.
var _ controller.MetricsRecorder = &Recorder{}
Loading

0 comments on commit 8afa703

Please sign in to comment.