Skip to content

Commit

Permalink
routing: measure CreateFilter latency (zalando#3115)
Browse files Browse the repository at this point in the history
Add `MeasureFilterCreate` method to `Metrics` interface similar to
existing `MeasureFilterRequest` and `MeasureFilterResponse`.

The alternative would be to use `MeasureSince` but then filter name
would have to be embedded into metric key.

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov authored and Janardhan Sharma committed Jul 19, 2024
1 parent 63e499e commit deeea50
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 16 deletions.
22 changes: 22 additions & 0 deletions metrics/all_kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,71 +23,93 @@ func (a *All) MeasureSince(key string, start time.Time) {
a.prometheus.MeasureSince(key, start)
a.codaHale.MeasureSince(key, start)
}

func (a *All) IncCounter(key string) {
a.prometheus.IncCounter(key)
a.codaHale.IncCounter(key)
}

func (a *All) IncCounterBy(key string, value int64) {
a.prometheus.IncCounterBy(key, value)
a.codaHale.IncCounterBy(key, value)
}

func (a *All) IncFloatCounterBy(key string, value float64) {
a.prometheus.IncFloatCounterBy(key, value)
a.codaHale.IncFloatCounterBy(key, value)
}

func (a *All) UpdateGauge(key string, v float64) {
a.prometheus.UpdateGauge(key, v)
a.codaHale.UpdateGauge(key, v)
}

func (a *All) MeasureRouteLookup(start time.Time) {
a.prometheus.MeasureRouteLookup(start)
a.codaHale.MeasureRouteLookup(start)
}

func (a *All) MeasureFilterCreate(filterName string, start time.Time) {
a.prometheus.MeasureFilterCreate(filterName, start)
a.codaHale.MeasureFilterCreate(filterName, start)
}

func (a *All) MeasureFilterRequest(filterName string, start time.Time) {
a.prometheus.MeasureFilterRequest(filterName, start)
a.codaHale.MeasureFilterRequest(filterName, start)
}

func (a *All) MeasureAllFiltersRequest(routeId string, start time.Time) {
a.prometheus.MeasureAllFiltersRequest(routeId, start)
a.codaHale.MeasureAllFiltersRequest(routeId, start)
}

func (a *All) MeasureBackend(routeId string, start time.Time) {
a.prometheus.MeasureBackend(routeId, start)
a.codaHale.MeasureBackend(routeId, start)
}

func (a *All) MeasureBackendHost(routeBackendHost string, start time.Time) {
a.prometheus.MeasureBackendHost(routeBackendHost, start)
a.codaHale.MeasureBackendHost(routeBackendHost, start)
}

func (a *All) MeasureFilterResponse(filterName string, start time.Time) {
a.prometheus.MeasureFilterResponse(filterName, start)
a.codaHale.MeasureFilterResponse(filterName, start)
}

func (a *All) MeasureAllFiltersResponse(routeId string, start time.Time) {
a.prometheus.MeasureAllFiltersResponse(routeId, start)
a.codaHale.MeasureAllFiltersResponse(routeId, start)
}

func (a *All) MeasureResponse(code int, method string, routeId string, start time.Time) {
a.prometheus.MeasureResponse(code, method, routeId, start)
a.codaHale.MeasureResponse(code, method, routeId, start)
}

func (a *All) MeasureServe(routeId, host, method string, code int, start time.Time) {
a.prometheus.MeasureServe(routeId, host, method, code, start)
a.codaHale.MeasureServe(routeId, host, method, code, start)
}

func (a *All) IncRoutingFailures() {
a.prometheus.IncRoutingFailures()
a.codaHale.IncRoutingFailures()
}

func (a *All) IncErrorsBackend(routeId string) {
a.prometheus.IncErrorsBackend(routeId)
a.codaHale.IncErrorsBackend(routeId)
}

func (a *All) MeasureBackend5xx(t time.Time) {
a.prometheus.MeasureBackend5xx(t)
a.codaHale.MeasureBackend5xx(t)

}

func (a *All) IncErrorsStreaming(routeId string) {
a.prometheus.IncErrorsStreaming(routeId)
a.codaHale.IncErrorsStreaming(routeId)
Expand Down
5 changes: 5 additions & 0 deletions metrics/codahale.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
const (
KeyRouteLookup = "routelookup"
KeyRouteFailure = "routefailure"
KeyFilterCreate = "filter.%s.create"
KeyFilterRequest = "filter.%s.request"
KeyFiltersRequest = "allfilters.request.%s"
KeyAllFiltersRequestCombined = "allfilters.combined.request"
Expand Down Expand Up @@ -131,6 +132,10 @@ func (c *CodaHale) MeasureRouteLookup(start time.Time) {
c.measureSince(KeyRouteLookup, start)
}

func (c *CodaHale) MeasureFilterCreate(filterName string, start time.Time) {
c.measureSince(fmt.Sprintf(KeyFilterCreate, filterName), start)
}

func (c *CodaHale) MeasureFilterRequest(filterName string, start time.Time) {
c.measureSince(fmt.Sprintf(KeyFilterRequest, filterName), start)
}
Expand Down
1 change: 1 addition & 0 deletions metrics/codahale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type proxyMetricTest struct {
var proxyMetricsTests = []proxyMetricTest{
// T1 - Measure routing
{KeyRouteLookup, func(m Metrics) { m.MeasureRouteLookup(time.Now()) }},
{fmt.Sprintf(KeyFilterCreate, "afilter"), func(m Metrics) { m.MeasureFilterCreate("afilter", time.Now()) }},
// T2 - Measure filter request
{fmt.Sprintf(KeyFilterRequest, "foo"), func(m Metrics) { m.MeasureFilterRequest("foo", time.Now()) }},
// T3 - Measure all filters request
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Metrics interface {
IncFloatCounterBy(key string, value float64)
// Additional methods
MeasureRouteLookup(start time.Time)
MeasureFilterCreate(filterName string, start time.Time)
MeasureFilterRequest(filterName string, start time.Time)
MeasureAllFiltersRequest(routeId string, start time.Time)
MeasureBackend(routeId string, start time.Time)
Expand Down
8 changes: 8 additions & 0 deletions metrics/metricstest/metricsmock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metricstest

import (
"fmt"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -112,6 +113,13 @@ func (m *MockMetrics) MeasureRouteLookup(start time.Time) {
// implement me
}

func (m *MockMetrics) MeasureFilterCreate(filterName string, start time.Time) {
key := fmt.Sprintf("%sfilter.%s.create", m.Prefix, filterName)
m.WithMeasures(func(measures map[string][]time.Duration) {
measures[key] = append(m.measures[key], time.Since(start))
})
}

func (m *MockMetrics) MeasureFilterRequest(filterName string, start time.Time) {
// implement me
}
Expand Down
15 changes: 15 additions & 0 deletions metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Prometheus struct {
routeLookupM *prometheus.HistogramVec
routeErrorsM *prometheus.CounterVec
responseM *prometheus.HistogramVec
filterCreateM *prometheus.HistogramVec
filterRequestM *prometheus.HistogramVec
filterAllRequestM *prometheus.HistogramVec
filterAllCombinedRequestM *prometheus.HistogramVec
Expand Down Expand Up @@ -86,6 +87,14 @@ func NewPrometheus(opts Options) *Prometheus {
Buckets: opts.HistogramBuckets,
}, []string{"code", "method", "route"})

filterCreate := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: promFilterSubsystem,
Name: "create_duration_seconds",
Help: "Duration in seconds of filter creation.",
Buckets: opts.HistogramBuckets,
}, []string{"filter"})

filterRequest := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: promFilterSubsystem,
Expand Down Expand Up @@ -229,6 +238,7 @@ func NewPrometheus(opts Options) *Prometheus {
routeLookupM: routeLookup,
routeErrorsM: routeErrors,
responseM: response,
filterCreateM: filterCreate,
filterRequestM: filterRequest,
filterAllRequestM: filterAllRequest,
filterAllCombinedRequestM: filterAllCombinedRequest,
Expand Down Expand Up @@ -352,6 +362,11 @@ func (p *Prometheus) MeasureRouteLookup(start time.Time) {
p.routeLookupM.WithLabelValues().Observe(t)
}

func (p *Prometheus) MeasureFilterCreate(filterName string, start time.Time) {
t := p.sinceS(start)
p.filterCreateM.WithLabelValues(filterName).Observe(t)
}

// MeasureFilterRequest satisfies Metrics interface.
func (p *Prometheus) MeasureFilterRequest(filterName string, start time.Time) {
t := p.sinceS(start)
Expand Down
25 changes: 16 additions & 9 deletions routing/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ func splitBackend(r *eskip.Route) (string, string, error) {

// creates a filter instance based on its definition and its
// specification in the filter registry.
func createFilter(fr filters.Registry, def *eskip.Filter, cpm map[string]PredicateSpec) (filters.Filter, error) {
spec, ok := fr[def.Name]
func createFilter(o *Options, def *eskip.Filter, cpm map[string]PredicateSpec) (filters.Filter, error) {
spec, ok := o.FilterRegistry[def.Name]
if !ok {
if isTreePredicate(def.Name) || def.Name == predicates.HostName || def.Name == predicates.PathRegexpName || def.Name == predicates.MethodName || def.Name == predicates.HeaderName || def.Name == predicates.HeaderRegexpName {
return nil, fmt.Errorf("trying to use %q as filter, but it is only available as predicate", def.Name)
Expand All @@ -222,7 +222,14 @@ func createFilter(fr filters.Registry, def *eskip.Filter, cpm map[string]Predica
return nil, fmt.Errorf("filter %q not found", def.Name)
}

start := time.Now()

f, err := spec.CreateFilter(def.Args)

if o.Metrics != nil { // measure regardless of the error
o.Metrics.MeasureFilterCreate(def.Name, start)
}

if err != nil {
return nil, fmt.Errorf("failed to create filter %q: %w", spec.Name(), err)
}
Expand All @@ -231,10 +238,10 @@ func createFilter(fr filters.Registry, def *eskip.Filter, cpm map[string]Predica

// creates filter instances based on their definition
// and the filter registry.
func createFilters(fr filters.Registry, defs []*eskip.Filter, cpm map[string]PredicateSpec) ([]*RouteFilter, error) {
func createFilters(o *Options, defs []*eskip.Filter, cpm map[string]PredicateSpec) ([]*RouteFilter, error) {
fs := make([]*RouteFilter, 0, len(defs))
for i, def := range defs {
f, err := createFilter(fr, def, cpm)
f, err := createFilter(o, def, cpm)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -456,13 +463,13 @@ func processTreePredicates(r *Route, predicateList []*eskip.Predicate) error {
}

// processes a route definition for the routing table
func processRouteDef(cpm map[string]PredicateSpec, fr filters.Registry, def *eskip.Route) (*Route, error) {
func processRouteDef(o *Options, cpm map[string]PredicateSpec, def *eskip.Route) (*Route, error) {
scheme, host, err := splitBackend(def)
if err != nil {
return nil, err
}

fs, err := createFilters(fr, def.Filters, cpm)
fs, err := createFilters(o, def.Filters, cpm)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -496,10 +503,10 @@ func mapPredicates(cps []PredicateSpec) map[string]PredicateSpec {
}

// processes a set of route definitions for the routing table
func processRouteDefs(o Options, fr filters.Registry, defs []*eskip.Route) (routes []*Route, invalidDefs []*eskip.Route) {
func processRouteDefs(o *Options, defs []*eskip.Route) (routes []*Route, invalidDefs []*eskip.Route) {
cpm := mapPredicates(o.Predicates)
for _, def := range defs {
route, err := processRouteDef(cpm, fr, def)
route, err := processRouteDef(o, cpm, def)
if err == nil {
routes = append(routes, route)
} else {
Expand Down Expand Up @@ -557,7 +564,7 @@ func receiveRouteMatcher(o Options, out chan<- *routeTable, quit <-chan struct{}
defs = o.PreProcessors[i].Do(defs)
}

routes, invalidRoutes := processRouteDefs(o, o.FilterRegistry, defs)
routes, invalidRoutes := processRouteDefs(&o, defs)

for i := range o.PostProcessors {
routes = o.PostProcessors[i].Do(routes)
Expand Down
66 changes: 63 additions & 3 deletions routing/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/builtin"
"github.com/zalando/skipper/logging"
"github.com/zalando/skipper/logging/loggingtest"
"github.com/zalando/skipper/metrics/metricstest"
"github.com/zalando/skipper/predicates/primitive"
"github.com/zalando/skipper/predicates/query"
"github.com/zalando/skipper/routing"
Expand Down Expand Up @@ -56,10 +58,12 @@ func TestNoMultipleTreePredicates(t *testing.T) {
}

erred := false
o := &routing.Options{
FilterRegistry: make(filters.Registry),
}
pr := make(map[string]routing.PredicateSpec)
fr := make(filters.Registry)
for _, d := range defs {
if _, err := routing.ExportProcessRouteDef(pr, fr, d); err != nil {
if _, err := routing.ExportProcessRouteDef(o, pr, d); err != nil {
erred = true
break
}
Expand Down Expand Up @@ -119,8 +123,11 @@ func TestProcessRouteDefErrors(t *testing.T) {
}
fr := make(filters.Registry)
fr.Register(builtin.NewSetPath())
o := &routing.Options{
FilterRegistry: fr,
}
for _, d := range defs {
_, err := routing.ExportProcessRouteDef(pr, fr, d)
_, err := routing.ExportProcessRouteDef(o, pr, d)
if err == nil || err.Error() != ti.err {
t.Errorf("expected error '%s'. Got: '%s'", ti.err, err)
}
Expand Down Expand Up @@ -298,6 +305,41 @@ func TestLogging(t *testing.T) {
})
}

func TestMetrics(t *testing.T) {
t.Run("create filter latency", func(t *testing.T) {
client, err := testdataclient.NewDoc(`
r0: * -> slowCreate("100ms") -> slowCreate("200ms") -> slowCreate("100ms") -> <shunt>;
`)
if err != nil {
t.Fatal(err)
}
defer client.Close()

metrics := &metricstest.MockMetrics{
Now: time.Now(),
}
fr := make(filters.Registry)
fr.Register(slowCreateSpec{})

r := routing.New(routing.Options{
DataClients: []routing.DataClient{client},
FilterRegistry: fr,
Metrics: metrics,
SignalFirstLoad: true,
})
defer r.Close()
<-r.FirstLoad()

metrics.WithMeasures(func(m map[string][]time.Duration) {
assert.InEpsilonSlice(t, []time.Duration{
100 * time.Millisecond,
200 * time.Millisecond,
100 * time.Millisecond,
}, m["filter.slowCreate.create"], 0.1)
})
})
}

type weightedPredicateSpec struct {
name string
weight int
Expand All @@ -319,3 +361,21 @@ func (w weightedPredicateSpec) Create([]interface{}) (routing.Predicate, error)
func (w weightedPredicateSpec) Weight() int {
return w.weight
}

type (
slowCreateSpec struct{}
slowCreateFilter struct{}
)

func (s slowCreateSpec) Name() string { return "slowCreate" }

func (s slowCreateSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
d, _ := time.ParseDuration(args[0].(string))

time.Sleep(d)

return slowCreateFilter{}, nil
}

func (s slowCreateFilter) Request(ctx filters.FilterContext) {}
func (s slowCreateFilter) Response(filters.FilterContext) {}
6 changes: 4 additions & 2 deletions routing/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ func benchmarkPredicateHost(b *testing.B, predicateFmt string) {

ha := host.NewAny()
pr := map[string]routing.PredicateSpec{ha.Name(): ha}
fr := make(filters.Registry)
o := &routing.Options{
FilterRegistry: make(filters.Registry),
}

var routes []*routing.Route
for i := 0; i < R; i++ {
p := strings.ReplaceAll(predicateFmt, "{i}", fmt.Sprintf("%d", i))
def := eskip.MustParse(fmt.Sprintf(`r%d: %s -> <shunt>;`, i, p))

route, err := routing.ExportProcessRouteDef(pr, fr, def[0])
route, err := routing.ExportProcessRouteDef(o, pr, def[0])
if err != nil {
b.Fatal(err)
}
Expand Down
Loading

0 comments on commit deeea50

Please sign in to comment.