From 5f5f39623710d3edc3dd513459183ae988b13daa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Wed, 10 Apr 2024 14:25:28 +0200 Subject: [PATCH 01/10] feat: Support zstd encoding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This allows endpoints to respond with zstd compressed metric data, if the requester supports it. I have imported a content-encoding parser from https://github.com/golang/gddo which is an archived repository to support different content-encoding headers. Signed-off-by: Manuel Rüger --- go.mod | 2 + go.sum | 3 + internal/github.com/golang/gddo/LICENSE | 27 ++++ internal/github.com/golang/gddo/README.md | 1 + .../golang/gddo/httputil/header/header.go | 145 ++++++++++++++++++ .../gddo/httputil/header/header_test.go | 49 ++++++ .../golang/gddo/httputil/negotiate.go | 36 +++++ .../golang/gddo/httputil/negotiate_test.go | 40 +++++ prometheus/promhttp/http.go | 62 +++++--- prometheus/promhttp/http_test.go | 92 +++++++++++ 10 files changed, 436 insertions(+), 21 deletions(-) create mode 100644 internal/github.com/golang/gddo/LICENSE create mode 100644 internal/github.com/golang/gddo/README.md create mode 100644 internal/github.com/golang/gddo/httputil/header/header.go create mode 100644 internal/github.com/golang/gddo/httputil/header/header_test.go create mode 100644 internal/github.com/golang/gddo/httputil/negotiate.go create mode 100644 internal/github.com/golang/gddo/httputil/negotiate_test.go diff --git a/go.mod b/go.mod index 0e3889f03..bbb9531ce 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,9 @@ go 1.20 require ( github.com/beorn7/perks v1.0.1 github.com/cespare/xxhash/v2 v2.3.0 + github.com/google/go-cmp v0.6.0 github.com/json-iterator/go v1.1.12 + github.com/klauspost/compress v1.17.8 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.53.0 github.com/prometheus/procfs v0.15.1 diff --git a/go.sum b/go.sum index de5929d66..61a208be3 100644 --- a/go.sum +++ b/go.sum @@ -12,11 +12,14 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= diff --git a/internal/github.com/golang/gddo/LICENSE b/internal/github.com/golang/gddo/LICENSE new file mode 100644 index 000000000..65d761bc9 --- /dev/null +++ b/internal/github.com/golang/gddo/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2013 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/internal/github.com/golang/gddo/README.md b/internal/github.com/golang/gddo/README.md new file mode 100644 index 000000000..69af39a33 --- /dev/null +++ b/internal/github.com/golang/gddo/README.md @@ -0,0 +1 @@ +This source code is a stripped down version from the archived repository https://github.com/golang/gddo and licensed under BSD. diff --git a/internal/github.com/golang/gddo/httputil/header/header.go b/internal/github.com/golang/gddo/httputil/header/header.go new file mode 100644 index 000000000..8547c8dfd --- /dev/null +++ b/internal/github.com/golang/gddo/httputil/header/header.go @@ -0,0 +1,145 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd. + +// Package header provides functions for parsing HTTP headers. +package header + +import ( + "net/http" + "strings" +) + +// Octet types from RFC 2616. +var octetTypes [256]octetType + +type octetType byte + +const ( + isToken octetType = 1 << iota + isSpace +) + +func init() { + // OCTET = + // CHAR = + // CTL = + // CR = + // LF = + // SP = + // HT = + // <"> = + // CRLF = CR LF + // LWS = [CRLF] 1*( SP | HT ) + // TEXT = + // separators = "(" | ")" | "<" | ">" | "@" | "," | ";" | ":" | "\" | <"> + // | "/" | "[" | "]" | "?" | "=" | "{" | "}" | SP | HT + // token = 1* + // qdtext = > + + for c := 0; c < 256; c++ { + var t octetType + isCtl := c <= 31 || c == 127 + isChar := 0 <= c && c <= 127 + isSeparator := strings.ContainsRune(" \t\"(),/:;<=>?@[]\\{}", rune(c)) + if strings.ContainsRune(" \t\r\n", rune(c)) { + t |= isSpace + } + if isChar && !isCtl && !isSeparator { + t |= isToken + } + octetTypes[c] = t + } +} + +// AcceptSpec describes an Accept* header. +type AcceptSpec struct { + Value string + Q float64 +} + +// ParseAccept parses Accept* headers. +func ParseAccept(header http.Header, key string) (specs []AcceptSpec) { +loop: + for _, s := range header[key] { + for { + var spec AcceptSpec + spec.Value, s = expectTokenSlash(s) + if spec.Value == "" { + continue loop + } + spec.Q = 1.0 + s = skipSpace(s) + if strings.HasPrefix(s, ";") { + s = skipSpace(s[1:]) + if !strings.HasPrefix(s, "q=") { + continue loop + } + spec.Q, s = expectQuality(s[2:]) + if spec.Q < 0.0 { + continue loop + } + } + specs = append(specs, spec) + s = skipSpace(s) + if !strings.HasPrefix(s, ",") { + continue loop + } + s = skipSpace(s[1:]) + } + } + return +} + +func skipSpace(s string) (rest string) { + i := 0 + for ; i < len(s); i++ { + if octetTypes[s[i]]&isSpace == 0 { + break + } + } + return s[i:] +} + +func expectTokenSlash(s string) (token, rest string) { + i := 0 + for ; i < len(s); i++ { + b := s[i] + if (octetTypes[b]&isToken == 0) && b != '/' { + break + } + } + return s[:i], s[i:] +} + +func expectQuality(s string) (q float64, rest string) { + switch { + case len(s) == 0: + return -1, "" + case s[0] == '0': + q = 0 + case s[0] == '1': + q = 1 + default: + return -1, "" + } + s = s[1:] + if !strings.HasPrefix(s, ".") { + return q, s + } + s = s[1:] + i := 0 + n := 0 + d := 1 + for ; i < len(s); i++ { + b := s[i] + if b < '0' || b > '9' { + break + } + n = n*10 + int(b) - '0' + d *= 10 + } + return q + float64(n)/float64(d), s[i:] +} diff --git a/internal/github.com/golang/gddo/httputil/header/header_test.go b/internal/github.com/golang/gddo/httputil/header/header_test.go new file mode 100644 index 000000000..e26eb6c30 --- /dev/null +++ b/internal/github.com/golang/gddo/httputil/header/header_test.go @@ -0,0 +1,49 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd. + +package header + +import ( + "net/http" + "testing" + + "github.com/google/go-cmp/cmp" +) + +var parseAcceptTests = []struct { + s string + expected []AcceptSpec +}{ + {"text/html", []AcceptSpec{{"text/html", 1}}}, + {"text/html; q=0", []AcceptSpec{{"text/html", 0}}}, + {"text/html; q=0.0", []AcceptSpec{{"text/html", 0}}}, + {"text/html; q=1", []AcceptSpec{{"text/html", 1}}}, + {"text/html; q=1.0", []AcceptSpec{{"text/html", 1}}}, + {"text/html; q=0.1", []AcceptSpec{{"text/html", 0.1}}}, + {"text/html;q=0.1", []AcceptSpec{{"text/html", 0.1}}}, + {"text/html, text/plain", []AcceptSpec{{"text/html", 1}, {"text/plain", 1}}}, + {"text/html; q=0.1, text/plain", []AcceptSpec{{"text/html", 0.1}, {"text/plain", 1}}}, + {"iso-8859-5, unicode-1-1;q=0.8,iso-8859-1", []AcceptSpec{{"iso-8859-5", 1}, {"unicode-1-1", 0.8}, {"iso-8859-1", 1}}}, + {"iso-8859-1", []AcceptSpec{{"iso-8859-1", 1}}}, + {"*", []AcceptSpec{{"*", 1}}}, + {"da, en-gb;q=0.8, en;q=0.7", []AcceptSpec{{"da", 1}, {"en-gb", 0.8}, {"en", 0.7}}}, + {"da, q, en-gb;q=0.8", []AcceptSpec{{"da", 1}, {"q", 1}, {"en-gb", 0.8}}}, + {"image/png, image/*;q=0.5", []AcceptSpec{{"image/png", 1}, {"image/*", 0.5}}}, + + // bad cases + {"value1; q=0.1.2", []AcceptSpec{{"value1", 0.1}}}, + {"da, en-gb;q=foo", []AcceptSpec{{"da", 1}}}, +} + +func TestParseAccept(t *testing.T) { + for _, tt := range parseAcceptTests { + header := http.Header{"Accept": {tt.s}} + actual := ParseAccept(header, "Accept") + if !cmp.Equal(actual, tt.expected) { + t.Errorf("ParseAccept(h, %q)=%v, want %v", tt.s, actual, tt.expected) + } + } +} diff --git a/internal/github.com/golang/gddo/httputil/negotiate.go b/internal/github.com/golang/gddo/httputil/negotiate.go new file mode 100644 index 000000000..2e45780b7 --- /dev/null +++ b/internal/github.com/golang/gddo/httputil/negotiate.go @@ -0,0 +1,36 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd. + +package httputil + +import ( + "net/http" + + "github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil/header" +) + +// NegotiateContentEncoding returns the best offered content encoding for the +// request's Accept-Encoding header. If two offers match with equal weight and +// then the offer earlier in the list is preferred. If no offers are +// acceptable, then "" is returned. +func NegotiateContentEncoding(r *http.Request, offers []string) string { + bestOffer := "identity" + bestQ := -1.0 + specs := header.ParseAccept(r.Header, "Accept-Encoding") + for _, offer := range offers { + for _, spec := range specs { + if spec.Q > bestQ && + (spec.Value == "*" || spec.Value == offer) { + bestQ = spec.Q + bestOffer = offer + } + } + } + if bestQ == 0 { + bestOffer = "" + } + return bestOffer +} diff --git a/internal/github.com/golang/gddo/httputil/negotiate_test.go b/internal/github.com/golang/gddo/httputil/negotiate_test.go new file mode 100644 index 000000000..cdd5807ca --- /dev/null +++ b/internal/github.com/golang/gddo/httputil/negotiate_test.go @@ -0,0 +1,40 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file or at +// https://developers.google.com/open-source/licenses/bsd. + +package httputil + +import ( + "net/http" + "testing" +) + +var negotiateContentEncodingTests = []struct { + s string + offers []string + expect string +}{ + {"", []string{"identity", "gzip"}, "identity"}, + {"*;q=0", []string{"identity", "gzip"}, ""}, + {"gzip", []string{"identity", "gzip"}, "gzip"}, + {"gzip,zstd", []string{"identity", "zstd"}, "zstd"}, + {"zstd,gzip", []string{"gzip", "zstd"}, "gzip"}, + {"gzip,zstd", []string{"gzip", "zstd"}, "gzip"}, + {"gzip,zstd", []string{"zstd", "gzip"}, "zstd"}, + {"gzip;q=0.1,zstd;q=0.5", []string{"gzip", "zstd"}, "zstd"}, + {"gzip;q=1.0, identity; q=0.5, *;q=0", []string{"identity", "gzip"}, "gzip"}, + {"gzip;q=1.0, identity; q=0.5, *;q=0", []string{"identity", "zstd"}, "identity"}, + {"zstd", []string{"identity", "gzip"}, "identity"}, +} + +func TestNegotiateContentEncoding(t *testing.T) { + for _, tt := range negotiateContentEncodingTests { + r := &http.Request{Header: http.Header{"Accept-Encoding": {tt.s}}} + actual := NegotiateContentEncoding(r, tt.offers) + if actual != tt.expect { + t.Errorf("NegotiateContentEncoding(%q, %#v)=%q, want %q", tt.s, tt.offers, actual, tt.expect) + } + } +} diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 09b8d2fbe..7517caa48 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -38,12 +38,13 @@ import ( "io" "net/http" "strconv" - "strings" "sync" "time" + "github.com/klauspost/compress/zstd" "github.com/prometheus/common/expfmt" + "github.com/prometheus/client_golang/internal/github.com/golang/gddo/httputil" "github.com/prometheus/client_golang/prometheus" ) @@ -54,6 +55,8 @@ const ( processStartTimeHeader = "Process-Start-Time-Unix" ) +var defaultEncodingOffers = []string{"gzip", "zstd"} + var gzipPool = sync.Pool{ New: func() interface{} { return gzip.NewWriter(nil) @@ -169,15 +172,42 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO header.Set(contentTypeHeader, string(contentType)) w := io.Writer(rsp) - if !opts.DisableCompression && gzipAccepted(req.Header) { - header.Set(contentEncodingHeader, "gzip") - gz := gzipPool.Get().(*gzip.Writer) - defer gzipPool.Put(gz) + if !opts.DisableCompression { + offers := defaultEncodingOffers + if len(opts.EncodingOffers) > 0 { + offers = opts.EncodingOffers + } + // TODO(mrueg): Replace internal/github.com/gddo once https://github.com/golang/go/issues/19307 is implemented. + compression := httputil.NegotiateContentEncoding(req, offers) + switch compression { + case "zstd": + header.Set(contentEncodingHeader, "zstd") + // TODO(mrueg): Replace klauspost/compress with stdlib implementation once https://github.com/golang/go/issues/62513 is implemented. + z, err := zstd.NewWriter(rsp, zstd.WithEncoderLevel(zstd.SpeedFastest)) + if err != nil { + return + } + + z.Reset(w) + defer z.Close() + + w = z + case "gzip": + header.Set(contentEncodingHeader, "gzip") + gz := gzipPool.Get().(*gzip.Writer) + defer gzipPool.Put(gz) + + gz.Reset(w) + defer gz.Close() - gz.Reset(w) - defer gz.Close() + w = gz + case "identity": + // This means the content is not encoded. + default: + // The content encoding was not implemented yet. + return + } - w = gz } enc := expfmt.NewEncoder(w, contentType) @@ -346,6 +376,9 @@ type HandlerOpts struct { // If DisableCompression is true, the handler will never compress the // response, even if requested by the client. DisableCompression bool + // If DisableCompression is false, this option will allow to define the + // set of offered encoding algorithms. + EncodingOffers []string // The number of concurrent HTTP requests is limited to // MaxRequestsInFlight. Additional requests are responded to with 503 // Service Unavailable and a suitable message in the body. If @@ -381,19 +414,6 @@ type HandlerOpts struct { ProcessStartTime time.Time } -// gzipAccepted returns whether the client will accept gzip-encoded content. -func gzipAccepted(header http.Header) bool { - a := header.Get(acceptEncodingHeader) - parts := strings.Split(a, ",") - for _, part := range parts { - part = strings.TrimSpace(part) - if part == "gzip" || strings.HasPrefix(part, "gzip;") { - return true - } - } - return false -} - // httpError removes any content-encoding header and then calls http.Error with // the provided error and http.StatusInternalServerError. Error contents is // supposed to be uncompressed plain text. Same as with a plain http.Error, this diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 8ca192748..3a65e61c5 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -331,3 +331,95 @@ func TestHandlerTimeout(t *testing.T) { close(c.Block) // To not leak a goroutine. } + +func BenchmarkEncoding(b *testing.B) { + benchmarks := []struct { + name string + encodingType string + }{ + { + name: "test with gzip encoding", + encodingType: "gzip", + }, + { + name: "test with zstd encoding", + encodingType: "zstd", + }, + { + name: "test with no encoding", + encodingType: "identity", + }, + } + sizes := []struct { + name string + metricCount int + labelCount int + labelLength int + metricLength int + }{ + { + name: "small", + metricCount: 50, + labelCount: 5, + labelLength: 5, + metricLength: 5, + }, + { + name: "medium", + metricCount: 500, + labelCount: 10, + labelLength: 5, + metricLength: 10, + }, + { + name: "large", + metricCount: 5000, + labelCount: 10, + labelLength: 5, + metricLength: 10, + }, + { + name: "extra-large", + metricCount: 50000, + labelCount: 20, + labelLength: 5, + metricLength: 10, + }, + } + + for _, size := range sizes { + reg := prometheus.NewRegistry() + handler := HandlerFor(reg, HandlerOpts{}) + + // Generate Metrics + // Original source: https://github.com/prometheus-community/avalanche/blob/main/metrics/serve.go + labelKeys := make([]string, size.labelCount) + for idx := 0; idx < size.labelCount; idx++ { + labelKeys[idx] = fmt.Sprintf("label_key_%s_%v", strings.Repeat("k", size.labelLength), idx) + } + labelValues := make([]string, size.labelCount) + for idx := 0; idx < size.labelCount; idx++ { + labelValues[idx] = fmt.Sprintf("label_val_%s_%v", strings.Repeat("v", size.labelLength), idx) + } + metrics := make([]*prometheus.GaugeVec, size.metricCount) + for idx := 0; idx < size.metricCount; idx++ { + gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: fmt.Sprintf("avalanche_metric_%s_%v_%v", strings.Repeat("m", size.metricLength), 0, idx), + Help: "A tasty metric morsel", + }, append([]string{"series_id", "cycle_id"}, labelKeys...)) + reg.MustRegister(gauge) + metrics[idx] = gauge + } + + for _, benchmark := range benchmarks { + b.Run(benchmark.name+"_"+size.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + writer := httptest.NewRecorder() + request, _ := http.NewRequest("GET", "/", nil) + request.Header.Add("Accept-Encoding", benchmark.encodingType) + handler.ServeHTTP(writer, request) + } + }) + } + } +} From 6ad6d341b0667a79be764d0acb37d3a9968d3223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Fri, 17 May 2024 10:27:21 +0200 Subject: [PATCH 02/10] Update prometheus/promhttp/http.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bartlomiej Plotka Signed-off-by: Manuel Rüger --- prometheus/promhttp/http.go | 126 ++++++++++++++++++++----------- prometheus/promhttp/http_test.go | 87 ++++++++++++++++++--- 2 files changed, 159 insertions(+), 54 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 7517caa48..f48d1dd0a 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -55,7 +55,25 @@ const ( processStartTimeHeader = "Process-Start-Time-Unix" ) -var defaultEncodingOffers = []string{"gzip", "zstd"} +type Compression int + +const ( + Identity Compression = iota + Gzip + Zstd +) + +var compressions = [...]string{ + "identity", + "gzip", + "zstd", +} + +func (c Compression) String() string { + return compressions[c] +} + +var defaultCompressionFormats = []Compression{Identity, Gzip, Zstd} var gzipPool = sync.Pool{ New: func() interface{} { @@ -168,46 +186,13 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO } else { contentType = expfmt.Negotiate(req.Header) } - header := rsp.Header() - header.Set(contentTypeHeader, string(contentType)) - - w := io.Writer(rsp) - if !opts.DisableCompression { - offers := defaultEncodingOffers - if len(opts.EncodingOffers) > 0 { - offers = opts.EncodingOffers - } - // TODO(mrueg): Replace internal/github.com/gddo once https://github.com/golang/go/issues/19307 is implemented. - compression := httputil.NegotiateContentEncoding(req, offers) - switch compression { - case "zstd": - header.Set(contentEncodingHeader, "zstd") - // TODO(mrueg): Replace klauspost/compress with stdlib implementation once https://github.com/golang/go/issues/62513 is implemented. - z, err := zstd.NewWriter(rsp, zstd.WithEncoderLevel(zstd.SpeedFastest)) - if err != nil { - return - } - - z.Reset(w) - defer z.Close() + rsp.Header().Set(contentTypeHeader, string(contentType)) - w = z - case "gzip": - header.Set(contentEncodingHeader, "gzip") - gz := gzipPool.Get().(*gzip.Writer) - defer gzipPool.Put(gz) - - gz.Reset(w) - defer gz.Close() - - w = gz - case "identity": - // This means the content is not encoded. - default: - // The content encoding was not implemented yet. - return + w, err := GetWriter(req, rsp, opts.DisableCompression, opts.OfferedCompressions) + if err != nil { + if opts.ErrorLog != nil { + opts.ErrorLog.Println("error getting writer", err) } - } enc := expfmt.NewEncoder(w, contentType) @@ -373,12 +358,19 @@ type HandlerOpts struct { // no effect on the HTTP status code because ErrorHandling is set to // ContinueOnError. Registry prometheus.Registerer - // If DisableCompression is true, the handler will never compress the - // response, even if requested by the client. + // DisableCompression disables the response encoding (compression) and + // encoding negotiation. If true, the handler will + // never compress the response, even if requested + // by the client and the OfferedCompressions field is ignored. DisableCompression bool - // If DisableCompression is false, this option will allow to define the - // set of offered encoding algorithms. - EncodingOffers []string + // OfferedCompressions is a set of encodings (compressions) handler will + // try to offer when negotiating with the client. This defaults to zstd, + // gzip and identity. + // NOTE: If handler can't agree on the encodings with the client or + // caller using unsupported or empty encodings in OfferedCompressions, + // handler always fallbacks to no compression (identity), for + // compatibility reasons. In such cases ErrorLog will be used if set. + OfferedCompressions []Compression // The number of concurrent HTTP requests is limited to // MaxRequestsInFlight. Additional requests are responded to with 503 // Service Unavailable and a suitable message in the body. If @@ -426,3 +418,49 @@ func httpError(rsp http.ResponseWriter, err error) { http.StatusInternalServerError, ) } + +func GetWriter(r *http.Request, rsp http.ResponseWriter, disableCompression bool, offeredCompressions []Compression) (io.Writer, error) { + w := io.Writer(rsp) + rsp.Header().Set(contentEncodingHeader, "identity") + if !disableCompression { + offers := defaultCompressionFormats + if len(offeredCompressions) > 0 { + offers = offeredCompressions + } + var compressions []string + for _, comp := range offers { + compressions = append(compressions, comp.String()) + } + // TODO(mrueg): Replace internal/github.com/gddo once https://github.com/golang/go/issues/19307 is implemented. + compression := httputil.NegotiateContentEncoding(r, compressions) + switch compression { + case "zstd": + rsp.Header().Set(contentEncodingHeader, "zstd") + // TODO(mrueg): Replace klauspost/compress with stdlib implementation once https://github.com/golang/go/issues/62513 is implemented. + z, err := zstd.NewWriter(rsp, zstd.WithEncoderLevel(zstd.SpeedFastest)) + if err != nil { + return nil, err + } + + z.Reset(w) + defer z.Close() + + w = z + case "gzip": + rsp.Header().Set(contentEncodingHeader, "gzip") + gz := gzipPool.Get().(*gzip.Writer) + defer gzipPool.Put(gz) + + gz.Reset(w) + defer gz.Close() + + w = gz + case "identity": + // This means the content is not compressed. + default: + // The content encoding was not implemented yet. + return w, fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", compression, defaultCompressionFormats) + } + } + return w, nil +} diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 3a65e61c5..24ff94778 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -332,22 +332,89 @@ func TestHandlerTimeout(t *testing.T) { close(c.Block) // To not leak a goroutine. } -func BenchmarkEncoding(b *testing.B) { +func TestGetWriter(t *testing.T) { + testCases := []struct { + name string + disableCompression bool + offeredCompressions []Compression + acceptEncoding string + expectedCompression string + err error + }{ + { + name: "test without compression enabled", + disableCompression: true, + offeredCompressions: defaultCompressionFormats, + acceptEncoding: "", + expectedCompression: "identity", + err: nil, + }, + { + name: "test with compression enabled with empty accept-encoding header", + disableCompression: false, + offeredCompressions: defaultCompressionFormats, + acceptEncoding: "", + expectedCompression: "identity", + err: nil, + }, + { + name: "test with gzip compression requested", + disableCompression: false, + offeredCompressions: defaultCompressionFormats, + acceptEncoding: "gzip", + expectedCompression: "gzip", + err: nil, + }, + { + name: "test with gzip, zstd compression requested", + disableCompression: false, + offeredCompressions: defaultCompressionFormats, + acceptEncoding: "gzip,zstd", + expectedCompression: "gzip", + err: nil, + }, + { + name: "test with zstd, gzip compression requested", + disableCompression: false, + offeredCompressions: defaultCompressionFormats, + acceptEncoding: "zstd,gzip", + expectedCompression: "gzip", + err: nil, + }, + } + + for _, test := range testCases { + request, _ := http.NewRequest("GET", "/", nil) + request.Header.Add(acceptEncodingHeader, test.acceptEncoding) + rr := httptest.NewRecorder() + _, err := GetWriter(request, rr, test.disableCompression, test.offeredCompressions) + + if !errors.Is(err, test.err) { + t.Errorf("got error: %v, expected: %v", err, test.err) + } + + if rr.Header().Get(contentEncodingHeader) != test.expectedCompression { + t.Errorf("got different compression type: %v, expected: %v", rr.Header().Get(contentEncodingHeader), test.expectedCompression) + } + } +} + +func BenchmarkCompression(b *testing.B) { benchmarks := []struct { - name string - encodingType string + name string + compressionType string }{ { - name: "test with gzip encoding", - encodingType: "gzip", + name: "test with gzip compression", + compressionType: "gzip", }, { - name: "test with zstd encoding", - encodingType: "zstd", + name: "test with zstd compression", + compressionType: "zstd", }, { - name: "test with no encoding", - encodingType: "identity", + name: "test with no compression", + compressionType: "identity", }, } sizes := []struct { @@ -416,7 +483,7 @@ func BenchmarkEncoding(b *testing.B) { for i := 0; i < b.N; i++ { writer := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/", nil) - request.Header.Add("Accept-Encoding", benchmark.encodingType) + request.Header.Add("Accept-Encoding", benchmark.compressionType) handler.ServeHTTP(writer, request) } }) From b0aadb6c096d345721d9f386491c555e580645d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Mon, 3 Jun 2024 12:34:28 +0200 Subject: [PATCH 03/10] Update prometheus/promhttp/http.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bartlomiej Plotka Signed-off-by: Manuel Rüger --- prometheus/promhttp/http.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index f48d1dd0a..8f3f88332 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -364,10 +364,10 @@ type HandlerOpts struct { // by the client and the OfferedCompressions field is ignored. DisableCompression bool // OfferedCompressions is a set of encodings (compressions) handler will - // try to offer when negotiating with the client. This defaults to zstd, - // gzip and identity. - // NOTE: If handler can't agree on the encodings with the client or - // caller using unsupported or empty encodings in OfferedCompressions, + // try to offer when negotiating with the client. This defaults to identity, gzip + // and zstd. + // NOTE: If handler can't agree with the client on the encodings or + // unsupported or empty encodings are set in OfferedCompressions, // handler always fallbacks to no compression (identity), for // compatibility reasons. In such cases ErrorLog will be used if set. OfferedCompressions []Compression From 07c2ea2a1b9a1ef5014552dc14eaafb00c443c4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Mon, 3 Jun 2024 12:34:38 +0200 Subject: [PATCH 04/10] Update prometheus/promhttp/http.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bartlomiej Plotka Signed-off-by: Manuel Rüger --- prometheus/promhttp/http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 8f3f88332..fd0c68834 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -361,7 +361,7 @@ type HandlerOpts struct { // DisableCompression disables the response encoding (compression) and // encoding negotiation. If true, the handler will // never compress the response, even if requested - // by the client and the OfferedCompressions field is ignored. + // by the client and the OfferedCompressions field is set. DisableCompression bool // OfferedCompressions is a set of encodings (compressions) handler will // try to offer when negotiating with the client. This defaults to identity, gzip From bf0f3dc8ef603235b1d7719cd28bdcc43c445ab7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Mon, 3 Jun 2024 16:37:43 +0200 Subject: [PATCH 05/10] Integrate review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * String typed enum Signed-off-by: Manuel Rüger --- prometheus/promhttp/http.go | 112 ++++++++++++++++--------------- prometheus/promhttp/http_test.go | 26 ++++--- 2 files changed, 75 insertions(+), 63 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index fd0c68834..c253079be 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -55,24 +55,14 @@ const ( processStartTimeHeader = "Process-Start-Time-Unix" ) -type Compression int +type Compression string const ( - Identity Compression = iota - Gzip - Zstd + Identity Compression = "identity" + Gzip Compression = "gzip" + Zstd Compression = "zstd" ) -var compressions = [...]string{ - "identity", - "gzip", - "zstd", -} - -func (c Compression) String() string { - return compressions[c] -} - var defaultCompressionFormats = []Compression{Identity, Gzip, Zstd} var gzipPool = sync.Pool{ @@ -143,6 +133,18 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO } } + // Select all supported compression formats + var compressions []string + if !opts.DisableCompression { + offers := defaultCompressionFormats + if len(opts.OfferedCompressions) > 0 { + offers = opts.OfferedCompressions + } + for _, comp := range offers { + compressions = append(compressions, string(comp)) + } + } + h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { if !opts.ProcessStartTime.IsZero() { rsp.Header().Set(processStartTimeHeader, strconv.FormatInt(opts.ProcessStartTime.Unix(), 10)) @@ -188,12 +190,19 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO } rsp.Header().Set(contentTypeHeader, string(contentType)) - w, err := GetWriter(req, rsp, opts.DisableCompression, opts.OfferedCompressions) + w, encodingHeader, err := NegotiateEncodingWriter(req, rsp, opts.DisableCompression, compressions) if err != nil { if opts.ErrorLog != nil { opts.ErrorLog.Println("error getting writer", err) } + // Since the writer received from NegotiateEncodingWriter will be nil, in case there's an error, we set it here + w = io.Writer(rsp) + } + + if encodingHeader == "" { + encodingHeader = string(Identity) } + rsp.Header().Set(contentEncodingHeader, encodingHeader) enc := expfmt.NewEncoder(w, contentType) @@ -419,48 +428,45 @@ func httpError(rsp http.ResponseWriter, err error) { ) } -func GetWriter(r *http.Request, rsp http.ResponseWriter, disableCompression bool, offeredCompressions []Compression) (io.Writer, error) { - w := io.Writer(rsp) - rsp.Header().Set(contentEncodingHeader, "identity") - if !disableCompression { - offers := defaultCompressionFormats - if len(offeredCompressions) > 0 { - offers = offeredCompressions - } - var compressions []string - for _, comp := range offers { - compressions = append(compressions, comp.String()) +// NegotiateEncodingWriter reads the Accept-Encoding header from a request and +// selects the right compression based on an allow-list of supported +// compressions. It returns a writer implementing the compression and an the +// correct value that the caller can set in the response header. +func NegotiateEncodingWriter(r *http.Request, rw io.Writer, disableCompression bool, compressions []string) (_ io.Writer, encodingHeaderValue string, _ error) { + w := rw + + if disableCompression { + return w, string(Identity), nil + } + + // TODO(mrueg): Replace internal/github.com/gddo once https://github.com/golang/go/issues/19307 is implemented. + compression := httputil.NegotiateContentEncoding(r, compressions) + + switch compression { + case "zstd": + // TODO(mrueg): Replace klauspost/compress with stdlib implementation once https://github.com/golang/go/issues/62513 is implemented. + z, err := zstd.NewWriter(rw, zstd.WithEncoderLevel(zstd.SpeedFastest)) + if err != nil { + return nil, "", err } - // TODO(mrueg): Replace internal/github.com/gddo once https://github.com/golang/go/issues/19307 is implemented. - compression := httputil.NegotiateContentEncoding(r, compressions) - switch compression { - case "zstd": - rsp.Header().Set(contentEncodingHeader, "zstd") - // TODO(mrueg): Replace klauspost/compress with stdlib implementation once https://github.com/golang/go/issues/62513 is implemented. - z, err := zstd.NewWriter(rsp, zstd.WithEncoderLevel(zstd.SpeedFastest)) - if err != nil { - return nil, err - } - z.Reset(w) - defer z.Close() + z.Reset(w) + defer z.Close() - w = z - case "gzip": - rsp.Header().Set(contentEncodingHeader, "gzip") - gz := gzipPool.Get().(*gzip.Writer) - defer gzipPool.Put(gz) + w = z + case "gzip": + gz := gzipPool.Get().(*gzip.Writer) + defer gzipPool.Put(gz) - gz.Reset(w) - defer gz.Close() + gz.Reset(w) + defer gz.Close() - w = gz - case "identity": - // This means the content is not compressed. - default: - // The content encoding was not implemented yet. - return w, fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", compression, defaultCompressionFormats) - } + w = gz + case "identity": + // This means the content is not compressed. + default: + // The content encoding was not implemented yet. + return nil, "", fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", compression, defaultCompressionFormats) } - return w, nil + return w, compression, nil } diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 24ff94778..0646d80b3 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -332,11 +332,17 @@ func TestHandlerTimeout(t *testing.T) { close(c.Block) // To not leak a goroutine. } -func TestGetWriter(t *testing.T) { +func TestNegotiateEncodingWriter(t *testing.T) { + var defaultCompressions []string + + for _, comp := range defaultCompressionFormats { + defaultCompressions = append(defaultCompressions, string(comp)) + } + testCases := []struct { name string disableCompression bool - offeredCompressions []Compression + offeredCompressions []string acceptEncoding string expectedCompression string err error @@ -344,7 +350,7 @@ func TestGetWriter(t *testing.T) { { name: "test without compression enabled", disableCompression: true, - offeredCompressions: defaultCompressionFormats, + offeredCompressions: defaultCompressions, acceptEncoding: "", expectedCompression: "identity", err: nil, @@ -352,7 +358,7 @@ func TestGetWriter(t *testing.T) { { name: "test with compression enabled with empty accept-encoding header", disableCompression: false, - offeredCompressions: defaultCompressionFormats, + offeredCompressions: defaultCompressions, acceptEncoding: "", expectedCompression: "identity", err: nil, @@ -360,7 +366,7 @@ func TestGetWriter(t *testing.T) { { name: "test with gzip compression requested", disableCompression: false, - offeredCompressions: defaultCompressionFormats, + offeredCompressions: defaultCompressions, acceptEncoding: "gzip", expectedCompression: "gzip", err: nil, @@ -368,7 +374,7 @@ func TestGetWriter(t *testing.T) { { name: "test with gzip, zstd compression requested", disableCompression: false, - offeredCompressions: defaultCompressionFormats, + offeredCompressions: defaultCompressions, acceptEncoding: "gzip,zstd", expectedCompression: "gzip", err: nil, @@ -376,7 +382,7 @@ func TestGetWriter(t *testing.T) { { name: "test with zstd, gzip compression requested", disableCompression: false, - offeredCompressions: defaultCompressionFormats, + offeredCompressions: defaultCompressions, acceptEncoding: "zstd,gzip", expectedCompression: "gzip", err: nil, @@ -387,14 +393,14 @@ func TestGetWriter(t *testing.T) { request, _ := http.NewRequest("GET", "/", nil) request.Header.Add(acceptEncodingHeader, test.acceptEncoding) rr := httptest.NewRecorder() - _, err := GetWriter(request, rr, test.disableCompression, test.offeredCompressions) + _, encodingHeader, err := NegotiateEncodingWriter(request, rr, test.disableCompression, test.offeredCompressions) if !errors.Is(err, test.err) { t.Errorf("got error: %v, expected: %v", err, test.err) } - if rr.Header().Get(contentEncodingHeader) != test.expectedCompression { - t.Errorf("got different compression type: %v, expected: %v", rr.Header().Get(contentEncodingHeader), test.expectedCompression) + if encodingHeader != test.expectedCompression { + t.Errorf("got different compression type: %v, expected: %v", encodingHeader, test.expectedCompression) } } } From c40f12aca0a9ba356e4b95706d9960431a962ee0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Mon, 3 Jun 2024 22:04:25 +0200 Subject: [PATCH 06/10] Test with gzip compression MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Manuel Rüger --- prometheus/promhttp/http.go | 29 ++++-- prometheus/promhttp/http_test.go | 157 ++++++++++++++++++++++++++++++- 2 files changed, 173 insertions(+), 13 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index c253079be..15a71c73c 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -190,7 +190,18 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO } rsp.Header().Set(contentTypeHeader, string(contentType)) - w, encodingHeader, err := NegotiateEncodingWriter(req, rsp, opts.DisableCompression, compressions) + w, encodingHeader, closeWriter, err := NegotiateEncodingWriter(req, rsp, opts.DisableCompression, compressions) + + if closeWriter != nil { + defer func() { + err := closeWriter() + if err != nil { + if opts.ErrorLog != nil { + opts.ErrorLog.Println("error closing writer:", err) + } + } + }() + } if err != nil { if opts.ErrorLog != nil { opts.ErrorLog.Println("error getting writer", err) @@ -432,11 +443,11 @@ func httpError(rsp http.ResponseWriter, err error) { // selects the right compression based on an allow-list of supported // compressions. It returns a writer implementing the compression and an the // correct value that the caller can set in the response header. -func NegotiateEncodingWriter(r *http.Request, rw io.Writer, disableCompression bool, compressions []string) (_ io.Writer, encodingHeaderValue string, _ error) { +func NegotiateEncodingWriter(r *http.Request, rw io.Writer, disableCompression bool, compressions []string) (_ io.Writer, encodingHeaderValue string, closeWriter func() error, _ error) { w := rw if disableCompression { - return w, string(Identity), nil + return w, string(Identity), nil, nil } // TODO(mrueg): Replace internal/github.com/gddo once https://github.com/golang/go/issues/19307 is implemented. @@ -447,26 +458,26 @@ func NegotiateEncodingWriter(r *http.Request, rw io.Writer, disableCompression b // TODO(mrueg): Replace klauspost/compress with stdlib implementation once https://github.com/golang/go/issues/62513 is implemented. z, err := zstd.NewWriter(rw, zstd.WithEncoderLevel(zstd.SpeedFastest)) if err != nil { - return nil, "", err + return nil, "", nil, err } z.Reset(w) - defer z.Close() - w = z + + return w, compression, z.Close, nil case "gzip": gz := gzipPool.Get().(*gzip.Writer) defer gzipPool.Put(gz) gz.Reset(w) - defer gz.Close() w = gz + return w, compression, gz.Close, nil case "identity": // This means the content is not compressed. + return w, compression, nil, nil default: // The content encoding was not implemented yet. - return nil, "", fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", compression, defaultCompressionFormats) + return nil, "", nil, fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", compression, defaultCompressionFormats) } - return w, compression, nil } diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 0646d80b3..528f89952 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -15,8 +15,10 @@ package promhttp import ( "bytes" + "compress/gzip" "errors" "fmt" + "io" "log" "net/http" "net/http/httptest" @@ -24,6 +26,7 @@ import ( "testing" "time" + "github.com/klauspost/compress/zstd" dto "github.com/prometheus/client_model/go" "github.com/prometheus/client_golang/prometheus" @@ -31,6 +34,11 @@ import ( type errorCollector struct{} +const ( + acceptHeader = "Accept" + acceptTextPlain = "text/plain" +) + func (e errorCollector) Describe(ch chan<- *prometheus.Desc) { ch <- prometheus.NewDesc("invalid_metric", "not helpful", nil, nil) } @@ -71,6 +79,28 @@ func (g *mockTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), return mfs, func() { g.doneInvoked++ }, err } +func readCompressedBody(r io.Reader, comp Compression) (string, error) { + switch comp { + case Gzip: + reader, err := gzip.NewReader(r) + if err != nil { + return "", err + } + defer reader.Close() + got, err := io.ReadAll(reader) + return string(got), err + case Zstd: + reader, err := zstd.NewReader(r) + if err != nil { + return "", err + } + defer reader.Close() + got, err := io.ReadAll(reader) + return string(got), err + } + return "", fmt.Errorf("Unsupported compression") +} + func TestHandlerErrorHandling(t *testing.T) { // Create a registry that collects a MetricFamily with two elements, // another with one, and reports an error. Further down, we'll use the @@ -223,7 +253,7 @@ func TestInstrumentMetricHandler(t *testing.T) { InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{})) writer := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/", nil) - request.Header.Add("Accept", "test/plain") + request.Header.Add(acceptHeader, acceptTextPlain) handler.ServeHTTP(writer, request) if got := mReg.gatherInvoked; got != 1 { @@ -237,6 +267,10 @@ func TestInstrumentMetricHandler(t *testing.T) { t.Errorf("got HTTP status code %d, want %d", got, want) } + if got, want := writer.Header().Get(contentEncodingHeader), string(Identity); got != want { + t.Errorf("got HTTP content encoding header %s, want %s", got, want) + } + want := "promhttp_metric_handler_requests_in_flight 1\n" if got := writer.Body.String(); !strings.Contains(got, want) { t.Errorf("got body %q, does not contain %q", got, want) @@ -278,7 +312,7 @@ func TestHandlerMaxRequestsInFlight(t *testing.T) { w2 := httptest.NewRecorder() w3 := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/", nil) - request.Header.Add("Accept", "test/plain") + request.Header.Add(acceptHeader, acceptTextPlain) c := blockingCollector{Block: make(chan struct{}), CollectStarted: make(chan struct{}, 1)} reg.MustRegister(c) @@ -332,6 +366,121 @@ func TestHandlerTimeout(t *testing.T) { close(c.Block) // To not leak a goroutine. } +func TestInstrumentMetricHandlerWithCompression(t *testing.T) { + reg := prometheus.NewRegistry() + mReg := &mockTransactionGatherer{g: reg} + handler := InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{DisableCompression: false})) + compression := Zstd + writer := httptest.NewRecorder() + request, _ := http.NewRequest("GET", "/", nil) + request.Header.Add(acceptHeader, acceptTextPlain) + request.Header.Add(acceptEncodingHeader, string(compression)) + + handler.ServeHTTP(writer, request) + if got := mReg.gatherInvoked; got != 1 { + t.Fatalf("unexpected number of gather invokes, want 1, got %d", got) + } + if got := mReg.doneInvoked; got != 1 { + t.Fatalf("unexpected number of done invokes, want 1, got %d", got) + } + + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + + if got, want := writer.Header().Get(contentEncodingHeader), string(compression); got != want { + t.Errorf("got HTTP content encoding header %s, want %s", got, want) + } + + body, err := readCompressedBody(writer.Body, compression) + want := "promhttp_metric_handler_requests_in_flight 1\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + want = "promhttp_metric_handler_requests_total{code=\"200\"} 0\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + for i := 0; i < 100; i++ { + writer.Body.Reset() + handler.ServeHTTP(writer, request) + + if got, want := mReg.gatherInvoked, i+2; got != want { + t.Fatalf("unexpected number of gather invokes, want %d, got %d", want, got) + } + if got, want := mReg.doneInvoked, i+2; got != want { + t.Fatalf("unexpected number of done invokes, want %d, got %d", want, got) + } + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + body, err := readCompressedBody(writer.Body, compression) + + want := "promhttp_metric_handler_requests_in_flight 1\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + want = fmt.Sprintf("promhttp_metric_handler_requests_total{code=\"200\"} %d\n", i+1) + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + } + + // Test with Zstd + compression = Zstd + request.Header.Set(acceptEncodingHeader, string(compression)) + + handler.ServeHTTP(writer, request) + + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + + if got, want := writer.Header().Get(contentEncodingHeader), string(compression); got != want { + t.Errorf("got HTTP content encoding header %s, want %s", got, want) + } + + body, err = readCompressedBody(writer.Body, compression) + want = "promhttp_metric_handler_requests_in_flight 1\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + want = "promhttp_metric_handler_requests_total{code=\"200\"} 101\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + for i := 101; i < 201; i++ { + writer.Body.Reset() + handler.ServeHTTP(writer, request) + + if got, want := mReg.gatherInvoked, i+2; got != want { + t.Fatalf("unexpected number of gather invokes, want %d, got %d", want, got) + } + if got, want := mReg.doneInvoked, i+2; got != want { + t.Fatalf("unexpected number of done invokes, want %d, got %d", want, got) + } + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + body, err := readCompressedBody(writer.Body, compression) + + want := "promhttp_metric_handler_requests_in_flight 1\n" + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + + want = fmt.Sprintf("promhttp_metric_handler_requests_total{code=\"200\"} %d\n", i+1) + if got := body; !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q, err: %v", got, want, err) + } + } +} + func TestNegotiateEncodingWriter(t *testing.T) { var defaultCompressions []string @@ -393,7 +542,7 @@ func TestNegotiateEncodingWriter(t *testing.T) { request, _ := http.NewRequest("GET", "/", nil) request.Header.Add(acceptEncodingHeader, test.acceptEncoding) rr := httptest.NewRecorder() - _, encodingHeader, err := NegotiateEncodingWriter(request, rr, test.disableCompression, test.offeredCompressions) + _, encodingHeader, _, err := NegotiateEncodingWriter(request, rr, test.disableCompression, test.offeredCompressions) if !errors.Is(err, test.err) { t.Errorf("got error: %v, expected: %v", err, test.err) @@ -489,7 +638,7 @@ func BenchmarkCompression(b *testing.B) { for i := 0; i < b.N; i++ { writer := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/", nil) - request.Header.Add("Accept-Encoding", benchmark.compressionType) + request.Header.Add(acceptEncodingHeader, benchmark.compressionType) handler.ServeHTTP(writer, request) } }) From f0c8656486aab2ce7da5f5f503d59aae6b07d756 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Tue, 4 Jun 2024 09:52:02 +0200 Subject: [PATCH 07/10] Update prometheus/promhttp/http.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bartlomiej Plotka Signed-off-by: Manuel Rüger --- prometheus/promhttp/http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 15a71c73c..da010a804 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -133,7 +133,7 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO } } - // Select all supported compression formats + // Select compression formats to offer based on default or user choice. var compressions []string if !opts.DisableCompression { offers := defaultCompressionFormats From 9fea57521f54ba5f86cb8def390115db9cd6755f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Tue, 4 Jun 2024 09:54:40 +0200 Subject: [PATCH 08/10] Reorder error handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Manuel Rüger --- prometheus/promhttp/http.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index da010a804..400cf406c 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -191,6 +191,13 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO rsp.Header().Set(contentTypeHeader, string(contentType)) w, encodingHeader, closeWriter, err := NegotiateEncodingWriter(req, rsp, opts.DisableCompression, compressions) + if err != nil { + if opts.ErrorLog != nil { + opts.ErrorLog.Println("error getting writer", err) + } + w = io.Writer(rsp) + encodingHeader = string(Identity) + } if closeWriter != nil { defer func() { @@ -202,17 +209,7 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO } }() } - if err != nil { - if opts.ErrorLog != nil { - opts.ErrorLog.Println("error getting writer", err) - } - // Since the writer received from NegotiateEncodingWriter will be nil, in case there's an error, we set it here - w = io.Writer(rsp) - } - if encodingHeader == "" { - encodingHeader = string(Identity) - } rsp.Header().Set(contentEncodingHeader, encodingHeader) enc := expfmt.NewEncoder(w, contentType) From 140db7ad0954f4be5870ee13355142afd42cc118 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Tue, 4 Jun 2024 21:38:19 +0200 Subject: [PATCH 09/10] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bartlomiej Plotka Signed-off-by: Manuel Rüger --- prometheus/promhttp/http.go | 43 +++++++++++--------------------- prometheus/promhttp/http_test.go | 10 ++------ 2 files changed, 16 insertions(+), 37 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 400cf406c..ac44c1cdd 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -190,7 +190,7 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO } rsp.Header().Set(contentTypeHeader, string(contentType)) - w, encodingHeader, closeWriter, err := NegotiateEncodingWriter(req, rsp, opts.DisableCompression, compressions) + w, encodingHeader, closeWriter, err := negotiateEncodingWriter(req, rsp, compressions) if err != nil { if opts.ErrorLog != nil { opts.ErrorLog.Println("error getting writer", err) @@ -199,16 +199,7 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO encodingHeader = string(Identity) } - if closeWriter != nil { - defer func() { - err := closeWriter() - if err != nil { - if opts.ErrorLog != nil { - opts.ErrorLog.Println("error closing writer:", err) - } - } - }() - } + defer closeWriter() rsp.Header().Set(contentEncodingHeader, encodingHeader) @@ -440,41 +431,35 @@ func httpError(rsp http.ResponseWriter, err error) { // selects the right compression based on an allow-list of supported // compressions. It returns a writer implementing the compression and an the // correct value that the caller can set in the response header. -func NegotiateEncodingWriter(r *http.Request, rw io.Writer, disableCompression bool, compressions []string) (_ io.Writer, encodingHeaderValue string, closeWriter func() error, _ error) { - w := rw - - if disableCompression { - return w, string(Identity), nil, nil +func negotiateEncodingWriter(r *http.Request, rw io.Writer, compressions []string) (_ io.Writer, encodingHeaderValue string, closeWriter func(), _ error) { + if len(compressions) == 0 { + return rw, string(Identity), func() {}, nil } // TODO(mrueg): Replace internal/github.com/gddo once https://github.com/golang/go/issues/19307 is implemented. - compression := httputil.NegotiateContentEncoding(r, compressions) + selected := httputil.NegotiateContentEncoding(r, compressions) - switch compression { + switch selected { case "zstd": // TODO(mrueg): Replace klauspost/compress with stdlib implementation once https://github.com/golang/go/issues/62513 is implemented. z, err := zstd.NewWriter(rw, zstd.WithEncoderLevel(zstd.SpeedFastest)) if err != nil { - return nil, "", nil, err + return nil, "", func() {}, err } - z.Reset(w) - w = z - - return w, compression, z.Close, nil + z.Reset(rw) + return z, selected, func() { _ = z.Close() }, nil case "gzip": gz := gzipPool.Get().(*gzip.Writer) defer gzipPool.Put(gz) - gz.Reset(w) - - w = gz - return w, compression, gz.Close, nil + gz.Reset(rw) + return gz, selected, func() { _ = gz.Close(); gzipPool.Put(gz) }, nil case "identity": // This means the content is not compressed. - return w, compression, nil, nil + return rw, selected, func() {}, nil default: // The content encoding was not implemented yet. - return nil, "", nil, fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", compression, defaultCompressionFormats) + return nil, "", func() {}, fmt.Errorf("content compression format not recognized: %s. Valid formats are: %s", selected, defaultCompressionFormats) } } diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 528f89952..0ed8fe341 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -490,7 +490,6 @@ func TestNegotiateEncodingWriter(t *testing.T) { testCases := []struct { name string - disableCompression bool offeredCompressions []string acceptEncoding string expectedCompression string @@ -498,15 +497,13 @@ func TestNegotiateEncodingWriter(t *testing.T) { }{ { name: "test without compression enabled", - disableCompression: true, - offeredCompressions: defaultCompressions, + offeredCompressions: []string{}, acceptEncoding: "", expectedCompression: "identity", err: nil, }, { name: "test with compression enabled with empty accept-encoding header", - disableCompression: false, offeredCompressions: defaultCompressions, acceptEncoding: "", expectedCompression: "identity", @@ -514,7 +511,6 @@ func TestNegotiateEncodingWriter(t *testing.T) { }, { name: "test with gzip compression requested", - disableCompression: false, offeredCompressions: defaultCompressions, acceptEncoding: "gzip", expectedCompression: "gzip", @@ -522,7 +518,6 @@ func TestNegotiateEncodingWriter(t *testing.T) { }, { name: "test with gzip, zstd compression requested", - disableCompression: false, offeredCompressions: defaultCompressions, acceptEncoding: "gzip,zstd", expectedCompression: "gzip", @@ -530,7 +525,6 @@ func TestNegotiateEncodingWriter(t *testing.T) { }, { name: "test with zstd, gzip compression requested", - disableCompression: false, offeredCompressions: defaultCompressions, acceptEncoding: "zstd,gzip", expectedCompression: "gzip", @@ -542,7 +536,7 @@ func TestNegotiateEncodingWriter(t *testing.T) { request, _ := http.NewRequest("GET", "/", nil) request.Header.Add(acceptEncodingHeader, test.acceptEncoding) rr := httptest.NewRecorder() - _, encodingHeader, _, err := NegotiateEncodingWriter(request, rr, test.disableCompression, test.offeredCompressions) + _, encodingHeader, _, err := negotiateEncodingWriter(request, rr, test.offeredCompressions) if !errors.Is(err, test.err) { t.Errorf("got error: %v, expected: %v", err, test.err) From 00ce2e8a67b828b3d582fea558d685cfabda9650 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20R=C3=BCger?= Date: Fri, 7 Jun 2024 14:06:55 +0200 Subject: [PATCH 10/10] Include review suggestions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Manuel Rüger --- prometheus/promhttp/http.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index ac44c1cdd..2e0b9a864 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -55,6 +55,8 @@ const ( processStartTimeHeader = "Process-Start-Time-Unix" ) +// Compression represents the content encodings handlers support for the HTTP +// responses. type Compression string const ( @@ -427,7 +429,7 @@ func httpError(rsp http.ResponseWriter, err error) { ) } -// NegotiateEncodingWriter reads the Accept-Encoding header from a request and +// negotiateEncodingWriter reads the Accept-Encoding header from a request and // selects the right compression based on an allow-list of supported // compressions. It returns a writer implementing the compression and an the // correct value that the caller can set in the response header. @@ -451,8 +453,6 @@ func negotiateEncodingWriter(r *http.Request, rw io.Writer, compressions []strin return z, selected, func() { _ = z.Close() }, nil case "gzip": gz := gzipPool.Get().(*gzip.Writer) - defer gzipPool.Put(gz) - gz.Reset(rw) return gz, selected, func() { _ = gz.Close(); gzipPool.Put(gz) }, nil case "identity":