diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go index d9626eb155d2..11df0f4e0a68 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go @@ -148,17 +148,18 @@ func init() { } type tenantSideCostController struct { - timeSource timeutil.TimeSource - testInstr TestInstrumentation - settings *cluster.Settings - costCfg tenantcostmodel.Config - tenantID roachpb.TenantID - provider kvtenant.TokenBucketProvider - limiter limiter - stopper *stop.Stopper - instanceID base.SQLInstanceID - sessionID sqlliveness.SessionID - cpuSecsFn multitenant.CPUSecsFn + timeSource timeutil.TimeSource + testInstr TestInstrumentation + settings *cluster.Settings + costCfg tenantcostmodel.Config + tenantID roachpb.TenantID + provider kvtenant.TokenBucketProvider + limiter limiter + stopper *stop.Stopper + instanceID base.SQLInstanceID + sessionID sqlliveness.SessionID + cpuSecsFn multitenant.CPUSecsFn + nextLiveInstanceIDFn multitenant.NextLiveInstanceIDFn mu struct { syncutil.Mutex @@ -241,6 +242,7 @@ func (c *tenantSideCostController) Start( instanceID base.SQLInstanceID, sessionID sqlliveness.SessionID, cpuSecsFn multitenant.CPUSecsFn, + nextLiveInstanceIDFn multitenant.NextLiveInstanceIDFn, ) error { if instanceID == 0 { return errors.New("invalid SQLInstanceID") @@ -252,6 +254,7 @@ func (c *tenantSideCostController) Start( c.instanceID = instanceID c.sessionID = sessionID c.cpuSecsFn = cpuSecsFn + c.nextLiveInstanceIDFn = nextLiveInstanceIDFn return stopper.RunAsyncTask(ctx, "cost-controller", func(ctx context.Context) { c.mainLoop(ctx) }) @@ -358,7 +361,8 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) { req := roachpb.TokenBucketRequest{ TenantID: c.tenantID.ToUint64(), InstanceID: uint32(c.instanceID), - InstanceLease: []byte(c.sessionID), + InstanceLease: c.sessionID.UnsafeBytes(), + NextLiveInstanceID: uint32(c.nextLiveInstanceIDFn(ctx)), SeqNum: c.run.requestSeqNum, ConsumptionSinceLastRequest: deltaConsumption, RequestedRU: requested, diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go index edc8efab38cc..0fe83cbf8483 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go @@ -143,9 +143,14 @@ func (ts *testState) start(t *testing.T) { usage := time.Duration(atomic.LoadInt64((*int64)(&ts.cpuUsage))) return usage.Seconds() } + nextLiveInstanceIDFn := func(ctx context.Context) base.SQLInstanceID { + return 0 + } instanceID := base.SQLInstanceID(1) sessionID := sqlliveness.SessionID("foo") - if err := ts.controller.Start(ctx, ts.stopper, instanceID, sessionID, cpuUsageFn); err != nil { + if err := ts.controller.Start( + ctx, ts.stopper, instanceID, sessionID, cpuUsageFn, nextLiveInstanceIDFn, + ); err != nil { t.Fatal(err) } } diff --git a/pkg/multitenant/cost_controller.go b/pkg/multitenant/cost_controller.go index 4d4739e26801..7378ae6aacd6 100644 --- a/pkg/multitenant/cost_controller.go +++ b/pkg/multitenant/cost_controller.go @@ -29,6 +29,7 @@ type TenantSideCostController interface { instanceID base.SQLInstanceID, sessionID sqlliveness.SessionID, cpuSecsFn CPUSecsFn, + nextLiveInstanceIDFn NextLiveInstanceIDFn, ) error TenantSideKVInterceptor @@ -38,6 +39,15 @@ type TenantSideCostController interface { // the SQL instance. type CPUSecsFn func(ctx context.Context) float64 +// NextLiveInstanceIDFn is a function used to get the next live instance ID +// for this tenant. The information is used as a cleanup trigger on the server +// side and can be stale without causing correctness issues. +// +// Can return 0 if the value is not available right now. +// +// The function must not block. +type NextLiveInstanceIDFn func(ctx context.Context) base.SQLInstanceID + // TenantSideKVInterceptor intercepts KV requests and responses, accounting // for resource usage and potentially throttling requests. // diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index e367152b9559..ec4ef5c6efba 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -41,14 +41,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/contention" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" ) // StartTenant starts a stand-alone SQL server against a KV backend. @@ -268,8 +271,16 @@ func StartTenant( log.SetNodeIDs(clusterID, 0 /* nodeID is not known for a SQL-only server. */) log.SetTenantIDs(args.TenantID.String(), int32(s.SQLInstanceID())) + nextLiveInstanceIDFn := makeNextLiveInstanceIDFn( + ctx, + args.stopper, + s.sqlInstanceProvider, + s.SQLInstanceID(), + ) + if err := args.costController.Start( - ctx, args.stopper, s.SQLInstanceID(), s.sqlLivenessSessionID, status.GetUserCPUSeconds, + ctx, args.stopper, s.SQLInstanceID(), s.sqlLivenessSessionID, + status.GetUserCPUSeconds, nextLiveInstanceIDFn, ); err != nil { return nil, "", "", err } @@ -473,6 +484,83 @@ func makeTenantSQLServerArgs( }, nil } +func makeNextLiveInstanceIDFn( + serverCtx context.Context, + stopper *stop.Stopper, + sqlInstanceProvider sqlinstance.Provider, + instanceID base.SQLInstanceID, +) multitenant.NextLiveInstanceIDFn { + retrieveNextLiveInstanceID := func(ctx context.Context) base.SQLInstanceID { + instances, err := sqlInstanceProvider.GetAllInstances(ctx) + if err != nil { + log.Infof(ctx, "GetAllInstances failed: %v", err) + // We will try again. + return 0 + } + if len(instances) == 0 { + return 0 + } + // Find the next ID in circular order. + var minID, nextID base.SQLInstanceID + for i := range instances { + id := instances[i].InstanceID + if minID == 0 || minID > id { + minID = id + } + if id > instanceID && (nextID == 0 || nextID > id) { + nextID = id + } + } + if nextID == 0 { + return minID + } + return nextID + } + + // We retrieve the value from the provider every minute. + // + // We report each retrieved value only once; for all other calls we return 0. + // We prefer to not provide a value rather than providing a stale value which + // might cause a bit of unnecessary work on the server side. + // + // TODO(radu): once the provider caches the information (see #69976), we can + // use it directly each time. + const interval = 1 * time.Minute + var mu syncutil.Mutex + var lastRefresh time.Time + var lastValue base.SQLInstanceID + var refreshInProgress bool + + serverCtx = logtags.AddTag(serverCtx, "get-next-live-instance-id", nil) + + return func(ctx context.Context) base.SQLInstanceID { + mu.Lock() + defer mu.Unlock() + if lastValue != 0 { + v := lastValue + lastValue = 0 + return v + } + + if now := timeutil.Now(); lastRefresh.Before(now.Add(-interval)) && !refreshInProgress { + lastRefresh = now + refreshInProgress = true + + // An error here indicates that the server is shutting down, so we can + // ignore it. + _ = stopper.RunAsyncTask(serverCtx, "get-next-live-instance-id", func(ctx context.Context) { + newValue := retrieveNextLiveInstanceID(ctx) + + mu.Lock() + defer mu.Unlock() + lastValue = newValue + refreshInProgress = false + }) + } + return 0 + } +} + // NewTenantSideCostController is a hook for CCL code which implements the // controller. var NewTenantSideCostController = func( @@ -499,6 +587,7 @@ func (noopTenantSideCostController) Start( instanceID base.SQLInstanceID, sessionID sqlliveness.SessionID, cpuSecsFn multitenant.CPUSecsFn, + nextLiveInstanceIDFn multitenant.NextLiveInstanceIDFn, ) error { return nil }