From 5b3613a83539560692cb3378eb3e195de42f4d08 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 14 Jan 2022 15:06:05 +0100 Subject: [PATCH 1/5] Add an HTTP hedging library. Signed-off-by: Cyril Tovena --- hedging/hedging.go | 136 ++++++++++++++++++++++++++++++++++++++++ hedging/hedging_test.go | 96 ++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+) create mode 100644 hedging/hedging.go create mode 100644 hedging/hedging_test.go diff --git a/hedging/hedging.go b/hedging/hedging.go new file mode 100644 index 000000000..705008227 --- /dev/null +++ b/hedging/hedging.go @@ -0,0 +1,136 @@ +package hedging + +import ( + "errors" + "flag" + "net/http" + "sync" + "time" + + "github.com/cristalhq/hedgedhttp" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" +) + +var ( + ErrTooManyHedgeRequests = errors.New("too many hedge requests") + totalHedgeRequests prometheus.Counter + totalRateLimitedHedgeRequests prometheus.Counter + once sync.Once +) + +func init() { + initMetrics() +} + +func initMetrics() { + once = sync.Once{} + totalHedgeRequests = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "hedged_requests_total", + Help: "The total number of hedged requests.", + }) + + totalRateLimitedHedgeRequests = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "hedged_requests_rate_limited_total", + Help: "The total number of hedged requests rejected via rate limiting.", + }) +} + +// Config is the configuration for hedging requests. +type Config struct { + // At is the duration after which a second request will be issued. + At time.Duration `yaml:"at"` + // UpTo is the maximum number of requests that will be issued. + UpTo int `yaml:"up_to"` + // The maximun of hedge requests allowed per second. + MaxPerSecond int `yaml:"max_per_second"` +} + +// RegisterFlags registers flags. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("", f) +} + +// RegisterFlagsWithPrefix registers flags with prefix. +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.UpTo, prefix+"hedge-requests-up-to", 2, "The maximun of hedge requests allowed.") + f.DurationVar(&cfg.At, prefix+"hedge-requests-at", 0, "If set to a non-zero value a second request will be issued at the provided duration. Default is 0 (disabled)") + f.IntVar(&cfg.MaxPerSecond, prefix+"hedge-max-per-second", 5, "The maximun of hedge requests allowed per seconds.") +} + +// Client returns a hedged http client. +// The client transport will be mutated to use the hedged roundtripper. +func (cfg *Config) Client(client *http.Client) (*http.Client, error) { + return cfg.ClientWithRegisterer(client, prometheus.DefaultRegisterer) +} + +// ClientWithRegisterer returns a hedged http client with instrumentation registered to the provided registerer. +// The client transport will be mutated to use the hedged roundtripper. +func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Registerer) (*http.Client, error) { + if reg == nil { + reg = prometheus.DefaultRegisterer + } + if client == nil { + client = http.DefaultClient + } + if cfg.At == 0 { + return client, nil + } + var err error + client.Transport, err = cfg.RoundTripperWithRegisterer(client.Transport, reg) + if err != nil { + return nil, err + } + return client, nil +} + +// RoundTripperWithRegisterer returns a hedged roundtripper with instrumentation registered to the provided registerer. +func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg prometheus.Registerer) (http.RoundTripper, error) { + if reg == nil { + reg = prometheus.DefaultRegisterer + } + if next == nil { + next = http.DefaultTransport + } + if cfg.At == 0 { + return next, nil + } + // register metrics + once.Do(func() { + reg.MustRegister(totalHedgeRequests) + reg.MustRegister(totalRateLimitedHedgeRequests) + }) + return hedgedhttp.NewRoundTripper( + cfg.At, + cfg.UpTo, + newLimitedHedgingRoundTripper(cfg.MaxPerSecond, next), + ) +} + +// RoundTripper returns a hedged roundtripper. +func (cfg *Config) RoundTripper(next http.RoundTripper) (http.RoundTripper, error) { + return cfg.RoundTripperWithRegisterer(next, prometheus.DefaultRegisterer) +} + +type limitedHedgingRoundTripper struct { + next http.RoundTripper + limiter *rate.Limiter +} + +func newLimitedHedgingRoundTripper(max int, next http.RoundTripper) *limitedHedgingRoundTripper { + return &limitedHedgingRoundTripper{ + next: next, + limiter: rate.NewLimiter(rate.Limit(max), max), + } +} + +func (rt *limitedHedgingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + if hedgedhttp.IsHedgedRequest(req) { + if !rt.limiter.Allow() { + totalRateLimitedHedgeRequests.Inc() + return nil, ErrTooManyHedgeRequests + } + totalHedgeRequests.Inc() + } + return rt.next.RoundTrip(req) +} diff --git a/hedging/hedging_test.go b/hedging/hedging_test.go new file mode 100644 index 000000000..4524156e7 --- /dev/null +++ b/hedging/hedging_test.go @@ -0,0 +1,96 @@ +package hedging + +import ( + "net/http" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +type RoundTripperFunc func(*http.Request) (*http.Response, error) + +func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return fn(req) +} + +func resetMetrics() { + reg := prometheus.NewRegistry() + prometheus.DefaultRegisterer = reg + prometheus.DefaultGatherer = reg + initMetrics() +} + +func TestHedging(t *testing.T) { + resetMetrics() + cfg := &Config{ + At: time.Duration(1), + UpTo: 3, + MaxPerSecond: 1000, + } + count := atomic.NewInt32(0) + client, err := cfg.Client(&http.Client{ + Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + count.Inc() + time.Sleep(200 * time.Millisecond) + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + }), + }) + if err != nil { + t.Fatal(err) + } + _, _ = client.Get("http://example.com") + + require.Equal(t, int32(3), count.Load()) + require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer, + strings.NewReader(` +# HELP hedged_requests_rate_limited_total The total number of hedged requests rejected via rate limiting. +# TYPE hedged_requests_rate_limited_total counter +hedged_requests_rate_limited_total 0 +# HELP hedged_requests_total The total number of hedged requests. +# TYPE hedged_requests_total counter +hedged_requests_total 2 +`, + ), "hedged_requests_total", "hedged_requests_rate_limited_total")) +} + +func TestHedgingRateLimit(t *testing.T) { + resetMetrics() + cfg := &Config{ + At: time.Duration(1), + UpTo: 20, + MaxPerSecond: 1, + } + count := atomic.NewInt32(0) + client, err := cfg.Client(&http.Client{ + Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + count.Inc() + time.Sleep(200 * time.Millisecond) + return &http.Response{ + StatusCode: http.StatusOK, + }, nil + }), + }) + if err != nil { + t.Fatal(err) + } + _, _ = client.Get("http://example.com") + + require.Equal(t, int32(2), count.Load()) + require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer, + strings.NewReader(` +# HELP hedged_requests_rate_limited_total The total number of hedged requests rejected via rate limiting. +# TYPE hedged_requests_rate_limited_total counter +hedged_requests_rate_limited_total 18 +# HELP hedged_requests_total The total number of hedged requests. +# TYPE hedged_requests_total counter +hedged_requests_total 1 +`, + ), "hedged_requests_total", "hedged_requests_rate_limited_total")) +} From 8f5c562ba266efc7b2bb35a6fe4e826c36a5412a Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 14 Jan 2022 15:12:34 +0100 Subject: [PATCH 2/5] Update changelog and gomod. Signed-off-by: Cyril Tovena --- CHANGELOG.md | 1 + go.mod | 1 + go.sum | 2 ++ 3 files changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 230ee1d6f..a15a33005 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ * [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97 * [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95 * [ENHANCEMENT] Trigger metrics update on ring changes instead of doing it periodically to speed up tests that wait for certain metrics. #107 +* [ENHANCEMENT] Add an HTTP hedging library. #115 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 * [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109 diff --git a/go.mod b/go.mod index 9671a2349..c2e02a8b4 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.16 require ( github.com/armon/go-metrics v0.3.0 github.com/coreos/etcd v3.3.25+incompatible // indirect + github.com/cristalhq/hedgedhttp v0.7.0 github.com/go-kit/log v0.1.0 github.com/gogo/protobuf v1.3.2 github.com/gogo/status v1.1.0 diff --git a/go.sum b/go.sum index d43f910c3..d30dc8946 100644 --- a/go.sum +++ b/go.sum @@ -60,6 +60,8 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf h1:CAKfRE2YtTUIjjh1bkBt github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/cristalhq/hedgedhttp v0.7.0 h1:C2XPDC+AQH4QJt6vZI4jB5WNyF86QbSJD4C4fW3H3ro= +github.com/cristalhq/hedgedhttp v0.7.0/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= From 466cf866439f5b7e064b218f797985345f2a3ffb Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 14 Jan 2022 15:37:55 +0100 Subject: [PATCH 3/5] Use promauto. Signed-off-by: Cyril Tovena --- hedging/hedging.go | 30 ++++++++++-------------------- hedging/hedging_test.go | 3 ++- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/hedging/hedging.go b/hedging/hedging.go index 705008227..5ed6c504a 100644 --- a/hedging/hedging.go +++ b/hedging/hedging.go @@ -9,6 +9,7 @@ import ( "github.com/cristalhq/hedgedhttp" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "golang.org/x/time/rate" ) @@ -19,23 +20,6 @@ var ( once sync.Once ) -func init() { - initMetrics() -} - -func initMetrics() { - once = sync.Once{} - totalHedgeRequests = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "hedged_requests_total", - Help: "The total number of hedged requests.", - }) - - totalRateLimitedHedgeRequests = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "hedged_requests_rate_limited_total", - Help: "The total number of hedged requests rejected via rate limiting.", - }) -} - // Config is the configuration for hedging requests. type Config struct { // At is the duration after which a second request will be issued. @@ -95,10 +79,16 @@ func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg promet if cfg.At == 0 { return next, nil } - // register metrics + // register metrics only once once.Do(func() { - reg.MustRegister(totalHedgeRequests) - reg.MustRegister(totalRateLimitedHedgeRequests) + totalHedgeRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "hedged_requests_total", + Help: "The total number of hedged requests.", + }) + totalRateLimitedHedgeRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "hedged_requests_rate_limited_total", + Help: "The total number of hedged requests rejected via rate limiting.", + }) }) return hedgedhttp.NewRoundTripper( cfg.At, diff --git a/hedging/hedging_test.go b/hedging/hedging_test.go index 4524156e7..578c2e46b 100644 --- a/hedging/hedging_test.go +++ b/hedging/hedging_test.go @@ -3,6 +3,7 @@ package hedging import ( "net/http" "strings" + "sync" "testing" "time" @@ -19,10 +20,10 @@ func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) } func resetMetrics() { + once = sync.Once{} reg := prometheus.NewRegistry() prometheus.DefaultRegisterer = reg prometheus.DefaultGatherer = reg - initMetrics() } func TestHedging(t *testing.T) { From d38cbf51a1f643441216e1fe095d9ccbc761e487 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 19 Jan 2022 07:58:46 +0100 Subject: [PATCH 4/5] First round of review feedback Signed-off-by: Cyril Tovena --- hedging/hedging.go | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/hedging/hedging.go b/hedging/hedging.go index 5ed6c504a..31fa25406 100644 --- a/hedging/hedging.go +++ b/hedging/hedging.go @@ -26,7 +26,7 @@ type Config struct { At time.Duration `yaml:"at"` // UpTo is the maximum number of requests that will be issued. UpTo int `yaml:"up_to"` - // The maximun of hedge requests allowed per second. + // The maximum number of hedge requests allowed per second. MaxPerSecond int `yaml:"max_per_second"` } @@ -37,9 +37,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // RegisterFlagsWithPrefix registers flags with prefix. func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.IntVar(&cfg.UpTo, prefix+"hedge-requests-up-to", 2, "The maximun of hedge requests allowed.") + f.IntVar(&cfg.UpTo, prefix+"hedge-requests-up-to", 2, "The maximum number of hedge requests allowed.") f.DurationVar(&cfg.At, prefix+"hedge-requests-at", 0, "If set to a non-zero value a second request will be issued at the provided duration. Default is 0 (disabled)") - f.IntVar(&cfg.MaxPerSecond, prefix+"hedge-max-per-second", 5, "The maximun of hedge requests allowed per seconds.") + f.IntVar(&cfg.MaxPerSecond, prefix+"hedge-max-per-second", 5, "The maximum number of hedge requests allowed per second.") } // Client returns a hedged http client. @@ -51,15 +51,12 @@ func (cfg *Config) Client(client *http.Client) (*http.Client, error) { // ClientWithRegisterer returns a hedged http client with instrumentation registered to the provided registerer. // The client transport will be mutated to use the hedged roundtripper. func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Registerer) (*http.Client, error) { - if reg == nil { - reg = prometheus.DefaultRegisterer + if cfg.At == 0 { + return client, nil } if client == nil { client = http.DefaultClient } - if cfg.At == 0 { - return client, nil - } var err error client.Transport, err = cfg.RoundTripperWithRegisterer(client.Transport, reg) if err != nil { @@ -70,15 +67,12 @@ func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Regi // RoundTripperWithRegisterer returns a hedged roundtripper with instrumentation registered to the provided registerer. func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg prometheus.Registerer) (http.RoundTripper, error) { - if reg == nil { - reg = prometheus.DefaultRegisterer + if cfg.At == 0 { + return next, nil } if next == nil { next = http.DefaultTransport } - if cfg.At == 0 { - return next, nil - } // register metrics only once once.Do(func() { totalHedgeRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{ From 8d6feed17d4e20c998c4dd13dadf7eca08d02658 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 19 Jan 2022 08:07:27 +0100 Subject: [PATCH 5/5] Change from struct fn to fn Signed-off-by: Cyril Tovena --- hedging/hedging.go | 14 +++++++------- hedging/hedging_test.go | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/hedging/hedging.go b/hedging/hedging.go index 31fa25406..5189bc4b3 100644 --- a/hedging/hedging.go +++ b/hedging/hedging.go @@ -44,13 +44,13 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { // Client returns a hedged http client. // The client transport will be mutated to use the hedged roundtripper. -func (cfg *Config) Client(client *http.Client) (*http.Client, error) { - return cfg.ClientWithRegisterer(client, prometheus.DefaultRegisterer) +func Client(cfg Config, client *http.Client) (*http.Client, error) { + return ClientWithRegisterer(cfg, client, prometheus.DefaultRegisterer) } // ClientWithRegisterer returns a hedged http client with instrumentation registered to the provided registerer. // The client transport will be mutated to use the hedged roundtripper. -func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Registerer) (*http.Client, error) { +func ClientWithRegisterer(cfg Config, client *http.Client, reg prometheus.Registerer) (*http.Client, error) { if cfg.At == 0 { return client, nil } @@ -58,7 +58,7 @@ func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Regi client = http.DefaultClient } var err error - client.Transport, err = cfg.RoundTripperWithRegisterer(client.Transport, reg) + client.Transport, err = RoundTripperWithRegisterer(cfg, client.Transport, reg) if err != nil { return nil, err } @@ -66,7 +66,7 @@ func (cfg *Config) ClientWithRegisterer(client *http.Client, reg prometheus.Regi } // RoundTripperWithRegisterer returns a hedged roundtripper with instrumentation registered to the provided registerer. -func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg prometheus.Registerer) (http.RoundTripper, error) { +func RoundTripperWithRegisterer(cfg Config, next http.RoundTripper, reg prometheus.Registerer) (http.RoundTripper, error) { if cfg.At == 0 { return next, nil } @@ -92,8 +92,8 @@ func (cfg *Config) RoundTripperWithRegisterer(next http.RoundTripper, reg promet } // RoundTripper returns a hedged roundtripper. -func (cfg *Config) RoundTripper(next http.RoundTripper) (http.RoundTripper, error) { - return cfg.RoundTripperWithRegisterer(next, prometheus.DefaultRegisterer) +func RoundTripper(cfg Config, next http.RoundTripper) (http.RoundTripper, error) { + return RoundTripperWithRegisterer(cfg, next, prometheus.DefaultRegisterer) } type limitedHedgingRoundTripper struct { diff --git a/hedging/hedging_test.go b/hedging/hedging_test.go index 578c2e46b..e5113e2f9 100644 --- a/hedging/hedging_test.go +++ b/hedging/hedging_test.go @@ -28,13 +28,13 @@ func resetMetrics() { func TestHedging(t *testing.T) { resetMetrics() - cfg := &Config{ + cfg := Config{ At: time.Duration(1), UpTo: 3, MaxPerSecond: 1000, } count := atomic.NewInt32(0) - client, err := cfg.Client(&http.Client{ + client, err := Client(cfg, &http.Client{ Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) { count.Inc() time.Sleep(200 * time.Millisecond) @@ -63,13 +63,13 @@ hedged_requests_total 2 func TestHedgingRateLimit(t *testing.T) { resetMetrics() - cfg := &Config{ + cfg := Config{ At: time.Duration(1), UpTo: 20, MaxPerSecond: 1, } count := atomic.NewInt32(0) - client, err := cfg.Client(&http.Client{ + client, err := Client(cfg, &http.Client{ Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) { count.Inc() time.Sleep(200 * time.Millisecond)