Skip to content

Commit

Permalink
multitenant: implement a fallback rate
Browse files Browse the repository at this point in the history
This change implements a fallback throttling rate that a SQL pod can
use if it stops being able to complete token bucket
requests.

The goal is keep tenants without burst RUs throttled and tenants with
lots of RUs unthrottled (or throttled at a high rate). To achieve
this, we calculate a rate at which the tenant would burn through all
their available RUs within 1 hour. The premise here is that if we have
some kind of infrastructure problem, 1 hour is a reasonable time frame
to address it. Beyond 1 hour, the tenant will continue at the same
rate, consuming more RUs than they had available.

Informs #68479.

Release note: None

Release justification: Necessary fix for the distributed rate limiting
functionality, which is vital for the upcoming Serverless MVP release.
It allows CRDB to throttle clusters that have run out of free or paid
request units (which measure CPU and I/O usage). This functionality is
only enabled in multi-tenant scenarios and should have no impact on
our dedicated customers.
  • Loading branch information
RaduBerinde committed Sep 14, 2021
1 parent 271630c commit 56f94dd
Show file tree
Hide file tree
Showing 15 changed files with 882 additions and 627 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,6 @@ go_test(
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@in_gopkg_yaml_v2//:yaml_v2",
],
)
47 changes: 32 additions & 15 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,6 @@ type tenantSideCostController struct {
// time.
requestNeedsRetry bool

// notificationReceivedDuringRequest is set if we received a "low RU"
// notification while a request was in progress.
notificationReceivedDuringRequest bool

lastRequestTime time.Time
lastReportedConsumption roachpb.TenantConsumption

Expand All @@ -197,6 +193,13 @@ type tenantSideCostController struct {
setupNotificationCh <-chan time.Time
setupNotificationThreshold tenantcostmodel.RU

// fallbackRate is the refill rate we fall back to if the token bucket
// requests don't complete or take a long time.
fallbackRate float64
// fallbackRateStart is the time when we can switch to the fallback rate;
// set only when we get a low RU notification.
fallbackRateStart time.Time

// avgRUPerSec is an exponentially-weighted moving average of the RU
// consumption per second; used to estimate the RU requirements for the next
// request.
Expand Down Expand Up @@ -332,7 +335,7 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) {

ctx, _ = c.stopper.WithCancelOnQuiesce(ctx)
err := c.stopper.RunAsyncTask(ctx, "token-bucket-request", func(ctx context.Context) {
if log.V(1) {
if log.ExpensiveLogEnabled(ctx, 1) {
log.Infof(ctx, "issuing TokenBucket: %s\n", req.String())
}
resp, err := c.provider.TokenBucket(ctx, &req)
Expand Down Expand Up @@ -360,9 +363,13 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) {
func (c *tenantSideCostController) handleTokenBucketResponse(
ctx context.Context, resp *roachpb.TokenBucketResponse,
) {
if log.V(1) {
log.Infof(ctx, "TokenBucket response: %g RUs over %s", resp.GrantedRU, resp.TrickleDuration)
if log.ExpensiveLogEnabled(ctx, 1) {
log.Infof(
ctx, "TokenBucket response: %g RUs over %s (fallback rate %g)",
resp.GrantedRU, resp.TrickleDuration, resp.FallbackRate,
)
}
c.run.fallbackRate = resp.FallbackRate

if !c.run.initialRequestCompleted {
c.run.initialRequestCompleted = true
Expand All @@ -373,19 +380,20 @@ func (c *tenantSideCostController) handleTokenBucketResponse(

granted := resp.GrantedRU
if granted == 0 {
// We must have not requested any more RUs; nothing to do.
// We must have not requested any more RUs. The token bucket state doesn't
// need updating.
//
// It is possible that we got a low RU notification while the request was in
// flight. If that is the case, we must send another request.
if c.run.notificationReceivedDuringRequest {
c.run.notificationReceivedDuringRequest = false
// flight. If that is the case, fallbackRateStart will be set and we send
// another request.
if !c.run.fallbackRateStart.IsZero() {
c.sendTokenBucketRequest(ctx)
}
return
}
// It doesn't matter if we received a notification; we are going to
// reconfigure the bucket and set up a new notification as needed.
c.run.notificationReceivedDuringRequest = false
c.run.fallbackRateStart = time.Time{}

if !c.run.lastDeadline.IsZero() {
// If last request came with a trickle duration, we may have RUs that were
Expand All @@ -411,7 +419,7 @@ func (c *tenantSideCostController) handleTokenBucketResponse(
// We received a batch of tokens to use as needed. Set up the token bucket
// to notify us when the tokens are running low.
cfg.TokenAdjustment = tenantcostmodel.RU(granted)
// TODO(radu): if we don't get more tokens in time, fall back to a "backup"
// TODO(radu): if we don't get more tokens in time, fall back to a "fallback"
// rate.
cfg.NewRate = 0
cfg.NotifyThreshold = notifyThreshold
Expand Down Expand Up @@ -463,6 +471,15 @@ func (c *tenantSideCostController) mainLoop(ctx context.Context) {
case <-tickerCh:
c.updateRunState(ctx)
c.updateAvgRUPerSec()

// Switch to the fallback rate, if necessary.
if !c.run.fallbackRateStart.IsZero() && !c.run.now.Before(c.run.fallbackRateStart) {
log.Infof(ctx, "switching to fallback rate %.10g", c.run.fallbackRate)
c.limiter.Reconfigure(c.run.now, tokenBucketReconfigureArgs{
NewRate: tenantcostmodel.RU(c.run.fallbackRate),
})
c.run.fallbackRateStart = time.Time{}
}
if c.run.requestNeedsRetry || c.shouldReportConsumption() {
c.run.requestNeedsRetry = false
c.sendTokenBucketRequest(ctx)
Expand Down Expand Up @@ -493,10 +510,10 @@ func (c *tenantSideCostController) mainLoop(ctx context.Context) {

case <-c.lowRUNotifyChan:
c.updateRunState(ctx)
c.run.fallbackRateStart = c.run.now.Add(anticipation)
// If we have a request in flight, the token bucket will get reconfigured.
if !c.run.requestInProgress {
c.sendTokenBucketRequest(ctx)
} else {
c.run.notificationReceivedDuringRequest = true
}
if c.testInstr != nil {
c.testInstr.Event(c.run.now, LowRUNotification)
Expand Down
48 changes: 32 additions & 16 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"gopkg.in/yaml.v2"
)

// TestDataDriven tests the tenant-side cost controller in an isolated setting.
Expand Down Expand Up @@ -191,7 +192,7 @@ var testStateCommands = map[string]func(
"timers": (*testState).timers,
"cpu": (*testState).cpu,
"usage": (*testState).usage,
"throttle": (*testState).throttle,
"configure": (*testState).configure,
}

func (ts *testState) fireRequest(
Expand Down Expand Up @@ -381,18 +382,13 @@ func timesToStrings(times []time.Time) []string {
return strs
}

func (ts *testState) throttle(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
var rate float64
if d.Input != "disable" {
var err error
rate, err = strconv.ParseFloat(d.Input, 64)
if err != nil {
d.Fatalf(t, "expected float rate or 'disable'")
}
// configure the test provider.
func (ts *testState) configure(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
var cfg testProviderConfig
if err := yaml.UnmarshalStrict([]byte(d.Input), &cfg); err != nil {
d.Fatalf(t, "failed to parse request yaml: %v", err)
}
ts.provider.mu.Lock()
defer ts.provider.mu.Unlock()
ts.provider.mu.throttlingRate = rate
ts.provider.configure(cfg)
return ""
}

Expand Down Expand Up @@ -445,13 +441,22 @@ type testProvider struct {
syncutil.Mutex
consumption roachpb.TenantConsumption

// If zero, the provider always grants RUs immediately. If non-zero, the
// provider grants RUs at this rate.
throttlingRate float64
cfg testProviderConfig
}
recvOnRequest chan struct{}
}

type testProviderConfig struct {
// If zero, the provider always grants RUs immediately. If non-zero, the
// provider grants RUs at this rate.
Throttle float64 `yaml:"throttle"`

// If set, the provider always errors out.
Error bool `yaml:"error"`

FallbackRate float64 `yaml:"fallback_rate"`
}

var _ kvtenant.TokenBucketProvider = (*testProvider)(nil)

func newTestProvider() *testProvider {
Expand All @@ -460,6 +465,12 @@ func newTestProvider() *testProvider {
}
}

func (tp *testProvider) configure(cfg testProviderConfig) {
tp.mu.Lock()
defer tp.mu.Unlock()
tp.mu.cfg = cfg
}

// waitForRequest waits until the next TokenBucket request.
func (tp *testProvider) waitForRequest(t *testing.T) {
t.Helper()
Expand Down Expand Up @@ -495,17 +506,22 @@ func (tp *testProvider) TokenBucket(
case <-tp.recvOnRequest:
default:
}

if tp.mu.cfg.Error {
return nil, errors.New("injected error")
}
tp.mu.consumption.Add(&in.ConsumptionSinceLastRequest)
res := &roachpb.TokenBucketResponse{}

res.GrantedRU = in.RequestedRU
if rate := tp.mu.throttlingRate; rate > 0 {
if rate := tp.mu.cfg.Throttle; rate > 0 {
res.TrickleDuration = time.Duration(in.RequestedRU / rate * float64(time.Second))
if res.TrickleDuration > in.TargetRequestPeriod {
res.GrantedRU *= in.TargetRequestPeriod.Seconds() / res.TrickleDuration.Seconds()
res.TrickleDuration = in.TargetRequestPeriod
}
}
res.FallbackRate = tp.mu.cfg.FallbackRate

return res, nil
}
Expand Down
55 changes: 55 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/testdata/fallback
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Tests in this file verify the enacting of the fallback rate when token bucket
# requests are erroring out.

configure
fallback_rate: 1000
----

# Issue a large request, forcing fetching of more RUs.
write bytes=50000000
----

configure
error: true
----

advance
1s
----
00:00:01.000

# Issue a large request that requires more RUs.
write bytes=100000000 label=w1
----

# Wait until we process the "low RU" notification (where we set the fallback rate
# start time).
wait-for-event
low-ru
----

advance
5s
----
00:00:06.000

# The fallback rate should kick in when we process a tick at the current time.
wait-for-event
tick
----

advance
10s
----
00:00:16.000

not-completed label=w1
----

advance
30s
----
00:00:46.000

await label=w1
----
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Tests in this file verify the enacting of the fallback rate when token bucket
# requests start erroring out while we are being throttled.

wait-for-event
token-bucket-response
----

configure
throttle: 100
fallback_rate: 1000
----

# Large request that requires more than the initial RUs.
write bytes=100000000 label=w2
----

wait-for-event
low-ru
----

wait-for-event
token-bucket-response
----

# We were granted 10s worth of tokens at 100 RU/s.
timers
----
00:00:09.000
00:04:41.479


configure
error: true
----

advance
10s
----
00:00:10.000

# Wait until we process the "low RU" notification (where we set the fallback rate
# start time).
wait-for-event
low-ru
----

advance
2s
----
00:00:12.000

# We would set up the fallback rate now, which should allow the request to go
# through much sooner than 4 minutes.
wait-for-event
tick
----

not-completed label=w2
----

advance
30s
----
00:00:42.000

await label=w2
----
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ wait-for-event
token-bucket-response
----

throttle
1000
configure
throttle: 1000
----

# Fire off a write that needs significantly more than the 10000 initial RUs.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/multitenantccl/tenantcostclient/testdata/throttling
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ write bytes=1024
----

# Set up throttling at 1000 RU/s.
throttle
1000
configure
throttle: 1000
----

# Fire off some writes that need significantly more than the 10000 initial RUs.
Expand Down
10 changes: 7 additions & 3 deletions pkg/ccl/multitenantccl/tenantcostserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,14 @@ func (ts *testState) tokenBucketRequest(t *testing.T, d *datadriven.TestData) st
}
return ""
}
if res.TrickleDuration == 0 {
return fmt.Sprintf("%.10g RUs granted immediately.\n", res.GrantedRU)
trickleStr := "immediately"
if res.TrickleDuration != 0 {
trickleStr = fmt.Sprintf("over %s", res.TrickleDuration)
}
return fmt.Sprintf("%.10g RUs granted over %s.\n", res.GrantedRU, res.TrickleDuration)
return fmt.Sprintf(
"%.10g RUs granted %s. Fallback rate: %.10g RU/s\n",
res.GrantedRU, trickleStr, res.FallbackRate,
)
}

// metrics outputs all metrics that match the regex in the input.
Expand Down
Loading

0 comments on commit 56f94dd

Please sign in to comment.