Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an HTTP hedging library. #115

Merged
merged 6 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97
* [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95
* [ENHANCEMENT] Trigger metrics update on ring changes instead of doing it periodically to speed up tests that wait for certain metrics. #107
* [ENHANCEMENT] Add an HTTP hedging library. #115
* [ENHANCEMENT] Ring: Add ring page handler to BasicLifecycler and Lifecycler. #112
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/armon/go-metrics v0.3.0
github.com/coreos/etcd v3.3.25+incompatible // indirect
github.com/cristalhq/hedgedhttp v0.7.0
github.com/go-kit/log v0.1.0
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf h1:CAKfRE2YtTUIjjh1bkBt
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/cristalhq/hedgedhttp v0.7.0 h1:C2XPDC+AQH4QJt6vZI4jB5WNyF86QbSJD4C4fW3H3ro=
github.com/cristalhq/hedgedhttp v0.7.0/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
120 changes: 120 additions & 0 deletions hedging/hedging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package hedging

import (
"errors"
"flag"
"net/http"
"sync"
"time"

"github.com/cristalhq/hedgedhttp"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/time/rate"
)

var (
ErrTooManyHedgeRequests = errors.New("too many hedge requests")
totalHedgeRequests prometheus.Counter
totalRateLimitedHedgeRequests prometheus.Counter
once sync.Once
)

// Config is the configuration for hedging requests.
type Config struct {
// At is the duration after which a second request will be issued.
At time.Duration `yaml:"at"`
// UpTo is the maximum number of requests that will be issued.
UpTo int `yaml:"up_to"`
// The maximum number of hedge requests allowed per second.
MaxPerSecond int `yaml:"max_per_second"`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.UpTo, prefix+"hedge-requests-up-to", 2, "The maximum number of hedge requests allowed.")
f.DurationVar(&cfg.At, prefix+"hedge-requests-at", 0, "If set to a non-zero value a second request will be issued at the provided duration. Default is 0 (disabled)")
f.IntVar(&cfg.MaxPerSecond, prefix+"hedge-max-per-second", 5, "The maximum number of hedge requests allowed per second.")
}

// Client returns a hedged http client.
// The client transport will be mutated to use the hedged roundtripper.
func Client(cfg Config, client *http.Client) (*http.Client, error) {
return ClientWithRegisterer(cfg, client, prometheus.DefaultRegisterer)
}

// ClientWithRegisterer returns a hedged http client with instrumentation registered to the provided registerer.
// The client transport will be mutated to use the hedged roundtripper.
func ClientWithRegisterer(cfg Config, client *http.Client, reg prometheus.Registerer) (*http.Client, error) {
if cfg.At == 0 {
return client, nil
}
if client == nil {
client = http.DefaultClient
}
var err error
client.Transport, err = RoundTripperWithRegisterer(cfg, client.Transport, reg)
if err != nil {
return nil, err
}
return client, nil
}

// RoundTripperWithRegisterer returns a hedged roundtripper with instrumentation registered to the provided registerer.
func RoundTripperWithRegisterer(cfg Config, next http.RoundTripper, reg prometheus.Registerer) (http.RoundTripper, error) {
if cfg.At == 0 {
return next, nil
}
if next == nil {
next = http.DefaultTransport
}
// register metrics only once
once.Do(func() {
totalHedgeRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
Name: "hedged_requests_total",
Help: "The total number of hedged requests.",
})
totalRateLimitedHedgeRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "hedged_requests_rate_limited_total",
Help: "The total number of hedged requests rejected via rate limiting.",
})
})
return hedgedhttp.NewRoundTripper(
cfg.At,
cfg.UpTo,
newLimitedHedgingRoundTripper(cfg.MaxPerSecond, next),
)
}

// RoundTripper returns a hedged roundtripper.
func RoundTripper(cfg Config, next http.RoundTripper) (http.RoundTripper, error) {
return RoundTripperWithRegisterer(cfg, next, prometheus.DefaultRegisterer)
}

type limitedHedgingRoundTripper struct {
next http.RoundTripper
limiter *rate.Limiter
}

func newLimitedHedgingRoundTripper(max int, next http.RoundTripper) *limitedHedgingRoundTripper {
return &limitedHedgingRoundTripper{
next: next,
limiter: rate.NewLimiter(rate.Limit(max), max),
}
}

func (rt *limitedHedgingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if hedgedhttp.IsHedgedRequest(req) {
if !rt.limiter.Allow() {
totalRateLimitedHedgeRequests.Inc()
return nil, ErrTooManyHedgeRequests
}
totalHedgeRequests.Inc()
}
return rt.next.RoundTrip(req)
}
97 changes: 97 additions & 0 deletions hedging/hedging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package hedging

import (
"net/http"
"strings"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

type RoundTripperFunc func(*http.Request) (*http.Response, error)

func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return fn(req)
}

func resetMetrics() {
once = sync.Once{}
reg := prometheus.NewRegistry()
prometheus.DefaultRegisterer = reg
prometheus.DefaultGatherer = reg
}

func TestHedging(t *testing.T) {
resetMetrics()
cfg := Config{
At: time.Duration(1),
UpTo: 3,
MaxPerSecond: 1000,
}
count := atomic.NewInt32(0)
client, err := Client(cfg, &http.Client{
Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return &http.Response{
StatusCode: http.StatusOK,
}, nil
}),
})
if err != nil {
t.Fatal(err)
}
_, _ = client.Get("http://example.com")

require.Equal(t, int32(3), count.Load())
require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer,
strings.NewReader(`
# HELP hedged_requests_rate_limited_total The total number of hedged requests rejected via rate limiting.
# TYPE hedged_requests_rate_limited_total counter
hedged_requests_rate_limited_total 0
# HELP hedged_requests_total The total number of hedged requests.
# TYPE hedged_requests_total counter
hedged_requests_total 2
`,
), "hedged_requests_total", "hedged_requests_rate_limited_total"))
}

func TestHedgingRateLimit(t *testing.T) {
resetMetrics()
cfg := Config{
At: time.Duration(1),
UpTo: 20,
MaxPerSecond: 1,
}
count := atomic.NewInt32(0)
client, err := Client(cfg, &http.Client{
Transport: RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
count.Inc()
time.Sleep(200 * time.Millisecond)
return &http.Response{
StatusCode: http.StatusOK,
}, nil
}),
})
if err != nil {
t.Fatal(err)
}
_, _ = client.Get("http://example.com")

require.Equal(t, int32(2), count.Load())
require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer,
strings.NewReader(`
# HELP hedged_requests_rate_limited_total The total number of hedged requests rejected via rate limiting.
# TYPE hedged_requests_rate_limited_total counter
hedged_requests_rate_limited_total 18
# HELP hedged_requests_total The total number of hedged requests.
# TYPE hedged_requests_total counter
hedged_requests_total 1
`,
), "hedged_requests_total", "hedged_requests_rate_limited_total"))
}