Skip to content

Commit

Permalink
Merge #71149
Browse files Browse the repository at this point in the history
71149: tenantcostclient: smooth over debt payback r=RaduBerinde a=RaduBerinde

Prior to this commit, the tenant's local token bucket stalls all
traffic whenever it incurs debt. This is a problem because some debt
comes in discrete "packets" (CPU usage is accounted for once a
second), which can lead to large occasional latency spikes.

This change improves the debt repayment mechanism: instead of paying
the debt as soon as possible, we aim to pay the debt within 2 seconds
of when the debt is incurred and earmark part of the rate for debt
repayment; the remaining rate can be used by requests in the mean
time. The comment in `token_bucket.go` goes over more details.

Release note: None

Release justification: Necessary fix for the distributed rate limiting
functionality, which is vital for the upcoming Serverless MVP release.
It allows CRDB to throttle clusters that have run out of free or paid
request units (which measure CPU and I/O usage). This functionality is
only enabled in multi-tenant scenarios and should have no impact on
our dedicated customers.

### Plots for latency data from the debt test

Before:
![image](https://user-images.githubusercontent.com/16544120/136071271-fee831e8-593a-4d82-b7f7-4ac30c0e3e1c.png)


After:

![image](https://user-images.githubusercontent.com/16544120/136071244-a86afbc5-55f8-4c26-b578-bc3e31538ffd.png)


Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Oct 8, 2021
2 parents c34dac2 + e3417f2 commit 7e4ba61
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 41 deletions.
21 changes: 9 additions & 12 deletions pkg/ccl/multitenantccl/tenantcostclient/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,16 @@ func (l *limiter) Wait(ctx context.Context, needed tenantcostmodel.RU) error {
return l.qp.Acquire(ctx, r)
}

// AdjustTokens adds or removes tokens from the bucket. Tokens are added when we
// receive more tokens from the host cluster. Tokens are removed when
// consumption has occurred without Wait(): accounting for CPU usage and the
// number of read bytes.
func (l *limiter) AdjustTokens(now time.Time, delta tenantcostmodel.RU) {
if delta == 0 {
return
}
// RemoveTokens removes tokens from the bucket.
//
// Tokens are removed when consumption has occurred without Wait(), like
// accounting for CPU usage or for the number of read bytes.
func (l *limiter) RemoveTokens(now time.Time, delta tenantcostmodel.RU) {
l.qp.Update(func(res quotapool.Resource) (shouldNotify bool) {
l.tb.AdjustTokens(now, delta)
// We notify the head of the queue if we added RUs, in which case that
// request might be allowed to go through earlier.
return delta > 0
l.tb.RemoveTokens(now, delta)
// Don't notify the head of the queue; this change can only delay the time
// it can go through.
return false
})
}

Expand Down
10 changes: 4 additions & 6 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,7 @@ func (c *tenantSideCostController) updateRunState(ctx context.Context) {
c.run.externalUsage = newExternalUsage
c.run.consumption = newConsumption

// TODO(radu): figure out how to "smooth out" this debt over a longer period
// (so we don't have periodic stalls).
c.limiter.AdjustTokens(newTime, -tenantcostmodel.RU(ru))
c.limiter.RemoveTokens(newTime, tenantcostmodel.RU(ru))
}

// updateAvgRUPerSec is called exactly once per mainLoopUpdateInterval.
Expand Down Expand Up @@ -426,7 +424,7 @@ func (c *tenantSideCostController) handleTokenBucketResponse(
c.run.initialRequestCompleted = true
// This is the first successful request. Take back the initial RUs that we
// used to pre-fill the bucket.
c.limiter.AdjustTokens(c.run.now, -initialRUs)
c.limiter.RemoveTokens(c.run.now, initialRUs)
}

granted := resp.GrantedRU
Expand Down Expand Up @@ -469,7 +467,7 @@ func (c *tenantSideCostController) handleTokenBucketResponse(
if resp.TrickleDuration == 0 {
// We received a batch of tokens to use as needed. Set up the token bucket
// to notify us when the tokens are running low.
cfg.TokenAdjustment = tenantcostmodel.RU(granted)
cfg.NewTokens = tenantcostmodel.RU(granted)
// TODO(radu): if we don't get more tokens in time, fall back to a "fallback"
// rate.
cfg.NewRate = 0
Expand Down Expand Up @@ -603,7 +601,7 @@ func (c *tenantSideCostController) OnResponse(
return
}
if resp.ReadBytes() > 0 {
c.limiter.AdjustTokens(c.timeSource.Now(), -c.costCfg.ResponseCost(resp))
c.limiter.RemoveTokens(c.timeSource.Now(), c.costCfg.ResponseCost(resp))
}

c.mu.Lock()
Expand Down
55 changes: 55 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/testdata/debt
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
wait-for-event
token-bucket-response
----

# Set up throttling at 1000 RU/s.
configure
throttle: 1000
----

# Fire off a write that needs more than the 10000 initial RUs.
write bytes=6000000 label=w1
----

wait-for-event
token-bucket-response
----

timers
----
00:00:01.600
00:00:09.000

advance
2s
----
00:00:02.000

wait-for-event
tick
----

await label=w1
----

# Do a read which incurs a debt of about 1000 RUs upon completion.
read bytes=10000000
----

# Verify that a small write doesn't have to wait a second for the entire debt
# to be paid.
write bytes=1024 label=w2
----

timers
----
00:00:02.011
00:00:09.000

advance
100ms
----
00:00:02.100

await label=w2
----
179 changes: 161 additions & 18 deletions pkg/ccl/multitenantccl/tenantcostclient/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,62 @@ import (

// tokenBucket implements a token bucket. It is a more specialized form of
// quotapool.TokenBucket. The main differences are:
// - it does not currently support a burst limit
// - it implements a "low tokens" notification mechanism.
// - it does not currently support a burst limit;
// - it implements a "low tokens" notification mechanism;
// - it has special debt handling.
//
// -- Notification mechanism --
//
// Notifications can be configured to fire when the number of available RUs dips
// below a certain value (or when a request blocks). Notifications are delivered
// via a non-blocking send to a given channel.
//
// -- Debt handling --
//
// The token bucket is designed to handle arbitrary removal of tokens to account
// for usage that cannot be throttled (e.g. read/transferred bytes, CPU usage).
// This can bring the token bucket into debt.
//
// The simplest handling of debt is to always pay the debt first, blocking all
// operations that require tokens in the mean time. However, this is undesirable
// because the accounting can happen at arbitrary intervals which can lead to
// the workload periodically experiencing starvation (e.g. CPU usage might be
// accounted for only once per second, which can lead to all requests being
// blocked for the beginning part of each second).
//
// Instead, we aim to pay all outstanding debt D within time T from the time the
// last debt was incurred. We do this by splitting the refill rate into D/T and
// using only what's left for replenishing tokens.
//
// This rate is recalculated every time we incur debt. So in essence, every time
// we incur debt, we put it together with all existing debt and plan to pay it
// within time T (we can think of this as "refinancing" the existing debt).
//
// This behavior is somewhat arbitrary because the rate at which we pay debt
// depends on how frequently we incur additional debt. To see how much it can
// vary, imagine that at time t=0 we incur some debt D(0) and consider the two
// extremes:
//
// A. We start with debt D(0), and we never recalculate the rate (no
// "refinancing"). We pay debt at constant rate D(0) / T and all debt is
// paid at time T.
//
// B. We start with debt D(0), and we recalculate the rate ("refinance")
// continuously (or, more intuitively, every nanosecond). The
// instantaneous rate is:
// D'(t) = - D(t) / T
// The debt formula is:
// D(t) = D(0) * e^(-t/T)
// We pay 63% of the debt in time T; 86% in 2T; and 95% in 3T.
//
// The difference between these two extremes is reasonable - we pay between 63%
// and 100% of the debt in time T, depending on the usage pattern.
//
// Design note: ideally we would always simulate B. However, under this model it
// is hard to compute the time until a request can go through (i.e. time until
// we accumulate a certain amount of tokens): it involves calculating the
// intersection of a line with an exponential, which cannot be solved
// algebraically (it requires slower numerical methods).
type tokenBucket struct {
// -- Static fields --

Expand All @@ -34,12 +88,21 @@ type tokenBucket struct {

// Refill rate, in RU/s.
rate tenantcostmodel.RU
// Currently available RUs. Can be negative (indicating debt).
// Currently available RUs. Can not be negative.
available tenantcostmodel.RU

// Debt incurred that we will try to pay over time. See debtHalfLife.
debt tenantcostmodel.RU

// Rate at which we pay off debt; cannot exceed the refill rate.
debtRate tenantcostmodel.RU

lastUpdated time.Time
}

// We try to repay debt over the next 2 seconds.
const debtRepaymentSecs = 2

func (tb *tokenBucket) Init(
now time.Time, notifyCh chan struct{}, rate, available tenantcostmodel.RU,
) {
Expand All @@ -54,10 +117,28 @@ func (tb *tokenBucket) Init(

// update accounts for the passing of time.
func (tb *tokenBucket) update(now time.Time) {
if since := now.Sub(tb.lastUpdated); since > 0 {
tb.available += tb.rate * tenantcostmodel.RU(since.Seconds())
tb.lastUpdated = now
since := now.Sub(tb.lastUpdated)
if since <= 0 {
return
}
tb.lastUpdated = now
sinceSeconds := since.Seconds()
refilled := tb.rate * tenantcostmodel.RU(sinceSeconds)

if tb.debt == 0 {
// Fast path: no debt.
tb.available += refilled
return
}

debtPaid := tb.debtRate * tenantcostmodel.RU(sinceSeconds)
if tb.debt >= debtPaid {
tb.debt -= debtPaid
} else {
debtPaid = tb.debt
tb.debt = 0
}
tb.available += refilled - debtPaid
}

// notify tries to send a non-blocking notification on notifyCh and disables
Expand All @@ -78,17 +159,31 @@ func (tb *tokenBucket) maybeNotify(now time.Time) {
}
}

// AdjustTokens changes the amount of tokens currently available, either
// increasing or decreasing them. The amount can become negative (indicating
// debt).
func (tb *tokenBucket) AdjustTokens(now time.Time, delta tenantcostmodel.RU) {
func (tb *tokenBucket) calculateDebtRate() {
tb.debtRate = tb.debt / debtRepaymentSecs
if tb.debtRate > tb.rate {
tb.debtRate = tb.rate
}
}

// RemoveTokens decreases the amount of tokens currently available.
//
// If there are not enough tokens, this causes the token bucket to go into debt.
// Debt will be attempted to be repaid over the next few seconds.
func (tb *tokenBucket) RemoveTokens(now time.Time, amount tenantcostmodel.RU) {
tb.update(now)
tb.available += delta
if tb.available >= amount {
tb.available -= amount
} else {
tb.debt += amount - tb.available
tb.available = 0
tb.calculateDebtRate()
}
tb.maybeNotify(now)
}

type tokenBucketReconfigureArgs struct {
TokenAdjustment tenantcostmodel.RU
NewTokens tenantcostmodel.RU

NewRate tenantcostmodel.RU

Expand All @@ -106,9 +201,21 @@ func (tb *tokenBucket) Reconfigure(now time.Time, args tokenBucketReconfigureArg
case <-tb.notifyCh:
default:
}
tb.available += args.TokenAdjustment
tb.rate = args.NewRate
tb.notifyThreshold = args.NotifyThreshold
if args.NewTokens > 0 {
if tb.debt > 0 {
if tb.debt >= args.NewTokens {
tb.debt -= args.NewTokens
} else {
tb.available += args.NewTokens - tb.debt
tb.debt = 0
}
tb.calculateDebtRate()
} else {
tb.available += args.NewTokens
}
}
tb.maybeNotify(now)
}

Expand All @@ -118,6 +225,8 @@ func (tb *tokenBucket) SetupNotification(now time.Time, threshold tenantcostmode
tb.notifyThreshold = threshold
}

const maxTryAgainAfterSeconds = 1000

// TryToFulfill either removes the given amount if is available, or returns a
// time after which the request should be retried.
func (tb *tokenBucket) TryToFulfill(
Expand All @@ -137,12 +246,46 @@ func (tb *tokenBucket) TryToFulfill(
tb.notify()
}

// Compute the time it will take to get to the needed capacity.
timeSeconds := float64((amount - tb.available) / tb.rate)
needed := amount - tb.available

// Compute the time it will take to refill to the needed amount.
var timeSeconds float64

if tb.debt == 0 {
timeSeconds = float64(needed / tb.rate)
} else {
remainingRate := tb.rate - tb.debtRate
// There are two cases:
//
// 1. We accumulate enough tokens from the remainingRate before paying off
// the entire debt.
//
// 2. We pay off all debt before accumulating enough tokens from the
// remainingRate.
//
// The time to accumulate the needed tokens while paying debt is:
// needed / remainingRate
// The time to pay off the debt is:
// debt / debtRate
//
// We are in case 1 if
// needed / remainingRate <= debt / debtRate
// or equivalently:
// needed * debtRate <= debt * remainingRate
if needed*tb.debtRate <= tb.debt*remainingRate {
// Case 1.
timeSeconds = float64(needed / remainingRate)
} else {
// Case 2.
debtPaySeconds := tb.debt / tb.debtRate
timeSeconds = float64(debtPaySeconds + (needed-debtPaySeconds*remainingRate)/tb.rate)
}
}

// Cap the number of seconds to avoid overflow; we want to tolerate even a
// rate of 0 (in which case we are really waiting for a token adjustment).
if timeSeconds > 1000 {
timeSeconds = 1000
if timeSeconds > maxTryAgainAfterSeconds {
return false, maxTryAgainAfterSeconds * time.Second
}

timeDelta := time.Duration(timeSeconds * float64(time.Second))
Expand All @@ -156,5 +299,5 @@ func (tb *tokenBucket) TryToFulfill(
// negative if we accumulated debt.
func (tb *tokenBucket) AvailableTokens(now time.Time) tenantcostmodel.RU {
tb.update(now)
return tb.available
return tb.available - tb.debt
}
Loading

0 comments on commit 7e4ba61

Please sign in to comment.