Skip to content

Commit

Permalink
tenantcostclient: pass next live instance ID
Browse files Browse the repository at this point in the history
This change adds reporting of the next live instance ID to the tenant
cost controller.

For now we query the sqlinstance.Provider at most once a minute. This
is temporary until the Provider is changed to a range-feed-driven
cache (##69976).

Release note: None

Release justification: Necessary fix for the distributed rate limiting
functionality, which is vital for the upcoming Serverless MVP release.
It allows CRDB to throttle clusters that have run out of free or paid
request units (which measure CPU and I/O usage). This functionality is
only enabled in multi-tenant scenarios and should have no impact on
our dedicated customers.
  • Loading branch information
RaduBerinde committed Sep 24, 2021
1 parent 77af028 commit 8d86233
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 14 deletions.
28 changes: 16 additions & 12 deletions pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,18 @@ func init() {
}

type tenantSideCostController struct {
timeSource timeutil.TimeSource
testInstr TestInstrumentation
settings *cluster.Settings
costCfg tenantcostmodel.Config
tenantID roachpb.TenantID
provider kvtenant.TokenBucketProvider
limiter limiter
stopper *stop.Stopper
instanceID base.SQLInstanceID
sessionID sqlliveness.SessionID
cpuSecsFn multitenant.CPUSecsFn
timeSource timeutil.TimeSource
testInstr TestInstrumentation
settings *cluster.Settings
costCfg tenantcostmodel.Config
tenantID roachpb.TenantID
provider kvtenant.TokenBucketProvider
limiter limiter
stopper *stop.Stopper
instanceID base.SQLInstanceID
sessionID sqlliveness.SessionID
cpuSecsFn multitenant.CPUSecsFn
nextLiveInstanceIDFn multitenant.NextLiveInstanceIDFn

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -241,6 +242,7 @@ func (c *tenantSideCostController) Start(
instanceID base.SQLInstanceID,
sessionID sqlliveness.SessionID,
cpuSecsFn multitenant.CPUSecsFn,
nextLiveInstanceIDFn multitenant.NextLiveInstanceIDFn,
) error {
if instanceID == 0 {
return errors.New("invalid SQLInstanceID")
Expand All @@ -252,6 +254,7 @@ func (c *tenantSideCostController) Start(
c.instanceID = instanceID
c.sessionID = sessionID
c.cpuSecsFn = cpuSecsFn
c.nextLiveInstanceIDFn = nextLiveInstanceIDFn
return stopper.RunAsyncTask(ctx, "cost-controller", func(ctx context.Context) {
c.mainLoop(ctx)
})
Expand Down Expand Up @@ -358,7 +361,8 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) {
req := roachpb.TokenBucketRequest{
TenantID: c.tenantID.ToUint64(),
InstanceID: uint32(c.instanceID),
InstanceLease: []byte(c.sessionID),
InstanceLease: c.sessionID.UnsafeBytes(),
NextLiveInstanceID: uint32(c.nextLiveInstanceIDFn(ctx)),
SeqNum: c.run.requestSeqNum,
ConsumptionSinceLastRequest: deltaConsumption,
RequestedRU: requested,
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,14 @@ func (ts *testState) start(t *testing.T) {
usage := time.Duration(atomic.LoadInt64((*int64)(&ts.cpuUsage)))
return usage.Seconds()
}
nextLiveInstanceIDFn := func(ctx context.Context) base.SQLInstanceID {
return 0
}
instanceID := base.SQLInstanceID(1)
sessionID := sqlliveness.SessionID("foo")
if err := ts.controller.Start(ctx, ts.stopper, instanceID, sessionID, cpuUsageFn); err != nil {
if err := ts.controller.Start(
ctx, ts.stopper, instanceID, sessionID, cpuUsageFn, nextLiveInstanceIDFn,
); err != nil {
t.Fatal(err)
}
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/multitenant/cost_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type TenantSideCostController interface {
instanceID base.SQLInstanceID,
sessionID sqlliveness.SessionID,
cpuSecsFn CPUSecsFn,
nextLiveInstanceIDFn NextLiveInstanceIDFn,
) error

TenantSideKVInterceptor
Expand All @@ -38,6 +39,15 @@ type TenantSideCostController interface {
// the SQL instance.
type CPUSecsFn func(ctx context.Context) float64

// NextLiveInstanceIDFn is a function used to get the next live instance ID
// for this tenant. The information is used as a cleanup trigger on the server
// side and can be stale without causing correctness issues.
//
// Can return 0 if the value is not available right now.
//
// The function must not block.
type NextLiveInstanceIDFn func(ctx context.Context) base.SQLInstanceID

// TenantSideKVInterceptor intercepts KV requests and responses, accounting
// for resource usage and potentially throttling requests.
//
Expand Down
91 changes: 90 additions & 1 deletion pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

// StartTenant starts a stand-alone SQL server against a KV backend.
Expand Down Expand Up @@ -268,8 +271,16 @@ func StartTenant(
log.SetNodeIDs(clusterID, 0 /* nodeID is not known for a SQL-only server. */)
log.SetTenantIDs(args.TenantID.String(), int32(s.SQLInstanceID()))

nextLiveInstanceIDFn := makeNextLiveInstanceIDFn(
ctx,
args.stopper,
s.sqlInstanceProvider,
s.SQLInstanceID(),
)

if err := args.costController.Start(
ctx, args.stopper, s.SQLInstanceID(), s.sqlLivenessSessionID, status.GetUserCPUSeconds,
ctx, args.stopper, s.SQLInstanceID(), s.sqlLivenessSessionID,
status.GetUserCPUSeconds, nextLiveInstanceIDFn,
); err != nil {
return nil, "", "", err
}
Expand Down Expand Up @@ -473,6 +484,83 @@ func makeTenantSQLServerArgs(
}, nil
}

func makeNextLiveInstanceIDFn(
serverCtx context.Context,
stopper *stop.Stopper,
sqlInstanceProvider sqlinstance.Provider,
instanceID base.SQLInstanceID,
) multitenant.NextLiveInstanceIDFn {
retrieveNextLiveInstanceID := func(ctx context.Context) base.SQLInstanceID {
instances, err := sqlInstanceProvider.GetAllInstances(ctx)
if err != nil {
log.Infof(ctx, "GetAllInstances failed: %v", err)
// We will try again.
return 0
}
if len(instances) == 0 {
return 0
}
// Find the next ID in circular order.
var minID, nextID base.SQLInstanceID
for i := range instances {
id := instances[i].InstanceID
if minID == 0 || minID > id {
minID = id
}
if id > instanceID && (nextID == 0 || nextID > id) {
nextID = id
}
}
if nextID == 0 {
return minID
}
return nextID
}

// We retrieve the value from the provider every minute.
//
// We report each retrieved value only once; for all other calls we return 0.
// We prefer to not provide a value rather than providing a stale value which
// might cause a bit of unnecessary work on the server side.
//
// TODO(radu): once the provider caches the information (see #69976), we can
// use it directly each time.
const interval = 1 * time.Minute
var mu syncutil.Mutex
var lastRefresh time.Time
var lastValue base.SQLInstanceID
var refreshInProgress bool

serverCtx = logtags.AddTag(serverCtx, "get-next-live-instance-id", nil)

return func(ctx context.Context) base.SQLInstanceID {
mu.Lock()
defer mu.Unlock()
if lastValue != 0 {
v := lastValue
lastValue = 0
return v
}

if now := timeutil.Now(); lastRefresh.Before(now.Add(-interval)) && !refreshInProgress {
lastRefresh = now
refreshInProgress = true

// An error here indicates that the server is shutting down, so we can
// ignore it.
_ = stopper.RunAsyncTask(serverCtx, "get-next-live-instance-id", func(ctx context.Context) {
newValue := retrieveNextLiveInstanceID(ctx)

mu.Lock()
defer mu.Unlock()
lastValue = newValue
refreshInProgress = false
})
}
return 0
}
}

// NewTenantSideCostController is a hook for CCL code which implements the
// controller.
var NewTenantSideCostController = func(
Expand All @@ -499,6 +587,7 @@ func (noopTenantSideCostController) Start(
instanceID base.SQLInstanceID,
sessionID sqlliveness.SessionID,
cpuSecsFn multitenant.CPUSecsFn,
nextLiveInstanceIDFn multitenant.NextLiveInstanceIDFn,
) error {
return nil
}
Expand Down

0 comments on commit 8d86233

Please sign in to comment.