Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor metrics #88

Merged
merged 1 commit into from
Mar 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ NOTE: Breaking release in controllers.
- Add Logrus helper wrapper.
- Refactor to simplify the retrievers.
- Add multiretriever to retriever different resource types on the same controller.
- Refactor metrics recorder implementation including the prometheus backend, the
output prometheus metrics have not been changed.

## [0.8.0] - 2019-12-11

Expand Down
21 changes: 10 additions & 11 deletions controller/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/spotahome/kooper/controller/leaderelection"
"github.com/spotahome/kooper/log"
"github.com/spotahome/kooper/monitoring/metrics"
)

var (
Expand All @@ -41,7 +40,7 @@ type Config struct {
// leader election will be ignored
LeaderElector leaderelection.Runner
// MetricsRecorder will record the controller metrics.
MetricRecorder metrics.Recorder
MetricsRecorder MetricsRecorder
// Logger will log messages of the controller.
Logger log.Logger

Expand Down Expand Up @@ -77,8 +76,8 @@ func (c *Config) setDefaults() error {
"controller-id": c.Name,
})

if c.MetricRecorder == nil {
c.MetricRecorder = metrics.Dummy
if c.MetricsRecorder == nil {
c.MetricsRecorder = DummyMetricsRecorder
c.Logger.Warningf("no metrics recorder specified, disabling metrics")
}

Expand Down Expand Up @@ -106,7 +105,7 @@ type generic struct {
running bool
runningMu sync.Mutex
cfg Config
metrics metrics.Recorder
metrics MetricsRecorder
leRunner leaderelection.Runner
logger log.Logger
}
Expand Down Expand Up @@ -148,37 +147,37 @@ func New(cfg *Config) (Controller, error) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
cfg.MetricRecorder.IncResourceEventQueued(cfg.Name, metrics.AddEvent)
cfg.MetricsRecorder.IncResourceEventQueued(context.TODO(), cfg.Name, AddEvent)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
cfg.MetricRecorder.IncResourceEventQueued(cfg.Name, metrics.AddEvent)
cfg.MetricsRecorder.IncResourceEventQueued(context.TODO(), cfg.Name, AddEvent)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
cfg.MetricRecorder.IncResourceEventQueued(cfg.Name, metrics.DeleteEvent)
cfg.MetricsRecorder.IncResourceEventQueued(context.TODO(), cfg.Name, DeleteEvent)
}
},
}, cfg.ResyncInterval)

// Create processing chain processor(+middlewares) -> handler(+middlewares).
handler := newMetricsMeasuredHandler(cfg.Name, cfg.MetricRecorder, cfg.Handler)
handler := newMetricsMeasuredHandler(cfg.Name, cfg.MetricsRecorder, cfg.Handler)
processor := newIndexerProcessor(informer.GetIndexer(), handler)
if cfg.ProcessingJobRetries > 0 {
processor = newRetryProcessor(cfg.Name, cfg.ProcessingJobRetries, cfg.MetricRecorder, queue, processor)
processor = newRetryProcessor(cfg.Name, cfg.ProcessingJobRetries, cfg.MetricsRecorder, queue, processor)
}

// Create our generic controller object.
return &generic{
queue: queue,
informer: informer,
metrics: cfg.MetricRecorder,
metrics: cfg.MetricsRecorder,
processor: processor,
leRunner: cfg.LeaderElector,
cfg: *cfg,
Expand Down
18 changes: 8 additions & 10 deletions controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"time"

"k8s.io/apimachinery/pkg/runtime"

"github.com/spotahome/kooper/monitoring/metrics"
)

// Handler knows how to handle the received resources from a kubernetes cluster.
Expand Down Expand Up @@ -47,11 +45,11 @@ func (h *HandlerFunc) Delete(ctx context.Context, s string) error {

type metricsMeasuredHandler struct {
id string
mrec metrics.Recorder
mrec MetricsRecorder
next Handler
}

func newMetricsMeasuredHandler(id string, mrec metrics.Recorder, next Handler) Handler {
func newMetricsMeasuredHandler(id string, mrec MetricsRecorder, next Handler) Handler {
return metricsMeasuredHandler{
id: id,
mrec: mrec,
Expand All @@ -61,28 +59,28 @@ func newMetricsMeasuredHandler(id string, mrec metrics.Recorder, next Handler) H

func (m metricsMeasuredHandler) Add(ctx context.Context, obj runtime.Object) (err error) {
defer func(start time.Time) {
m.mrec.ObserveDurationResourceEventProcessed(m.id, metrics.AddEvent, start)
m.mrec.ObserveDurationResourceEventProcessed(ctx, m.id, AddEvent, start)

if err != nil {
m.mrec.IncResourceEventProcessedError(m.id, metrics.AddEvent)
m.mrec.IncResourceEventProcessedError(ctx, m.id, AddEvent)
}
}(time.Now())

m.mrec.IncResourceEventProcessed(m.id, metrics.AddEvent)
m.mrec.IncResourceEventProcessed(ctx, m.id, AddEvent)

return m.next.Add(ctx, obj)
}

func (m metricsMeasuredHandler) Delete(ctx context.Context, objKey string) (err error) {
defer func(start time.Time) {
m.mrec.ObserveDurationResourceEventProcessed(m.id, metrics.DeleteEvent, start)
m.mrec.ObserveDurationResourceEventProcessed(ctx, m.id, DeleteEvent, start)

if err != nil {
m.mrec.IncResourceEventProcessedError(m.id, metrics.DeleteEvent)
m.mrec.IncResourceEventProcessedError(ctx, m.id, DeleteEvent)
}
}(time.Now())

m.mrec.IncResourceEventProcessed(m.id, metrics.DeleteEvent)
m.mrec.IncResourceEventProcessed(ctx, m.id, DeleteEvent)

return m.next.Delete(ctx, objKey)
}
40 changes: 40 additions & 0 deletions controller/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package controller

import (
"context"
"time"
)

// EventType is the event type of a controller enqueued object.
type EventType string

const (
//AddEvent is the add event.
AddEvent EventType = "add"
// DeleteEvent is the delete event.
DeleteEvent EventType = "delete"
// RequeueEvent is a requeued event (unknown state when handling again).
RequeueEvent EventType = "requeue"
)

// MetricsRecorder knows how to record metrics of a controller.
type MetricsRecorder interface {
// IncResourceEvent increments in one the metric records of a queued event.
IncResourceEventQueued(ctx context.Context, controller string, eventType EventType)
// IncResourceEventProcessed increments in one the metric records processed event.
IncResourceEventProcessed(ctx context.Context, controller string, eventType EventType)
// IncResourceEventProcessedError increments in one the metric records of a processed event in error.
IncResourceEventProcessedError(ctx context.Context, controller string, eventType EventType)
// ObserveDurationResourceEventProcessed measures the duration it took to process a event.
ObserveDurationResourceEventProcessed(ctx context.Context, controller string, eventType EventType, start time.Time)
}

// DummyMetricsRecorder is a dummy metrics recorder.
var DummyMetricsRecorder = dummy(0)

type dummy int

func (dummy) IncResourceEventQueued(_ context.Context, _ string, _ EventType) {}
func (dummy) IncResourceEventProcessed(_ context.Context, _ string, _ EventType) {}
func (dummy) IncResourceEventProcessedError(_ context.Context, _ string, _ EventType) {}
func (dummy) ObserveDurationResourceEventProcessed(_ context.Context, _ string, _ EventType, _ time.Time) {}
8 changes: 3 additions & 5 deletions controller/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/spotahome/kooper/monitoring/metrics"
)

// processor knows how to process object keys.
Expand Down Expand Up @@ -57,12 +55,12 @@ var errRequeued = fmt.Errorf("requeued after receiving error")
type retryProcessor struct {
name string
maxRetries int
mrec metrics.Recorder
mrec MetricsRecorder
queue workqueue.RateLimitingInterface
next processor
}

func newRetryProcessor(name string, maxRetries int, mrec metrics.Recorder, queue workqueue.RateLimitingInterface, next processor) processor {
func newRetryProcessor(name string, maxRetries int, mrec MetricsRecorder, queue workqueue.RateLimitingInterface, next processor) processor {
return retryProcessor{
name: name,
maxRetries: maxRetries,
Expand All @@ -78,7 +76,7 @@ func (r retryProcessor) Process(ctx context.Context, key string) error {
// If there was an error and we have retries pending then requeue.
if err != nil && r.queue.NumRequeues(key) < r.maxRetries {
r.queue.AddRateLimited(key)
r.mrec.IncResourceEventQueued(r.name, metrics.RequeueEvent)
r.mrec.IncResourceEventQueued(ctx, r.name, RequeueEvent)
return fmt.Errorf("%w: %s", errRequeued, err)
}

Expand Down
24 changes: 5 additions & 19 deletions examples/metrics-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/spotahome/kooper/controller"
"github.com/spotahome/kooper/log"
kooperlogrus "github.com/spotahome/kooper/log/logrus"
"github.com/spotahome/kooper/monitoring/metrics"
kooperprometheus "github.com/spotahome/kooper/metrics/prometheus"
)

const (
Expand Down Expand Up @@ -62,11 +62,11 @@ func errRandomly() error {
}

// creates prometheus recorder and starts serving metrics in background.
func createPrometheusRecorder(logger log.Logger) metrics.Recorder {
func createPrometheusRecorder(logger log.Logger) *kooperprometheus.Recorder {
// We could use also prometheus global registry (the default one)
// prometheus.DefaultRegisterer instead of creating a new one
reg := prometheus.NewRegistry()
m := metrics.NewPrometheus(reg)
rec := kooperprometheus.New(kooperprometheus.Config{Registerer: reg})

// Start serving metrics in background.
h := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
Expand All @@ -75,17 +75,7 @@ func createPrometheusRecorder(logger log.Logger) metrics.Recorder {
http.ListenAndServe(metricsAddr, h)
}()

return m
}

func getMetricRecorder(backend string, logger log.Logger) (metrics.Recorder, error) {
switch backend {
case prometheusBackend:
logger.Infof("using Prometheus metrics recorder")
return createPrometheusRecorder(logger), nil
}

return nil, fmt.Errorf("wrong metrics backend")
return rec
}

func run() error {
Expand Down Expand Up @@ -136,15 +126,11 @@ func run() error {
}

// Create the controller that will refresh every 30 seconds.
m, err := getMetricRecorder(metricsBackend, logger)
if err != nil {
return fmt.Errorf("errors getting metrics backend: %w", err)
}
cfg := &controller.Config{
Name: "metricsControllerTest",
Handler: hand,
Retriever: retr,
MetricRecorder: m,
MetricsRecorder: createPrometheusRecorder(logger),
Logger: logger,
ProcessingJobRetries: 3,
}
Expand Down
114 changes: 114 additions & 0 deletions metrics/prometheus/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package prometheus

import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/spotahome/kooper/controller"
)

const (
promNamespace = "kooper"
promControllerSubsystem = "controller"
)

// Config is the Recorder Config.
type Config struct {
// Registerer is a prometheus registerer, e.g: prometheus.Registry.
// By default will use Prometheus default registry.
Registerer prometheus.Registerer
// Buckets sets custom buckets for the duration/latency metrics. This should be used when
// the default buckets don't work. This could happen when the time to process an event is not on the
// range of 5ms-10s duration.
// Check https://godoc.org/github.com/prometheus/client_golang/prometheus#pkg-variables
Buckets []float64
}

func (c *Config) defaults() {
if c.Registerer == nil {
c.Registerer = prometheus.DefaultRegisterer
}

if c.Buckets == nil || len(c.Buckets) == 0 {
c.Buckets = prometheus.DefBuckets
}
}

// Recorder implements the metrics recording in a prometheus registry.
type Recorder struct {
// Metrics
queuedEvents *prometheus.CounterVec
processedEvents *prometheus.CounterVec
processedEventErrors *prometheus.CounterVec
processedEventDuration *prometheus.HistogramVec
}

// New returns a new Prometheus implementaiton for a metrics recorder.
func New(cfg Config) *Recorder {
cfg.defaults()

r := &Recorder{
queuedEvents: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promControllerSubsystem,
Name: "queued_events_total",
Help: "Total number of events queued.",
}, []string{"controller", "type"}),

processedEvents: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promControllerSubsystem,
Name: "processed_events_total",
Help: "Total number of successfuly processed events.",
}, []string{"controller", "type"}),

processedEventErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promControllerSubsystem,
Name: "processed_event_errors_total",
Help: "Total number of errors processing events.",
}, []string{"controller", "type"}),

processedEventDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: promNamespace,
Subsystem: promControllerSubsystem,
Name: "processed_event_duration_seconds",
Help: "The duration for a successful event to be processed.",
Buckets: cfg.Buckets,
}, []string{"controller", "type"}),
}

// Register metrics.
cfg.Registerer.MustRegister(
r.queuedEvents,
r.processedEvents,
r.processedEventErrors,
r.processedEventDuration)

return r
}

// IncResourceEventQueued satisfies metrics.Recorder interface.
func (r Recorder) IncResourceEventQueued(_ context.Context, controller string, eventType controller.EventType) {
r.queuedEvents.WithLabelValues(controller, string(eventType)).Inc()
}

// IncResourceEventProcessed satisfies metrics.Recorder interface.
func (r Recorder) IncResourceEventProcessed(_ context.Context, controller string, eventType controller.EventType) {
r.processedEvents.WithLabelValues(controller, string(eventType)).Inc()
}

// IncResourceEventProcessedError satisfies metrics.Recorder interface.
func (r Recorder) IncResourceEventProcessedError(_ context.Context, controller string, eventType controller.EventType) {
r.processedEventErrors.WithLabelValues(controller, string(eventType)).Inc()
}

// ObserveDurationResourceEventProcessed satisfies metrics.Recorder interface.
func (r Recorder) ObserveDurationResourceEventProcessed(_ context.Context, controller string, eventType controller.EventType, start time.Time) {
secs := time.Now().Sub(start).Seconds()
r.processedEventDuration.WithLabelValues(controller, string(eventType)).Observe(secs)
}

// Check we implement all the required metrics recorder interfaces.
var _ controller.MetricsRecorder = &Recorder{}
Loading