Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request Limiter #25093

Merged
merged 5 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions builtin/logical/pki/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ func Backend(conf *logical.BackendConfig) *backend {
unifiedDeltaWALPath,
},

Limited: []string{
"issue",
"issue/*",
},

Binary: []string{
"ocsp", // OCSP POST
"ocsp/*", // OCSP GET
Expand Down
5 changes: 5 additions & 0 deletions changelog/25093.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
```release-note:feature
**Request Limiter**: Add adaptive concurrency limits to write-based HTTP
methods and special-case `pki/issue` requests to prevent overloading the Vault
server.
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ require (
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pires/go-proxyproto v0.6.1
github.com/pkg/errors v0.9.1
github.com/platinummonkey/go-concurrency-limits v0.7.0
github.com/posener/complete v1.2.3
github.com/pquerna/otp v1.2.1-0.20191009055518-468c2dd2b58d
github.com/prometheus/client_golang v1.14.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,7 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go v3.2.0+incompatible h1:qSG2N4FghB1He/r2mFrWKCaL7dXCilEuNEeAn20fdD4=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go/v5 v5.0.2/go.mod h1:ZI9JFB4ewXbw1sBnF4sxsR2k1H3xjV+PUAOUsHvKpcU=
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/Jeffail/gabs v1.1.1 h1:V0uzR08Hj22EX8+8QMhyI9sX2hwRu+/RJhJUmnwda/E=
github.com/Jeffail/gabs v1.1.1/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc=
Expand All @@ -1063,6 +1064,7 @@ github.com/Microsoft/go-winio v0.4.16/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugX
github.com/Microsoft/go-winio v0.4.17-0.20210211115548-6eac466e5fa3/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.4.17-0.20210324224401-5516f17a5958/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.4.17/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.5.1/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY=
github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE=
Expand Down Expand Up @@ -2879,6 +2881,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/platinummonkey/go-concurrency-limits v0.7.0 h1:Bl9E74+67BrlRLBeryHOaFy0e1L3zD9g436/3vo6akQ=
github.com/platinummonkey/go-concurrency-limits v0.7.0/go.mod h1:Xxr6BywMVH3QyLyd0PanLnkkkmByTTPET3azMpdfmng=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -2947,6 +2951,7 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rboyer/safeio v0.2.1 h1:05xhhdRNAdS3apYm7JRjOqngf4xruaW959jmRxGDuSU=
github.com/rboyer/safeio v0.2.1/go.mod h1:Cq/cEPK+YXFn622lsQ0K4KsPZSPtaptHHEldsy7Fmig=
github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
Expand Down
38 changes: 38 additions & 0 deletions http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/internalshared/configutil"
"github.com/hashicorp/vault/limits"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
"github.com/hashicorp/vault/sdk/helper/pathmanager"
Expand Down Expand Up @@ -908,10 +909,47 @@ func forwardRequest(core *vault.Core, w http.ResponseWriter, r *http.Request) {
w.Write(retBytes)
}

func acquireLimiterListener(core *vault.Core, rawReq *http.Request, r *logical.Request) (*limits.RequestListener, bool) {
lim := &limits.RequestLimiter{}
if r.PathLimited {
lim = core.GetRequestLimiter(limits.SpecialPathLimiter)
} else {
switch rawReq.Method {
case http.MethodGet, http.MethodHead, http.MethodTrace, http.MethodOptions:
// We're only interested in the inverse, so do nothing here.
default:
lim = core.GetRequestLimiter(limits.WriteLimiter)
}
}
return lim.Acquire(rawReq.Context())
}

// request is a helper to perform a request and properly exit in the
// case of an error.
func request(core *vault.Core, w http.ResponseWriter, rawReq *http.Request, r *logical.Request) (*logical.Response, bool, bool) {
lsnr, ok := acquireLimiterListener(core, rawReq, r)
if !ok {
resp := &logical.Response{}
logical.RespondWithStatusCode(resp, r, http.StatusServiceUnavailable)
respondError(w, http.StatusServiceUnavailable, limits.ErrCapacity)
return resp, false, false
}

// To guard against leaking RequestListener slots, we should ignore Limiter
// measurements on panic. OnIgnore will check to see if a RequestListener
// slot has been acquired and not released, which could happen on
// recoverable panics.
defer lsnr.OnIgnore()

resp, err := core.HandleRequest(rawReq.Context(), r)

// Do the limiter measurement
if err != nil {
lsnr.OnDropped()
} else {
lsnr.OnSuccess()
}

if r.LastRemoteWAL() > 0 && !core.EntWaitUntilWALShipped(rawReq.Context(), r.LastRemoteWAL()) {
if resp == nil {
resp = &logical.Response{}
Expand Down
9 changes: 9 additions & 0 deletions http/logical.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bufio"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
Expand All @@ -18,6 +19,7 @@ import (

"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/limits"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault"
Expand Down Expand Up @@ -211,6 +213,10 @@ func buildLogicalRequestNoAuth(perfStandby bool, ra *vault.RouterAccess, w http.
Headers: r.Header,
}

if ra != nil && ra.IsLimitedPath(r.Context(), path) {
req.PathLimited = true
}

if passHTTPReq {
req.HTTPRequest = r
}
Expand Down Expand Up @@ -378,6 +384,9 @@ func handleLogicalInternal(core *vault.Core, injectDataIntoTopLevel bool, noForw
// success.
resp, ok, needsForward := request(core, w, r, req)
switch {
case errors.Is(resp.Error(), limits.ErrCapacity):
respondError(w, http.StatusServiceUnavailable, limits.ErrCapacity)
return
case needsForward && noForward:
respondError(w, http.StatusBadRequest, vault.ErrCannotForwardLocalOnly)
return
Expand Down
191 changes: 191 additions & 0 deletions limits/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package limits

import (
"context"
"errors"
"fmt"
"math"
"sync/atomic"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/platinummonkey/go-concurrency-limits/core"
"github.com/platinummonkey/go-concurrency-limits/limit"
"github.com/platinummonkey/go-concurrency-limits/limiter"
"github.com/platinummonkey/go-concurrency-limits/strategy"
)

var (
// ErrCapacity is a new error type to indicate that Vault is not accepting new
// requests. This should be handled by callers in request paths to return
// http.StatusServiceUnavailable to the client.
ErrCapacity = errors.New("Vault server temporarily overloaded")

// DefaultDebugLogger opts out of the go-concurrency-limits internal Debug
// logger, since it's rather noisy. We're generating logs of interest in
// Vault.
DefaultDebugLogger limit.Logger = nil

// DefaultMetricsRegistry opts out of the go-concurrency-limits internal
// metrics because we're tracking what we care about in Vault.
DefaultMetricsRegistry core.MetricRegistry = core.EmptyMetricRegistryInstance
)

const (
// Smoothing adjusts how heavily we weight newer high-latency detection.
// Higher values (>1) place more emphasis on recent measurements. We set
// this below 1 to better tolerate short-lived spikes in request rate.
DefaultSmoothing = .1

// DefaultLongWindow is chosen as a minimum of 1000 samples. longWindow
// defines sliding window size used for the Exponential Moving Average.
DefaultLongWindow = 1000
)

// RequestLimiter is a thin wrapper for limiter.DefaultLimiter.
type RequestLimiter struct {
*limiter.DefaultLimiter
}

// Acquire consults the underlying RequestLimiter to see if a new
// RequestListener can be acquired.
//
// The return values are a *RequestListener, which the caller can use to perform
// latency measurements, and a bool to indicate whether or not a RequestListener
// was acquired.
//
// The returned RequestListener is short-lived and eventually garbage-collected;
// however, the RequestLimiter keeps track of in-flight concurrency using a
// token bucket implementation. The caller must release the resulting Limiter
// token by conducting a measurement.
//
// There are three return cases:
//
// 1) If Request Limiting is disabled, we return an empty RequestListener so all
// measurements are no-ops.
//
// 2) If the request limit has been exceeded, we will not acquire a
// RequestListener and instead return nil, false. No measurement is required,
// since we immediately return from callers with ErrCapacity.
//
// 3) If we have not exceeded the request limit, the caller must call one of
// OnSuccess(), OnDropped(), or OnIgnore() to return a measurement and release
// the underlying Limiter token.
func (l *RequestLimiter) Acquire(ctx context.Context) (*RequestListener, bool) {
// Transparently handle the case where the limiter is disabled.
if l == nil || l.DefaultLimiter == nil {
return &RequestListener{}, true
}

lsnr, ok := l.DefaultLimiter.Acquire(ctx)
if !ok {
metrics.IncrCounter(([]string{"limits", "concurrency", "service_unavailable"}), 1)
// If the token acquisition fails, we've reached capacity and we won't
// get a listener, so just return nil.
return nil, false
}

return &RequestListener{
DefaultListener: lsnr.(*limiter.DefaultListener),
released: new(atomic.Bool),
}, true
}

// concurrencyChanger adjusts the current allowed concurrency with an
// exponential backoff as we approach the max limit.
func concurrencyChanger(limit int) int {
change := math.Sqrt(float64(limit))
if change < 1.0 {
change = 1.0
}
return int(change)
}

var (
// DefaultWriteLimiterFlags have a less conservative MinLimit to prevent
// over-optimizing the request latency, which would result in
// under-utilization and client starvation.
DefaultWriteLimiterFlags = LimiterFlags{
Name: WriteLimiter,
MinLimit: 100,
MaxLimit: 5000,
}

// DefaultSpecialPathLimiterFlags have a conservative MinLimit to allow more
// aggressive concurrency throttling for CPU-bound workloads such as
// `pki/issue`.
DefaultSpecialPathLimiterFlags = LimiterFlags{
Name: SpecialPathLimiter,
MinLimit: 5,
MaxLimit: 5000,
}
)

// LimiterFlags establish some initial configuration for a new request limiter.
type LimiterFlags struct {
// Name specifies the limiter Name for registry lookup and logging.
Name string

// MinLimit defines the minimum concurrency floor to prevent over-throttling
// requests during periods of high traffic.
MinLimit int

// MaxLimit defines the maximum concurrency ceiling to prevent skewing to a
// point of no return.
//
// We set this to a high value (5000) with the expectation that systems with
// high-performing specs will tolerate higher limits, while the algorithm
// will find its own steady-state concurrency well below this threshold in
// most cases.
MaxLimit int

// InitialLimit defines the starting concurrency limit prior to any
// measurements.
//
// If we start this value off too high, Vault could become
// overloaded before the algorithm has a chance to adapt. Setting the value
// to the minimum is a safety measure which could result in early request
// rejection; however, the adaptive nature of the algorithm will prevent
// this from being a prolonged state as the allowed concurrency will
// increase during normal operation.
InitialLimit int
}

// NewRequestLimiter is a basic constructor for the RequestLimiter wrapper. It
// is responsible for setting up the Gradient2 Limit and instantiating a new
// wrapped DefaultLimiter.
func NewRequestLimiter(logger hclog.Logger, flags LimiterFlags) (*RequestLimiter, error) {
logger.Info("setting up new request limiter",
"initialLimit", flags.InitialLimit,
"maxLimit", flags.MaxLimit,
"minLimit", flags.MinLimit,
)

// NewGradient2Limit is the algorithm which drives request limiting
// decisions. It gathers latency measurements and calculates an Exponential
// Moving Average to determine whether latency deviation warrants a change
// in the current concurrency limit.
lim, err := limit.NewGradient2Limit(flags.Name,
flags.InitialLimit,
flags.MaxLimit,
flags.MinLimit,
concurrencyChanger,
DefaultSmoothing,
DefaultLongWindow,
DefaultDebugLogger,
DefaultMetricsRegistry,
)
if err != nil {
return nil, fmt.Errorf("failed to create gradient2 limit: %w", err)
}

strategy := strategy.NewSimpleStrategy(flags.InitialLimit)
defLimiter, err := limiter.NewDefaultLimiter(lim, 1e9, 1e9, 10, 100, strategy, nil, DefaultMetricsRegistry)
if err != nil {
return &RequestLimiter{}, err
}

return &RequestLimiter{defLimiter}, nil
}
51 changes: 51 additions & 0 deletions limits/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package limits

import (
"sync/atomic"

"github.com/armon/go-metrics"
"github.com/platinummonkey/go-concurrency-limits/limiter"
)

// RequestListener is a thin wrapper for limiter.DefaultLimiter to handle the
// case where request limiting is turned off.
type RequestListener struct {
*limiter.DefaultListener
released *atomic.Bool
}

// OnSuccess is called as a notification that the operation succeeded and
// internally measured latency should be used as an RTT sample.
func (l *RequestListener) OnSuccess() {
if l.DefaultListener != nil {
metrics.IncrCounter(([]string{"limits", "concurrency", "success"}), 1)
l.DefaultListener.OnSuccess()
l.released.Store(true)
}
}

// OnDropped is called to indicate the request failed and was dropped due to an
// internal server error. Note that this does not include ErrCapacity.
func (l *RequestListener) OnDropped() {
if l.DefaultListener != nil {
metrics.IncrCounter(([]string{"limits", "concurrency", "dropped"}), 1)
l.DefaultListener.OnDropped()
l.released.Store(true)
}
}

// OnIgnore is called to indicate the operation failed before any meaningful RTT
// measurement could be made and should be ignored to not introduce an
// artificially low RTT. It also provides an extra layer of protection against
// leaks of the underlying StrategyToken during recoverable panics in the
// request handler. We treat these as Ignored, discard the measurement, and mark
// the listener as released.
func (l *RequestListener) OnIgnore() {
if l.DefaultListener != nil && l.released.Load() != true {
metrics.IncrCounter(([]string{"limits", "concurrency", "ignored"}), 1)
l.DefaultListener.OnIgnore()
l.released.Store(true)
}
}
Loading
Loading