Skip to content

Commit

Permalink
Merge pull request #70727 from RaduBerinde/backport21.2-70163-70520
Browse files Browse the repository at this point in the history
release-21.2: multitenant: implement fallback rate and pass next live instance ID
  • Loading branch information
RaduBerinde authored Sep 28, 2021
2 parents 2fef2fb + 8d86233 commit 218fbf5
Show file tree
Hide file tree
Showing 36 changed files with 1,846 additions and 715 deletions.
5 changes: 5 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostclient",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/kv/kvclient/kvtenant",
"//pkg/multitenant",
"//pkg/multitenant/tenantcostmodel",
"//pkg/roachpb:with-mocks",
"//pkg/server",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/util/log",
"//pkg/util/quotapool",
"//pkg/util/stop",
Expand Down Expand Up @@ -50,16 +52,19 @@ go_test(
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/sqlliveness",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@in_gopkg_yaml_v2//:yaml_v2",
],
)
9 changes: 8 additions & 1 deletion pkg/ccl/multitenantccl/tenantcostclient/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// are used to adjust/reconfigure/replenish the local token bucket.
type limiter struct {
timeSource timeutil.TimeSource
testInstr TestInstrumentation
tb tokenBucket
qp *quotapool.AbstractPool

Expand All @@ -43,9 +44,12 @@ type limiter struct {
const initialRUs = 10000
const initialRate = 100

func (l *limiter) Init(timeSource timeutil.TimeSource, notifyChan chan struct{}) {
func (l *limiter) Init(
timeSource timeutil.TimeSource, testInstr TestInstrumentation, notifyChan chan struct{},
) {
*l = limiter{
timeSource: timeSource,
testInstr: testInstr,
}

l.tb.Init(timeSource.Now(), notifyChan, initialRate, initialRUs)
Expand All @@ -58,6 +62,9 @@ func (l *limiter) Init(timeSource timeutil.TimeSource, notifyChan chan struct{})
if !req.waitingRUAccounted {
req.waitingRUAccounted = true
atomic.AddInt64(&l.waitingRU, req.neededCeil())
if l.testInstr != nil {
l.testInstr.Event(l.timeSource.Now(), WaitingRUAccountedInCallback)
}
}
}
// We use OnWaitStartLocked because otherwise we have a race between the token
Expand Down
116 changes: 84 additions & 32 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -85,6 +87,10 @@ const consumptionReportingThreshold = 100
// The extended reporting period is this factor times the normal period.
const extendedReportingPeriodFactor = 4

// We try to maintain this many RUs in our local bucket, regardless of estimated
// usage. This is intended to support usage spikes without blocking.
const bufferRUs = 5000

func newTenantSideCostController(
st *cluster.Settings,
tenantID roachpb.TenantID,
Expand All @@ -104,7 +110,7 @@ func newTenantSideCostController(
responseChan: make(chan *roachpb.TokenBucketResponse, 1),
lowRUNotifyChan: make(chan struct{}, 1),
}
c.limiter.Init(c.timeSource, c.lowRUNotifyChan)
c.limiter.Init(timeSource, testInstr, c.lowRUNotifyChan)

// TODO(radu): these settings can currently be changed by the tenant (see
// #47918), which would made it very easy to evade cost control. For now, use
Expand Down Expand Up @@ -142,15 +148,18 @@ func init() {
}

type tenantSideCostController struct {
timeSource timeutil.TimeSource
testInstr TestInstrumentation
settings *cluster.Settings
costCfg tenantcostmodel.Config
tenantID roachpb.TenantID
provider kvtenant.TokenBucketProvider
limiter limiter
stopper *stop.Stopper
cpuSecsFn multitenant.CPUSecsFn
timeSource timeutil.TimeSource
testInstr TestInstrumentation
settings *cluster.Settings
costCfg tenantcostmodel.Config
tenantID roachpb.TenantID
provider kvtenant.TokenBucketProvider
limiter limiter
stopper *stop.Stopper
instanceID base.SQLInstanceID
sessionID sqlliveness.SessionID
cpuSecsFn multitenant.CPUSecsFn
nextLiveInstanceIDFn multitenant.NextLiveInstanceIDFn

mu struct {
syncutil.Mutex
Expand All @@ -171,8 +180,12 @@ type tenantSideCostController struct {
now time.Time
cpuSecs float64
consumption roachpb.TenantConsumption
// requestSeqNum is an increasing sequence number that is included in token
// bucket requests.
requestSeqNum int64

// TargetPeriodSetting value at the last update.
// targetPeriod stores the value of the TargetPeriodSetting setting at the
// last update.
targetPeriod time.Duration

// initialRequestCompleted is set to true when the first token bucket
Expand All @@ -191,10 +204,6 @@ type tenantSideCostController struct {
// time.
requestNeedsRetry bool

// notificationReceivedDuringRequest is set if we received a "low RU"
// notification while a request was in progress.
notificationReceivedDuringRequest bool

lastRequestTime time.Time
lastReportedConsumption roachpb.TenantConsumption

Expand All @@ -208,6 +217,13 @@ type tenantSideCostController struct {
setupNotificationCh <-chan time.Time
setupNotificationThreshold tenantcostmodel.RU

// fallbackRate is the refill rate we fall back to if the token bucket
// requests don't complete or take a long time.
fallbackRate float64
// fallbackRateStart is the time when we can switch to the fallback rate;
// set only when we get a low RU notification.
fallbackRateStart time.Time

// avgRUPerSec is an exponentially-weighted moving average of the RU
// consumption per second; used to estimate the RU requirements for the next
// request.
Expand All @@ -221,10 +237,24 @@ var _ multitenant.TenantSideCostController = (*tenantSideCostController)(nil)

// Start is part of multitenant.TenantSideCostController.
func (c *tenantSideCostController) Start(
ctx context.Context, stopper *stop.Stopper, cpuSecsFn multitenant.CPUSecsFn,
ctx context.Context,
stopper *stop.Stopper,
instanceID base.SQLInstanceID,
sessionID sqlliveness.SessionID,
cpuSecsFn multitenant.CPUSecsFn,
nextLiveInstanceIDFn multitenant.NextLiveInstanceIDFn,
) error {
if instanceID == 0 {
return errors.New("invalid SQLInstanceID")
}
if sessionID == "" {
return errors.New("invalid sqlliveness.SessionID")
}
c.stopper = stopper
c.instanceID = instanceID
c.sessionID = sessionID
c.cpuSecsFn = cpuSecsFn
c.nextLiveInstanceIDFn = nextLiveInstanceIDFn
return stopper.RunAsyncTask(ctx, "cost-controller", func(ctx context.Context) {
c.mainLoop(ctx)
})
Expand All @@ -238,6 +268,7 @@ func (c *tenantSideCostController) initRunState(ctx context.Context) {
c.run.cpuSecs = c.cpuSecsFn(ctx)
c.run.lastRequestTime = now
c.run.avgRUPerSec = initialRUs / c.run.targetPeriod.Seconds()
c.run.requestSeqNum = 1
}

// updateRunState is called whenever the main loop awakens and accounts for the
Expand Down Expand Up @@ -313,8 +344,9 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) {
if !c.run.initialRequestCompleted {
requested = initialRUs
} else {
// Request what we expect to need over the next target period.
requested = c.run.avgRUPerSec * c.run.targetPeriod.Seconds()
// Request what we expect to need over the next target period plus the
// buffer amount.
requested = c.run.avgRUPerSec*c.run.targetPeriod.Seconds() + bufferRUs

// Adjust by the currently available amount. If we are in debt, we request
// more to cover the debt.
Expand All @@ -327,13 +359,16 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) {
}

req := roachpb.TokenBucketRequest{
TenantID: c.tenantID.ToUint64(),
// TODO(radu): populate instance ID.
InstanceID: 1,
TenantID: c.tenantID.ToUint64(),
InstanceID: uint32(c.instanceID),
InstanceLease: c.sessionID.UnsafeBytes(),
NextLiveInstanceID: uint32(c.nextLiveInstanceIDFn(ctx)),
SeqNum: c.run.requestSeqNum,
ConsumptionSinceLastRequest: deltaConsumption,
RequestedRU: requested,
TargetRequestPeriod: c.run.targetPeriod,
}
c.run.requestSeqNum++

c.run.lastRequestTime = c.run.now
// TODO(radu): in case of an error, we undercount some consumption.
Expand All @@ -342,7 +377,7 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) {

ctx, _ = c.stopper.WithCancelOnQuiesce(ctx)
err := c.stopper.RunAsyncTask(ctx, "token-bucket-request", func(ctx context.Context) {
if log.V(1) {
if log.ExpensiveLogEnabled(ctx, 1) {
log.Infof(ctx, "issuing TokenBucket: %s\n", req.String())
}
resp, err := c.provider.TokenBucket(ctx, &req)
Expand Down Expand Up @@ -370,9 +405,13 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) {
func (c *tenantSideCostController) handleTokenBucketResponse(
ctx context.Context, resp *roachpb.TokenBucketResponse,
) {
if log.V(1) {
log.Infof(ctx, "TokenBucket response: %g RUs over %s", resp.GrantedRU, resp.TrickleDuration)
if log.ExpensiveLogEnabled(ctx, 1) {
log.Infof(
ctx, "TokenBucket response: %g RUs over %s (fallback rate %g)",
resp.GrantedRU, resp.TrickleDuration, resp.FallbackRate,
)
}
c.run.fallbackRate = resp.FallbackRate

if !c.run.initialRequestCompleted {
c.run.initialRequestCompleted = true
Expand All @@ -383,19 +422,20 @@ func (c *tenantSideCostController) handleTokenBucketResponse(

granted := resp.GrantedRU
if granted == 0 {
// We must have not requested any more RUs; nothing to do.
// We must have not requested any more RUs. The token bucket state doesn't
// need updating.
//
// It is possible that we got a low RU notification while the request was in
// flight. If that is the case, we must send another request.
if c.run.notificationReceivedDuringRequest {
c.run.notificationReceivedDuringRequest = false
// flight. If that is the case, fallbackRateStart will be set and we send
// another request.
if !c.run.fallbackRateStart.IsZero() {
c.sendTokenBucketRequest(ctx)
}
return
}
// It doesn't matter if we received a notification; we are going to
// reconfigure the bucket and set up a new notification as needed.
c.run.notificationReceivedDuringRequest = false
c.run.fallbackRateStart = time.Time{}

if !c.run.lastDeadline.IsZero() {
// If last request came with a trickle duration, we may have RUs that were
Expand All @@ -413,12 +453,15 @@ func (c *tenantSideCostController) handleTokenBucketResponse(
}

notifyThreshold := tenantcostmodel.RU(granted * notifyFraction)
if notifyThreshold < bufferRUs {
notifyThreshold = bufferRUs
}
var cfg tokenBucketReconfigureArgs
if resp.TrickleDuration == 0 {
// We received a batch of tokens to use as needed. Set up the token bucket
// to notify us when the tokens are running low.
cfg.TokenAdjustment = tenantcostmodel.RU(granted)
// TODO(radu): if we don't get more tokens in time, fall back to a "backup"
// TODO(radu): if we don't get more tokens in time, fall back to a "fallback"
// rate.
cfg.NewRate = 0
cfg.NotifyThreshold = notifyThreshold
Expand Down Expand Up @@ -470,6 +513,15 @@ func (c *tenantSideCostController) mainLoop(ctx context.Context) {
case <-tickerCh:
c.updateRunState(ctx)
c.updateAvgRUPerSec()

// Switch to the fallback rate, if necessary.
if !c.run.fallbackRateStart.IsZero() && !c.run.now.Before(c.run.fallbackRateStart) {
log.Infof(ctx, "switching to fallback rate %.10g", c.run.fallbackRate)
c.limiter.Reconfigure(c.run.now, tokenBucketReconfigureArgs{
NewRate: tenantcostmodel.RU(c.run.fallbackRate),
})
c.run.fallbackRateStart = time.Time{}
}
if c.run.requestNeedsRetry || c.shouldReportConsumption() {
c.run.requestNeedsRetry = false
c.sendTokenBucketRequest(ctx)
Expand Down Expand Up @@ -500,10 +552,10 @@ func (c *tenantSideCostController) mainLoop(ctx context.Context) {

case <-c.lowRUNotifyChan:
c.updateRunState(ctx)
c.run.fallbackRateStart = c.run.now.Add(anticipation)
// If we have a request in flight, the token bucket will get reconfigured.
if !c.run.requestInProgress {
c.sendTokenBucketRequest(ctx)
} else {
c.run.notificationReceivedDuringRequest = true
}
if c.testInstr != nil {
c.testInstr.Event(c.run.now, LowRUNotification)
Expand Down
Loading

0 comments on commit 218fbf5

Please sign in to comment.