diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index e3d78d5a5a5b..2aa4f756aa50 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -60,6 +60,7 @@ func declareKeysAddSSTable( l, r := rangeTombstonePeekBounds(args.Key, args.EndKey, rs.GetStartKey().AsRawKey(), nil) latchSpans.AddMVCC(spanset.SpanReadOnly, roachpb.Span{Key: l, EndKey: r}, header.Timestamp) } + latchSpans.DisableUndeclaredAccessAssertions() } // AddSSTableRewriteConcurrency sets the concurrency of a single SST rewrite. @@ -349,7 +350,7 @@ func EvalAddSSTable( // addition, and instead just use this key-only iterator. If a caller actually // needs to know what data is there, it must issue its own real Scan. if args.ReturnFollowingLikelyNonEmptySpanStart { - existingIter := spanset.DisableReaderAssertions(readWriter).NewMVCCIterator( + existingIter := readWriter.NewMVCCIterator( storage.MVCCKeyIterKind, // don't care if it is committed or not, just that it isn't empty. storage.IterOptions{ KeyTypes: storage.IterKeyTypePointsAndRanges, diff --git a/pkg/kv/kvserver/batcheval/cmd_gc.go b/pkg/kv/kvserver/batcheval/cmd_gc.go index 2f9290bf2015..95491d3ee48a 100644 --- a/pkg/kv/kvserver/batcheval/cmd_gc.go +++ b/pkg/kv/kvserver/batcheval/cmd_gc.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -35,35 +36,51 @@ func declareKeysGC( latchSpans, _ *spanset.SpanSet, _ time.Duration, ) { - // Intentionally don't call DefaultDeclareKeys: the key range in the header - // is usually the whole range (pending resolution of #7880). gcr := req.(*roachpb.GCRequest) - for _, key := range gcr.Keys { - if keys.IsLocal(key.Key) { - latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}) - } else { - latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}, header.Timestamp) - } - } - // Extend the range key latches by feather to ensure MVCC stats - // computations correctly account for adjacent range keys tombstones if they - // need to be split. - // TODO(oleg): These latches are very broad and will be disruptive to read and - // write operations despite only accessing "stale" data. We should think of - // better integrating it with latchless GC approach. - for _, span := range mergeAdjacentSpans(makeLookupBoundariesForGCRanges(rs.GetStartKey().AsRawKey(), - nil, gcr.RangeKeys)) { - latchSpans.AddMVCC(spanset.SpanReadWrite, span, - header.Timestamp) - } - // Be smart here about blocking on the threshold keys. The MVCC GC queue can - // send an empty request first to bump the thresholds, and then another one - // that actually does work but can avoid declaring these keys below. - if !gcr.Threshold.IsEmpty() { - latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeGCThresholdKey(rs.GetRangeID())}) + // When GC-ing MVCC range key tombstones or individual range keys, we need to + // serialize with all writes that overlap the MVCC range tombstone, as well as + // the immediate left/right neighboring keys. This is because a range key + // write has non-local effects, i.e. it can fragment or merge other range keys + // at other timestamps and at its boundaries, and this has a non-commutative + // effect on MVCC stats -- if someone writes a new range key while we're GCing + // one below, the stats would come out wrong. + // Note that we only need to serialize with writers (including other GC + // processes) and not with readers (that are guaranteed to be above the GC + // threshold). To achieve this, we declare read-write access at + // hlc.MaxTimestamp which will not block any readers. + for _, span := range mergeAdjacentSpans(makeLookupBoundariesForGCRanges( + rs.GetStartKey().AsRawKey(), nil, gcr.RangeKeys, + )) { + latchSpans.AddMVCC(spanset.SpanReadWrite, span, hlc.MaxTimestamp) } + // The RangeGCThresholdKey is only written to if the + // req.(*GCRequest).Threshold is set. However, we always declare an exclusive + // access over this key in order to serialize with other GC requests. + // + // Correctness: + // It is correct for a GC request to not declare exclusive access over the + // keys being GCed because of the following: + // 1. We define "correctness" to be the property that a reader reading at / + // around the GC threshold will either see the correct results or receive an + // error. + // 2. Readers perform their command evaluation over a stable snapshot of the + // storage engine. This means that the reader will not see the effects of a + // subsequent GC run as long as it created a Pebble iterator before the GC + // request. + // 3. A reader checks the in-memory GC threshold of a Replica after it has + // created this snapshot (i.e. after a Pebble iterator has been created). + // 4. If the in-memory GC threshold is above the timestamp of the read, the + // reader receives an error. Otherwise, the reader is guaranteed to see a + // state of the storage engine that hasn't been affected by the GC request [5]. + // 5. GC requests bump the in-memory GC threshold of a Replica as a pre-apply + // side effect. This means that if a reader checks the in-memory GC threshold + // after it has created a Pebble iterator, it is impossible for the iterator + // to point to a storage engine state that has been affected by the GC + // request. + latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeGCThresholdKey(rs.GetRangeID())}) // Needed for Range bounds checks in calls to EvalContext.ContainsKey. latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())}) + latchSpans.DisableUndeclaredAccessAssertions() } // Create latches and merge adjacent. @@ -171,9 +188,7 @@ func GC( newThreshold := oldThreshold updated := newThreshold.Forward(args.Threshold) - // Don't write the GC threshold key unless we have to. We also don't - // declare the key unless we have to (to allow the MVCC GC queue to - // batch requests more efficiently), and we must honor what we declare. + // Don't write the GC threshold key unless we have to. if updated { if err := MakeStateLoader(cArgs.EvalCtx).SetGCThreshold( ctx, readWriter, cArgs.Stats, &newThreshold, diff --git a/pkg/kv/kvserver/batcheval/cmd_recompute_stats.go b/pkg/kv/kvserver/batcheval/cmd_recompute_stats.go index 9a457decb2a6..5146476b67ab 100644 --- a/pkg/kv/kvserver/batcheval/cmd_recompute_stats.go +++ b/pkg/kv/kvserver/batcheval/cmd_recompute_stats.go @@ -53,6 +53,8 @@ func declareKeysRecomputeStats( rdKey := keys.RangeDescriptorKey(rs.GetStartKey()) latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: rdKey}) latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.TransactionKey(rdKey, uuid.Nil)}) + // Disable the assertions which check that all reads were previously declared. + latchSpans.DisableUndeclaredAccessAssertions() } // RecomputeStats recomputes the MVCCStats stored for this range and adjust them accordingly, @@ -69,10 +71,6 @@ func RecomputeStats( args = nil // avoid accidental use below - // Disable the assertions which check that all reads were previously declared. - // See the comment in `declareKeysRecomputeStats` for details on this. - reader = spanset.DisableReaderAssertions(reader) - actualMS, err := rditer.ComputeStatsForRange(desc, reader, cArgs.Header.Timestamp.WallTime) if err != nil { return result.Result{}, err diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 4e725ce19c97..ade7e9b1c7a5 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -1721,6 +1721,14 @@ func gcKey(key roachpb.Key, timestamp hlc.Timestamp) roachpb.GCRequest_GCKey { } } +func recomputeStatsArgs(key roachpb.Key) roachpb.RecomputeStatsRequest { + return roachpb.RecomputeStatsRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: key, + }, + } +} + func gcArgs(startKey []byte, endKey []byte, keys ...roachpb.GCRequest_GCKey) roachpb.GCRequest { return roachpb.GCRequest{ RequestHeader: roachpb.RequestHeader{ @@ -8392,56 +8400,6 @@ func TestReplicaReproposalWithNewLeaseIndexError(t *testing.T) { } } -// TestGCWithoutThreshold validates that GCRequest only declares the threshold -// key if it is subject to change, and that it does not access this key if it -// does not declare them. -func TestGCWithoutThreshold(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - tc := &testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - tc.Start(ctx, t, stopper) - - for _, keyThresh := range []hlc.Timestamp{{}, {Logical: 1}} { - t.Run(fmt.Sprintf("thresh=%s", keyThresh), func(t *testing.T) { - var gc roachpb.GCRequest - var spans spanset.SpanSet - - gc.Threshold = keyThresh - cmd, _ := batcheval.LookupCommand(roachpb.GC) - cmd.DeclareKeys(tc.repl.Desc(), &roachpb.Header{RangeID: tc.repl.RangeID}, &gc, &spans, nil, 0) - - expSpans := 1 - if !keyThresh.IsEmpty() { - expSpans++ - } - if numSpans := spans.Len(); numSpans != expSpans { - t.Fatalf("expected %d declared keys, found %d", expSpans, numSpans) - } - - eng := storage.NewDefaultInMemForTesting() - defer eng.Close() - - batch := eng.NewBatch() - defer batch.Close() - rw := spanset.NewBatch(batch, &spans) - - var resp roachpb.GCResponse - if _, err := batcheval.GC(ctx, rw, batcheval.CommandArgs{ - Args: &gc, - EvalCtx: NewReplicaEvalContext( - ctx, tc.repl, &spans, false, /* requiresClosedTSOlderThanStorageSnap */ - ), - }, &resp); err != nil { - t.Fatal(err) - } - }) - } -} - // Test that, if the Raft command resulting from EndTxn request fails to be // processed/apply, then the LocalResult associated with that command is // cleared. @@ -8512,6 +8470,78 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { } } +// TestMVCCStatsGCCommutesWithWrites tests that the MVCCStats updates +// corresponding to writes and GCs are commutative. +// +// This test does so by: +// 1. Initially writing N versions of a key. +// 2. Concurrently GC-ing the N-1 versions written in step 1 while writing N-1 +// new versions of the key. +// 3. Concurrently recomputing MVCC stats (via RecomputeStatsRequests) in the +// background and ensuring that the stats are always consistent at all times. +func TestMVCCStatsGCCommutesWithWrites(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + key := tc.ScratchRange(t) + store, err := tc.Server(0).GetStores().(*Stores).GetStore(tc.Server(0).GetFirstStoreID()) + require.NoError(t, err) + + write := func() hlc.Timestamp { + var ba roachpb.BatchRequest + put := putArgs(key, []byte("0")) + ba.Add(&put) + resp, pErr := store.TestSender().Send(ctx, ba) + require.Nil(t, pErr) + return resp.Timestamp + } + + // Write `numIterations` versions for a key. + const numIterations = 100 + writeTimestamps := make([]hlc.Timestamp, 0, numIterations) + for i := 0; i < numIterations; i++ { + writeTimestamps = append(writeTimestamps, write()) + } + + // Now, we GC the first `numIterations-1` versions we wrote above while + // concurrently writing `numIterations-1` new versions. + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + for _, ts := range writeTimestamps[:numIterations-1] { + gcReq := gcArgs(key, key.Next(), gcKey(key, ts)) + _, pErr := kv.SendWrapped(ctx, store.TestSender(), &gcReq) + require.Nil(t, pErr) + } + }() + go func() { + defer wg.Done() + for i := 0; i < numIterations-1; i++ { + write() + } + }() + // Also concurrently recompute stats and ensure that they're consistent at all + // times. + go func() { + defer wg.Done() + expDelta := enginepb.MVCCStats{} + for i := 0; i < numIterations; i++ { + recomputeReq := recomputeStatsArgs(key) + resp, pErr := kv.SendWrapped(ctx, store.TestSender(), &recomputeReq) + require.Nil(t, pErr) + delta := enginepb.MVCCStats(resp.(*roachpb.RecomputeStatsResponse).AddedDelta) + delta.AgeTo(expDelta.LastUpdateNanos) + require.Equal(t, expDelta, delta) + } + }() + + wg.Wait() +} + // TestBatchTimestampBelowGCThreshold verifies that commands below the replica // GC threshold fail. func TestBatchTimestampBelowGCThreshold(t *testing.T) { @@ -8778,6 +8808,18 @@ func TestGCThresholdRacesWithRead(t *testing.T) { require.Nil(t, err) require.Equal(t, va, b) + // Since the GC request does not acquire latches on the keys being GC'ed, + // they're not guaranteed to wait for these above Puts to get applied on + // the leaseholder. See AckCommittedEntriesBeforeApplication() the comment + // above it for more details. So we separately ensure both these Puts have + // been applied by just trying to read the latest value @ ts2. These Get + // requests do indeed declare latches on the keys being read, so by the + // time they return, subsequent GC requests are guaranteed to see the + // latest keys. + gArgs = getArgs(key) + _, pErr = kv.SendWrappedWith(ctx, reader, h2, &gArgs) + require.Nil(t, pErr) + // Perform two actions concurrently: // 1. GC up to ts2. This should remove the k@ts1 version. // 2. Read @ ts1. diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index 77de83c2b3d3..3ef23e3366da 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -814,19 +814,6 @@ func NewBatchAt(b storage.Batch, spans *SpanSet, ts hlc.Timestamp) storage.Batch } } -// DisableReaderAssertions unwraps any storage.Reader implementations that may -// assert access against a given SpanSet. -func DisableReaderAssertions(reader storage.Reader) storage.Reader { - switch v := reader.(type) { - case ReadWriter: - return DisableReaderAssertions(v.r) - case *spanSetBatch: - return DisableReaderAssertions(v.r) - default: - return reader - } -} - // addLockTableSpans adds corresponding lock table spans for the declared // spans. This is to implicitly allow raw access to separated intents in the // lock table for any declared keys. Explicitly declaring lock table spans is diff --git a/pkg/kv/kvserver/spanset/spanset.go b/pkg/kv/kvserver/spanset/spanset.go index fdefe74434e8..ff98215d4bc3 100644 --- a/pkg/kv/kvserver/spanset/spanset.go +++ b/pkg/kv/kvserver/spanset/spanset.go @@ -82,7 +82,8 @@ type Span struct { // The Span slice for a particular access and scope contains non-overlapping // spans in increasing key order after calls to SortAndDedup. type SpanSet struct { - spans [NumSpanAccess][NumSpanScope][]Span + spans [NumSpanAccess][NumSpanScope][]Span + allowUndeclared bool } var spanSetPool = sync.Pool{ @@ -111,6 +112,7 @@ func (s *SpanSet) Release() { s.spans[sa][ss] = recycle } } + s.allowUndeclared = false spanSetPool.Put(s) } @@ -152,6 +154,7 @@ func (s *SpanSet) Copy() *SpanSet { n.spans[sa][ss] = append(n.spans[sa][ss], s.spans[sa][ss]...) } } + n.allowUndeclared = s.allowUndeclared return n } @@ -204,6 +207,7 @@ func (s *SpanSet) Merge(s2 *SpanSet) { s.spans[sa][ss] = append(s.spans[sa][ss], s2.spans[sa][ss]...) } } + s.allowUndeclared = s2.allowUndeclared s.SortAndDedup() } @@ -335,6 +339,12 @@ func (s *SpanSet) CheckAllowedAt( func (s *SpanSet) checkAllowed( access SpanAccess, span roachpb.Span, check func(SpanAccess, Span) bool, ) error { + if s.allowUndeclared { + // If the request has specified that undeclared spans are allowed, do + // nothing. + return nil + } + scope := SpanGlobal if (span.Key != nil && keys.IsLocal(span.Key)) || (span.EndKey != nil && keys.IsLocal(span.EndKey)) { @@ -387,3 +397,10 @@ func (s *SpanSet) Validate() error { return nil } + +// DisableUndeclaredAccessAssertions disables the assertions that prevent +// undeclared access to spans. This is generally set by requests that rely on +// other forms of synchronization for correctness (e.g. GCRequest). +func (s *SpanSet) DisableUndeclaredAccessAssertions() { + s.allowUndeclared = true +}