diff --git a/CHANGELOG.md b/CHANGELOG.md index d90ec93d1..e9ade6b5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Changelog +* [CHANGE] Add new metric `slow_request_server_throughput` to track the throughput of slow queries. #619 * [CHANGE] Log middleware updated to honor `logRequestHeaders` in all logging scenarios. #615 * [CHANGE] Roll back the gRPC dependency to v1.65.0 to allow downstream projects to avoid a performance regression and maybe a bug in v1.66.0. #581 * [CHANGE] Update the gRPC dependency to v1.66.0 and deprecate the `grpc_server_recv_buffer_pools_enabled` option that is no longer supported by it. #580 diff --git a/middleware/instrument.go b/middleware/instrument.go index 9813077ce..8497c4b74 100644 --- a/middleware/instrument.go +++ b/middleware/instrument.go @@ -6,10 +6,12 @@ package middleware import ( "context" + "fmt" "io" "net/http" "strconv" "strings" + "time" "github.com/felixge/httpsnoop" "github.com/gorilla/mux" @@ -50,6 +52,9 @@ type Instrument struct { RequestBodySize *prometheus.HistogramVec ResponseBodySize *prometheus.HistogramVec InflightRequests *prometheus.GaugeVec + RequestCutoff time.Duration + ThroughputUnit string + RequestThroughput *prometheus.HistogramVec } // IsWSHandshakeRequest returns true if the given request is a websocket handshake request. @@ -105,9 +110,31 @@ func (i Instrument) Wrap(next http.Handler) http.Handler { labelValues = append(labelValues, tenantID) instrument.ObserveWithExemplar(r.Context(), i.PerTenantDuration.WithLabelValues(labelValues...), respMetrics.Duration.Seconds()) } + if i.RequestCutoff > 0 && respMetrics.Duration > i.RequestCutoff { + volume, err := extractValueFromMultiValueHeader(w.Header().Get("Server-Timing"), i.ThroughputUnit) + if err == nil { + instrument.ObserveWithExemplar(r.Context(), i.RequestThroughput.WithLabelValues(r.Method, route), float64(volume)/respMetrics.Duration.Seconds()) + } + } }) } +// Extracts a single value from a multi-value header, e.g. "throughput=500, duration=1000" +func extractValueFromMultiValueHeader(h, key string) (int64, error) { + parts := strings.Split(h, ", ") + if len(parts) == 0 { + return 0, fmt.Errorf("no a multi-value header") + } + value := int64(0) + for _, part := range parts { + if strings.HasPrefix(part, key) { + _, err := fmt.Sscanf(part, key+"=%d", &value) + return value, err + } + } + return 0, fmt.Errorf("desired key not found in header") +} + // Return a name identifier for ths request. There are three options: // 1. The request matches a gorilla mux route, with a name. Use that. // 2. The request matches an unamed gorilla mux router. Munge the path diff --git a/middleware/instrument_test.go b/middleware/instrument_test.go new file mode 100644 index 000000000..7581524db --- /dev/null +++ b/middleware/instrument_test.go @@ -0,0 +1,148 @@ +package middleware + +import ( + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + + "github.com/grafana/dskit/instrument" +) + +func TestThroughputMetricHistogram(t *testing.T) { + tests := []struct { + name string + sleep bool + header string + observed bool + }{ + {"WithSleep", true, "unit=0, other_unit=2", true}, + {"WithoutSleep", false, "unit=0, other_unit=2", false}, + {"WithoutSleep", true, "", false}, + {"WithoutSleep", false, "", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + reg := prometheus.NewPedanticRegistry() + i := NewInstrument(reg) + + wrap := i.Wrap(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + if tt.sleep { + time.Sleep(i.RequestCutoff) + } + w.Header().Set("Server-Timing", tt.header) + })) + + req := httptest.NewRequest("GET", "/", nil) + res := httptest.NewRecorder() + + wrap.ServeHTTP(res, req) + + output := `` + if tt.observed { + output = ` + # HELP request_throughput_unit Server throughput running requests. + # TYPE request_throughput_unit histogram + request_throughput_unit_bucket{cutoff_ms="100",method="GET",route="other",le="1"} 1 + request_throughput_unit_bucket{cutoff_ms="100",method="GET",route="other",le="5"} 1 + request_throughput_unit_bucket{cutoff_ms="100",method="GET",route="other",le="10"} 1 + request_throughput_unit_bucket{cutoff_ms="100",method="GET",route="other",le="+Inf"} 1 + request_throughput_unit_sum{cutoff_ms="100",method="GET",route="other"} 0 + request_throughput_unit_count{cutoff_ms="100",method="GET",route="other"} 1 + ` + } + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(output), "slow_request_throughput_"+i.ThroughputUnit)) + }) + } +} + +func NewInstrument(registry *prometheus.Registry) Instrument { + reg := promauto.With(registry) + + const metricsNativeHistogramFactor = 1.1 + const throughputUnit = "unit" + const SlowRequestCutoff = 100 * time.Millisecond + + return Instrument{ + Duration: reg.NewHistogramVec(prometheus.HistogramOpts{ + Name: "request_duration_seconds", + Help: "Time (in seconds) spent serving HTTP requests.", + Buckets: instrument.DefBuckets, + NativeHistogramBucketFactor: metricsNativeHistogramFactor, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: time.Hour, + }, []string{"method", "route", "status_code", "ws"}), + PerTenantDuration: reg.NewHistogramVec(prometheus.HistogramOpts{ + Name: "per_tenant_request_duration_seconds", + Help: "Time (in seconds) spent serving HTTP requests for a particular tenant.", + Buckets: instrument.DefBuckets, + NativeHistogramBucketFactor: metricsNativeHistogramFactor, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: time.Hour, + }, []string{"method", "route", "status_code", "ws", "tenant"}), + RequestBodySize: reg.NewHistogramVec(prometheus.HistogramOpts{ + Name: "request_message_bytes", + Help: "Size (in bytes) of messages received in the request.", + Buckets: BodySizeBuckets, + }, []string{"method", "route"}), + ResponseBodySize: reg.NewHistogramVec(prometheus.HistogramOpts{ + Name: "response_message_bytes", + Help: "Size (in bytes) of messages sent in response.", + Buckets: BodySizeBuckets, + }, []string{"method", "route"}), + InflightRequests: reg.NewGaugeVec(prometheus.GaugeOpts{ + Name: "inflight_requests", + Help: "Current number of inflight requests.", + }, []string{"method", "route"}), + RequestCutoff: SlowRequestCutoff, + ThroughputUnit: throughputUnit, + RequestThroughput: reg.NewHistogramVec(prometheus.HistogramOpts{ + Name: "request_throughput_" + throughputUnit, + Help: "Server throughput running requests.", + ConstLabels: prometheus.Labels{"cutoff_ms": strconv.FormatInt(SlowRequestCutoff.Milliseconds(), 10)}, + Buckets: []float64{1, 5, 10}, + NativeHistogramBucketFactor: metricsNativeHistogramFactor, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: time.Hour, + }, []string{"method", "route"}), + } +} + +func TestExtractValueFromMultiValueHeader(t *testing.T) { + tests := []struct { + header string + key string + expected int64 + err bool + }{ + {"key0=0, key1=1", "key0", 0, false}, + {"key0=0, key1=1", "key1", 1, false}, + {"key0=0, key1=1", "key2", 0, true}, + {"key0=1.0, key1=1", "key0", 1, false}, + {"foo", "foo", 0, true}, + {"foo=bar", "foo", 0, true}, + {"", "foo", 0, true}, + } + + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + value, err := extractValueFromMultiValueHeader(tt.header, tt.key) + if (err != nil) != tt.err { + t.Errorf("expected error: %v, got: %v", tt.err, err) + } + if value != tt.expected { + t.Errorf("expected value: %d, got: %d", tt.expected, value) + } + }) + } +} diff --git a/server/metrics.go b/server/metrics.go index d6011525d..0907cb6db 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -5,6 +5,7 @@ package server import ( + "strconv" "time" "github.com/prometheus/client_golang/prometheus" @@ -22,6 +23,7 @@ type Metrics struct { ReceivedMessageSize *prometheus.HistogramVec SentMessageSize *prometheus.HistogramVec InflightRequests *prometheus.GaugeVec + RequestThroughput *prometheus.HistogramVec } func NewServerMetrics(cfg Config) *Metrics { @@ -73,5 +75,15 @@ func NewServerMetrics(cfg Config) *Metrics { Name: "inflight_requests", Help: "Current number of inflight requests.", }, []string{"method", "route"}), + RequestThroughput: reg.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: cfg.MetricsNamespace, + Name: "slow_request_throughput_" + cfg.Throughput.Unit, + Help: "Server throughput of long running requests.", + ConstLabels: prometheus.Labels{"cutoff_ms": strconv.FormatInt(cfg.Throughput.RequestCutoff.Milliseconds(), 10)}, + Buckets: instrument.DefBuckets, + NativeHistogramBucketFactor: cfg.MetricsNativeHistogramFactor, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: time.Hour, + }, []string{"method", "route"}), } } diff --git a/server/server.go b/server/server.go index 7b8e7593d..ff412ce7e 100644 --- a/server/server.go +++ b/server/server.go @@ -153,6 +153,13 @@ type Config struct { // This limiter is called for every started and finished gRPC request. GrpcMethodLimiter GrpcInflightMethodLimiter `yaml:"-"` + + Throughput Throughput `yaml:"throughput"` +} + +type Throughput struct { + RequestCutoff time.Duration `yaml:"request_cutoff"` + Unit string `yaml:"unit"` } var infinty = time.Duration(math.MaxInt64) @@ -209,6 +216,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.LogRequestExcludeHeadersList, "server.log-request-headers-exclude-list", "", "Comma separated list of headers to exclude from loggin. Only used if server.log-request-headers is true.") f.BoolVar(&cfg.LogRequestAtInfoLevel, "server.log-request-at-info-level-enabled", false, "Optionally log requests at info level instead of debug level. Applies to request headers as well if server.log-request-headers is enabled.") f.BoolVar(&cfg.ProxyProtocolEnabled, "server.proxy-protocol-enabled", false, "Enables PROXY protocol.") + f.DurationVar(&cfg.Throughput.RequestCutoff, "server.throughput.request-cutoff", 0, "Duration after which a request will be observed. For requests taking longer than this duration to finish, the throughput will be calculated. If set to 0, the throughput will not be calculated.") + f.StringVar(&cfg.Throughput.Unit, "server.throughput.unit", "total_samples", "Unit of the server throughput metric, for example 'processed_bytes' or 'total_samples'. Observed values will be gathered from Server-Timing header with defined key. If set, it is appended to the request_server_throughput metric name.") } func (cfg *Config) registererOrDefault() prometheus.Registerer { @@ -527,6 +536,9 @@ func BuildHTTPMiddleware(cfg Config, router *mux.Router, metrics *Metrics, logge RequestBodySize: metrics.ReceivedMessageSize, ResponseBodySize: metrics.SentMessageSize, InflightRequests: metrics.InflightRequests, + RequestCutoff: cfg.Throughput.RequestCutoff, + ThroughputUnit: cfg.Throughput.Unit, + RequestThroughput: metrics.RequestThroughput, }, } var httpMiddleware []middleware.Interface