Skip to content

Commit

Permalink
Merge #72471 #72836
Browse files Browse the repository at this point in the history
72471: kvserver: fix bugs in & fortify tenant refcounting r=ajwerner a=tbg

This PR fixes a sandwich of two bugs around refcounting the tenant rate limiters and metrics that I found while prototyping around #72374.

We had an accidental early return in `postDestroyRaftMuLocked` that meant that tenant metrics and rate limiters were essentially never released.

We were also continuing to use at least the tenant metrics object after the call to `postDestroyRaftMuLocked` had returned (but note that the above bug meant that we hadn't actually released the ref).

This PR fixes both and adds precautions against regressions of such bugs.

Despite having fixed bugs, I would be cautious about backporting this to 21.2 and 21.1; the bugs here never seem to have caused any problems, and since our day-to-day testing isn't heavy on tenants, it's unclear how reliably we'd be shaking out problems that were previously masked by the bug.

72836: server,sql,kv: various context improvements r=miretskiy,tbg a=knz

Informs #58938.

Connects more async goroutines to the tracer.

Also fixes various defects I introduced in #72638 and #72605.


Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
4 people committed Nov 18, 2021
3 parents 7bcec1f + d1e3fc1 + bc269bd commit e1c7d6a
Show file tree
Hide file tree
Showing 27 changed files with 290 additions and 72 deletions.
6 changes: 3 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1198,10 +1198,10 @@ def go_deps():
name = "com_github_cockroachdb_logtags",
build_file_proto_mode = "disable_global",
importpath = "github.com/cockroachdb/logtags",
sha256 = "e0ff78268deed42414d58c55115e2a7db8d6b76f4165c02d8ba40d6cd32495a1",
strip_prefix = "github.com/cockroachdb/[email protected]20190617123548-eb05cc24525f",
sha256 = "1972c3f171f118add3fd9e64bcea6cbb9959a3b7fa0ada308e8a7310813fea74",
strip_prefix = "github.com/cockroachdb/[email protected]20211118104740-dabe8e521a4f",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20190617123548-eb05cc24525f.zip",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20211118104740-dabe8e521a4f.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/cockroachdb/errors v1.8.5
github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55
github.com/cockroachdb/gostdlib v1.13.0
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f
github.com/cockroachdb/pebble v0.0.0-20211019184201-7fec828fc1af
github.com/cockroachdb/redact v1.1.3
github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,9 @@ github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 h1:Yq
github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55/go.mod h1:QqVqNIiRhLqJXif5C9wbM4JydBhrAF2WDMxkv5xkyxQ=
github.com/cockroachdb/gostdlib v1.13.0 h1:TzSEPYgkKDNei3gbLc0rrHu4iHyBp7/+NxPOFmcXGaw=
github.com/cockroachdb/gostdlib v1.13.0/go.mod h1:eXX95p9QDrYwJfJ6AgeN9QnRa/lqqid9LAzWz/l5OgA=
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f h1:o/kfcElHqOiXqcou5a3rIlMc7oJbMQkeLk0VQJ7zgqY=
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI=
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f h1:6jduT9Hfc0njg5jJ1DdKCFPdMBrp/mdZfCpa5h+WM74=
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/panicparse/v2 v2.0.0-20211103220158-604c82a44f1e h1:FrERdkPlRj+v7fc+PGpey3GUiDGuTR5CsmLCA54YJ8I=
github.com/cockroachdb/panicparse/v2 v2.0.0-20211103220158-604c82a44f1e/go.mod h1:pMxsKyCewnV3xPaFvvT9NfwvDTcIx2Xqg0qL5Gq0SjM=
github.com/cockroachdb/pebble v0.0.0-20211019184201-7fec828fc1af h1:NY+UDVTyU+Y2wKr0ocnBSbXGYTHEGwnQ9ukP+qg7xfY=
Expand Down
7 changes: 6 additions & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,12 @@ func (r *Registry) ID() base.SQLInstanceID {
// cancel func.
func (r *Registry) makeCtx() (context.Context, func()) {
ctx := r.ac.AnnotateCtx(context.Background())
ctx = logtags.WithTags(ctx, logtags.FromContext(r.serverCtx))
// AddTags and not WithTags, so that we combine the tags with those
// filled by AnnotateCtx.
// TODO(knz): This may not be necessary if the AmbientContext had
// all the tags already.
// See: https://github.com/cockroachdb/cockroach/issues/72815
ctx = logtags.AddTags(ctx, logtags.FromContext(r.serverCtx))
return context.WithCancel(ctx)
}

Expand Down
77 changes: 60 additions & 17 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package kvserver

import (
"context"
"runtime/debug"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -1439,6 +1441,29 @@ type StoreMetrics struct {
ClosedTimestampMaxBehindNanos *metric.Gauge
}

type tenantMetricsRef struct {
// All fields are internal. Don't access them.

_tenantID roachpb.TenantID
_state int32 // atomic; 0=usable 1=poisoned

// _stack helps diagnose use-after-release when it occurs.
// This field is populated in releaseTenant and printed
// in assertions on failure.
_stack struct {
syncutil.Mutex
string
}
}

func (ref *tenantMetricsRef) assert(ctx context.Context) {
if atomic.LoadInt32(&ref._state) != 0 {
ref._stack.Lock()
defer ref._stack.Unlock()
log.FatalfDepth(ctx, 1, "tenantMetricsRef already finalized in:\n%s", ref._stack.string)
}
}

// TenantsStorageMetrics are metrics which are aggregated over all tenants
// present on the server. The struct maintains child metrics used by each
// tenant to track their individual values. The struct expects that children
Expand All @@ -1461,7 +1486,12 @@ type TenantsStorageMetrics struct {
AbortSpanBytes *aggmetric.AggGauge

// This struct is invisible to the metric package.
tenants syncutil.IntMap // map[roachpb.TenantID]*tenantStorageMetrics
//
// NB: note that the int64 conversion in this map is lossless, so
// everything will work with tenantsIDs in excess of math.MaxInt64
// except that should one ever look at this map through a debugger
// the int64->uint64 conversion has to be done manually.
tenants syncutil.IntMap // map[int64(roachpb.TenantID)]*tenantStorageMetrics
}

var _ metric.Struct = (*TenantsStorageMetrics)(nil)
Expand All @@ -1473,7 +1503,7 @@ func (sm *TenantsStorageMetrics) MetricStruct() {}
// method are reference counted with decrements occurring in the corresponding
// releaseTenant call. This method must be called prior to adding or subtracting
// MVCC stats.
func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) {
func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) *tenantMetricsRef {
// incRef increments the reference count if it is not already zero indicating
// that the struct has already been destroyed.
incRef := func(m *tenantStorageMetrics) (alreadyDestroyed bool) {
Expand All @@ -1490,7 +1520,9 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) {
if mPtr, ok := sm.tenants.Load(key); ok {
m := (*tenantStorageMetrics)(mPtr)
if alreadyDestroyed := incRef(m); !alreadyDestroyed {
return
return &tenantMetricsRef{
_tenantID: tenantID,
}
}
// Somebody else concurrently took the reference count to zero, go back
// around. Because of the locking in releaseTenant, we know that we'll
Expand Down Expand Up @@ -1522,21 +1554,30 @@ func (sm *TenantsStorageMetrics) acquireTenant(tenantID roachpb.TenantID) {
m.SysCount = sm.SysCount.AddChild(tenantIDStr)
m.AbortSpanBytes = sm.AbortSpanBytes.AddChild(tenantIDStr)
m.mu.Unlock()
return
return &tenantMetricsRef{
_tenantID: tenantID,
}
}
}
}

// releaseTenant releases the reference to the metrics for this tenant which was
// acquired with acquireTenant. It will fatally log if no entry exists for this
// tenant.
func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, tenantID roachpb.TenantID) {
m := sm.getTenant(ctx, tenantID)
func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, ref *tenantMetricsRef) {
m := sm.getTenant(ctx, ref) // NB: asserts against use-after-release
if atomic.SwapInt32(&ref._state, 1) != 0 {
ref.assert(ctx) // this will fatal
return // unreachable
}
ref._stack.Lock()
ref._stack.string = string(debug.Stack())
ref._stack.Unlock()
m.mu.Lock()
defer m.mu.Unlock()
m.mu.refCount--
if m.mu.refCount < 0 {
log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", tenantID, m.mu.refCount)
log.Fatalf(ctx, "invalid refCount on metrics for tenant %v: %d", ref._tenantID, m.mu.refCount)
} else if m.mu.refCount > 0 {
return
}
Expand All @@ -1558,18 +1599,19 @@ func (sm *TenantsStorageMetrics) releaseTenant(ctx context.Context, tenantID roa
m.SysBytes.Destroy()
m.SysCount.Destroy()
m.AbortSpanBytes.Destroy()
sm.tenants.Delete(int64(tenantID.ToUint64()))
sm.tenants.Delete(int64(ref._tenantID.ToUint64()))
}

// getTenant is a helper method used to retrieve the metrics for a tenant. The
// call will log fatally if no such tenant has been previously acquired.
func (sm *TenantsStorageMetrics) getTenant(
ctx context.Context, tenantID roachpb.TenantID,
ctx context.Context, ref *tenantMetricsRef,
) *tenantStorageMetrics {
key := int64(tenantID.ToUint64())
ref.assert(ctx)
key := int64(ref._tenantID.ToUint64())
mPtr, ok := sm.tenants.Load(key)
if !ok {
log.Fatalf(ctx, "no metrics exist for tenant %v", tenantID)
log.Fatalf(ctx, "no metrics exist for tenant %v", ref._tenantID)
}
return (*tenantStorageMetrics)(mPtr)
}
Expand Down Expand Up @@ -1843,9 +1885,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
// single snapshot of these gauges in the registry might mix the values of two
// subsequent updates.
func (sm *TenantsStorageMetrics) incMVCCGauges(
ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats,
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
) {
tm := sm.getTenant(ctx, tenantID)
ref.assert(ctx)
tm := sm.getTenant(ctx, ref)
tm.LiveBytes.Inc(delta.LiveBytes)
tm.KeyBytes.Inc(delta.KeyBytes)
tm.ValBytes.Inc(delta.ValBytes)
Expand All @@ -1863,17 +1906,17 @@ func (sm *TenantsStorageMetrics) incMVCCGauges(
}

func (sm *TenantsStorageMetrics) addMVCCStats(
ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats,
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
) {
sm.incMVCCGauges(ctx, tenantID, delta)
sm.incMVCCGauges(ctx, ref, delta)
}

func (sm *TenantsStorageMetrics) subtractMVCCStats(
ctx context.Context, tenantID roachpb.TenantID, delta enginepb.MVCCStats,
ctx context.Context, ref *tenantMetricsRef, delta enginepb.MVCCStats,
) {
var neg enginepb.MVCCStats
neg.Subtract(delta)
sm.incMVCCGauges(ctx, tenantID, neg)
sm.incMVCCGauges(ctx, ref, neg)
}

func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func TestTenantsStorageMetricsConcurrency(t *testing.T) {
tid := tenantIDs[rand.Intn(tenants)]

time.Sleep(randDuration())
sm.acquireTenant(tid)
ref := sm.acquireTenant(tid)

time.Sleep(randDuration())
sm.incMVCCGauges(ctx, tid, enginepb.MVCCStats{})
sm.incMVCCGauges(ctx, ref, enginepb.MVCCStats{})

time.Sleep(randDuration())
sm.releaseTenant(ctx, tid)
sm.releaseTenant(ctx, ref)
}
}
var wg sync.WaitGroup
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,18 @@ type Replica struct {
concMgr concurrency.Manager

// tenantLimiter rate limits requests on a per-tenant basis and accumulates
// metrics about it.
// metrics about it. This is determined by the start key of the Replica,
// once initialized.
tenantLimiter tenantrate.Limiter

// tenantMetricsRef is a metrics reference indicating the tenant under
// which to track the range's contributions. This is determined by the
// start key of the Replica, once initialized.
// Its purpose is to help track down missing/extraneous release operations
// that would not be apparent or easy to resolve when refcounting at the store
// level only.
tenantMetricsRef *tenantMetricsRef

// sideTransportClosedTimestamp encapsulates state related to the closed
// timestamp's information about the range. Note that the
// sideTransportClosedTimestamp does not incorporate the closed timestamp
Expand Down
13 changes: 7 additions & 6 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,19 +349,20 @@ func (r *Replica) handleChangeReplicasResult(
log.Infof(ctx, "removing replica due to ChangeReplicasTrigger: %v", chng)
}

if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{
// We destroyed the data when the batch committed so don't destroy it again.
DestroyData: false,
}); err != nil {
log.Fatalf(ctx, "failed to remove replica: %v", err)
}

// NB: postDestroyRaftMuLocked requires that the batch which removed the data
// be durably synced to disk, which we have.
// See replicaAppBatch.ApplyToStateMachine().
if err := r.postDestroyRaftMuLocked(ctx, r.GetMVCCStats()); err != nil {
log.Fatalf(ctx, "failed to run Replica postDestroy: %v", err)
}

if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{
// We destroyed the data when the batch committed so don't destroy it again.
DestroyData: false,
}); err != nil {
log.Fatalf(ctx, "failed to remove replica: %v", err)
}
return true
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,6 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
needsSplitBySize := r.needsSplitBySizeRLocked()
needsMergeBySize := r.needsMergeBySizeRLocked()
needsTruncationByLogSize := r.needsRaftLogTruncationLocked()
tenantID := r.mu.tenantID
r.mu.Unlock()
if closedTimestampUpdated {
r.handleClosedTimestampUpdateRaftMuLocked(ctx, b.state.RaftClosedTimestamp)
Expand All @@ -899,7 +898,7 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
// Record the stats delta in the StoreMetrics.
deltaStats := *b.state.Stats
deltaStats.Subtract(prevStats)
r.store.metrics.addMVCCStats(ctx, tenantID, deltaStats)
r.store.metrics.addMVCCStats(ctx, r.tenantMetricsRef, deltaStats)

// Record the write activity, passing a 0 nodeID because replica.writeStats
// intentionally doesn't track the origin of the writes.
Expand Down
15 changes: 14 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,11 @@ func waitForApplication(
replicas []roachpb.ReplicaDescriptor,
leaseIndex uint64,
) error {
if dialer == nil && len(replicas) == 1 {
// This early return supports unit tests (testContext{}) that also
// want to perform merges.
return nil
}
return contextutil.RunWithTimeout(ctx, "wait for application", 5*time.Second, func(ctx context.Context) error {
g := ctxgroup.WithContext(ctx)
for _, repl := range replicas {
Expand Down Expand Up @@ -825,6 +830,11 @@ func waitForReplicasInit(
rangeID roachpb.RangeID,
replicas []roachpb.ReplicaDescriptor,
) error {
if dialer == nil && len(replicas) == 1 {
// This early return supports unit tests (testContext{}) that also
// want to perform merges.
return nil
}
return contextutil.RunWithTimeout(ctx, "wait for replicas init", 5*time.Second, func(ctx context.Context) error {
g := ctxgroup.WithContext(ctx)
for _, repl := range replicas {
Expand Down Expand Up @@ -1798,7 +1808,10 @@ func (r *Replica) tryRollbackRaftLearner(
})
return err
}
rollbackCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx))
rollbackCtx := r.AnnotateCtx(context.Background())
// AddTags and not WithTags, so that we combine the tags with those
// filled by AnnotateCtx.
rollbackCtx = logtags.AddTags(rollbackCtx, logtags.FromContext(ctx))
if err := contextutil.RunWithTimeout(
rollbackCtx, "learner rollback", rollbackTimeout, rollbackFn,
); err != nil {
Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,21 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS
// directories belonging to replicas which aren't present. A crash before a
// call to postDestroyRaftMuLocked will currently leave the files around
// forever.
//
// TODO(tbg): coming back in 2021, the above should be outdated. The ReplicaID
// is set on creation and never changes over the lifetime of a Replica. Also,
// the replica is always contained in its descriptor. So this code below should
// be removable.
if r.raftMu.sideloaded != nil {
return r.raftMu.sideloaded.Clear(ctx)
if err := r.raftMu.sideloaded.Clear(ctx); err != nil {
return err
}
}

// Release the reference to this tenant in metrics, we know the tenant ID is
// valid if the replica is initialized.
if tenantID, ok := r.TenantID(); ok {
r.store.metrics.releaseTenant(ctx, tenantID)
if r.tenantMetricsRef != nil {
r.store.metrics.releaseTenant(ctx, r.tenantMetricsRef)
}

// Unhook the tenant rate limiter if we have one.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R
"replica %v: %v", r, err)
}
r.mu.tenantID = tenantID
r.store.metrics.acquireTenant(tenantID)
r.tenantMetricsRef = r.store.metrics.acquireTenant(tenantID)
if tenantID != roachpb.SystemTenantID {
r.tenantLimiter = r.store.tenantRateLimiters.GetTenant(ctx, tenantID, r.store.stopper.ShouldQuiesce())
}
Expand Down
Loading

0 comments on commit e1c7d6a

Please sign in to comment.