Skip to content

Commit

Permalink
kvserver: avoid use-after-release for tenant metrics
Browse files Browse the repository at this point in the history
Give tenants an explicit token that they hold on to to account for their
tenant metrics reference. This gives us a convenient place to assert
that references are not double-freed or used after free, which in fact
hey were in multiple locations (all fixed in this commit).

Release note: None
  • Loading branch information
tbg committed Nov 17, 2021
1 parent 30e117d commit 84e3fdf
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 49 deletions.
56 changes: 40 additions & 16 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package kvserver

import (
"context"
"runtime/debug"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -1439,6 +1441,18 @@ type StoreMetrics struct {
ClosedTimestampMaxBehindNanos *metric.Gauge
}

type tenantMetricsRef struct {
_tenantID roachpb.TenantID
_state int32 // atomic; 0=usable 1=poisoned
stack []byte
}

func (ref *tenantMetricsRef) assert(ctx context.Context) {
if atomic.LoadInt32(&ref._state) != 0 {
log.Fatalf(ctx, "tenantMetricsRef already finalized in:\n%s", ref.stack)
}
}

// TenantsStorageMetrics are metrics which are aggregated over all tenants
// present on the server. The struct maintains child metrics used by each
// tenant to track their individual values. The struct expects that children
Expand Down Expand Up @@ -1473,7 +1487,7 @@ func (sm *TenantsStorageMetrics) MetricStruct() {}
// method are reference counted with decrements occurring in the corresponding
// releaseTenant call. This method must be called prior to adding or subtracting
// MVCC stats.
func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) {
func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) *tenantMetricsRef {
// incRef increments the reference count if it is not already zero indicating
// that the struct has already been destroyed.
incRef := func(m *tenantStorageMetrics) (alreadyDestroyed bool) {
Expand All @@ -1490,7 +1504,9 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) {
if mPtr, ok := sm.tenants.Load(key); ok {
m := (*tenantStorageMetrics)(mPtr)
if alreadyDestroyed := incRef(m); !alreadyDestroyed {
return
return &tenantMetricsRef{
_tenantID: tenantID,
}
}
// Somebody else concurrently took the reference count to zero, go back
// around. Because of the locking in releaseTenant, we know that we'll
Expand Down Expand Up @@ -1522,21 +1538,27 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) {
m.SysCount = sm.SysCount.AddChild(tenantIDStr)
m.AbortSpanBytes = sm.AbortSpanBytes.AddChild(tenantIDStr)
m.mu.Unlock()
return
return &tenantMetricsRef{
_tenantID: tenantID,
}
}
}
}

// releaseTenant releases the reference to the metrics for this tenant which was
// acquired with acquireTenant. It will fatally log if no entry exists for this
// tenant.
func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, tenantID roachpb.TenantID) {
m := sm.getTenant(ctx, tenantID)
func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, ref *tenantMetricsRef) {
m := sm.getTenant(ctx, ref) // NB: asserts against use-after-release
if atomic.SwapInt32(&ref._state, 1) != 0 {
log.Fatalf(ctx, "metrics ref released twice")
}
ref.stack = debug.Stack()
m.mu.Lock()
defer m.mu.Unlock()
m.mu.refCount--
if m.mu.refCount < 0 {
log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", tenantID, m.mu.refCount)
log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", ref._tenantID, m.mu.refCount)
} else if m.mu.refCount > 0 {
return
}
Expand All @@ -1558,18 +1580,19 @@ func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, tenantID roa
m.SysBytes.Destroy()
m.SysCount.Destroy()
m.AbortSpanBytes.Destroy()
sm.tenants.Delete(int64(tenantID.ToUint64()))
sm.tenants.Delete(int64(ref._tenantID.ToUint64()))
}

// getTenant is a helper method used to retrieve the metrics for a tenant. The
// call will log fatally if no such tenant has been previously acquired.
func (sm *TenantsStorageMetrics) getTenant(
ctx context.Context, tenantID roachpb.TenantID,
ctx context.Context, ref *tenantMetricsRef,
) *tenantStorageMetrics {
key := int64(tenantID.ToUint64())
ref.assert(ctx)
key := int64(ref._tenantID.ToUint64())
mPtr, ok := sm.tenants.Load(key)
if !ok {
log.Fatalf(ctx, "no metrics exist for tenant %v", tenantID)
log.Fatalf(ctx, "no metrics exist for tenant %v", ref._tenantID)
}
return (*tenantStorageMetrics)(mPtr)
}
Expand Down Expand Up @@ -1843,9 +1866,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
// single snapshot of these gauges in the registry might mix the values of two
// subsequent updates.
func (sm *TenantsStorageMetrics) incMVCCGauges(
ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats,
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
) {
tm := sm.getTenant(ctx, tenantID)
ref.assert(ctx)
tm := sm.getTenant(ctx, ref)
tm.LiveBytes.Inc(delta.LiveBytes)
tm.KeyBytes.Inc(delta.KeyBytes)
tm.ValBytes.Inc(delta.ValBytes)
Expand All @@ -1863,17 +1887,17 @@ func (sm *TenantsStorageMetrics) incMVCCGauges(
}

func (sm *TenantsStorageMetrics) addMVCCStats(
ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats,
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
) {
sm.incMVCCGauges(ctx, tenantID, delta)
sm.incMVCCGauges(ctx, ref, delta)
}

func (sm *TenantsStorageMetrics) subtractMVCCStats(
ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats,
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
) {
var neg enginepb.MVCCStats
neg.Subtract(delta)
sm.incMVCCGauges(ctx, tenantID, neg)
sm.incMVCCGauges(ctx, ref, neg)
}

func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func TestTenantsStorageMetricsConcurrency(t *testing.T) {
tid := tenantIDs[rand.Intn(tenants)]

time.Sleep(randDuration())
sm.acquireTenant(tid)
ref := sm.acquireTenant(tid)

time.Sleep(randDuration())
sm.incMVCCGauges(ctx, tid, enginepb.MVCCStats{})
sm.incMVCCGauges(ctx, ref, enginepb.MVCCStats{})

time.Sleep(randDuration())
sm.releaseTenant(ctx, tid)
sm.releaseTenant(ctx, ref)
}
}
var wg sync.WaitGroup
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,18 @@ type Replica struct {
concMgr concurrency.Manager

// tenantLimiter rate limits requests on a per-tenant basis and accumulates
// metrics about it.
// metrics about it. This is determined by the start key of the Replica,
// once initialized.
tenantLimiter tenantrate.Limiter

// tenantMetricsRef is a metrics reference indicating the tenant under
// which to track the range's contributions. This is determined by the
// start key of the Replica, once initialized.
// Its purpose is to help track down missing/extraneous release operations
// that would not be apparent or easy to resolve when refcounting at the store
// level only.
tenantMetricsRef *tenantMetricsRef

// sideTransportClosedTimestamp encapsulates state related to the closed
// timestamp's information about the range. Note that the
// sideTransportClosedTimestamp does not incorporate the closed timestamp
Expand Down
13 changes: 7 additions & 6 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,19 +349,20 @@ func (r *Replica) handleChangeReplicasResult(
log.Infof(ctx, "removing replica due to ChangeReplicasTrigger: %v", chng)
}

if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{
// We destroyed the data when the batch committed so don't destroy it again.
DestroyData: false,
}); err != nil {
log.Fatalf(ctx, "failed to remove replica: %v", err)
}

// NB: postDestroyRaftMuLocked requires that the batch which removed the data
// be durably synced to disk, which we have.
// See replicaAppBatch.ApplyToStateMachine().
if err := r.postDestroyRaftMuLocked(ctx, r.GetMVCCStats()); err != nil {
log.Fatalf(ctx, "failed to run Replica postDestroy: %v", err)
}

if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{
// We destroyed the data when the batch committed so don't destroy it again.
DestroyData: false,
}); err != nil {
log.Fatalf(ctx, "failed to remove replica: %v", err)
}
return true
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,6 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
needsSplitBySize := r.needsSplitBySizeRLocked()
needsMergeBySize := r.needsMergeBySizeRLocked()
needsTruncationByLogSize := r.needsRaftLogTruncationLocked()
tenantID := r.mu.tenantID
r.mu.Unlock()
if closedTimestampUpdated {
r.handleClosedTimestampUpdateRaftMuLocked(ctx, b.state.RaftClosedTimestamp)
Expand All @@ -895,7 +894,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
// Record the stats delta in the StoreMetrics.
deltaStats := *b.state.Stats
deltaStats.Subtract(prevStats)
r.store.metrics.addMVCCStats(ctx, tenantID, deltaStats)
r.store.metrics.addMVCCStats(ctx, r.tenantMetricsRef, deltaStats)

// Record the write activity, passing a 0 nodeID because replica.writeStats
// intentionally doesn't track the origin of the writes.
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS
// directories belonging to replicas which aren't present. A crash before a
// call to postDestroyRaftMuLocked will currently leave the files around
// forever.
//
// TODO(tbg): coming back in 2021, the above should be outdated. The ReplicaID
// is set on creation and never changes over the lifetime of a Replica. Also,
// the replica is always contained in its descriptor. So this code below should
// be removable.
if r.raftMu.sideloaded != nil {
if err := r.raftMu.sideloaded.Clear(ctx); err != nil {
return err
Expand All @@ -120,8 +125,8 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS

// Release the reference to this tenant in metrics, we know the tenant ID is
// valid if the replica is initialized.
if tenantID, ok := r.TenantID(); ok {
r.store.metrics.releaseTenant(ctx, tenantID)
if r.tenantMetricsRef != nil {
r.store.metrics.releaseTenant(ctx, r.tenantMetricsRef)
}

// Unhook the tenant rate limiter if we have one.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R
"replica %v: %v", r, err)
}
r.mu.tenantID = tenantID
r.store.metrics.acquireTenant(tenantID)
r.tenantMetricsRef = r.store.metrics.acquireTenant(tenantID)
if tenantID != roachpb.SystemTenantID {
r.tenantLimiter = r.store.tenantRateLimiters.GetTenant(ctx, tenantID, r.store.stopper.ShouldQuiesce())
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,8 +949,8 @@ func (r *Replica) applySnapshot(
r.mu.lastTerm = invalidLastTerm
r.mu.raftLogSize = 0
// Update the store stats for the data in the snapshot.
r.store.metrics.subtractMVCCStats(ctx, r.mu.tenantID, *r.mu.state.Stats)
r.store.metrics.addMVCCStats(ctx, r.mu.tenantID, *state.Stats)
r.store.metrics.subtractMVCCStats(ctx, r.tenantMetricsRef, *r.mu.state.Stats)
r.store.metrics.addMVCCStats(ctx, r.tenantMetricsRef, *state.Stats)
lastKnownLease := r.mu.state.Lease
// Update the rest of the Raft state. Changes to r.mu.state.Desc must be
// managed by r.setDescRaftMuLocked and changes to r.mu.state.Lease must be handled
Expand Down Expand Up @@ -1135,11 +1135,6 @@ func (r *Replica) clearSubsumedReplicaInMemoryData(
ctx context.Context, subsumedRepls []*Replica, subsumedNextReplicaID roachpb.ReplicaID,
) error {
for _, sr := range subsumedRepls {
// We removed sr's data when we committed the batch. Finish subsumption by
// updating the in-memory bookkeping.
if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil {
return err
}
// We already hold sr's raftMu, so we must call removeReplicaImpl directly.
// Note that it's safe to update the store's metadata for sr's removal
// separately from updating the store's metadata for r's new descriptor
Expand All @@ -1153,6 +1148,11 @@ func (r *Replica) clearSubsumedReplicaInMemoryData(
}); err != nil {
return err
}
// We removed sr's data when we committed the batch. Finish subsumption by
// updating the in-memory bookkeping.
if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil {
return err
}
}
return nil
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,8 +1562,10 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {

// Add this range and its stats to our counter.
s.metrics.ReplicaCount.Inc(1)
if tenantID, ok := rep.TenantID(); ok {
s.metrics.addMVCCStats(ctx, tenantID, rep.GetMVCCStats())
if _, ok := rep.TenantID(); ok {
// TODO(tbg): why the check? We're definitely an initialized range so
// we have a tenantID.
s.metrics.addMVCCStats(ctx, rep.tenantMetricsRef, rep.GetMVCCStats())
} else {
return errors.AssertionFailedf("found newly constructed replica"+
" for range %d at generation %d with an invalid tenant ID in store %d",
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/store_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ func (s *Store) MergeRange(
leftRepl.raftMu.AssertHeld()
rightRepl.raftMu.AssertHeld()

if err := rightRepl.postDestroyRaftMuLocked(ctx, rightRepl.GetMVCCStats()); err != nil {
return err
}

// Note that we were called (indirectly) from raft processing so we must
// call removeInitializedReplicaRaftMuLocked directly to avoid deadlocking
// on the right-hand replica's raftMu.
Expand All @@ -61,6 +57,10 @@ func (s *Store) MergeRange(
return errors.Errorf("cannot remove range: %s", err)
}

if err := rightRepl.postDestroyRaftMuLocked(ctx, rightRepl.GetMVCCStats()); err != nil {
return err
}

if leftRepl.leaseholderStats != nil {
leftRepl.leaseholderStats.resetRequestCounts()
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/store_remove_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
// destroy status.
var desc *roachpb.RangeDescriptor
var replicaID roachpb.ReplicaID
var tenantID roachpb.TenantID
{
rep.readOnlyCmdMu.Lock()
rep.mu.Lock()
Expand Down Expand Up @@ -112,7 +111,6 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
rep.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(rep.RangeID, rep.StoreID()),
destroyReasonRemoved)
replicaID = rep.mu.replicaID
tenantID = rep.mu.tenantID
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
}
Expand Down Expand Up @@ -144,7 +142,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
// Destroy, but this configuration helps avoid races in stat verification
// tests.

s.metrics.subtractMVCCStats(ctx, tenantID, rep.GetMVCCStats())
s.metrics.subtractMVCCStats(ctx, rep.tenantMetricsRef, rep.GetMVCCStats())
s.metrics.ReplicaCount.Dec(1)
s.mu.Unlock()

Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,10 @@ func splitPostApply(

// Update store stats with difference in stats before and after split.
if rightReplOrNil != nil {
if tenantID, ok := rightReplOrNil.TenantID(); ok {
rightReplOrNil.store.metrics.addMVCCStats(ctx, tenantID, deltaMS)
if _, ok := rightReplOrNil.TenantID(); ok {
// TODO(tbg): why this check to get here? Is this really checking if the RHS
// is already initialized? But isn't it always, at this point?
rightReplOrNil.store.metrics.addMVCCStats(ctx, rightReplOrNil.tenantMetricsRef, deltaMS)
} else {
log.Fatalf(ctx, "%s: found replica which is RHS of a split "+
"without a valid tenant ID", rightReplOrNil)
Expand Down

0 comments on commit 84e3fdf

Please sign in to comment.