Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

routing: measure CreateFilter latency #3115

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions metrics/all_kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,71 +23,93 @@ func (a *All) MeasureSince(key string, start time.Time) {
a.prometheus.MeasureSince(key, start)
a.codaHale.MeasureSince(key, start)
}

func (a *All) IncCounter(key string) {
a.prometheus.IncCounter(key)
a.codaHale.IncCounter(key)
}

func (a *All) IncCounterBy(key string, value int64) {
a.prometheus.IncCounterBy(key, value)
a.codaHale.IncCounterBy(key, value)
}

func (a *All) IncFloatCounterBy(key string, value float64) {
a.prometheus.IncFloatCounterBy(key, value)
a.codaHale.IncFloatCounterBy(key, value)
}

func (a *All) UpdateGauge(key string, v float64) {
a.prometheus.UpdateGauge(key, v)
a.codaHale.UpdateGauge(key, v)
}

func (a *All) MeasureRouteLookup(start time.Time) {
a.prometheus.MeasureRouteLookup(start)
a.codaHale.MeasureRouteLookup(start)
}

func (a *All) MeasureFilterCreate(filterName string, start time.Time) {
a.prometheus.MeasureFilterCreate(filterName, start)
a.codaHale.MeasureFilterCreate(filterName, start)
}

func (a *All) MeasureFilterRequest(filterName string, start time.Time) {
a.prometheus.MeasureFilterRequest(filterName, start)
a.codaHale.MeasureFilterRequest(filterName, start)
}

func (a *All) MeasureAllFiltersRequest(routeId string, start time.Time) {
a.prometheus.MeasureAllFiltersRequest(routeId, start)
a.codaHale.MeasureAllFiltersRequest(routeId, start)
}

func (a *All) MeasureBackend(routeId string, start time.Time) {
a.prometheus.MeasureBackend(routeId, start)
a.codaHale.MeasureBackend(routeId, start)
}

func (a *All) MeasureBackendHost(routeBackendHost string, start time.Time) {
a.prometheus.MeasureBackendHost(routeBackendHost, start)
a.codaHale.MeasureBackendHost(routeBackendHost, start)
}

func (a *All) MeasureFilterResponse(filterName string, start time.Time) {
a.prometheus.MeasureFilterResponse(filterName, start)
a.codaHale.MeasureFilterResponse(filterName, start)
}

func (a *All) MeasureAllFiltersResponse(routeId string, start time.Time) {
a.prometheus.MeasureAllFiltersResponse(routeId, start)
a.codaHale.MeasureAllFiltersResponse(routeId, start)
}

func (a *All) MeasureResponse(code int, method string, routeId string, start time.Time) {
a.prometheus.MeasureResponse(code, method, routeId, start)
a.codaHale.MeasureResponse(code, method, routeId, start)
}

func (a *All) MeasureServe(routeId, host, method string, code int, start time.Time) {
a.prometheus.MeasureServe(routeId, host, method, code, start)
a.codaHale.MeasureServe(routeId, host, method, code, start)
}

func (a *All) IncRoutingFailures() {
a.prometheus.IncRoutingFailures()
a.codaHale.IncRoutingFailures()
}

func (a *All) IncErrorsBackend(routeId string) {
a.prometheus.IncErrorsBackend(routeId)
a.codaHale.IncErrorsBackend(routeId)
}

func (a *All) MeasureBackend5xx(t time.Time) {
a.prometheus.MeasureBackend5xx(t)
a.codaHale.MeasureBackend5xx(t)

}

func (a *All) IncErrorsStreaming(routeId string) {
a.prometheus.IncErrorsStreaming(routeId)
a.codaHale.IncErrorsStreaming(routeId)
Expand Down
5 changes: 5 additions & 0 deletions metrics/codahale.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
const (
KeyRouteLookup = "routelookup"
KeyRouteFailure = "routefailure"
KeyFilterCreate = "filter.%s.create"
KeyFilterRequest = "filter.%s.request"
KeyFiltersRequest = "allfilters.request.%s"
KeyAllFiltersRequestCombined = "allfilters.combined.request"
Expand Down Expand Up @@ -131,6 +132,10 @@ func (c *CodaHale) MeasureRouteLookup(start time.Time) {
c.measureSince(KeyRouteLookup, start)
}

func (c *CodaHale) MeasureFilterCreate(filterName string, start time.Time) {
c.measureSince(fmt.Sprintf(KeyFilterCreate, filterName), start)
}

func (c *CodaHale) MeasureFilterRequest(filterName string, start time.Time) {
c.measureSince(fmt.Sprintf(KeyFilterRequest, filterName), start)
}
Expand Down
1 change: 1 addition & 0 deletions metrics/codahale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type proxyMetricTest struct {
var proxyMetricsTests = []proxyMetricTest{
// T1 - Measure routing
{KeyRouteLookup, func(m Metrics) { m.MeasureRouteLookup(time.Now()) }},
{fmt.Sprintf(KeyFilterCreate, "afilter"), func(m Metrics) { m.MeasureFilterCreate("afilter", time.Now()) }},
// T2 - Measure filter request
{fmt.Sprintf(KeyFilterRequest, "foo"), func(m Metrics) { m.MeasureFilterRequest("foo", time.Now()) }},
// T3 - Measure all filters request
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Metrics interface {
IncFloatCounterBy(key string, value float64)
// Additional methods
MeasureRouteLookup(start time.Time)
MeasureFilterCreate(filterName string, start time.Time)
MeasureFilterRequest(filterName string, start time.Time)
MeasureAllFiltersRequest(routeId string, start time.Time)
MeasureBackend(routeId string, start time.Time)
Expand Down
8 changes: 8 additions & 0 deletions metrics/metricstest/metricsmock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metricstest

import (
"fmt"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -112,6 +113,13 @@ func (m *MockMetrics) MeasureRouteLookup(start time.Time) {
// implement me
}

func (m *MockMetrics) MeasureFilterCreate(filterName string, start time.Time) {
key := fmt.Sprintf("%sfilter.%s.create", m.Prefix, filterName)
m.WithMeasures(func(measures map[string][]time.Duration) {
measures[key] = append(m.measures[key], time.Since(start))
})
}

func (m *MockMetrics) MeasureFilterRequest(filterName string, start time.Time) {
// implement me
}
Expand Down
15 changes: 15 additions & 0 deletions metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Prometheus struct {
routeLookupM *prometheus.HistogramVec
routeErrorsM *prometheus.CounterVec
responseM *prometheus.HistogramVec
filterCreateM *prometheus.HistogramVec
filterRequestM *prometheus.HistogramVec
filterAllRequestM *prometheus.HistogramVec
filterAllCombinedRequestM *prometheus.HistogramVec
Expand Down Expand Up @@ -86,6 +87,14 @@ func NewPrometheus(opts Options) *Prometheus {
Buckets: opts.HistogramBuckets,
}, []string{"code", "method", "route"})

filterCreate := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: promFilterSubsystem,
Name: "create_duration_seconds",
Help: "Duration in seconds of filter creation.",
Buckets: opts.HistogramBuckets,
}, []string{"filter"})

filterRequest := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: promFilterSubsystem,
Expand Down Expand Up @@ -229,6 +238,7 @@ func NewPrometheus(opts Options) *Prometheus {
routeLookupM: routeLookup,
routeErrorsM: routeErrors,
responseM: response,
filterCreateM: filterCreate,
filterRequestM: filterRequest,
filterAllRequestM: filterAllRequest,
filterAllCombinedRequestM: filterAllCombinedRequest,
Expand Down Expand Up @@ -352,6 +362,11 @@ func (p *Prometheus) MeasureRouteLookup(start time.Time) {
p.routeLookupM.WithLabelValues().Observe(t)
}

func (p *Prometheus) MeasureFilterCreate(filterName string, start time.Time) {
t := p.sinceS(start)
p.filterCreateM.WithLabelValues(filterName).Observe(t)
}

// MeasureFilterRequest satisfies Metrics interface.
func (p *Prometheus) MeasureFilterRequest(filterName string, start time.Time) {
t := p.sinceS(start)
Expand Down
25 changes: 16 additions & 9 deletions routing/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ func splitBackend(r *eskip.Route) (string, string, error) {

// creates a filter instance based on its definition and its
// specification in the filter registry.
func createFilter(fr filters.Registry, def *eskip.Filter, cpm map[string]PredicateSpec) (filters.Filter, error) {
spec, ok := fr[def.Name]
func createFilter(o *Options, def *eskip.Filter, cpm map[string]PredicateSpec) (filters.Filter, error) {
spec, ok := o.FilterRegistry[def.Name]
if !ok {
if isTreePredicate(def.Name) || def.Name == predicates.HostName || def.Name == predicates.PathRegexpName || def.Name == predicates.MethodName || def.Name == predicates.HeaderName || def.Name == predicates.HeaderRegexpName {
return nil, fmt.Errorf("trying to use %q as filter, but it is only available as predicate", def.Name)
Expand All @@ -222,7 +222,14 @@ func createFilter(fr filters.Registry, def *eskip.Filter, cpm map[string]Predica
return nil, fmt.Errorf("filter %q not found", def.Name)
}

start := time.Now()

f, err := spec.CreateFilter(def.Args)

if o.Metrics != nil { // measure regardless of the error
o.Metrics.MeasureFilterCreate(def.Name, start)
}

if err != nil {
return nil, fmt.Errorf("failed to create filter %q: %w", spec.Name(), err)
}
Expand All @@ -231,10 +238,10 @@ func createFilter(fr filters.Registry, def *eskip.Filter, cpm map[string]Predica

// creates filter instances based on their definition
// and the filter registry.
func createFilters(fr filters.Registry, defs []*eskip.Filter, cpm map[string]PredicateSpec) ([]*RouteFilter, error) {
func createFilters(o *Options, defs []*eskip.Filter, cpm map[string]PredicateSpec) ([]*RouteFilter, error) {
fs := make([]*RouteFilter, 0, len(defs))
for i, def := range defs {
f, err := createFilter(fr, def, cpm)
f, err := createFilter(o, def, cpm)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -456,13 +463,13 @@ func processTreePredicates(r *Route, predicateList []*eskip.Predicate) error {
}

// processes a route definition for the routing table
func processRouteDef(cpm map[string]PredicateSpec, fr filters.Registry, def *eskip.Route) (*Route, error) {
func processRouteDef(o *Options, cpm map[string]PredicateSpec, def *eskip.Route) (*Route, error) {
scheme, host, err := splitBackend(def)
if err != nil {
return nil, err
}

fs, err := createFilters(fr, def.Filters, cpm)
fs, err := createFilters(o, def.Filters, cpm)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -496,10 +503,10 @@ func mapPredicates(cps []PredicateSpec) map[string]PredicateSpec {
}

// processes a set of route definitions for the routing table
func processRouteDefs(o Options, fr filters.Registry, defs []*eskip.Route) (routes []*Route, invalidDefs []*eskip.Route) {
func processRouteDefs(o *Options, defs []*eskip.Route) (routes []*Route, invalidDefs []*eskip.Route) {
cpm := mapPredicates(o.Predicates)
for _, def := range defs {
route, err := processRouteDef(cpm, fr, def)
route, err := processRouteDef(o, cpm, def)
if err == nil {
routes = append(routes, route)
} else {
Expand Down Expand Up @@ -557,7 +564,7 @@ func receiveRouteMatcher(o Options, out chan<- *routeTable, quit <-chan struct{}
defs = o.PreProcessors[i].Do(defs)
}

routes, invalidRoutes := processRouteDefs(o, o.FilterRegistry, defs)
routes, invalidRoutes := processRouteDefs(&o, defs)

for i := range o.PostProcessors {
routes = o.PostProcessors[i].Do(routes)
Expand Down
66 changes: 63 additions & 3 deletions routing/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/builtin"
"github.com/zalando/skipper/logging"
"github.com/zalando/skipper/logging/loggingtest"
"github.com/zalando/skipper/metrics/metricstest"
"github.com/zalando/skipper/predicates/primitive"
"github.com/zalando/skipper/predicates/query"
"github.com/zalando/skipper/routing"
Expand Down Expand Up @@ -56,10 +58,12 @@ func TestNoMultipleTreePredicates(t *testing.T) {
}

erred := false
o := &routing.Options{
FilterRegistry: make(filters.Registry),
}
pr := make(map[string]routing.PredicateSpec)
fr := make(filters.Registry)
for _, d := range defs {
if _, err := routing.ExportProcessRouteDef(pr, fr, d); err != nil {
if _, err := routing.ExportProcessRouteDef(o, pr, d); err != nil {
erred = true
break
}
Expand Down Expand Up @@ -119,8 +123,11 @@ func TestProcessRouteDefErrors(t *testing.T) {
}
fr := make(filters.Registry)
fr.Register(builtin.NewSetPath())
o := &routing.Options{
FilterRegistry: fr,
}
for _, d := range defs {
_, err := routing.ExportProcessRouteDef(pr, fr, d)
_, err := routing.ExportProcessRouteDef(o, pr, d)
if err == nil || err.Error() != ti.err {
t.Errorf("expected error '%s'. Got: '%s'", ti.err, err)
}
Expand Down Expand Up @@ -298,6 +305,41 @@ func TestLogging(t *testing.T) {
})
}

func TestMetrics(t *testing.T) {
t.Run("create filter latency", func(t *testing.T) {
client, err := testdataclient.NewDoc(`
r0: * -> slowCreate("100ms") -> slowCreate("200ms") -> slowCreate("100ms") -> <shunt>;
`)
if err != nil {
t.Fatal(err)
}
defer client.Close()

metrics := &metricstest.MockMetrics{
Now: time.Now(),
}
fr := make(filters.Registry)
fr.Register(slowCreateSpec{})

r := routing.New(routing.Options{
DataClients: []routing.DataClient{client},
FilterRegistry: fr,
Metrics: metrics,
SignalFirstLoad: true,
})
defer r.Close()
<-r.FirstLoad()

metrics.WithMeasures(func(m map[string][]time.Duration) {
assert.InEpsilonSlice(t, []time.Duration{
100 * time.Millisecond,
200 * time.Millisecond,
100 * time.Millisecond,
}, m["filter.slowCreate.create"], 0.1)
})
})
}

type weightedPredicateSpec struct {
name string
weight int
Expand All @@ -319,3 +361,21 @@ func (w weightedPredicateSpec) Create([]interface{}) (routing.Predicate, error)
func (w weightedPredicateSpec) Weight() int {
return w.weight
}

type (
slowCreateSpec struct{}
slowCreateFilter struct{}
)

func (s slowCreateSpec) Name() string { return "slowCreate" }

func (s slowCreateSpec) CreateFilter(args []interface{}) (filters.Filter, error) {
d, _ := time.ParseDuration(args[0].(string))

time.Sleep(d)

return slowCreateFilter{}, nil
}

func (s slowCreateFilter) Request(ctx filters.FilterContext) {}
func (s slowCreateFilter) Response(filters.FilterContext) {}
6 changes: 4 additions & 2 deletions routing/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ func benchmarkPredicateHost(b *testing.B, predicateFmt string) {

ha := host.NewAny()
pr := map[string]routing.PredicateSpec{ha.Name(): ha}
fr := make(filters.Registry)
o := &routing.Options{
FilterRegistry: make(filters.Registry),
}

var routes []*routing.Route
for i := 0; i < R; i++ {
p := strings.ReplaceAll(predicateFmt, "{i}", fmt.Sprintf("%d", i))
def := eskip.MustParse(fmt.Sprintf(`r%d: %s -> <shunt>;`, i, p))

route, err := routing.ExportProcessRouteDef(pr, fr, def[0])
route, err := routing.ExportProcessRouteDef(o, pr, def[0])
if err != nil {
b.Fatal(err)
}
Expand Down
Loading
Loading