Skip to content

Commit

Permalink
tenantcostserver: cleanup stale instances
Browse files Browse the repository at this point in the history
This commit implements the server-side logic for cleaning up stale
instances from the tenant_usage table, according to the following
scheme:

 - each tenant sends the ID of the next instance in circular order.
   The live instance set is maintained on the tenant side by a
   separate subsystem.

 - the server uses this information as a "hint" that some instances
   might be stale. When the next ID does not match the expected value,
   a cleanup for a specific instance ID range is triggered. The
   cleanup ultimately checks that the last update is stale, so that
   stale information from the tenant-side doesn't cause incorrect
   removals.

Instances are cleaned up one at a time, with a limit of 10 instance
removals per request. This solution avoids queries that scan ranges of
the table which may contain a lot of tombstones.

Release note: None

Release justification: Necessary fix for the distributed rate limiting
functionality, which is vital for the upcoming Serverless MVP release.
It allows CRDB to throttle clusters that have run out of free or paid
request units (which measure CPU and I/O usage). This functionality is
only enabled in multi-tenant scenarios and should have no impact on
our dedicated customers.
  • Loading branch information
RaduBerinde committed Sep 24, 2021
1 parent d0c73d1 commit 77af028
Show file tree
Hide file tree
Showing 10 changed files with 821 additions and 350 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ go_library(
"//pkg/multitenant",
"//pkg/roachpb:with-mocks",
"//pkg/server",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
Expand Down Expand Up @@ -50,10 +52,12 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/testutils",
"//pkg/testutils/metrictestutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
Expand Down
26 changes: 23 additions & 3 deletions pkg/ccl/multitenantccl/tenantcostserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@
package tenantcostserver

import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -22,15 +26,29 @@ type instance struct {
executor *sql.InternalExecutor
metrics Metrics
timeSource timeutil.TimeSource
settings *cluster.Settings
}

// Note: the "four" in the description comes from
// tenantcostclient.extendedReportingPeriodFactor.
var instanceInactivity = settings.RegisterDurationSetting(
"tenant_usage_instance_inactivity",
"instances that have not reported consumption for longer than this value are cleaned up; "+
"should be at least four times higher than the tenant_cost_control_period of any tenant",
1*time.Minute, settings.PositiveDuration,
)

func newInstance(
db *kv.DB, executor *sql.InternalExecutor, timeSource timeutil.TimeSource,
settings *cluster.Settings,
db *kv.DB,
executor *sql.InternalExecutor,
timeSource timeutil.TimeSource,
) *instance {
res := &instance{
db: db,
executor: executor,
timeSource: timeSource,
settings: settings,
}
res.metrics.init()
return res
Expand All @@ -45,8 +63,10 @@ var _ multitenant.TenantUsageServer = (*instance)(nil)

func init() {
server.NewTenantUsageServer = func(
db *kv.DB, executor *sql.InternalExecutor,
settings *cluster.Settings,
db *kv.DB,
executor *sql.InternalExecutor,
) multitenant.TenantUsageServer {
return newInstance(db, executor, timeutil.DefaultTimeSource{})
return newInstance(settings, db, executor, timeutil.DefaultTimeSource{})
}
}
116 changes: 107 additions & 9 deletions pkg/ccl/multitenantccl/tenantcostserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"regexp"
"strconv"
"testing"
Expand All @@ -23,9 +24,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/metrictestutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -78,7 +81,9 @@ func (ts *testState) start(t *testing.T) {

ts.clock = timeutil.NewManualTime(t0)
ts.tenantUsage = tenantcostserver.NewInstance(
ts.kvDB, ts.s.InternalExecutor().(*sql.InternalExecutor), ts.clock,
ts.s.ClusterSettings(),
ts.kvDB,
ts.s.InternalExecutor().(*sql.InternalExecutor), ts.clock,
)
ts.metricsReg = metric.NewRegistry()
ts.metricsReg.AddMetricStruct(ts.tenantUsage.Metrics())
Expand All @@ -98,6 +103,7 @@ var testStateCommands = map[string]func(*testState, *testing.T, *datadriven.Test
"metrics": (*testState).metrics,
"configure": (*testState).configure,
"inspect": (*testState).inspect,
"wait-inspect": (*testState).waitInspect,
"advance": (*testState).advance,
}

Expand Down Expand Up @@ -128,10 +134,11 @@ func (ts *testState) createTenant(t *testing.T, d *datadriven.TestData) string {
func (ts *testState) tokenBucketRequest(t *testing.T, d *datadriven.TestData) string {
tenantID := ts.tenantID(t, d)
var args struct {
InstanceID uint32 `yaml:"instance_id"`
InstanceLease string `yaml:"instance_lease"`
SeqNum int64 `yaml:"seq_num"`
Consumption struct {
InstanceID uint32 `yaml:"instance_id"`
InstanceLease string `yaml:"instance_lease"`
NextLiveInstanceID uint32 `yaml:"next_live_instance_id"`
SeqNum int64 `yaml:"seq_num"`
Consumption struct {
RU float64 `yaml:"ru"`
ReadReq uint64 `yaml:"read_req"`
ReadBytes uint64 `yaml:"read_bytes"`
Expand All @@ -158,10 +165,11 @@ func (ts *testState) tokenBucketRequest(t *testing.T, d *datadriven.TestData) st
d.Fatalf(t, "failed to parse duration: %v", args.Period)
}
req := roachpb.TokenBucketRequest{
TenantID: tenantID,
InstanceID: args.InstanceID,
InstanceLease: []byte(args.InstanceLease),
SeqNum: args.SeqNum,
TenantID: tenantID,
InstanceID: args.InstanceID,
InstanceLease: []byte(args.InstanceLease),
NextLiveInstanceID: args.NextLiveInstanceID,
SeqNum: args.SeqNum,
ConsumptionSinceLastRequest: roachpb.TenantConsumption{
RU: args.Consumption.RU,
ReadRequests: args.Consumption.ReadReq,
Expand Down Expand Up @@ -251,6 +259,20 @@ func (ts *testState) inspect(t *testing.T, d *datadriven.TestData) string {
return res
}

// inspect-wait is like inspect but waits a little bit and retries until the
// output equals the expected output; used for cases where
func (ts *testState) waitInspect(t *testing.T, d *datadriven.TestData) string {
time.Sleep(1 * time.Millisecond)
testutils.SucceedsSoon(t, func() error {
res := ts.inspect(t, d)
if res == d.Expected {
return nil
}
return errors.Errorf("-- expected:\n%s\n-- got:\n%s", d.Expected, res)
})
return d.Expected
}

// advance advances the clock by the provided duration and returns the new
// current time.
func (ts *testState) advance(t *testing.T, d *datadriven.TestData) string {
Expand All @@ -264,3 +286,79 @@ func (ts *testState) advance(t *testing.T, d *datadriven.TestData) string {
*ts.clock = *timeutil.NewManualTime(ts.clock.Now().Add(dur))
return ts.formatTime(ts.clock.Now())
}

// TestInstanceCleanup is a randomized test that verifies that the server keeps
// up with a changing live set.
func TestInstanceCleanup(t *testing.T) {
defer leaktest.AfterTest(t)()

var ts testState
ts.start(t)
defer ts.stop()

ts.r.Exec(t, fmt.Sprintf("SELECT crdb_internal.create_tenant(%d)", 5))

// Note: this number needs to be at most maxInstancesCleanup.
const maxInstances = 10
var liveset, prev util.FastIntSet

for steps := 0; steps < 100; steps++ {
// Keep the previous set for debugging.
prev = liveset.Copy()
// Make a few random changes to the set.
for n := rand.Intn(4); n >= 0; n-- {
x := 1 + rand.Intn(maxInstances)
if rand.Intn(2) == 0 {
liveset.Add(x)
} else {
liveset.Remove(x)
}
}
// Advance the time so all existing instances look stale.
ts.clock.Advance(5 * time.Minute)
if liveset.Empty() {
// An empty live set can't trigger cleanup.
continue
}
// Send one token bucket update from each instance, in random order.
instances := liveset.Ordered()
for _, i := range rand.Perm(len(instances)) {
req := roachpb.TokenBucketRequest{
TenantID: 5,
InstanceID: uint32(instances[i]),
}
if i+1 < len(instances) {
req.NextLiveInstanceID = uint32(instances[i+1])
} else {
req.NextLiveInstanceID = uint32(instances[0])
}
res := ts.tenantUsage.TokenBucketRequest(
context.Background(), roachpb.MakeTenantID(5), &req,
)
if res.Error != (errors.EncodedError{}) {
t.Fatal(errors.DecodeError(context.Background(), res.Error))
}
}
// Verify that the server reached the correct liveset.
rows := ts.r.Query(t,
"SELECT instance_id FROM system.tenant_usage WHERE tenant_id = 5 AND instance_id > 0",
)
var serverSet util.FastIntSet
for rows.Next() {
var id int
if err := rows.Scan(&id); err != nil {
t.Fatal(err)
}
serverSet.Add(id)
}
if err := rows.Err(); err != nil {
t.Fatal(err)
}
if !liveset.Equals(serverSet) {
t.Fatalf(
"previous live set: %s current live set: %s server live set: %s",
liveset, prev, serverSet,
)
}
}
}
73 changes: 69 additions & 4 deletions pkg/ccl/multitenantccl/tenantcostserver/system_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ const defaultRefillRate = 100
// throttled.
const defaultInitialRUs = 10 * 1000 * 1000

// maxInstancesCleanup restricts the number of stale instances that are removed
// in a single transaction.
const maxInstancesCleanup = 10

// update accounts for the passing of time since LastUpdate.
// If the tenantState is not initialized (Present=false), it is initialized now.
func (ts *tenantState) update(now time.Time) {
Expand Down Expand Up @@ -391,18 +395,79 @@ func (h *sysTableHelper) accomodateNewInstance(tenant *tenantState, instance *in
return err
}

// maybeCleanupStaleInstance checks the last update time of the given instance;
// if it is older than the cutoff time, the instance is removed and the next
// instance ID is returned (this ID is 0 if this is the highest instance ID).
func (h *sysTableHelper) maybeCleanupStaleInstance(
cutoff time.Time, instanceID base.SQLInstanceID,
) (deleted bool, nextInstance base.SQLInstanceID, _ error) {
ts := tree.MustMakeDTimestamp(cutoff, time.Microsecond)
row, err := h.ex.QueryRowEx(
h.ctx, "tenant-usage-delete", h.txn,
sessiondata.NodeUserSessionDataOverride,
`DELETE FROM system.tenant_usage
WHERE tenant_id = $1 AND instance_id = $2 AND last_update < $3
RETURNING next_instance_id`,
h.tenantID.ToUint64(),
int32(instanceID),
ts,
)
if err != nil {
return false, -1, err
}
if row == nil {
log.VEventf(h.ctx, 1, "tenant %s instance %d not stale", h.tenantID, instanceID)
return false, -1, nil
}
nextInstance = base.SQLInstanceID(tree.MustBeDInt(row[0]))
log.VEventf(h.ctx, 1, "cleaned up tenant %s instance %d", h.tenantID, instanceID)
return true, nextInstance, nil
}

// maybeCleanupStaleInstances removes up to maxInstancesCleanup stale instances
// (where the last update time is before the cutoff) with IDs in the range
// [startID, endID).
// If endID is -1, then the range is unrestricted [startID, ∞).
//
// Returns the ID of the instance following the deleted instances. This is
// the same with startID if nothing was cleaned up, and it is 0 if we cleaned up
// the last (highest ID) instance.
func (h *sysTableHelper) maybeCleanupStaleInstances(
cutoff time.Time, startID, endID base.SQLInstanceID,
) (nextInstance base.SQLInstanceID, _ error) {
log.VEventf(
h.ctx, 1, "checking stale instances (tenant=%s startID=%d endID=%d)",
h.tenantID, startID, endID,
)
id := startID
for n := 0; n < maxInstancesCleanup; n++ {
deleted, nextInstance, err := h.maybeCleanupStaleInstance(cutoff, id)
if err != nil {
return -1, err
}
if !deleted {
break
}
id = nextInstance
if id == 0 || (endID != -1 && id >= endID) {
break
}
}
return id, nil
}

// maybeCheckInvariants checks the invariants for the system table with a random
// probability and only if this is a test build.
func (h *sysTableHelper) maybeCheckInvariants(ctx context.Context) error {
func (h *sysTableHelper) maybeCheckInvariants() error {
if util.CrdbTestBuild && rand.Intn(10) == 0 {
return h.checkInvariants(ctx)
return h.checkInvariants()
}
return nil
}

// checkInvariants reads all rows in the system table for the given tenant and
// checks that the state is consistent.
func (h *sysTableHelper) checkInvariants(ctx context.Context) error {
func (h *sysTableHelper) checkInvariants() error {
// Read the two rows for the per-tenant state (instance_id = 0) and the
// per-instance state.
rows, err := h.ex.QueryBufferedEx(
Expand Down Expand Up @@ -432,7 +497,7 @@ func (h *sysTableHelper) checkInvariants(ctx context.Context) error {
)
if err != nil {
if h.ctx.Err() == nil {
log.Warningf(ctx, "checkInvariants query failed: %v", err)
log.Warningf(h.ctx, "checkInvariants query failed: %v", err)
}
// We don't want to cause a panic for a query error (which is expected
// during shutdown).
Expand Down
Loading

0 comments on commit 77af028

Please sign in to comment.