diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index c0c19df7c772..8aafc4d2c478 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -12,6 +12,8 @@ package kvserver import ( "context" + "runtime/debug" + "sync/atomic" "time" "unsafe" @@ -1439,6 +1441,18 @@ type StoreMetrics struct { ClosedTimestampMaxBehindNanos *metric.Gauge } +type tenantMetricsRef struct { + _tenantID roachpb.TenantID + _state int32 // atomic; 0=usable 1=poisoned + stack []byte +} + +func (ref *tenantMetricsRef) assert(ctx context.Context) { + if atomic.LoadInt32(&ref._state) != 0 { + log.Fatalf(ctx, "tenantMetricsRef already finalized in:\n%s", ref.stack) + } +} + // TenantsStorageMetrics are metrics which are aggregated over all tenants // present on the server. The struct maintains child metrics used by each // tenant to track their individual values. The struct expects that children @@ -1473,7 +1487,7 @@ func (sm *TenantsStorageMetrics) MetricStruct() {} // method are reference counted with decrements occurring in the corresponding // releaseTenant call. This method must be called prior to adding or subtracting // MVCC stats. -func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) { +func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) *tenantMetricsRef { // incRef increments the reference count if it is not already zero indicating // that the struct has already been destroyed. incRef := func(m *tenantStorageMetrics) (alreadyDestroyed bool) { @@ -1490,7 +1504,9 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) { if mPtr, ok := sm.tenants.Load(key); ok { m := (*tenantStorageMetrics)(mPtr) if alreadyDestroyed := incRef(m); !alreadyDestroyed { - return + return &tenantMetricsRef{ + _tenantID: tenantID, + } } // Somebody else concurrently took the reference count to zero, go back // around. Because of the locking in releaseTenant, we know that we'll @@ -1522,7 +1538,9 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) { m.SysCount = sm.SysCount.AddChild(tenantIDStr) m.AbortSpanBytes = sm.AbortSpanBytes.AddChild(tenantIDStr) m.mu.Unlock() - return + return &tenantMetricsRef{ + _tenantID: tenantID, + } } } } @@ -1530,13 +1548,17 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) { // releaseTenant releases the reference to the metrics for this tenant which was // acquired with acquireTenant. It will fatally log if no entry exists for this // tenant. -func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, tenantID roachpb.TenantID) { - m := sm.getTenant(ctx, tenantID) +func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, ref *tenantMetricsRef) { + m := sm.getTenant(ctx, ref) // NB: asserts against use-after-release + if atomic.SwapInt32(&ref._state, 1) != 0 { + log.Fatalf(ctx, "metrics ref released twice") + } + ref.stack = debug.Stack() m.mu.Lock() defer m.mu.Unlock() m.mu.refCount-- if m.mu.refCount < 0 { - log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", tenantID, m.mu.refCount) + log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", ref._tenantID, m.mu.refCount) } else if m.mu.refCount > 0 { return } @@ -1558,18 +1580,19 @@ func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, tenantID roa m.SysBytes.Destroy() m.SysCount.Destroy() m.AbortSpanBytes.Destroy() - sm.tenants.Delete(int64(tenantID.ToUint64())) + sm.tenants.Delete(int64(ref._tenantID.ToUint64())) } // getTenant is a helper method used to retrieve the metrics for a tenant. The // call will log fatally if no such tenant has been previously acquired. func (sm *TenantsStorageMetrics) getTenant( - ctx context.Context, tenantID roachpb.TenantID, + ctx context.Context, ref *tenantMetricsRef, ) *tenantStorageMetrics { - key := int64(tenantID.ToUint64()) + ref.assert(ctx) + key := int64(ref._tenantID.ToUint64()) mPtr, ok := sm.tenants.Load(key) if !ok { - log.Fatalf(ctx, "no metrics exist for tenant %v", tenantID) + log.Fatalf(ctx, "no metrics exist for tenant %v", ref._tenantID) } return (*tenantStorageMetrics)(mPtr) } @@ -1843,9 +1866,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { // single snapshot of these gauges in the registry might mix the values of two // subsequent updates. func (sm *TenantsStorageMetrics) incMVCCGauges( - ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats, + ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats, ) { - tm := sm.getTenant(ctx, tenantID) + ref.assert(ctx) + tm := sm.getTenant(ctx, ref) tm.LiveBytes.Inc(delta.LiveBytes) tm.KeyBytes.Inc(delta.KeyBytes) tm.ValBytes.Inc(delta.ValBytes) @@ -1863,17 +1887,17 @@ func (sm *TenantsStorageMetrics) incMVCCGauges( } func (sm *TenantsStorageMetrics) addMVCCStats( - ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats, + ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats, ) { - sm.incMVCCGauges(ctx, tenantID, delta) + sm.incMVCCGauges(ctx, ref, delta) } func (sm *TenantsStorageMetrics) subtractMVCCStats( - ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats, + ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats, ) { var neg enginepb.MVCCStats neg.Subtract(delta) - sm.incMVCCGauges(ctx, tenantID, neg) + sm.incMVCCGauges(ctx, ref, neg) } func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { diff --git a/pkg/kv/kvserver/metrics_test.go b/pkg/kv/kvserver/metrics_test.go index c6f76c91cc58..c6b58b07993a 100644 --- a/pkg/kv/kvserver/metrics_test.go +++ b/pkg/kv/kvserver/metrics_test.go @@ -54,13 +54,13 @@ func TestTenantsStorageMetricsConcurrency(t *testing.T) { tid := tenantIDs[rand.Intn(tenants)] time.Sleep(randDuration()) - sm.acquireTenant(tid) + ref := sm.acquireTenant(tid) time.Sleep(randDuration()) - sm.incMVCCGauges(ctx, tid, enginepb.MVCCStats{}) + sm.incMVCCGauges(ctx, ref, enginepb.MVCCStats{}) time.Sleep(randDuration()) - sm.releaseTenant(ctx, tid) + sm.releaseTenant(ctx, ref) } } var wg sync.WaitGroup diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 34df3b3fd18a..3bc7c2c086c2 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -267,9 +267,18 @@ type Replica struct { concMgr concurrency.Manager // tenantLimiter rate limits requests on a per-tenant basis and accumulates - // metrics about it. + // metrics about it. This is determined by the start key of the Replica, + // once initialized. tenantLimiter tenantrate.Limiter + // tenantMetricsRef is a metrics reference indicating the tenant under + // which to track the range's contributions. This is determined by the + // start key of the Replica, once initialized. + // Its purpose is to help track down missing/extraneous release operations + // that would not be apparent or easy to resolve when refcounting at the store + // level only. + tenantMetricsRef *tenantMetricsRef + // sideTransportClosedTimestamp encapsulates state related to the closed // timestamp's information about the range. Note that the // sideTransportClosedTimestamp does not incorporate the closed timestamp diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 33e1a3e33789..5512c3116af0 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -349,6 +349,13 @@ func (r *Replica) handleChangeReplicasResult( log.Infof(ctx, "removing replica due to ChangeReplicasTrigger: %v", chng) } + if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{ + // We destroyed the data when the batch committed so don't destroy it again. + DestroyData: false, + }); err != nil { + log.Fatalf(ctx, "failed to remove replica: %v", err) + } + // NB: postDestroyRaftMuLocked requires that the batch which removed the data // be durably synced to disk, which we have. // See replicaAppBatch.ApplyToStateMachine(). @@ -356,12 +363,6 @@ func (r *Replica) handleChangeReplicasResult( log.Fatalf(ctx, "failed to run Replica postDestroy: %v", err) } - if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{ - // We destroyed the data when the batch committed so don't destroy it again. - DestroyData: false, - }); err != nil { - log.Fatalf(ctx, "failed to remove replica: %v", err) - } return true } diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 225425da3945..96968eb5acd4 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -886,7 +886,6 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { needsSplitBySize := r.needsSplitBySizeRLocked() needsMergeBySize := r.needsMergeBySizeRLocked() needsTruncationByLogSize := r.needsRaftLogTruncationLocked() - tenantID := r.mu.tenantID r.mu.Unlock() if closedTimestampUpdated { r.handleClosedTimestampUpdateRaftMuLocked(ctx, b.state.RaftClosedTimestamp) @@ -895,7 +894,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { // Record the stats delta in the StoreMetrics. deltaStats := *b.state.Stats deltaStats.Subtract(prevStats) - r.store.metrics.addMVCCStats(ctx, tenantID, deltaStats) + r.store.metrics.addMVCCStats(ctx, r.tenantMetricsRef, deltaStats) // Record the write activity, passing a 0 nodeID because replica.writeStats // intentionally doesn't track the origin of the writes. diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 05b9d5d917a1..cdc32adb3e20 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -112,6 +112,11 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS // directories belonging to replicas which aren't present. A crash before a // call to postDestroyRaftMuLocked will currently leave the files around // forever. + // + // TODO(tbg): coming back in 2021, the above should be outdated. The ReplicaID + // is set on creation and never changes over the lifetime of a Replica. Also, + // the replica is always contained in its descriptor. So this code below should + // be removable. if r.raftMu.sideloaded != nil { if err := r.raftMu.sideloaded.Clear(ctx); err != nil { return err @@ -120,8 +125,8 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS // Release the reference to this tenant in metrics, we know the tenant ID is // valid if the replica is initialized. - if tenantID, ok := r.TenantID(); ok { - r.store.metrics.releaseTenant(ctx, tenantID) + if r.tenantMetricsRef != nil { + r.store.metrics.releaseTenant(ctx, r.tenantMetricsRef) } // Unhook the tenant rate limiter if we have one. diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 09e2ef64a73b..7b007cf52644 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -339,7 +339,7 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R "replica %v: %v", r, err) } r.mu.tenantID = tenantID - r.store.metrics.acquireTenant(tenantID) + r.tenantMetricsRef = r.store.metrics.acquireTenant(tenantID) if tenantID != roachpb.SystemTenantID { r.tenantLimiter = r.store.tenantRateLimiters.GetTenant(ctx, tenantID, r.store.stopper.ShouldQuiesce()) } diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ff436a769149..eafaab968b54 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -949,8 +949,8 @@ func (r *Replica) applySnapshot( r.mu.lastTerm = invalidLastTerm r.mu.raftLogSize = 0 // Update the store stats for the data in the snapshot. - r.store.metrics.subtractMVCCStats(ctx, r.mu.tenantID, *r.mu.state.Stats) - r.store.metrics.addMVCCStats(ctx, r.mu.tenantID, *state.Stats) + r.store.metrics.subtractMVCCStats(ctx, r.tenantMetricsRef, *r.mu.state.Stats) + r.store.metrics.addMVCCStats(ctx, r.tenantMetricsRef, *state.Stats) lastKnownLease := r.mu.state.Lease // Update the rest of the Raft state. Changes to r.mu.state.Desc must be // managed by r.setDescRaftMuLocked and changes to r.mu.state.Lease must be handled @@ -1135,11 +1135,6 @@ func (r *Replica) clearSubsumedReplicaInMemoryData( ctx context.Context, subsumedRepls []*Replica, subsumedNextReplicaID roachpb.ReplicaID, ) error { for _, sr := range subsumedRepls { - // We removed sr's data when we committed the batch. Finish subsumption by - // updating the in-memory bookkeping. - if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil { - return err - } // We already hold sr's raftMu, so we must call removeReplicaImpl directly. // Note that it's safe to update the store's metadata for sr's removal // separately from updating the store's metadata for r's new descriptor @@ -1153,6 +1148,11 @@ func (r *Replica) clearSubsumedReplicaInMemoryData( }); err != nil { return err } + // We removed sr's data when we committed the batch. Finish subsumption by + // updating the in-memory bookkeping. + if err := sr.postDestroyRaftMuLocked(ctx, sr.GetMVCCStats()); err != nil { + return err + } } return nil } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 5d90512eb330..707e5e436870 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1562,8 +1562,10 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Add this range and its stats to our counter. s.metrics.ReplicaCount.Inc(1) - if tenantID, ok := rep.TenantID(); ok { - s.metrics.addMVCCStats(ctx, tenantID, rep.GetMVCCStats()) + if _, ok := rep.TenantID(); ok { + // TODO(tbg): why the check? We're definitely an initialized range so + // we have a tenantID. + s.metrics.addMVCCStats(ctx, rep.tenantMetricsRef, rep.GetMVCCStats()) } else { return errors.AssertionFailedf("found newly constructed replica"+ " for range %d at generation %d with an invalid tenant ID in store %d", diff --git a/pkg/kv/kvserver/store_merge.go b/pkg/kv/kvserver/store_merge.go index 22ad0ae8cef1..e96737888fb6 100644 --- a/pkg/kv/kvserver/store_merge.go +++ b/pkg/kv/kvserver/store_merge.go @@ -46,10 +46,6 @@ func (s *Store) MergeRange( leftRepl.raftMu.AssertHeld() rightRepl.raftMu.AssertHeld() - if err := rightRepl.postDestroyRaftMuLocked(ctx, rightRepl.GetMVCCStats()); err != nil { - return err - } - // Note that we were called (indirectly) from raft processing so we must // call removeInitializedReplicaRaftMuLocked directly to avoid deadlocking // on the right-hand replica's raftMu. @@ -61,6 +57,10 @@ func (s *Store) MergeRange( return errors.Errorf("cannot remove range: %s", err) } + if err := rightRepl.postDestroyRaftMuLocked(ctx, rightRepl.GetMVCCStats()); err != nil { + return err + } + if leftRepl.leaseholderStats != nil { leftRepl.leaseholderStats.resetRequestCounts() } diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index 8f47c805c840..f5d47ffa8a0a 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -69,7 +69,6 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( // destroy status. var desc *roachpb.RangeDescriptor var replicaID roachpb.ReplicaID - var tenantID roachpb.TenantID { rep.readOnlyCmdMu.Lock() rep.mu.Lock() @@ -112,7 +111,6 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( rep.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(rep.RangeID, rep.StoreID()), destroyReasonRemoved) replicaID = rep.mu.replicaID - tenantID = rep.mu.tenantID rep.mu.Unlock() rep.readOnlyCmdMu.Unlock() } @@ -144,7 +142,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( // Destroy, but this configuration helps avoid races in stat verification // tests. - s.metrics.subtractMVCCStats(ctx, tenantID, rep.GetMVCCStats()) + s.metrics.subtractMVCCStats(ctx, rep.tenantMetricsRef, rep.GetMVCCStats()) s.metrics.ReplicaCount.Dec(1) s.mu.Unlock() diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 810965fae7a4..2294ca8ecd90 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -153,8 +153,10 @@ func splitPostApply( // Update store stats with difference in stats before and after split. if rightReplOrNil != nil { - if tenantID, ok := rightReplOrNil.TenantID(); ok { - rightReplOrNil.store.metrics.addMVCCStats(ctx, tenantID, deltaMS) + if _, ok := rightReplOrNil.TenantID(); ok { + // TODO(tbg): why this check to get here? Is this really checking if the RHS + // is already initialized? But isn't it always, at this point? + rightReplOrNil.store.metrics.addMVCCStats(ctx, rightReplOrNil.tenantMetricsRef, deltaMS) } else { log.Fatalf(ctx, "%s: found replica which is RHS of a split "+ "without a valid tenant ID", rightReplOrNil)