Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: fix bugs in & fortify tenant refcounting #72471

Merged
merged 5 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 60 additions & 17 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package kvserver

import (
"context"
"runtime/debug"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -1439,6 +1441,29 @@ type StoreMetrics struct {
ClosedTimestampMaxBehindNanos *metric.Gauge
}

type tenantMetricsRef struct {
// All fields are internal. Don't access them.

_tenantID roachpb.TenantID
_state int32 // atomic; 0=usable 1=poisoned

// _stack helps diagnose use-after-release when it occurs.
// This field is populated in releaseTenant and printed
// in assertions on failure.
_stack struct {
syncutil.Mutex
string
}
}

func (ref *tenantMetricsRef) assert(ctx context.Context) {
if atomic.LoadInt32(&ref._state) != 0 {
ref._stack.Lock()
defer ref._stack.Unlock()
log.FatalfDepth(ctx, 1, "tenantMetricsRef already finalized in:\n%s", ref._stack.string)
}
}

// TenantsStorageMetrics are metrics which are aggregated over all tenants
// present on the server. The struct maintains child metrics used by each
// tenant to track their individual values. The struct expects that children
Expand All @@ -1461,7 +1486,12 @@ type TenantsStorageMetrics struct {
AbortSpanBytes *aggmetric.AggGauge

// This struct is invisible to the metric package.
tenants syncutil.IntMap // map[roachpb.TenantID]*tenantStorageMetrics
//
// NB: note that the int64 conversion in this map is lossless, so
// everything will work with tenantsIDs in excess of math.MaxInt64
// except that should one ever look at this map through a debugger
// the int64->uint64 conversion has to be done manually.
tenants syncutil.IntMap // map[int64(roachpb.TenantID)]*tenantStorageMetrics
}

var _ metric.Struct = (*TenantsStorageMetrics)(nil)
Expand All @@ -1473,7 +1503,7 @@ func (sm *TenantsStorageMetrics) MetricStruct() {}
// method are reference counted with decrements occurring in the corresponding
// releaseTenant call. This method must be called prior to adding or subtracting
// MVCC stats.
func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) {
func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) *tenantMetricsRef {
// incRef increments the reference count if it is not already zero indicating
// that the struct has already been destroyed.
incRef := func(m *tenantStorageMetrics) (alreadyDestroyed bool) {
Expand All @@ -1490,7 +1520,9 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) {
if mPtr, ok := sm.tenants.Load(key); ok {
m := (*tenantStorageMetrics)(mPtr)
if alreadyDestroyed := incRef(m); !alreadyDestroyed {
return
return &tenantMetricsRef{
_tenantID: tenantID,
}
}
// Somebody else concurrently took the reference count to zero, go back
// around. Because of the locking in releaseTenant, we know that we'll
Expand Down Expand Up @@ -1522,21 +1554,30 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) {
m.SysCount = sm.SysCount.AddChild(tenantIDStr)
m.AbortSpanBytes = sm.AbortSpanBytes.AddChild(tenantIDStr)
m.mu.Unlock()
return
return &tenantMetricsRef{
_tenantID: tenantID,
}
}
}
}

// releaseTenant releases the reference to the metrics for this tenant which was
// acquired with acquireTenant. It will fatally log if no entry exists for this
// tenant.
func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, tenantID roachpb.TenantID) {
m := sm.getTenant(ctx, tenantID)
func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, ref *tenantMetricsRef) {
m := sm.getTenant(ctx, ref) // NB: asserts against use-after-release
if atomic.SwapInt32(&ref._state, 1) != 0 {
ref.assert(ctx) // this will fatal
return // unreachable
}
ref._stack.Lock()
ref._stack.string = string(debug.Stack())
ref._stack.Unlock()
m.mu.Lock()
defer m.mu.Unlock()
m.mu.refCount--
if m.mu.refCount < 0 {
log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", tenantID, m.mu.refCount)
log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", ref._tenantID, m.mu.refCount)
} else if m.mu.refCount > 0 {
return
}
Expand All @@ -1558,18 +1599,19 @@ func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, tenantID roa
m.SysBytes.Destroy()
m.SysCount.Destroy()
m.AbortSpanBytes.Destroy()
sm.tenants.Delete(int64(tenantID.ToUint64()))
sm.tenants.Delete(int64(ref._tenantID.ToUint64()))
}

// getTenant is a helper method used to retrieve the metrics for a tenant. The
// call will log fatally if no such tenant has been previously acquired.
func (sm *TenantsStorageMetrics) getTenant(
ctx context.Context, tenantID roachpb.TenantID,
ctx context.Context, ref *tenantMetricsRef,
) *tenantStorageMetrics {
key := int64(tenantID.ToUint64())
ref.assert(ctx)
key := int64(ref._tenantID.ToUint64())
mPtr, ok := sm.tenants.Load(key)
if !ok {
log.Fatalf(ctx, "no metrics exist for tenant %v", tenantID)
log.Fatalf(ctx, "no metrics exist for tenant %v", ref._tenantID)
}
return (*tenantStorageMetrics)(mPtr)
}
Expand Down Expand Up @@ -1843,9 +1885,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
// single snapshot of these gauges in the registry might mix the values of two
// subsequent updates.
func (sm *TenantsStorageMetrics) incMVCCGauges(
ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats,
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
) {
tm := sm.getTenant(ctx, tenantID)
ref.assert(ctx)
tm := sm.getTenant(ctx, ref)
tm.LiveBytes.Inc(delta.LiveBytes)
tm.KeyBytes.Inc(delta.KeyBytes)
tm.ValBytes.Inc(delta.ValBytes)
Expand All @@ -1863,17 +1906,17 @@ func (sm *TenantsStorageMetrics) incMVCCGauges(
}

func (sm *TenantsStorageMetrics) addMVCCStats(
ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats,
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
) {
sm.incMVCCGauges(ctx, tenantID, delta)
sm.incMVCCGauges(ctx, ref, delta)
}

func (sm *TenantsStorageMetrics) subtractMVCCStats(
ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats,
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
) {
var neg enginepb.MVCCStats
neg.Subtract(delta)
sm.incMVCCGauges(ctx, tenantID, neg)
sm.incMVCCGauges(ctx, ref, neg)
}

func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func TestTenantsStorageMetricsConcurrency(t *testing.T) {
tid := tenantIDs[rand.Intn(tenants)]

time.Sleep(randDuration())
sm.acquireTenant(tid)
ref := sm.acquireTenant(tid)

time.Sleep(randDuration())
sm.incMVCCGauges(ctx, tid, enginepb.MVCCStats{})
sm.incMVCCGauges(ctx, ref, enginepb.MVCCStats{})

time.Sleep(randDuration())
sm.releaseTenant(ctx, tid)
sm.releaseTenant(ctx, ref)
}
}
var wg sync.WaitGroup
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,18 @@ type Replica struct {
concMgr concurrency.Manager

// tenantLimiter rate limits requests on a per-tenant basis and accumulates
// metrics about it.
// metrics about it. This is determined by the start key of the Replica,
// once initialized.
tenantLimiter tenantrate.Limiter

// tenantMetricsRef is a metrics reference indicating the tenant under
// which to track the range's contributions. This is determined by the
// start key of the Replica, once initialized.
// Its purpose is to help track down missing/extraneous release operations
// that would not be apparent or easy to resolve when refcounting at the store
// level only.
tenantMetricsRef *tenantMetricsRef

// sideTransportClosedTimestamp encapsulates state related to the closed
// timestamp's information about the range. Note that the
// sideTransportClosedTimestamp does not incorporate the closed timestamp
Expand Down
13 changes: 7 additions & 6 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,19 +349,20 @@ func (r *Replica) handleChangeReplicasResult(
log.Infof(ctx, "removing replica due to ChangeReplicasTrigger: %v", chng)
}

if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{
// We destroyed the data when the batch committed so don't destroy it again.
DestroyData: false,
}); err != nil {
log.Fatalf(ctx, "failed to remove replica: %v", err)
}

// NB: postDestroyRaftMuLocked requires that the batch which removed the data
// be durably synced to disk, which we have.
// See replicaAppBatch.ApplyToStateMachine().
if err := r.postDestroyRaftMuLocked(ctx, r.GetMVCCStats()); err != nil {
log.Fatalf(ctx, "failed to run Replica postDestroy: %v", err)
}

if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{
// We destroyed the data when the batch committed so don't destroy it again.
DestroyData: false,
}); err != nil {
log.Fatalf(ctx, "failed to remove replica: %v", err)
}
return true
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,6 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
needsSplitBySize := r.needsSplitBySizeRLocked()
needsMergeBySize := r.needsMergeBySizeRLocked()
needsTruncationByLogSize := r.needsRaftLogTruncationLocked()
tenantID := r.mu.tenantID
r.mu.Unlock()
if closedTimestampUpdated {
r.handleClosedTimestampUpdateRaftMuLocked(ctx, b.state.RaftClosedTimestamp)
Expand All @@ -895,7 +894,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
// Record the stats delta in the StoreMetrics.
deltaStats := *b.state.Stats
deltaStats.Subtract(prevStats)
r.store.metrics.addMVCCStats(ctx, tenantID, deltaStats)
r.store.metrics.addMVCCStats(ctx, r.tenantMetricsRef, deltaStats)

// Record the write activity, passing a 0 nodeID because replica.writeStats
// intentionally doesn't track the origin of the writes.
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,11 @@ func waitForApplication(
replicas []roachpb.ReplicaDescriptor,
leaseIndex uint64,
) error {
if dialer == nil && len(replicas) == 1 {
// This early return supports unit tests (testContext{}) that also
// want to perform merges.
return nil
}
return contextutil.RunWithTimeout(ctx, "wait for application", 5*time.Second, func(ctx context.Context) error {
g := ctxgroup.WithContext(ctx)
for _, repl := range replicas {
Expand Down Expand Up @@ -825,6 +830,11 @@ func waitForReplicasInit(
rangeID roachpb.RangeID,
replicas []roachpb.ReplicaDescriptor,
) error {
if dialer == nil && len(replicas) == 1 {
// This early return supports unit tests (testContext{}) that also
// want to perform merges.
return nil
}
return contextutil.RunWithTimeout(ctx, "wait for replicas init", 5*time.Second, func(ctx context.Context) error {
g := ctxgroup.WithContext(ctx)
for _, repl := range replicas {
Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,21 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS
// directories belonging to replicas which aren't present. A crash before a
// call to postDestroyRaftMuLocked will currently leave the files around
// forever.
//
// TODO(tbg): coming back in 2021, the above should be outdated. The ReplicaID
// is set on creation and never changes over the lifetime of a Replica. Also,
// the replica is always contained in its descriptor. So this code below should
// be removable.
if r.raftMu.sideloaded != nil {
return r.raftMu.sideloaded.Clear(ctx)
if err := r.raftMu.sideloaded.Clear(ctx); err != nil {
return err
}
}

// Release the reference to this tenant in metrics, we know the tenant ID is
// valid if the replica is initialized.
if tenantID, ok := r.TenantID(); ok {
r.store.metrics.releaseTenant(ctx, tenantID)
if r.tenantMetricsRef != nil {
r.store.metrics.releaseTenant(ctx, r.tenantMetricsRef)
}

// Unhook the tenant rate limiter if we have one.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R
"replica %v: %v", r, err)
}
r.mu.tenantID = tenantID
r.store.metrics.acquireTenant(tenantID)
r.tenantMetricsRef = r.store.metrics.acquireTenant(tenantID)
if tenantID != roachpb.SystemTenantID {
r.tenantLimiter = r.store.tenantRateLimiters.GetTenant(ctx, tenantID, r.store.stopper.ShouldQuiesce())
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,8 +949,8 @@ func (r *Replica) applySnapshot(
r.mu.lastTerm = invalidLastTerm
r.mu.raftLogSize = 0
// Update the store stats for the data in the snapshot.
r.store.metrics.subtractMVCCStats(ctx, r.mu.tenantID, *r.mu.state.Stats)
r.store.metrics.addMVCCStats(ctx, r.mu.tenantID, *state.Stats)
r.store.metrics.subtractMVCCStats(ctx, r.tenantMetricsRef, *r.mu.state.Stats)
r.store.metrics.addMVCCStats(ctx, r.tenantMetricsRef, *state.Stats)
lastKnownLease := r.mu.state.Lease
// Update the rest of the Raft state. Changes to r.mu.state.Desc must be
// managed by r.setDescRaftMuLocked and changes to r.mu.state.Lease must be handled
Expand Down Expand Up @@ -1135,11 +1135,6 @@ func (r *Replica) clearSubsumedReplicaInMemoryData(
ctx context.Context, subsumedRepls []*Replica, subsumedNextReplicaID roachpb.ReplicaID,
) error {
for _, sr := range subsumedRepls {
// We removed sr's data when we committed the batch. Finish subsumption by
// updating the in-memory bookkeping.
if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil {
return err
}
// We already hold sr's raftMu, so we must call removeReplicaImpl directly.
// Note that it's safe to update the store's metadata for sr's removal
// separately from updating the store's metadata for r's new descriptor
Expand All @@ -1153,6 +1148,11 @@ func (r *Replica) clearSubsumedReplicaInMemoryData(
}); err != nil {
return err
}
// We removed sr's data when we committed the batch. Finish subsumption by
// updating the in-memory bookkeping.
if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil {
return err
}
}
return nil
}
Expand Down
Loading