Skip to content

Commit

Permalink
Merge #70163
Browse files Browse the repository at this point in the history
70163: multitenant: implement a fallback rate r=RaduBerinde a=RaduBerinde

#### tenantcostclient: maintain a "buffer" of RUs

This change adjusts the tenant cost controller logic to try to
maintain a "buffer" of 5000 RUs. This is useful to prevent waiting for
more RUs if an otherwise lightly loaded pod suddenly gets a spike of
traffic.

Release note: None

#### multitenant: implement a "backup" rate

This change implements a "backup" (fallback) throttling rate that a
SQL pod can use if it stops being able to complete token bucket
requests.

The goal is keep tenants without burst RUs throttled and tenants with
lots of RUs unthrottled (or throttled at a high rate). To achieve
this, we calculate a rate at which the tenant would burn through all
their available RUs within 1 hour. The premise here is that if we have
some kind of infrastructure problem, 1 hour is a reasonable time frame
to address it. Beyond 1 hour, the tenant will continue at the same
rate, consuming more RUs than they had available.

Informs #68479.

Release note: None

Release justification: Necessary fix for the distributed rate limiting
functionality, which is vital for the upcoming Serverless MVP release.
It allows CRDB to throttle clusters that have run out of free or paid
request units (which measure CPU and I/O usage). This functionality is
only enabled in multi-tenant scenarios and should have no impact on
our dedicated customers.

Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Sep 14, 2021
2 parents 1f98510 + 56f94dd commit a59ea3e
Show file tree
Hide file tree
Showing 17 changed files with 926 additions and 635 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@in_gopkg_yaml_v2//:yaml_v2",
],
)
9 changes: 8 additions & 1 deletion pkg/ccl/multitenantccl/tenantcostclient/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// are used to adjust/reconfigure/replenish the local token bucket.
type limiter struct {
timeSource timeutil.TimeSource
testInstr TestInstrumentation
tb tokenBucket
qp *quotapool.AbstractPool

Expand All @@ -43,9 +44,12 @@ type limiter struct {
const initialRUs = 10000
const initialRate = 100

func (l *limiter) Init(timeSource timeutil.TimeSource, notifyChan chan struct{}) {
func (l *limiter) Init(
timeSource timeutil.TimeSource, testInstr TestInstrumentation, notifyChan chan struct{},
) {
*l = limiter{
timeSource: timeSource,
testInstr: testInstr,
}

l.tb.Init(timeSource.Now(), notifyChan, initialRate, initialRUs)
Expand All @@ -58,6 +62,9 @@ func (l *limiter) Init(timeSource timeutil.TimeSource, notifyChan chan struct{})
if !req.waitingRUAccounted {
req.waitingRUAccounted = true
atomic.AddInt64(&l.waitingRU, req.neededCeil())
if l.testInstr != nil {
l.testInstr.Event(l.timeSource.Now(), WaitingRUAccountedInCallback)
}
}
}
// We use OnWaitStartLocked because otherwise we have a race between the token
Expand Down
61 changes: 43 additions & 18 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ const consumptionReportingThreshold = 100
// The extended reporting period is this factor times the normal period.
const extendedReportingPeriodFactor = 4

// We try to maintain this many RUs in our local bucket, regardless of estimated
// usage. This is intended to support usage spikes without blocking.
const bufferRUs = 5000

func newTenantSideCostController(
st *cluster.Settings,
tenantID roachpb.TenantID,
Expand All @@ -104,7 +108,7 @@ func newTenantSideCostController(
responseChan: make(chan *roachpb.TokenBucketResponse, 1),
lowRUNotifyChan: make(chan struct{}, 1),
}
c.limiter.Init(c.timeSource, c.lowRUNotifyChan)
c.limiter.Init(timeSource, testInstr, c.lowRUNotifyChan)

// TODO(radu): these settings can currently be changed by the tenant (see
// #47918), which would made it very easy to evade cost control. For now, use
Expand Down Expand Up @@ -191,10 +195,6 @@ type tenantSideCostController struct {
// time.
requestNeedsRetry bool

// notificationReceivedDuringRequest is set if we received a "low RU"
// notification while a request was in progress.
notificationReceivedDuringRequest bool

lastRequestTime time.Time
lastReportedConsumption roachpb.TenantConsumption

Expand All @@ -208,6 +208,13 @@ type tenantSideCostController struct {
setupNotificationCh <-chan time.Time
setupNotificationThreshold tenantcostmodel.RU

// fallbackRate is the refill rate we fall back to if the token bucket
// requests don't complete or take a long time.
fallbackRate float64
// fallbackRateStart is the time when we can switch to the fallback rate;
// set only when we get a low RU notification.
fallbackRateStart time.Time

// avgRUPerSec is an exponentially-weighted moving average of the RU
// consumption per second; used to estimate the RU requirements for the next
// request.
Expand Down Expand Up @@ -313,8 +320,9 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) {
if !c.run.initialRequestCompleted {
requested = initialRUs
} else {
// Request what we expect to need over the next target period.
requested = c.run.avgRUPerSec * c.run.targetPeriod.Seconds()
// Request what we expect to need over the next target period plus the
// buffer amount.
requested = c.run.avgRUPerSec*c.run.targetPeriod.Seconds() + bufferRUs

// Adjust by the currently available amount. If we are in debt, we request
// more to cover the debt.
Expand Down Expand Up @@ -342,7 +350,7 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) {

ctx, _ = c.stopper.WithCancelOnQuiesce(ctx)
err := c.stopper.RunAsyncTask(ctx, "token-bucket-request", func(ctx context.Context) {
if log.V(1) {
if log.ExpensiveLogEnabled(ctx, 1) {
log.Infof(ctx, "issuing TokenBucket: %s\n", req.String())
}
resp, err := c.provider.TokenBucket(ctx, &req)
Expand Down Expand Up @@ -370,9 +378,13 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) {
func (c *tenantSideCostController) handleTokenBucketResponse(
ctx context.Context, resp *roachpb.TokenBucketResponse,
) {
if log.V(1) {
log.Infof(ctx, "TokenBucket response: %g RUs over %s", resp.GrantedRU, resp.TrickleDuration)
if log.ExpensiveLogEnabled(ctx, 1) {
log.Infof(
ctx, "TokenBucket response: %g RUs over %s (fallback rate %g)",
resp.GrantedRU, resp.TrickleDuration, resp.FallbackRate,
)
}
c.run.fallbackRate = resp.FallbackRate

if !c.run.initialRequestCompleted {
c.run.initialRequestCompleted = true
Expand All @@ -383,19 +395,20 @@ func (c *tenantSideCostController) handleTokenBucketResponse(

granted := resp.GrantedRU
if granted == 0 {
// We must have not requested any more RUs; nothing to do.
// We must have not requested any more RUs. The token bucket state doesn't
// need updating.
//
// It is possible that we got a low RU notification while the request was in
// flight. If that is the case, we must send another request.
if c.run.notificationReceivedDuringRequest {
c.run.notificationReceivedDuringRequest = false
// flight. If that is the case, fallbackRateStart will be set and we send
// another request.
if !c.run.fallbackRateStart.IsZero() {
c.sendTokenBucketRequest(ctx)
}
return
}
// It doesn't matter if we received a notification; we are going to
// reconfigure the bucket and set up a new notification as needed.
c.run.notificationReceivedDuringRequest = false
c.run.fallbackRateStart = time.Time{}

if !c.run.lastDeadline.IsZero() {
// If last request came with a trickle duration, we may have RUs that were
Expand All @@ -413,12 +426,15 @@ func (c *tenantSideCostController) handleTokenBucketResponse(
}

notifyThreshold := tenantcostmodel.RU(granted * notifyFraction)
if notifyThreshold < bufferRUs {
notifyThreshold = bufferRUs
}
var cfg tokenBucketReconfigureArgs
if resp.TrickleDuration == 0 {
// We received a batch of tokens to use as needed. Set up the token bucket
// to notify us when the tokens are running low.
cfg.TokenAdjustment = tenantcostmodel.RU(granted)
// TODO(radu): if we don't get more tokens in time, fall back to a "backup"
// TODO(radu): if we don't get more tokens in time, fall back to a "fallback"
// rate.
cfg.NewRate = 0
cfg.NotifyThreshold = notifyThreshold
Expand Down Expand Up @@ -470,6 +486,15 @@ func (c *tenantSideCostController) mainLoop(ctx context.Context) {
case <-tickerCh:
c.updateRunState(ctx)
c.updateAvgRUPerSec()

// Switch to the fallback rate, if necessary.
if !c.run.fallbackRateStart.IsZero() && !c.run.now.Before(c.run.fallbackRateStart) {
log.Infof(ctx, "switching to fallback rate %.10g", c.run.fallbackRate)
c.limiter.Reconfigure(c.run.now, tokenBucketReconfigureArgs{
NewRate: tenantcostmodel.RU(c.run.fallbackRate),
})
c.run.fallbackRateStart = time.Time{}
}
if c.run.requestNeedsRetry || c.shouldReportConsumption() {
c.run.requestNeedsRetry = false
c.sendTokenBucketRequest(ctx)
Expand Down Expand Up @@ -500,10 +525,10 @@ func (c *tenantSideCostController) mainLoop(ctx context.Context) {

case <-c.lowRUNotifyChan:
c.updateRunState(ctx)
c.run.fallbackRateStart = c.run.now.Add(anticipation)
// If we have a request in flight, the token bucket will get reconfigured.
if !c.run.requestInProgress {
c.sendTokenBucketRequest(ctx)
} else {
c.run.notificationReceivedDuringRequest = true
}
if c.testInstr != nil {
c.testInstr.Event(c.run.now, LowRUNotification)
Expand Down
66 changes: 46 additions & 20 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"gopkg.in/yaml.v2"
)

// TestDataDriven tests the tenant-side cost controller in an isolated setting.
Expand Down Expand Up @@ -81,6 +83,13 @@ type event struct {

var _ tenantcostclient.TestInstrumentation = (*testState)(nil)

var eventTypeStr = map[tenantcostclient.TestEventType]string{
tenantcostclient.TickProcessed: "tick",
tenantcostclient.LowRUNotification: "low-ru",
tenantcostclient.TokenBucketResponseProcessed: "token-bucket-response",
tenantcostclient.WaitingRUAccountedInCallback: "waiting-ru-accounted",
}

// Event is part of tenantcostclient.TestInstrumentation.
func (ts *testState) Event(now time.Time, typ tenantcostclient.TestEventType) {
ev := event{
Expand All @@ -89,6 +98,9 @@ func (ts *testState) Event(now time.Time, typ tenantcostclient.TestEventType) {
}
select {
case ts.eventsCh <- ev:
if testing.Verbose() {
log.Infof(context.Background(), "event %s at %s\n", eventTypeStr[typ], now.Format(timeFormat))
}
default:
panic("events channel full")
}
Expand Down Expand Up @@ -180,7 +192,7 @@ var testStateCommands = map[string]func(
"timers": (*testState).timers,
"cpu": (*testState).cpu,
"usage": (*testState).usage,
"throttle": (*testState).throttle,
"configure": (*testState).configure,
}

func (ts *testState) fireRequest(
Expand Down Expand Up @@ -310,10 +322,9 @@ func (ts *testState) advance(t *testing.T, d *datadriven.TestData, args cmdArgs)
// waitForEvent waits until the tenant controller reports the given event type,
// at the current time.
func (ts *testState) waitForEvent(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
typs := map[string]tenantcostclient.TestEventType{
"tick": tenantcostclient.TickProcessed,
"low-ru": tenantcostclient.LowRUNotification,
"token-bucket-response": tenantcostclient.TokenBucketResponseProcessed,
typs := make(map[string]tenantcostclient.TestEventType)
for ev, evStr := range eventTypeStr {
typs[evStr] = ev
}
typ, ok := typs[d.Input]
if !ok {
Expand Down Expand Up @@ -371,18 +382,13 @@ func timesToStrings(times []time.Time) []string {
return strs
}

func (ts *testState) throttle(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
var rate float64
if d.Input != "disable" {
var err error
rate, err = strconv.ParseFloat(d.Input, 64)
if err != nil {
d.Fatalf(t, "expected float rate or 'disable'")
}
// configure the test provider.
func (ts *testState) configure(t *testing.T, d *datadriven.TestData, args cmdArgs) string {
var cfg testProviderConfig
if err := yaml.UnmarshalStrict([]byte(d.Input), &cfg); err != nil {
d.Fatalf(t, "failed to parse request yaml: %v", err)
}
ts.provider.mu.Lock()
defer ts.provider.mu.Unlock()
ts.provider.mu.throttlingRate = rate
ts.provider.configure(cfg)
return ""
}

Expand Down Expand Up @@ -435,13 +441,22 @@ type testProvider struct {
syncutil.Mutex
consumption roachpb.TenantConsumption

// If zero, the provider always grants RUs immediately. If non-zero, the
// provider grants RUs at this rate.
throttlingRate float64
cfg testProviderConfig
}
recvOnRequest chan struct{}
}

type testProviderConfig struct {
// If zero, the provider always grants RUs immediately. If non-zero, the
// provider grants RUs at this rate.
Throttle float64 `yaml:"throttle"`

// If set, the provider always errors out.
Error bool `yaml:"error"`

FallbackRate float64 `yaml:"fallback_rate"`
}

var _ kvtenant.TokenBucketProvider = (*testProvider)(nil)

func newTestProvider() *testProvider {
Expand All @@ -450,6 +465,12 @@ func newTestProvider() *testProvider {
}
}

func (tp *testProvider) configure(cfg testProviderConfig) {
tp.mu.Lock()
defer tp.mu.Unlock()
tp.mu.cfg = cfg
}

// waitForRequest waits until the next TokenBucket request.
func (tp *testProvider) waitForRequest(t *testing.T) {
t.Helper()
Expand Down Expand Up @@ -485,17 +506,22 @@ func (tp *testProvider) TokenBucket(
case <-tp.recvOnRequest:
default:
}

if tp.mu.cfg.Error {
return nil, errors.New("injected error")
}
tp.mu.consumption.Add(&in.ConsumptionSinceLastRequest)
res := &roachpb.TokenBucketResponse{}

res.GrantedRU = in.RequestedRU
if rate := tp.mu.throttlingRate; rate > 0 {
if rate := tp.mu.cfg.Throttle; rate > 0 {
res.TrickleDuration = time.Duration(in.RequestedRU / rate * float64(time.Second))
if res.TrickleDuration > in.TargetRequestPeriod {
res.GrantedRU *= in.TargetRequestPeriod.Seconds() / res.TrickleDuration.Seconds()
res.TrickleDuration = in.TargetRequestPeriod
}
}
res.FallbackRate = tp.mu.cfg.FallbackRate

return res, nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ const (
// TokenBucketResponseProcessed indicates that we have processed a
// (successful) request to the global token bucket.
TokenBucketResponseProcessed

// WaitingRUAccountedInCallback indicates that we have accounted for a new
// request's waiting RUs inside an OnWaitStart callback.
WaitingRUAccountedInCallback
)
Loading

0 comments on commit a59ea3e

Please sign in to comment.