diff --git a/pkg/ccl/multitenantccl/tenantcostclient/limiter.go b/pkg/ccl/multitenantccl/tenantcostclient/limiter.go index 516895a0fe3b..e75bcdcb011a 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/limiter.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/limiter.go @@ -93,19 +93,16 @@ func (l *limiter) Wait(ctx context.Context, needed tenantcostmodel.RU) error { return l.qp.Acquire(ctx, r) } -// AdjustTokens adds or removes tokens from the bucket. Tokens are added when we -// receive more tokens from the host cluster. Tokens are removed when -// consumption has occurred without Wait(): accounting for CPU usage and the -// number of read bytes. -func (l *limiter) AdjustTokens(now time.Time, delta tenantcostmodel.RU) { - if delta == 0 { - return - } +// RemoveTokens removes tokens from the bucket. +// +// Tokens are removed when consumption has occurred without Wait(), like +// accounting for CPU usage or for the number of read bytes. +func (l *limiter) RemoveTokens(now time.Time, delta tenantcostmodel.RU) { l.qp.Update(func(res quotapool.Resource) (shouldNotify bool) { - l.tb.AdjustTokens(now, delta) - // We notify the head of the queue if we added RUs, in which case that - // request might be allowed to go through earlier. - return delta > 0 + l.tb.RemoveTokens(now, delta) + // Don't notify the head of the queue; this change can only delay the time + // it can go through. + return false }) } diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go index f600dbcedd4e..d8f797b2a427 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go @@ -311,9 +311,7 @@ func (c *tenantSideCostController) updateRunState(ctx context.Context) { c.run.externalUsage = newExternalUsage c.run.consumption = newConsumption - // TODO(radu): figure out how to "smooth out" this debt over a longer period - // (so we don't have periodic stalls). - c.limiter.AdjustTokens(newTime, -tenantcostmodel.RU(ru)) + c.limiter.RemoveTokens(newTime, tenantcostmodel.RU(ru)) } // updateAvgRUPerSec is called exactly once per mainLoopUpdateInterval. @@ -426,7 +424,7 @@ func (c *tenantSideCostController) handleTokenBucketResponse( c.run.initialRequestCompleted = true // This is the first successful request. Take back the initial RUs that we // used to pre-fill the bucket. - c.limiter.AdjustTokens(c.run.now, -initialRUs) + c.limiter.RemoveTokens(c.run.now, initialRUs) } granted := resp.GrantedRU @@ -469,7 +467,7 @@ func (c *tenantSideCostController) handleTokenBucketResponse( if resp.TrickleDuration == 0 { // We received a batch of tokens to use as needed. Set up the token bucket // to notify us when the tokens are running low. - cfg.TokenAdjustment = tenantcostmodel.RU(granted) + cfg.NewTokens = tenantcostmodel.RU(granted) // TODO(radu): if we don't get more tokens in time, fall back to a "fallback" // rate. cfg.NewRate = 0 @@ -603,7 +601,7 @@ func (c *tenantSideCostController) OnResponse( return } if resp.ReadBytes() > 0 { - c.limiter.AdjustTokens(c.timeSource.Now(), -c.costCfg.ResponseCost(resp)) + c.limiter.RemoveTokens(c.timeSource.Now(), c.costCfg.ResponseCost(resp)) } c.mu.Lock() diff --git a/pkg/ccl/multitenantccl/tenantcostclient/testdata/debt b/pkg/ccl/multitenantccl/tenantcostclient/testdata/debt new file mode 100644 index 000000000000..4703894e5402 --- /dev/null +++ b/pkg/ccl/multitenantccl/tenantcostclient/testdata/debt @@ -0,0 +1,55 @@ +wait-for-event +token-bucket-response +---- + +# Set up throttling at 1000 RU/s. +configure +throttle: 1000 +---- + +# Fire off a write that needs more than the 10000 initial RUs. +write bytes=6000000 label=w1 +---- + +wait-for-event +token-bucket-response +---- + +timers +---- +00:00:01.600 +00:00:09.000 + +advance +2s +---- +00:00:02.000 + +wait-for-event +tick +---- + +await label=w1 +---- + +# Do a read which incurs a debt of about 1000 RUs upon completion. +read bytes=10000000 +---- + +# Verify that a small write doesn't have to wait a second for the entire debt +# to be paid. +write bytes=1024 label=w2 +---- + +timers +---- +00:00:02.011 +00:00:09.000 + +advance +100ms +---- +00:00:02.100 + +await label=w2 +---- diff --git a/pkg/ccl/multitenantccl/tenantcostclient/token_bucket.go b/pkg/ccl/multitenantccl/tenantcostclient/token_bucket.go index 98c9b46538a8..e2d766790d97 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/token_bucket.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/token_bucket.go @@ -16,8 +16,62 @@ import ( // tokenBucket implements a token bucket. It is a more specialized form of // quotapool.TokenBucket. The main differences are: -// - it does not currently support a burst limit -// - it implements a "low tokens" notification mechanism. +// - it does not currently support a burst limit; +// - it implements a "low tokens" notification mechanism; +// - it has special debt handling. +// +// -- Notification mechanism -- +// +// Notifications can be configured to fire when the number of available RUs dips +// below a certain value (or when a request blocks). Notifications are delivered +// via a non-blocking send to a given channel. +// +// -- Debt handling -- +// +// The token bucket is designed to handle arbitrary removal of tokens to account +// for usage that cannot be throttled (e.g. read/transferred bytes, CPU usage). +// This can bring the token bucket into debt. +// +// The simplest handling of debt is to always pay the debt first, blocking all +// operations that require tokens in the mean time. However, this is undesirable +// because the accounting can happen at arbitrary intervals which can lead to +// the workload periodically experiencing starvation (e.g. CPU usage might be +// accounted for only once per second, which can lead to all requests being +// blocked for the beginning part of each second). +// +// Instead, we aim to pay all outstanding debt D within time T from the time the +// last debt was incurred. We do this by splitting the refill rate into D/T and +// using only what's left for replenishing tokens. +// +// This rate is recalculated every time we incur debt. So in essence, every time +// we incur debt, we put it together with all existing debt and plan to pay it +// within time T (we can think of this as "refinancing" the existing debt). +// +// This behavior is somewhat arbitrary because the rate at which we pay debt +// depends on how frequently we incur additional debt. To see how much it can +// vary, imagine that at time t=0 we incur some debt D(0) and consider the two +// extremes: +// +// A. We start with debt D(0), and we never recalculate the rate (no +// "refinancing"). We pay debt at constant rate D(0) / T and all debt is +// paid at time T. +// +// B. We start with debt D(0), and we recalculate the rate ("refinance") +// continuously (or, more intuitively, every nanosecond). The +// instantaneous rate is: +// D'(t) = - D(t) / T +// The debt formula is: +// D(t) = D(0) * e^(-t/T) +// We pay 63% of the debt in time T; 86% in 2T; and 95% in 3T. +// +// The difference between these two extremes is reasonable - we pay between 63% +// and 100% of the debt in time T, depending on the usage pattern. +// +// Design note: ideally we would always simulate B. However, under this model it +// is hard to compute the time until a request can go through (i.e. time until +// we accumulate a certain amount of tokens): it involves calculating the +// intersection of a line with an exponential, which cannot be solved +// algebraically (it requires slower numerical methods). type tokenBucket struct { // -- Static fields -- @@ -34,12 +88,21 @@ type tokenBucket struct { // Refill rate, in RU/s. rate tenantcostmodel.RU - // Currently available RUs. Can be negative (indicating debt). + // Currently available RUs. Can not be negative. available tenantcostmodel.RU + // Debt incurred that we will try to pay over time. See debtHalfLife. + debt tenantcostmodel.RU + + // Rate at which we pay off debt; cannot exceed the refill rate. + debtRate tenantcostmodel.RU + lastUpdated time.Time } +// We try to repay debt over the next 2 seconds. +const debtRepaymentSecs = 2 + func (tb *tokenBucket) Init( now time.Time, notifyCh chan struct{}, rate, available tenantcostmodel.RU, ) { @@ -54,10 +117,28 @@ func (tb *tokenBucket) Init( // update accounts for the passing of time. func (tb *tokenBucket) update(now time.Time) { - if since := now.Sub(tb.lastUpdated); since > 0 { - tb.available += tb.rate * tenantcostmodel.RU(since.Seconds()) - tb.lastUpdated = now + since := now.Sub(tb.lastUpdated) + if since <= 0 { + return } + tb.lastUpdated = now + sinceSeconds := since.Seconds() + refilled := tb.rate * tenantcostmodel.RU(sinceSeconds) + + if tb.debt == 0 { + // Fast path: no debt. + tb.available += refilled + return + } + + debtPaid := tb.debtRate * tenantcostmodel.RU(sinceSeconds) + if tb.debt >= debtPaid { + tb.debt -= debtPaid + } else { + debtPaid = tb.debt + tb.debt = 0 + } + tb.available += refilled - debtPaid } // notify tries to send a non-blocking notification on notifyCh and disables @@ -78,17 +159,31 @@ func (tb *tokenBucket) maybeNotify(now time.Time) { } } -// AdjustTokens changes the amount of tokens currently available, either -// increasing or decreasing them. The amount can become negative (indicating -// debt). -func (tb *tokenBucket) AdjustTokens(now time.Time, delta tenantcostmodel.RU) { +func (tb *tokenBucket) calculateDebtRate() { + tb.debtRate = tb.debt / debtRepaymentSecs + if tb.debtRate > tb.rate { + tb.debtRate = tb.rate + } +} + +// RemoveTokens decreases the amount of tokens currently available. +// +// If there are not enough tokens, this causes the token bucket to go into debt. +// Debt will be attempted to be repaid over the next few seconds. +func (tb *tokenBucket) RemoveTokens(now time.Time, amount tenantcostmodel.RU) { tb.update(now) - tb.available += delta + if tb.available >= amount { + tb.available -= amount + } else { + tb.debt += amount - tb.available + tb.available = 0 + tb.calculateDebtRate() + } tb.maybeNotify(now) } type tokenBucketReconfigureArgs struct { - TokenAdjustment tenantcostmodel.RU + NewTokens tenantcostmodel.RU NewRate tenantcostmodel.RU @@ -106,9 +201,21 @@ func (tb *tokenBucket) Reconfigure(now time.Time, args tokenBucketReconfigureArg case <-tb.notifyCh: default: } - tb.available += args.TokenAdjustment tb.rate = args.NewRate tb.notifyThreshold = args.NotifyThreshold + if args.NewTokens > 0 { + if tb.debt > 0 { + if tb.debt >= args.NewTokens { + tb.debt -= args.NewTokens + } else { + tb.available += args.NewTokens - tb.debt + tb.debt = 0 + } + tb.calculateDebtRate() + } else { + tb.available += args.NewTokens + } + } tb.maybeNotify(now) } @@ -118,6 +225,8 @@ func (tb *tokenBucket) SetupNotification(now time.Time, threshold tenantcostmode tb.notifyThreshold = threshold } +const maxTryAgainAfterSeconds = 1000 + // TryToFulfill either removes the given amount if is available, or returns a // time after which the request should be retried. func (tb *tokenBucket) TryToFulfill( @@ -137,12 +246,46 @@ func (tb *tokenBucket) TryToFulfill( tb.notify() } - // Compute the time it will take to get to the needed capacity. - timeSeconds := float64((amount - tb.available) / tb.rate) + needed := amount - tb.available + + // Compute the time it will take to refill to the needed amount. + var timeSeconds float64 + + if tb.debt == 0 { + timeSeconds = float64(needed / tb.rate) + } else { + remainingRate := tb.rate - tb.debtRate + // There are two cases: + // + // 1. We accumulate enough tokens from the remainingRate before paying off + // the entire debt. + // + // 2. We pay off all debt before accumulating enough tokens from the + // remainingRate. + // + // The time to accumulate the needed tokens while paying debt is: + // needed / remainingRate + // The time to pay off the debt is: + // debt / debtRate + // + // We are in case 1 if + // needed / remainingRate <= debt / debtRate + // or equivalently: + // needed * debtRate <= debt * remainingRate + if needed*tb.debtRate <= tb.debt*remainingRate { + // Case 1. + timeSeconds = float64(needed / remainingRate) + } else { + // Case 2. + debtPaySeconds := tb.debt / tb.debtRate + timeSeconds = float64(debtPaySeconds + (needed-debtPaySeconds*remainingRate)/tb.rate) + } + } + // Cap the number of seconds to avoid overflow; we want to tolerate even a // rate of 0 (in which case we are really waiting for a token adjustment). - if timeSeconds > 1000 { - timeSeconds = 1000 + if timeSeconds > maxTryAgainAfterSeconds { + return false, maxTryAgainAfterSeconds * time.Second } timeDelta := time.Duration(timeSeconds * float64(time.Second)) @@ -156,5 +299,5 @@ func (tb *tokenBucket) TryToFulfill( // negative if we accumulated debt. func (tb *tokenBucket) AvailableTokens(now time.Time) tenantcostmodel.RU { tb.update(now) - return tb.available + return tb.available - tb.debt } diff --git a/pkg/ccl/multitenantccl/tenantcostclient/token_bucket_test.go b/pkg/ccl/multitenantccl/tenantcostclient/token_bucket_test.go index 313d01a7dab4..da18ff8f7009 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/token_bucket_test.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/token_bucket_test.go @@ -9,15 +9,26 @@ package tenantcostclient import ( + "flag" + "fmt" + "io" "math" + "math/rand" + "os" "testing" "time" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) +var saveDebtCSV = flag.String( + "save-debt-csv", "", + "save latency data from TestTokenBucketDebt to a csv file", +) + func TestTokenBucket(t *testing.T) { defer leaktest.AfterTest(t)() @@ -43,8 +54,8 @@ func TestTokenBucket(t *testing.T) { ts.Advance(1 * time.Second) check(110) - // Check AdjustTokens. - tb.AdjustTokens(ts.Now(), -200) + // Check RemoveTokens. + tb.RemoveTokens(ts.Now(), 200) check(-90) ts.Advance(1 * time.Second) @@ -111,7 +122,7 @@ func TestTokenBucket(t *testing.T) { // Verify that we get notified when we block, even if the current amount is // above the threshold. args = tokenBucketReconfigureArgs{ - TokenAdjustment: 10, + NewTokens: 10, NewRate: 10, NotifyThreshold: 5, } @@ -123,8 +134,8 @@ func TestTokenBucket(t *testing.T) { checkNotification() args = tokenBucketReconfigureArgs{ - TokenAdjustment: 80, - NewRate: 1, + NewTokens: 80, + NewRate: 1, } tb.Reconfigure(ts.Now(), args) check(91) @@ -136,3 +147,143 @@ func TestTokenBucket(t *testing.T) { fulfill(60) checkNotification() } + +// TestTokenBucketTryToFulfill verifies that the tryAgainAfter time returned by +// TryToFulfill is consistent if recalculated after some time has passed. +func TestTokenBucketTryToFulfill(t *testing.T) { + defer leaktest.AfterTest(t)() + + r, _ := randutil.NewPseudoRand() + randRU := func(min, max float64) tenantcostmodel.RU { + return tenantcostmodel.RU(min + r.Float64()*(max-min)) + } + randDuration := func(max time.Duration) time.Duration { + return time.Duration(r.Intn(int(max + 1))) + } + + start := timeutil.Now() + const runs = 50000 + for run := 0; run < runs; run++ { + var tb tokenBucket + clock := timeutil.NewManualTime(start) + tb.Init(clock.Now(), nil, randRU(1, 100), randRU(0, 500)) + + // Advance a random amount of time. + clock.Advance(randDuration(100 * time.Millisecond)) + + if rand.Intn(5) > 0 { + // Add some debt and advance more. + tb.RemoveTokens(clock.Now(), randRU(1, 500)) + clock.Advance(randDuration(100 * time.Millisecond)) + } + + // Fulfill requests until we can't anymore. + var ru tenantcostmodel.RU + var tryAgainAfter time.Duration + for { + ru = randRU(0, 100) + var ok bool + ok, tryAgainAfter = tb.TryToFulfill(clock.Now(), ru) + if !ok { + break + } + } + if tryAgainAfter == maxTryAgainAfterSeconds*time.Second { + // TryToFullfill has a cap; we cannot crosscheck the value if that cap is + // hit. + continue + } + state := tb + // Now check that if we advance the time a bit, the tryAgainAfter time + // agrees with the previous one. + advance := randDuration(tryAgainAfter) + clock.Advance(advance) + ok, newTryAgainAfter := tb.TryToFulfill(clock.Now(), ru) + if ok { + newTryAgainAfter = 0 + } + // Check that the two calls agree on when the request can go through. + diff := advance + newTryAgainAfter - tryAgainAfter + const tolerance = 10 * time.Nanosecond + if diff < -tolerance || diff > tolerance { + t.Fatalf( + "inconsistent tryAgainAfter\nstate: %+v\nru: %f\ntryAgainAfter: %s\ntryAgainAfter after %s: %s", + state, ru, tryAgainAfter, advance, newTryAgainAfter, + ) + } + } +} + +// TestTokenBucketDebt simulates a closed-loop workload with parallelism 1 in +// combination with incurring a fixed amount of debt every second. It verifies +// that queue times are never too high, and optionally emits all queue time +// information into a csv file. +func TestTokenBucketDebt(t *testing.T) { + defer leaktest.AfterTest(t)() + + start := timeutil.Now() + ts := timeutil.NewManualTime(start) + + var tb tokenBucket + tb.Init(ts.Now(), nil, 100 /* rate */, 0 /* available */) + + const tickDuration = time.Millisecond + const debtPeriod = time.Second + const debtTicks = int(debtPeriod / tickDuration) + const totalTicks = 10 * debtTicks + const debt tenantcostmodel.RU = 50 + + const reqRUMean = 1 + const reqRUStdDev = reqRUMean / 2 + + // Use a fixed seed so the result is reproducible. + r := rand.New(rand.NewSource(1234)) + randRu := func() tenantcostmodel.RU { + ru := r.NormFloat64()*reqRUStdDev + reqRUMean + if ru < 0 { + return 0 + } + return tenantcostmodel.RU(ru) + } + + ru := randRu() + reqTick := 0 + + out := io.Discard + if *saveDebtCSV != "" { + file, err := os.Create(*saveDebtCSV) + if err != nil { + t.Fatalf("error creating csv file: %v", err) + } + defer func() { + if err := file.Close(); err != nil { + t.Errorf("error closing csv file: %v", err) + } + }() + out = file + } + + fmt.Fprintf(out, "time (s),queue latency (ms),debt applied (RU)\n") + for tick := 0; tick < totalTicks; tick++ { + now := ts.Now() + var d tenantcostmodel.RU + if tick > 0 && tick%debtTicks == 0 { + d = debt + tb.RemoveTokens(now, debt) + } + if ok, _ := tb.TryToFulfill(now, ru); ok { + latency := tick - reqTick + if latency > 100 { + // A single request took longer than 100ms; we did a poor job smoothing + // out the debt. + t.Fatalf("high latency for request: %d ms", latency) + } + fmt.Fprintf(out, "%f,%d,%f\n", (time.Duration(tick) * tickDuration).Seconds(), latency, d) + ru = randRu() + reqTick = tick + } else { + fmt.Fprintf(out, "%f,,%f\n", (time.Duration(tick) * tickDuration).Seconds(), d) + } + ts.Advance(tickDuration) + } +}