From deeea501958d33901666fd8bdd65f0bed62eb34e Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Wed, 19 Jun 2024 13:47:47 +0200 Subject: [PATCH] routing: measure CreateFilter latency (#3115) Add `MeasureFilterCreate` method to `Metrics` interface similar to existing `MeasureFilterRequest` and `MeasureFilterResponse`. The alternative would be to use `MeasureSince` but then filter name would have to be embedded into metric key. Signed-off-by: Alexander Yastrebov --- metrics/all_kind.go | 22 ++++++++++ metrics/codahale.go | 5 +++ metrics/codahale_test.go | 1 + metrics/metrics.go | 1 + metrics/metricstest/metricsmock.go | 8 ++++ metrics/prometheus.go | 15 +++++++ routing/datasource.go | 25 +++++++---- routing/datasource_test.go | 66 ++++++++++++++++++++++++++++-- routing/host_test.go | 6 ++- routing/matcher_test.go | 4 +- 10 files changed, 137 insertions(+), 16 deletions(-) diff --git a/metrics/all_kind.go b/metrics/all_kind.go index 2eb2cebeb8..33a33eae95 100644 --- a/metrics/all_kind.go +++ b/metrics/all_kind.go @@ -23,71 +23,93 @@ func (a *All) MeasureSince(key string, start time.Time) { a.prometheus.MeasureSince(key, start) a.codaHale.MeasureSince(key, start) } + func (a *All) IncCounter(key string) { a.prometheus.IncCounter(key) a.codaHale.IncCounter(key) } + func (a *All) IncCounterBy(key string, value int64) { a.prometheus.IncCounterBy(key, value) a.codaHale.IncCounterBy(key, value) } + func (a *All) IncFloatCounterBy(key string, value float64) { a.prometheus.IncFloatCounterBy(key, value) a.codaHale.IncFloatCounterBy(key, value) } + func (a *All) UpdateGauge(key string, v float64) { a.prometheus.UpdateGauge(key, v) a.codaHale.UpdateGauge(key, v) } + func (a *All) MeasureRouteLookup(start time.Time) { a.prometheus.MeasureRouteLookup(start) a.codaHale.MeasureRouteLookup(start) } + +func (a *All) MeasureFilterCreate(filterName string, start time.Time) { + a.prometheus.MeasureFilterCreate(filterName, start) + a.codaHale.MeasureFilterCreate(filterName, start) +} + func (a *All) MeasureFilterRequest(filterName string, start time.Time) { a.prometheus.MeasureFilterRequest(filterName, start) a.codaHale.MeasureFilterRequest(filterName, start) } + func (a *All) MeasureAllFiltersRequest(routeId string, start time.Time) { a.prometheus.MeasureAllFiltersRequest(routeId, start) a.codaHale.MeasureAllFiltersRequest(routeId, start) } + func (a *All) MeasureBackend(routeId string, start time.Time) { a.prometheus.MeasureBackend(routeId, start) a.codaHale.MeasureBackend(routeId, start) } + func (a *All) MeasureBackendHost(routeBackendHost string, start time.Time) { a.prometheus.MeasureBackendHost(routeBackendHost, start) a.codaHale.MeasureBackendHost(routeBackendHost, start) } + func (a *All) MeasureFilterResponse(filterName string, start time.Time) { a.prometheus.MeasureFilterResponse(filterName, start) a.codaHale.MeasureFilterResponse(filterName, start) } + func (a *All) MeasureAllFiltersResponse(routeId string, start time.Time) { a.prometheus.MeasureAllFiltersResponse(routeId, start) a.codaHale.MeasureAllFiltersResponse(routeId, start) } + func (a *All) MeasureResponse(code int, method string, routeId string, start time.Time) { a.prometheus.MeasureResponse(code, method, routeId, start) a.codaHale.MeasureResponse(code, method, routeId, start) } + func (a *All) MeasureServe(routeId, host, method string, code int, start time.Time) { a.prometheus.MeasureServe(routeId, host, method, code, start) a.codaHale.MeasureServe(routeId, host, method, code, start) } + func (a *All) IncRoutingFailures() { a.prometheus.IncRoutingFailures() a.codaHale.IncRoutingFailures() } + func (a *All) IncErrorsBackend(routeId string) { a.prometheus.IncErrorsBackend(routeId) a.codaHale.IncErrorsBackend(routeId) } + func (a *All) MeasureBackend5xx(t time.Time) { a.prometheus.MeasureBackend5xx(t) a.codaHale.MeasureBackend5xx(t) } + func (a *All) IncErrorsStreaming(routeId string) { a.prometheus.IncErrorsStreaming(routeId) a.codaHale.IncErrorsStreaming(routeId) diff --git a/metrics/codahale.go b/metrics/codahale.go index 0b7fa8241d..cfcc897e22 100644 --- a/metrics/codahale.go +++ b/metrics/codahale.go @@ -14,6 +14,7 @@ import ( const ( KeyRouteLookup = "routelookup" KeyRouteFailure = "routefailure" + KeyFilterCreate = "filter.%s.create" KeyFilterRequest = "filter.%s.request" KeyFiltersRequest = "allfilters.request.%s" KeyAllFiltersRequestCombined = "allfilters.combined.request" @@ -131,6 +132,10 @@ func (c *CodaHale) MeasureRouteLookup(start time.Time) { c.measureSince(KeyRouteLookup, start) } +func (c *CodaHale) MeasureFilterCreate(filterName string, start time.Time) { + c.measureSince(fmt.Sprintf(KeyFilterCreate, filterName), start) +} + func (c *CodaHale) MeasureFilterRequest(filterName string, start time.Time) { c.measureSince(fmt.Sprintf(KeyFilterRequest, filterName), start) } diff --git a/metrics/codahale_test.go b/metrics/codahale_test.go index 7f7fd6e8a3..916b709c99 100644 --- a/metrics/codahale_test.go +++ b/metrics/codahale_test.go @@ -151,6 +151,7 @@ type proxyMetricTest struct { var proxyMetricsTests = []proxyMetricTest{ // T1 - Measure routing {KeyRouteLookup, func(m Metrics) { m.MeasureRouteLookup(time.Now()) }}, + {fmt.Sprintf(KeyFilterCreate, "afilter"), func(m Metrics) { m.MeasureFilterCreate("afilter", time.Now()) }}, // T2 - Measure filter request {fmt.Sprintf(KeyFilterRequest, "foo"), func(m Metrics) { m.MeasureFilterRequest("foo", time.Now()) }}, // T3 - Measure all filters request diff --git a/metrics/metrics.go b/metrics/metrics.go index 616706c352..c6b79c4da6 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -62,6 +62,7 @@ type Metrics interface { IncFloatCounterBy(key string, value float64) // Additional methods MeasureRouteLookup(start time.Time) + MeasureFilterCreate(filterName string, start time.Time) MeasureFilterRequest(filterName string, start time.Time) MeasureAllFiltersRequest(routeId string, start time.Time) MeasureBackend(routeId string, start time.Time) diff --git a/metrics/metricstest/metricsmock.go b/metrics/metricstest/metricsmock.go index a61b59b161..f08183ebbc 100644 --- a/metrics/metricstest/metricsmock.go +++ b/metrics/metricstest/metricsmock.go @@ -1,6 +1,7 @@ package metricstest import ( + "fmt" "net/http" "sync" "time" @@ -112,6 +113,13 @@ func (m *MockMetrics) MeasureRouteLookup(start time.Time) { // implement me } +func (m *MockMetrics) MeasureFilterCreate(filterName string, start time.Time) { + key := fmt.Sprintf("%sfilter.%s.create", m.Prefix, filterName) + m.WithMeasures(func(measures map[string][]time.Duration) { + measures[key] = append(m.measures[key], time.Since(start)) + }) +} + func (m *MockMetrics) MeasureFilterRequest(filterName string, start time.Time) { // implement me } diff --git a/metrics/prometheus.go b/metrics/prometheus.go index e9af8b3c1e..a013d0a259 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -30,6 +30,7 @@ type Prometheus struct { routeLookupM *prometheus.HistogramVec routeErrorsM *prometheus.CounterVec responseM *prometheus.HistogramVec + filterCreateM *prometheus.HistogramVec filterRequestM *prometheus.HistogramVec filterAllRequestM *prometheus.HistogramVec filterAllCombinedRequestM *prometheus.HistogramVec @@ -86,6 +87,14 @@ func NewPrometheus(opts Options) *Prometheus { Buckets: opts.HistogramBuckets, }, []string{"code", "method", "route"}) + filterCreate := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: promFilterSubsystem, + Name: "create_duration_seconds", + Help: "Duration in seconds of filter creation.", + Buckets: opts.HistogramBuckets, + }, []string{"filter"}) + filterRequest := prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: promFilterSubsystem, @@ -229,6 +238,7 @@ func NewPrometheus(opts Options) *Prometheus { routeLookupM: routeLookup, routeErrorsM: routeErrors, responseM: response, + filterCreateM: filterCreate, filterRequestM: filterRequest, filterAllRequestM: filterAllRequest, filterAllCombinedRequestM: filterAllCombinedRequest, @@ -352,6 +362,11 @@ func (p *Prometheus) MeasureRouteLookup(start time.Time) { p.routeLookupM.WithLabelValues().Observe(t) } +func (p *Prometheus) MeasureFilterCreate(filterName string, start time.Time) { + t := p.sinceS(start) + p.filterCreateM.WithLabelValues(filterName).Observe(t) +} + // MeasureFilterRequest satisfies Metrics interface. func (p *Prometheus) MeasureFilterRequest(filterName string, start time.Time) { t := p.sinceS(start) diff --git a/routing/datasource.go b/routing/datasource.go index 11967d7945..4f87c6edb8 100644 --- a/routing/datasource.go +++ b/routing/datasource.go @@ -208,8 +208,8 @@ func splitBackend(r *eskip.Route) (string, string, error) { // creates a filter instance based on its definition and its // specification in the filter registry. -func createFilter(fr filters.Registry, def *eskip.Filter, cpm map[string]PredicateSpec) (filters.Filter, error) { - spec, ok := fr[def.Name] +func createFilter(o *Options, def *eskip.Filter, cpm map[string]PredicateSpec) (filters.Filter, error) { + spec, ok := o.FilterRegistry[def.Name] if !ok { if isTreePredicate(def.Name) || def.Name == predicates.HostName || def.Name == predicates.PathRegexpName || def.Name == predicates.MethodName || def.Name == predicates.HeaderName || def.Name == predicates.HeaderRegexpName { return nil, fmt.Errorf("trying to use %q as filter, but it is only available as predicate", def.Name) @@ -222,7 +222,14 @@ func createFilter(fr filters.Registry, def *eskip.Filter, cpm map[string]Predica return nil, fmt.Errorf("filter %q not found", def.Name) } + start := time.Now() + f, err := spec.CreateFilter(def.Args) + + if o.Metrics != nil { // measure regardless of the error + o.Metrics.MeasureFilterCreate(def.Name, start) + } + if err != nil { return nil, fmt.Errorf("failed to create filter %q: %w", spec.Name(), err) } @@ -231,10 +238,10 @@ func createFilter(fr filters.Registry, def *eskip.Filter, cpm map[string]Predica // creates filter instances based on their definition // and the filter registry. -func createFilters(fr filters.Registry, defs []*eskip.Filter, cpm map[string]PredicateSpec) ([]*RouteFilter, error) { +func createFilters(o *Options, defs []*eskip.Filter, cpm map[string]PredicateSpec) ([]*RouteFilter, error) { fs := make([]*RouteFilter, 0, len(defs)) for i, def := range defs { - f, err := createFilter(fr, def, cpm) + f, err := createFilter(o, def, cpm) if err != nil { return nil, err } @@ -456,13 +463,13 @@ func processTreePredicates(r *Route, predicateList []*eskip.Predicate) error { } // processes a route definition for the routing table -func processRouteDef(cpm map[string]PredicateSpec, fr filters.Registry, def *eskip.Route) (*Route, error) { +func processRouteDef(o *Options, cpm map[string]PredicateSpec, def *eskip.Route) (*Route, error) { scheme, host, err := splitBackend(def) if err != nil { return nil, err } - fs, err := createFilters(fr, def.Filters, cpm) + fs, err := createFilters(o, def.Filters, cpm) if err != nil { return nil, err } @@ -496,10 +503,10 @@ func mapPredicates(cps []PredicateSpec) map[string]PredicateSpec { } // processes a set of route definitions for the routing table -func processRouteDefs(o Options, fr filters.Registry, defs []*eskip.Route) (routes []*Route, invalidDefs []*eskip.Route) { +func processRouteDefs(o *Options, defs []*eskip.Route) (routes []*Route, invalidDefs []*eskip.Route) { cpm := mapPredicates(o.Predicates) for _, def := range defs { - route, err := processRouteDef(cpm, fr, def) + route, err := processRouteDef(o, cpm, def) if err == nil { routes = append(routes, route) } else { @@ -557,7 +564,7 @@ func receiveRouteMatcher(o Options, out chan<- *routeTable, quit <-chan struct{} defs = o.PreProcessors[i].Do(defs) } - routes, invalidRoutes := processRouteDefs(o, o.FilterRegistry, defs) + routes, invalidRoutes := processRouteDefs(&o, defs) for i := range o.PostProcessors { routes = o.PostProcessors[i].Do(routes) diff --git a/routing/datasource_test.go b/routing/datasource_test.go index 57eaacbe93..9e3f39013a 100644 --- a/routing/datasource_test.go +++ b/routing/datasource_test.go @@ -5,10 +5,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/zalando/skipper/filters" "github.com/zalando/skipper/filters/builtin" "github.com/zalando/skipper/logging" "github.com/zalando/skipper/logging/loggingtest" + "github.com/zalando/skipper/metrics/metricstest" "github.com/zalando/skipper/predicates/primitive" "github.com/zalando/skipper/predicates/query" "github.com/zalando/skipper/routing" @@ -56,10 +58,12 @@ func TestNoMultipleTreePredicates(t *testing.T) { } erred := false + o := &routing.Options{ + FilterRegistry: make(filters.Registry), + } pr := make(map[string]routing.PredicateSpec) - fr := make(filters.Registry) for _, d := range defs { - if _, err := routing.ExportProcessRouteDef(pr, fr, d); err != nil { + if _, err := routing.ExportProcessRouteDef(o, pr, d); err != nil { erred = true break } @@ -119,8 +123,11 @@ func TestProcessRouteDefErrors(t *testing.T) { } fr := make(filters.Registry) fr.Register(builtin.NewSetPath()) + o := &routing.Options{ + FilterRegistry: fr, + } for _, d := range defs { - _, err := routing.ExportProcessRouteDef(pr, fr, d) + _, err := routing.ExportProcessRouteDef(o, pr, d) if err == nil || err.Error() != ti.err { t.Errorf("expected error '%s'. Got: '%s'", ti.err, err) } @@ -298,6 +305,41 @@ func TestLogging(t *testing.T) { }) } +func TestMetrics(t *testing.T) { + t.Run("create filter latency", func(t *testing.T) { + client, err := testdataclient.NewDoc(` + r0: * -> slowCreate("100ms") -> slowCreate("200ms") -> slowCreate("100ms") -> ; + `) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + metrics := &metricstest.MockMetrics{ + Now: time.Now(), + } + fr := make(filters.Registry) + fr.Register(slowCreateSpec{}) + + r := routing.New(routing.Options{ + DataClients: []routing.DataClient{client}, + FilterRegistry: fr, + Metrics: metrics, + SignalFirstLoad: true, + }) + defer r.Close() + <-r.FirstLoad() + + metrics.WithMeasures(func(m map[string][]time.Duration) { + assert.InEpsilonSlice(t, []time.Duration{ + 100 * time.Millisecond, + 200 * time.Millisecond, + 100 * time.Millisecond, + }, m["filter.slowCreate.create"], 0.1) + }) + }) +} + type weightedPredicateSpec struct { name string weight int @@ -319,3 +361,21 @@ func (w weightedPredicateSpec) Create([]interface{}) (routing.Predicate, error) func (w weightedPredicateSpec) Weight() int { return w.weight } + +type ( + slowCreateSpec struct{} + slowCreateFilter struct{} +) + +func (s slowCreateSpec) Name() string { return "slowCreate" } + +func (s slowCreateSpec) CreateFilter(args []interface{}) (filters.Filter, error) { + d, _ := time.ParseDuration(args[0].(string)) + + time.Sleep(d) + + return slowCreateFilter{}, nil +} + +func (s slowCreateFilter) Request(ctx filters.FilterContext) {} +func (s slowCreateFilter) Response(filters.FilterContext) {} diff --git a/routing/host_test.go b/routing/host_test.go index 3fc0a835d4..9fc9a5918d 100644 --- a/routing/host_test.go +++ b/routing/host_test.go @@ -51,14 +51,16 @@ func benchmarkPredicateHost(b *testing.B, predicateFmt string) { ha := host.NewAny() pr := map[string]routing.PredicateSpec{ha.Name(): ha} - fr := make(filters.Registry) + o := &routing.Options{ + FilterRegistry: make(filters.Registry), + } var routes []*routing.Route for i := 0; i < R; i++ { p := strings.ReplaceAll(predicateFmt, "{i}", fmt.Sprintf("%d", i)) def := eskip.MustParse(fmt.Sprintf(`r%d: %s -> ;`, i, p)) - route, err := routing.ExportProcessRouteDef(pr, fr, def[0]) + route, err := routing.ExportProcessRouteDef(o, pr, def[0]) if err != nil { b.Fatal(err) } diff --git a/routing/matcher_test.go b/routing/matcher_test.go index 13c0487fc3..c40c33495c 100644 --- a/routing/matcher_test.go +++ b/routing/matcher_test.go @@ -101,7 +101,7 @@ func docToRoutes(doc string) ([]*Route, error) { if err != nil { return nil, err } - routes, _ := processRouteDefs(Options{Predicates: []PredicateSpec{&truePredicate{}}}, nil, defs) + routes, _ := processRouteDefs(&Options{Predicates: []PredicateSpec{&truePredicate{}}}, defs) return routes, nil } @@ -193,7 +193,7 @@ func generateRoutes(paths []string) []*Route { defs[i] = &eskip.Route{Id: fmt.Sprintf("route%d", i), Path: p, Backend: p} } - routes, _ := processRouteDefs(Options{}, nil, defs) + routes, _ := processRouteDefs(&Options{}, defs) return routes }