Skip to content

Commit

Permalink
Request Limiter (#25093)
Browse files Browse the repository at this point in the history
This commit introduces two new adaptive concurrency limiters in Vault,
which should handle overloading of the server during periods of
untenable request rate. The limiter adjusts the number of allowable
in-flight requests based on latency measurements performed across the
request duration. This approach allows us to reject entire requests
prior to doing any work and prevents clients from exceeding server
capacity.

The limiters intentionally target two separate vectors that have been
proven to lead to server over-utilization.

- Back pressure from the storage backend, resulting in bufferbloat in
  the WAL system. (enterprise)
- Back pressure from CPU over-utilization via PKI issue requests
  (specifically for RSA keys), resulting in failed heartbeats.

Storage constraints can be accounted for by limiting logical requests
according to their http.Method. We only limit requests with write-based
methods, since these will result in storage Puts and exhibit the
aforementioned bufferbloat.

CPU constraints are accounted for using the same underlying library and
technique; however, they require special treatment. The maximum number
of concurrent pki/issue requests found in testing (again, specifically
for RSA keys) is far lower than the minimum tolerable write request
rate. Without separate limiting, we would artificially impose limits on
tolerable request rates for non-PKI requests. To specifically target PKI
issue requests, we add a new PathsSpecial field, called limited,
allowing backends to specify a list of paths which should get
special-case request limiting.

For the sake of code cleanliness and future extensibility, we introduce
the concept of a LimiterRegistry. The registry proposed in this PR has
two entries, corresponding with the two vectors above. Each Limiter
entry has its own corresponding maximum and minimum concurrency,
allowing them to react to latency deviation independently and handle
high volumes of requests to targeted bottlenecks (CPU and storage).

In both cases, utilization will be effectively throttled before Vault
reaches any degraded state. The resulting 503 - Service Unavailable is a
retryable HTTP response code, which can be handled to gracefully retry
and eventually succeed. Clients should handle this by retrying with
jitter and exponential backoff. This is done within Vault's API, using
the go-retryablehttp library.

Limiter testing was performed via benchmarks of mixed workloads and
across a deployment of agent pods with great success.
  • Loading branch information
mpalmi authored Jan 26, 2024
1 parent 3ba802d commit 43be9fc
Show file tree
Hide file tree
Showing 21 changed files with 1,165 additions and 585 deletions.
5 changes: 5 additions & 0 deletions builtin/logical/pki/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ func Backend(conf *logical.BackendConfig) *backend {
unifiedDeltaWALPath,
},

Limited: []string{
"issue",
"issue/*",
},

Binary: []string{
"ocsp", // OCSP POST
"ocsp/*", // OCSP GET
Expand Down
5 changes: 5 additions & 0 deletions changelog/25093.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
```release-note:feature
**Request Limiter**: Add adaptive concurrency limits to write-based HTTP
methods and special-case `pki/issue` requests to prevent overloading the Vault
server.
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ require (
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pires/go-proxyproto v0.6.1
github.com/pkg/errors v0.9.1
github.com/platinummonkey/go-concurrency-limits v0.7.0
github.com/posener/complete v1.2.3
github.com/pquerna/otp v1.2.1-0.20191009055518-468c2dd2b58d
github.com/prometheus/client_golang v1.14.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,7 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go v3.2.0+incompatible h1:qSG2N4FghB1He/r2mFrWKCaL7dXCilEuNEeAn20fdD4=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go/v5 v5.0.2/go.mod h1:ZI9JFB4ewXbw1sBnF4sxsR2k1H3xjV+PUAOUsHvKpcU=
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/Jeffail/gabs v1.1.1 h1:V0uzR08Hj22EX8+8QMhyI9sX2hwRu+/RJhJUmnwda/E=
github.com/Jeffail/gabs v1.1.1/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc=
Expand All @@ -1063,6 +1064,7 @@ github.com/Microsoft/go-winio v0.4.16/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugX
github.com/Microsoft/go-winio v0.4.17-0.20210211115548-6eac466e5fa3/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.4.17-0.20210324224401-5516f17a5958/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.4.17/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.5.1/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY=
github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE=
Expand Down Expand Up @@ -2879,6 +2881,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/platinummonkey/go-concurrency-limits v0.7.0 h1:Bl9E74+67BrlRLBeryHOaFy0e1L3zD9g436/3vo6akQ=
github.com/platinummonkey/go-concurrency-limits v0.7.0/go.mod h1:Xxr6BywMVH3QyLyd0PanLnkkkmByTTPET3azMpdfmng=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -2947,6 +2951,7 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rboyer/safeio v0.2.1 h1:05xhhdRNAdS3apYm7JRjOqngf4xruaW959jmRxGDuSU=
github.com/rboyer/safeio v0.2.1/go.mod h1:Cq/cEPK+YXFn622lsQ0K4KsPZSPtaptHHEldsy7Fmig=
github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
Expand Down
38 changes: 38 additions & 0 deletions http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/internalshared/configutil"
"github.com/hashicorp/vault/limits"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
"github.com/hashicorp/vault/sdk/helper/pathmanager"
Expand Down Expand Up @@ -908,10 +909,47 @@ func forwardRequest(core *vault.Core, w http.ResponseWriter, r *http.Request) {
w.Write(retBytes)
}

func acquireLimiterListener(core *vault.Core, rawReq *http.Request, r *logical.Request) (*limits.RequestListener, bool) {
lim := &limits.RequestLimiter{}
if r.PathLimited {
lim = core.GetRequestLimiter(limits.SpecialPathLimiter)
} else {
switch rawReq.Method {
case http.MethodGet, http.MethodHead, http.MethodTrace, http.MethodOptions:
// We're only interested in the inverse, so do nothing here.
default:
lim = core.GetRequestLimiter(limits.WriteLimiter)
}
}
return lim.Acquire(rawReq.Context())
}

// request is a helper to perform a request and properly exit in the
// case of an error.
func request(core *vault.Core, w http.ResponseWriter, rawReq *http.Request, r *logical.Request) (*logical.Response, bool, bool) {
lsnr, ok := acquireLimiterListener(core, rawReq, r)
if !ok {
resp := &logical.Response{}
logical.RespondWithStatusCode(resp, r, http.StatusServiceUnavailable)
respondError(w, http.StatusServiceUnavailable, limits.ErrCapacity)
return resp, false, false
}

// To guard against leaking RequestListener slots, we should ignore Limiter
// measurements on panic. OnIgnore will check to see if a RequestListener
// slot has been acquired and not released, which could happen on
// recoverable panics.
defer lsnr.OnIgnore()

resp, err := core.HandleRequest(rawReq.Context(), r)

// Do the limiter measurement
if err != nil {
lsnr.OnDropped()
} else {
lsnr.OnSuccess()
}

if r.LastRemoteWAL() > 0 && !core.EntWaitUntilWALShipped(rawReq.Context(), r.LastRemoteWAL()) {
if resp == nil {
resp = &logical.Response{}
Expand Down
9 changes: 9 additions & 0 deletions http/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bufio"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
Expand All @@ -18,6 +19,7 @@ import (

"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/limits"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault"
Expand Down Expand Up @@ -211,6 +213,10 @@ func buildLogicalRequestNoAuth(perfStandby bool, ra *vault.RouterAccess, w http.
Headers: r.Header,
}

if ra != nil && ra.IsLimitedPath(r.Context(), path) {
req.PathLimited = true
}

if passHTTPReq {
req.HTTPRequest = r
}
Expand Down Expand Up @@ -378,6 +384,9 @@ func handleLogicalInternal(core *vault.Core, injectDataIntoTopLevel bool, noForw
// success.
resp, ok, needsForward := request(core, w, r, req)
switch {
case errors.Is(resp.Error(), limits.ErrCapacity):
respondError(w, http.StatusServiceUnavailable, limits.ErrCapacity)
return
case needsForward && noForward:
respondError(w, http.StatusBadRequest, vault.ErrCannotForwardLocalOnly)
return
Expand Down
191 changes: 191 additions & 0 deletions limits/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package limits

import (
"context"
"errors"
"fmt"
"math"
"sync/atomic"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/platinummonkey/go-concurrency-limits/core"
"github.com/platinummonkey/go-concurrency-limits/limit"
"github.com/platinummonkey/go-concurrency-limits/limiter"
"github.com/platinummonkey/go-concurrency-limits/strategy"
)

var (
// ErrCapacity is a new error type to indicate that Vault is not accepting new
// requests. This should be handled by callers in request paths to return
// http.StatusServiceUnavailable to the client.
ErrCapacity = errors.New("Vault server temporarily overloaded")

// DefaultDebugLogger opts out of the go-concurrency-limits internal Debug
// logger, since it's rather noisy. We're generating logs of interest in
// Vault.
DefaultDebugLogger limit.Logger = nil

// DefaultMetricsRegistry opts out of the go-concurrency-limits internal
// metrics because we're tracking what we care about in Vault.
DefaultMetricsRegistry core.MetricRegistry = core.EmptyMetricRegistryInstance
)

const (
// Smoothing adjusts how heavily we weight newer high-latency detection.
// Higher values (>1) place more emphasis on recent measurements. We set
// this below 1 to better tolerate short-lived spikes in request rate.
DefaultSmoothing = .1

// DefaultLongWindow is chosen as a minimum of 1000 samples. longWindow
// defines sliding window size used for the Exponential Moving Average.
DefaultLongWindow = 1000
)

// RequestLimiter is a thin wrapper for limiter.DefaultLimiter.
type RequestLimiter struct {
*limiter.DefaultLimiter
}

// Acquire consults the underlying RequestLimiter to see if a new
// RequestListener can be acquired.
//
// The return values are a *RequestListener, which the caller can use to perform
// latency measurements, and a bool to indicate whether or not a RequestListener
// was acquired.
//
// The returned RequestListener is short-lived and eventually garbage-collected;
// however, the RequestLimiter keeps track of in-flight concurrency using a
// token bucket implementation. The caller must release the resulting Limiter
// token by conducting a measurement.
//
// There are three return cases:
//
// 1) If Request Limiting is disabled, we return an empty RequestListener so all
// measurements are no-ops.
//
// 2) If the request limit has been exceeded, we will not acquire a
// RequestListener and instead return nil, false. No measurement is required,
// since we immediately return from callers with ErrCapacity.
//
// 3) If we have not exceeded the request limit, the caller must call one of
// OnSuccess(), OnDropped(), or OnIgnore() to return a measurement and release
// the underlying Limiter token.
func (l *RequestLimiter) Acquire(ctx context.Context) (*RequestListener, bool) {
// Transparently handle the case where the limiter is disabled.
if l == nil || l.DefaultLimiter == nil {
return &RequestListener{}, true
}

lsnr, ok := l.DefaultLimiter.Acquire(ctx)
if !ok {
metrics.IncrCounter(([]string{"limits", "concurrency", "service_unavailable"}), 1)
// If the token acquisition fails, we've reached capacity and we won't
// get a listener, so just return nil.
return nil, false
}

return &RequestListener{
DefaultListener: lsnr.(*limiter.DefaultListener),
released: new(atomic.Bool),
}, true
}

// concurrencyChanger adjusts the current allowed concurrency with an
// exponential backoff as we approach the max limit.
func concurrencyChanger(limit int) int {
change := math.Sqrt(float64(limit))
if change < 1.0 {
change = 1.0
}
return int(change)
}

var (
// DefaultWriteLimiterFlags have a less conservative MinLimit to prevent
// over-optimizing the request latency, which would result in
// under-utilization and client starvation.
DefaultWriteLimiterFlags = LimiterFlags{
Name: WriteLimiter,
MinLimit: 100,
MaxLimit: 5000,
}

// DefaultSpecialPathLimiterFlags have a conservative MinLimit to allow more
// aggressive concurrency throttling for CPU-bound workloads such as
// `pki/issue`.
DefaultSpecialPathLimiterFlags = LimiterFlags{
Name: SpecialPathLimiter,
MinLimit: 5,
MaxLimit: 5000,
}
)

// LimiterFlags establish some initial configuration for a new request limiter.
type LimiterFlags struct {
// Name specifies the limiter Name for registry lookup and logging.
Name string

// MinLimit defines the minimum concurrency floor to prevent over-throttling
// requests during periods of high traffic.
MinLimit int

// MaxLimit defines the maximum concurrency ceiling to prevent skewing to a
// point of no return.
//
// We set this to a high value (5000) with the expectation that systems with
// high-performing specs will tolerate higher limits, while the algorithm
// will find its own steady-state concurrency well below this threshold in
// most cases.
MaxLimit int

// InitialLimit defines the starting concurrency limit prior to any
// measurements.
//
// If we start this value off too high, Vault could become
// overloaded before the algorithm has a chance to adapt. Setting the value
// to the minimum is a safety measure which could result in early request
// rejection; however, the adaptive nature of the algorithm will prevent
// this from being a prolonged state as the allowed concurrency will
// increase during normal operation.
InitialLimit int
}

// NewRequestLimiter is a basic constructor for the RequestLimiter wrapper. It
// is responsible for setting up the Gradient2 Limit and instantiating a new
// wrapped DefaultLimiter.
func NewRequestLimiter(logger hclog.Logger, flags LimiterFlags) (*RequestLimiter, error) {
logger.Info("setting up new request limiter",
"initialLimit", flags.InitialLimit,
"maxLimit", flags.MaxLimit,
"minLimit", flags.MinLimit,
)

// NewGradient2Limit is the algorithm which drives request limiting
// decisions. It gathers latency measurements and calculates an Exponential
// Moving Average to determine whether latency deviation warrants a change
// in the current concurrency limit.
lim, err := limit.NewGradient2Limit(flags.Name,
flags.InitialLimit,
flags.MaxLimit,
flags.MinLimit,
concurrencyChanger,
DefaultSmoothing,
DefaultLongWindow,
DefaultDebugLogger,
DefaultMetricsRegistry,
)
if err != nil {
return nil, fmt.Errorf("failed to create gradient2 limit: %w", err)
}

strategy := strategy.NewSimpleStrategy(flags.InitialLimit)
defLimiter, err := limiter.NewDefaultLimiter(lim, 1e9, 1e9, 10, 100, strategy, nil, DefaultMetricsRegistry)
if err != nil {
return &RequestLimiter{}, err
}

return &RequestLimiter{defLimiter}, nil
}
51 changes: 51 additions & 0 deletions limits/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package limits

import (
"sync/atomic"

"github.com/armon/go-metrics"
"github.com/platinummonkey/go-concurrency-limits/limiter"
)

// RequestListener is a thin wrapper for limiter.DefaultLimiter to handle the
// case where request limiting is turned off.
type RequestListener struct {
*limiter.DefaultListener
released *atomic.Bool
}

// OnSuccess is called as a notification that the operation succeeded and
// internally measured latency should be used as an RTT sample.
func (l *RequestListener) OnSuccess() {
if l.DefaultListener != nil {
metrics.IncrCounter(([]string{"limits", "concurrency", "success"}), 1)
l.DefaultListener.OnSuccess()
l.released.Store(true)
}
}

// OnDropped is called to indicate the request failed and was dropped due to an
// internal server error. Note that this does not include ErrCapacity.
func (l *RequestListener) OnDropped() {
if l.DefaultListener != nil {
metrics.IncrCounter(([]string{"limits", "concurrency", "dropped"}), 1)
l.DefaultListener.OnDropped()
l.released.Store(true)
}
}

// OnIgnore is called to indicate the operation failed before any meaningful RTT
// measurement could be made and should be ignored to not introduce an
// artificially low RTT. It also provides an extra layer of protection against
// leaks of the underlying StrategyToken during recoverable panics in the
// request handler. We treat these as Ignored, discard the measurement, and mark
// the listener as released.
func (l *RequestListener) OnIgnore() {
if l.DefaultListener != nil && l.released.Load() != true {
metrics.IncrCounter(([]string{"limits", "concurrency", "ignored"}), 1)
l.DefaultListener.OnIgnore()
l.released.Store(true)
}
}
Loading

0 comments on commit 43be9fc

Please sign in to comment.