From f79fb5cda03dd6dc1c5693ad25fdccd52d688a42 Mon Sep 17 00:00:00 2001 From: Christopher Li Date: Fri, 31 Mar 2023 18:49:32 -0700 Subject: [PATCH 1/2] Add query duration bucket in the metrics --- .../querier/queryrange/instrumentation.go | 60 ++++++++++++++++++- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/internal/cortex/querier/queryrange/instrumentation.go b/internal/cortex/querier/queryrange/instrumentation.go index 9cc92c7018..65362d03c0 100644 --- a/internal/cortex/querier/queryrange/instrumentation.go +++ b/internal/cortex/querier/queryrange/instrumentation.go @@ -12,19 +12,24 @@ import ( "github.com/weaveworks/common/instrument" ) +const DAY = 24 * time.Hour +const QUERYDURATIONBUCKET = "query_range_bucket" + // InstrumentMiddleware can be inserted into the middleware chain to expose timing information. func InstrumentMiddleware(name string, metrics *InstrumentMiddlewareMetrics) Middleware { - var durationCol instrument.Collector + var durationCol instrument.Collector // Support the case metrics shouldn't be tracked (ie. unit tests). if metrics != nil { - durationCol = instrument.NewHistogramCollector(metrics.duration) + durationCol = NewDurationHistogramCollector(metrics.duration) } else { durationCol = &NoopCollector{} } return MiddlewareFunc(func(next Handler) Handler { return HandlerFunc(func(ctx context.Context, req Request) (Response, error) { + queryRangeDurationBucket := getRangeBucket(req) + ctx = context.WithValue(ctx, QUERYDURATIONBUCKET, queryRangeDurationBucket) var resp Response err := instrument.CollectedRequest(ctx, name, durationCol, instrument.ErrorCode, func(ctx context.Context) error { var err error @@ -36,6 +41,32 @@ func InstrumentMiddleware(name string, metrics *InstrumentMiddlewareMetrics) Mid }) } +func getRangeBucket(req Request) string { + queryRangeDuration := req.GetEnd() - req.GetStart() + var queryRangeDurationBucket string + switch { + case queryRangeDuration < 0: + return "Invalid" + case queryRangeDuration <= time.Hour.Milliseconds(): + return "1h" + case queryRangeDuration <= 6*time.Hour.Milliseconds(): + return "6h" + case queryRangeDuration <= 12*time.Hour.Milliseconds(): + return "12h" + case queryRangeDuration <= DAY.Milliseconds(): + return "1d" + case queryRangeDuration <= 2*DAY.Milliseconds(): + return "2d" + case queryRangeDuration <= 7*DAY.Milliseconds(): + return "7d" + case queryRangeDuration <= 30*DAY.Milliseconds(): + return "30d" + default: + return "+INF" + } + return queryRangeDurationBucket +} + // InstrumentMiddlewareMetrics holds the metrics tracked by InstrumentMiddleware. type InstrumentMiddlewareMetrics struct { duration *prometheus.HistogramVec @@ -49,7 +80,7 @@ func NewInstrumentMiddlewareMetrics(registerer prometheus.Registerer) *Instrumen Name: "frontend_query_range_duration_seconds", Help: "Total time spent in seconds doing query range requests.", Buckets: prometheus.DefBuckets, - }, []string{"method", "status_code"}), + }, []string{"method", "status_code", QUERYDURATIONBUCKET}), } } @@ -65,3 +96,26 @@ func (c *NoopCollector) Before(ctx context.Context, method string, start time.Ti // After implements instrument.Collector. func (c *NoopCollector) After(ctx context.Context, method, statusCode string, start time.Time) {} + +// DurationHistogramCollector collects the duration of a request +type DurationHistogramCollector struct { + metric *prometheus.HistogramVec +} + +func (c *DurationHistogramCollector) Before(ctx context.Context, method string, start time.Time) { +} + +func (c *DurationHistogramCollector) After(ctx context.Context, method, statusCode string, start time.Time) { + durationBucket, _ := ctx.Value(QUERYDURATIONBUCKET).(string) + + if c.metric != nil { + instrument.ObserveWithExemplar(ctx, c.metric.WithLabelValues(method, statusCode, durationBucket), time.Since(start).Seconds()) + } +} +func (c *DurationHistogramCollector) Register() { + prometheus.MustRegister(c.metric) +} + +func NewDurationHistogramCollector(metric *prometheus.HistogramVec) *DurationHistogramCollector { + return &DurationHistogramCollector{metric} +} From 49366815f53a1f268cb4bed4702883fee937d2b8 Mon Sep 17 00:00:00 2001 From: Christopher Li Date: Mon, 3 Apr 2023 22:55:28 -0700 Subject: [PATCH 2/2] add logger warning and test coverage --- .../querier/queryrange/instrumentation.go | 22 +++++++++----- .../queryrange/instrumentation_test.go | 29 +++++++++++++++++++ .../cortex/querier/queryrange/roundtrip.go | 8 ++--- pkg/queryfrontend/roundtrip.go | 18 ++++++------ 4 files changed, 57 insertions(+), 20 deletions(-) diff --git a/internal/cortex/querier/queryrange/instrumentation.go b/internal/cortex/querier/queryrange/instrumentation.go index bf318cac77..f4d24b0ebb 100644 --- a/internal/cortex/querier/queryrange/instrumentation.go +++ b/internal/cortex/querier/queryrange/instrumentation.go @@ -5,6 +5,8 @@ package queryrange import ( "context" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "time" "github.com/prometheus/client_golang/prometheus" @@ -14,14 +16,15 @@ import ( const DAY = 24 * time.Hour const queryRangeBucket = "query_range_bucket" +const invalidDurationBucket = "Invalid" // InstrumentMiddleware can be inserted into the middleware chain to expose timing information. -func InstrumentMiddleware(name string, metrics *InstrumentMiddlewareMetrics) Middleware { +func InstrumentMiddleware(name string, metrics *InstrumentMiddlewareMetrics, log log.Logger) Middleware { var durationCol instrument.Collector // Support the case metrics shouldn't be tracked (ie. unit tests). if metrics != nil { - durationCol = NewDurationHistogramCollector(metrics.duration) + durationCol = NewDurationHistogramCollector(metrics.duration, log) } else { durationCol = &NoopCollector{} } @@ -45,7 +48,9 @@ func getRangeBucket(req Request) string { queryRangeDuration := req.GetEnd() - req.GetStart() switch { case queryRangeDuration < 0: - return "Invalid" + return invalidDurationBucket + case queryRangeDuration == 0: + return "Instant" case queryRangeDuration <= time.Hour.Milliseconds(): return "1h" case queryRangeDuration <= 6*time.Hour.Milliseconds(): @@ -83,7 +88,7 @@ func NewInstrumentMiddlewareMetrics(registerer prometheus.Registerer) *Instrumen } // NoopCollector is a noop collector that can be used as placeholder when no metric -// should tracked by the instrumentation. +// should be tracked by the instrumentation. type NoopCollector struct{} // Register implements instrument.Collector. @@ -98,6 +103,7 @@ func (c *NoopCollector) After(ctx context.Context, method, statusCode string, st // DurationHistogramCollector collects the duration of a request type DurationHistogramCollector struct { metric *prometheus.HistogramVec + log log.Logger } func (c *DurationHistogramCollector) Register() { @@ -111,13 +117,15 @@ func (c *DurationHistogramCollector) After(ctx context.Context, method, statusCo durationBucket, ok := ctx.Value(queryRangeBucket).(string) if !ok { - durationBucket = "null" + level.Warn(c.log).Log("msg", "failed to get query range bucket for frontend_query_range_duration_seconds metrics", + "method", method, "start_time", start) + durationBucket = invalidDurationBucket } if c.metric != nil { instrument.ObserveWithExemplar(ctx, c.metric.WithLabelValues(method, statusCode, durationBucket), time.Since(start).Seconds()) } } -func NewDurationHistogramCollector(metric *prometheus.HistogramVec) *DurationHistogramCollector { - return &DurationHistogramCollector{metric} +func NewDurationHistogramCollector(metric *prometheus.HistogramVec, log log.Logger) *DurationHistogramCollector { + return &DurationHistogramCollector{metric, log} } diff --git a/internal/cortex/querier/queryrange/instrumentation_test.go b/internal/cortex/querier/queryrange/instrumentation_test.go index 896d7b4ef1..60557cf40d 100644 --- a/internal/cortex/querier/queryrange/instrumentation_test.go +++ b/internal/cortex/querier/queryrange/instrumentation_test.go @@ -1,7 +1,12 @@ package queryrange import ( + "context" + "github.com/go-kit/log" "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" "testing" ) @@ -83,3 +88,27 @@ func TestGetRangeBucket(t *testing.T) { } } } + +func TestInstrumentMiddleware(t *testing.T) { + registry := prometheus.DefaultRegisterer + + metrics := NewInstrumentMiddlewareMetrics(registry) + + logger := log.NewNopLogger() + + middleware := InstrumentMiddleware("step_align", metrics, logger) + + // Create a new dummy Request object with a duration of 6 hours. + req := mockRequest{1, 6 * 60 * 60 * 1000} + + // Create a dummy Handler object that just returns a Response object. + handler := HandlerFunc(func(ctx context.Context, req Request) (Response, error) { + return Response(nil), nil + }) + + _, err := middleware.Wrap(handler).Do(context.Background(), req) + assert.NoError(t, err) + + _, error := testutil.CollectAndLint(metrics.duration, "cortex_frontend_query_range_duration_seconds") + assert.NoError(t, error) +} diff --git a/internal/cortex/querier/queryrange/roundtrip.go b/internal/cortex/querier/queryrange/roundtrip.go index cbc027f58e..38317af5d5 100644 --- a/internal/cortex/querier/queryrange/roundtrip.go +++ b/internal/cortex/querier/queryrange/roundtrip.go @@ -157,11 +157,11 @@ func NewTripperware( queryRangeMiddleware := []Middleware{NewLimitsMiddleware(limits)} if cfg.AlignQueriesWithStep { - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("step_align", metrics), StepAlignMiddleware) + queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("step_align", metrics, log), StepAlignMiddleware) } if cfg.SplitQueriesByInterval != 0 { staticIntervalFn := func(_ Request) time.Duration { return cfg.SplitQueriesByInterval } - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, codec, registerer)) + queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("split_by_interval", metrics, log), SplitByIntervalMiddleware(staticIntervalFn, limits, codec, registerer)) } var c cache.Cache @@ -174,11 +174,11 @@ func NewTripperware( return nil, nil, err } c = cache - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("results_cache", metrics), queryCacheMiddleware) + queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("results_cache", metrics, log), queryCacheMiddleware) } if cfg.MaxRetries > 0 { - queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer))) + queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("retry", metrics, log), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer))) } // Start cleanup. If cleaner stops or fail, we will simply not clean the metrics for inactive users. diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index ef557e0a35..ff28e68582 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -164,7 +164,7 @@ func newQueryRangeTripperware( if config.AlignRangeWithStep { queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("step_align", m), + queryrange.InstrumentMiddleware("step_align", m, logger), queryrange.StepAlignMiddleware, ) } @@ -172,7 +172,7 @@ func newQueryRangeTripperware( if config.RequestDownsampled { queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("downsampled", m), + queryrange.InstrumentMiddleware("downsampled", m, logger), DownsampledMiddleware(codec, reg), ) } @@ -182,7 +182,7 @@ func newQueryRangeTripperware( queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("split_by_interval", m), + queryrange.InstrumentMiddleware("split_by_interval", m, logger), SplitByIntervalMiddleware(queryIntervalFn, limits, codec, reg), ) } @@ -213,7 +213,7 @@ func newQueryRangeTripperware( queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("results_cache", m), + queryrange.InstrumentMiddleware("results_cache", m, logger), queryCacheMiddleware, ) } @@ -221,7 +221,7 @@ func newQueryRangeTripperware( if config.MaxRetries > 0 { queryRangeMiddleware = append( queryRangeMiddleware, - queryrange.InstrumentMiddleware("retry", m), + queryrange.InstrumentMiddleware("retry", m, logger), queryrange.NewRetryMiddleware(logger, config.MaxRetries, queryrange.NewRetryMiddlewareMetrics(reg)), ) } @@ -276,7 +276,7 @@ func newLabelsTripperware( if config.SplitQueriesByInterval != 0 { labelsMiddleware = append( labelsMiddleware, - queryrange.InstrumentMiddleware("split_interval", m), + queryrange.InstrumentMiddleware("split_interval", m, logger), SplitByIntervalMiddleware(queryIntervalFn, limits, codec, reg), ) } @@ -300,7 +300,7 @@ func newLabelsTripperware( labelsMiddleware = append( labelsMiddleware, - queryrange.InstrumentMiddleware("results_cache", m), + queryrange.InstrumentMiddleware("results_cache", m, logger), queryCacheMiddleware, ) } @@ -308,7 +308,7 @@ func newLabelsTripperware( if config.MaxRetries > 0 { labelsMiddleware = append( labelsMiddleware, - queryrange.InstrumentMiddleware("retry", m), + queryrange.InstrumentMiddleware("retry", m, logger), queryrange.NewRetryMiddleware(logger, config.MaxRetries, queryrange.NewRetryMiddlewareMetrics(reg)), ) } @@ -333,7 +333,7 @@ func newInstantQueryTripperware( analyzer := querysharding.NewQueryAnalyzer() instantQueryMiddlewares = append( instantQueryMiddlewares, - queryrange.InstrumentMiddleware("sharding", m), + queryrange.InstrumentMiddleware("sharding", m, log.NewNopLogger()), PromQLShardingMiddleware(analyzer, numShards, limits, codec, reg), ) }