Skip to content

Commit

Permalink
kvserver: test refcounting of tenants through split and merge
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
tbg committed Nov 17, 2021
1 parent 7e70d0a commit c0a8a63
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,11 @@ func waitForApplication(
replicas []roachpb.ReplicaDescriptor,
leaseIndex uint64,
) error {
if dialer == nil && len(replicas) == 1 {
// This early return supports unit tests (testContext{}) that also
// want to perform merges.
return nil
}
return contextutil.RunWithTimeout(ctx, "wait for application", 5*time.Second, func(ctx context.Context) error {
g := ctxgroup.WithContext(ctx)
for _, repl := range replicas {
Expand Down Expand Up @@ -825,6 +830,11 @@ func waitForReplicasInit(
rangeID roachpb.RangeID,
replicas []roachpb.ReplicaDescriptor,
) error {
if dialer == nil && len(replicas) == 1 {
// This early return supports unit tests (testContext{}) that also
// want to perform merges.
return nil
}
return contextutil.RunWithTimeout(ctx, "wait for replicas init", 5*time.Second, func(ctx context.Context) error {
g := ctxgroup.WithContext(ctx)
for _, repl := range replicas {
Expand Down
113 changes: 113 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync/atomic"
"testing"
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cli/exit"
Expand Down Expand Up @@ -13057,3 +13058,115 @@ func TestRangeInfoReturned(t *testing.T) {
})
}
}

func tenantsWithMetrics(m *StoreMetrics) map[roachpb.TenantID]struct{} {
metricsTenants := map[roachpb.TenantID]struct{}{}
m.tenants.Range(func(tenID int64, _ unsafe.Pointer) bool {
metricsTenants[roachpb.MakeTenantID(uint64(tenID))] = struct{}{}
return true // more
})
return metricsTenants
}

func isSystemTenantRepl(t *testing.T, repl *Replica) {
t.Log(repl)
// Even system tenant has a metrics ref.
require.NotNil(t, repl.tenantMetricsRef)
// System tenant has no rate limiter.
require.Nil(t, repl.tenantLimiter)
tenID, ok := repl.TenantID()
require.True(t, ok) // repl is initialized
require.Equal(t, roachpb.SystemTenantID, tenID)
// Even though the system tenant is not populated with a rate limiter and
// there is no refcounting for the system tenant, there is a system rate
// limiter (which would exist and be used for some requests even if the store
// had no replica for the system tenant).
require.NotNil(t, repl.store.tenantRateLimiters.GetTenant(context.Background(), tenID, nil /* closer */))
}

// TestStoreTenantMetricsAndRateLimiterRefcount verifies that the refcounting
// for replicas owned by tenants works: a tenant metrics or tenant rate limiter
// reference is only retained as long as a replica owned by that tenant exists
// on the store. This does not exhaustively test all of the ways in which a
// Replica can be destroyed (but uses a Merge). There are always-on assertions
// (see calls to tenantMetricsRef.assert) that fire on use-after-release, but
// if we leaked references on the non-merge code paths, this would not be
// obviously caught in testing.
func TestStoreTenantMetricsAndRateLimiterRefcount(t *testing.T) {
defer leaktest.AfterTest(t)()

stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
tc := testContext{}
tc.Start(t, stopper)

// Initially all replicas are system tenant replicas.
tc.store.VisitReplicas(func(repl *Replica) (wantMore bool) {
isSystemTenantRepl(t, repl)
return true // wantMore
})

// The metrics only know the system tenant.
require.Equal(t,
map[roachpb.TenantID]struct{}{roachpb.SystemTenantID: {}},
tenantsWithMetrics(tc.store.metrics),
)

// A range for tenant 123 appears via a split.
ten123 := roachpb.MakeTenantID(123)
splitKey := keys.MustAddr(keys.MakeSQLCodec(ten123).TenantPrefix())
leftRepl := tc.store.LookupReplica(splitKey)
require.NotNil(t, leftRepl)
splitTestRange(tc.store, splitKey, t)
tenRepl := tc.store.LookupReplica(splitKey)
require.NotNil(t, tenRepl)
require.NotNil(t, tenRepl.tenantMetricsRef)
require.NotNil(t, tenRepl.tenantLimiter)

// The store metrics correspondingly track the system tenant and tenant 123
// and the rate limiter registry has an entry for it as well.
require.Equal(t,
map[roachpb.TenantID]struct{}{
roachpb.SystemTenantID: {},
ten123: {},
},
tenantsWithMetrics(tc.store.metrics),
)
tenLimiter := tenRepl.tenantLimiter
secondLimiter := tenRepl.store.tenantRateLimiters.GetTenant(context.Background(), ten123, nil /* closer */)
tenRepl.store.tenantRateLimiters.Release(secondLimiter)
require.Equal(t,
tenLimiter,
secondLimiter,
)

// The sole range owned by tenant 123 gets merged away again.
_, pErr := leftRepl.AdminMerge(context.Background(), roachpb.AdminMergeRequest{
RequestHeader: roachpb.RequestHeader{
Key: leftRepl.Desc().StartKey.AsRawKey(),
},
}, "testing")
require.Nil(t, pErr)

// The store metrics no longer track tenant 123.
require.Equal(t,
map[roachpb.TenantID]struct{}{
roachpb.SystemTenantID: {},
},
tenantsWithMetrics(tc.store.metrics),
)
// The rate limiter is similarly gone. We can't test this directly
// but we can check that the limiter we had has been released, which
// we can tell from a panic with an assertion failure if we release
// again.
func() {
defer func() {
r := recover()
err, ok := r.(error)
if !ok || !errors.HasAssertionFailure(err) {
t.Errorf("unxpected recover() after double-Release: %+v", r)
}
}()
tc.store.tenantRateLimiters.Release(tenLimiter)
}()
}

0 comments on commit c0a8a63

Please sign in to comment.