From 67a6c2547f6ac9d2e600b76d11ddc37f42d8b365 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Thu, 4 Aug 2022 18:08:50 -0400 Subject: [PATCH] kvserver: allow certain read-only requests to drop latches before evaluation This commit introduces a change to the way certain types of read-only requests are evaluated. Traditionally, read-only requests have held their latches throughout their execution. This commit allows certain qualifying reads to be able to release their latches earlier. At a high level, reads may attempt to resolve all conflicts upfront by performing a sort of "validation" phase before they perform their MVCC scan. This validation phase performs a scan of the lock table keyspace in order to find any conflicting intents that may need to be resolved before the actual evaluation of the request over the MVCC keyspace. If no conflicting intents are found, then (since https://github.com/cockroachdb/cockroach/pull/76312), the request is guaranteed to be fully isolated against all other concurrent requests and can be allowed to release its latches at this point. This allows the actual evaluation of the read (over the MVCC part of the keyspace) to proceed without latches being held, which is the main motivation of this work. This validation phase could be thought of as an extension to the validation that the concurrency manager already performs when requests are sequenced through it, by trying to detect any conflicting intents that have already been pulled into the in-memory lock table. Additionally, for certain types of requests that can drop their latches early, and do not need to access the `IntentHistory` for any of their parent txn's intents, this commit attempts to make their MVCC scan cheaper by eliminating the need for an `intentInterleavingIterator`. This is enabled by the observation that once the validation phase is complete, the only remaining intents in the read's declared span must be intents belonging to the reader's transaction. So if the reader doesn't need to read an intent that isn't the latest intent on a key, then it doesn't need access to the key's `IntentHistory` (which lives in the lock-table keyspace), and doesn't need to use an `intentInterleavingIterator`. Release note (performance improvement): certain types of reads will now have a far smaller contention footprint with conflicting concurrent writers Resolves https://github.com/cockroachdb/cockroach/issues/66485 Release justification: high benefit change to existing functionality, part of 22.2 roadmap --- pkg/kv/kvserver/batcheval/cmd_get.go | 15 +- pkg/kv/kvserver/batcheval/cmd_reverse_scan.go | 25 ++- pkg/kv/kvserver/batcheval/cmd_scan.go | 27 +-- pkg/kv/kvserver/batcheval/declare.go | 7 +- pkg/kv/kvserver/client_replica_test.go | 46 ++-- pkg/kv/kvserver/replica_evaluate.go | 183 +++++++++++++++- pkg/kv/kvserver/replica_evaluate_test.go | 10 +- pkg/kv/kvserver/replica_gossip.go | 5 +- pkg/kv/kvserver/replica_read.go | 206 +++++++++++++++--- pkg/kv/kvserver/replica_send.go | 56 ++--- pkg/kv/kvserver/replica_test.go | 91 ++++++-- pkg/kv/kvserver/replica_tscache.go | 17 +- pkg/kv/kvserver/replica_write.go | 2 +- pkg/kv/kvserver/store_test.go | 139 +++++++++++- pkg/kv/kvserver/testing_knobs.go | 8 + pkg/storage/mvcc.go | 163 +++++++++----- 16 files changed, 777 insertions(+), 223 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index 78b270643e55..8a6506ce11aa 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -53,13 +53,14 @@ func Get( var intent *roachpb.Intent var err error val, intent, err = storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{ - Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, - SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, - Txn: h.Txn, - FailOnMoreRecent: args.KeyLocking != lock.None, - Uncertainty: cArgs.Uncertainty, - MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), - LockTable: cArgs.Concurrency, + Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, + SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, + Txn: h.Txn, + FailOnMoreRecent: args.KeyLocking != lock.None, + Uncertainty: cArgs.Uncertainty, + MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), + LockTable: cArgs.Concurrency, + DontInterleaveIntents: cArgs.DontInterleaveIntents, }) if err != nil { return result.Result{}, err diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index cb83a622c4f5..8ae129f7976e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -40,18 +40,19 @@ func ReverseScan( var err error opts := storage.MVCCScanOptions{ - Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, - SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, - Txn: h.Txn, - MaxKeys: h.MaxSpanRequestKeys, - MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), - TargetBytes: h.TargetBytes, - AllowEmpty: h.AllowEmpty, - WholeRowsOfSize: h.WholeRowsOfSize, - FailOnMoreRecent: args.KeyLocking != lock.None, - Reverse: true, - MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), - LockTable: cArgs.Concurrency, + Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, + SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, + Txn: h.Txn, + MaxKeys: h.MaxSpanRequestKeys, + MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), + TargetBytes: h.TargetBytes, + AllowEmpty: h.AllowEmpty, + WholeRowsOfSize: h.WholeRowsOfSize, + FailOnMoreRecent: args.KeyLocking != lock.None, + Reverse: true, + MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), + LockTable: cArgs.Concurrency, + DontInterleaveIntents: cArgs.DontInterleaveIntents, } switch args.ScanFormat { diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index ffac770c5232..19f2f6aa48c4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -40,19 +40,20 @@ func Scan( var err error opts := storage.MVCCScanOptions{ - Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, - SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, - Txn: h.Txn, - Uncertainty: cArgs.Uncertainty, - MaxKeys: h.MaxSpanRequestKeys, - MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), - TargetBytes: h.TargetBytes, - AllowEmpty: h.AllowEmpty, - WholeRowsOfSize: h.WholeRowsOfSize, - FailOnMoreRecent: args.KeyLocking != lock.None, - Reverse: false, - MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), - LockTable: cArgs.Concurrency, + Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, + SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, + Txn: h.Txn, + Uncertainty: cArgs.Uncertainty, + MaxKeys: h.MaxSpanRequestKeys, + MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), + TargetBytes: h.TargetBytes, + AllowEmpty: h.AllowEmpty, + WholeRowsOfSize: h.WholeRowsOfSize, + FailOnMoreRecent: args.KeyLocking != lock.None, + Reverse: false, + MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), + LockTable: cArgs.Concurrency, + DontInterleaveIntents: cArgs.DontInterleaveIntents, } switch args.ScanFormat { diff --git a/pkg/kv/kvserver/batcheval/declare.go b/pkg/kv/kvserver/batcheval/declare.go index 52979d9d808f..2e230d3b20a5 100644 --- a/pkg/kv/kvserver/batcheval/declare.go +++ b/pkg/kv/kvserver/batcheval/declare.go @@ -121,7 +121,8 @@ type CommandArgs struct { Args roachpb.Request Now hlc.ClockTimestamp // *Stats should be mutated to reflect any writes made by the command. - Stats *enginepb.MVCCStats - Concurrency *concurrency.Guard - Uncertainty uncertainty.Interval + Stats *enginepb.MVCCStats + Concurrency *concurrency.Guard + Uncertainty uncertainty.Interval + DontInterleaveIntents bool } diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index e17bd407638f..b2a56e8f0dd9 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -252,17 +252,13 @@ func TestTxnPutOutOfOrder(t *testing.T) { restartKey = "restart" ) // Set up a filter to so that the get operation at Step 3 will return an error. - var numGets int32 + var shouldFailGet atomic.Value testingEvalFilter := func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { if _, ok := filterArgs.Req.(*roachpb.GetRequest); ok && filterArgs.Req.Header().Key.Equal(roachpb.Key(key)) && filterArgs.Hdr.Txn == nil { - // The Reader executes two get operations, each of which triggers two get requests - // (the first request fails and triggers txn push, and then the second request - // succeeds). Returns an error for the fourth get request to avoid timestamp cache - // update after the third get operation pushes the txn timestamp. - if atomic.AddInt32(&numGets, 1) == 4 { + if shouldFail := shouldFailGet.Load(); shouldFail != nil && shouldFail.(bool) { return roachpb.NewErrorWithTxn(errors.Errorf("Test"), filterArgs.Hdr.Txn) } } @@ -401,6 +397,7 @@ func TestTxnPutOutOfOrder(t *testing.T) { manual.Increment(100) h.Timestamp = s.Clock().Now() + shouldFailGet.Store(true) if _, err := kv.SendWrappedWith( context.Background(), store.TestSender(), h, &roachpb.GetRequest{RequestHeader: requestHeader}, ); err == nil { @@ -4493,20 +4490,6 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { var txn2ID atomic.Value var txn2BBlockOnce sync.Once txn2BlockedC := make(chan chan struct{}) - postEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { - if txn := args.Hdr.Txn; txn != nil && txn.ID == txn2ID.Load() { - txn2BBlockOnce.Do(func() { - if !errors.HasType(args.Err, (*roachpb.WriteIntentError)(nil)) { - t.Errorf("expected WriteIntentError; got %v", args.Err) - } - - unblockCh := make(chan struct{}) - txn2BlockedC <- unblockCh - <-unblockCh - }) - } - return nil - } // Detect when txn4 discovers txn3's intent and begins to push. var txn4ID atomic.Value @@ -4527,10 +4510,20 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ - EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ - TestingPostEvalFilter: postEvalFilter, - }, TestingRequestFilter: requestFilter, + TestingConcurrencyRetryFilter: func(ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error) { + if txn := ba.Txn; txn != nil && txn.ID == txn2ID.Load() { + txn2BBlockOnce.Do(func() { + if !errors.HasType(pErr.GoError(), (*roachpb.WriteIntentError)(nil)) { + t.Errorf("expected WriteIntentError; got %v", pErr) + } + + unblockCh := make(chan struct{}) + txn2BlockedC <- unblockCh + <-unblockCh + }) + } + }, // Required by TestCluster.MoveRangeLeaseNonCooperatively. AllowLeaseRequestProposalsWhenNotLeader: true, }, @@ -4563,7 +4556,12 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { _, err := txn2.Get(ctx, key) err2C <- err }() - txn2UnblockC := <-txn2BlockedC + var txn2UnblockC chan struct{} + select { + case txn2UnblockC = <-txn2BlockedC: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for txn2 to block") + } // Transfer the lease to Server 1. Do so non-cooperatively instead of using // a lease transfer, because the cooperative lease transfer would get stuck diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index 68507e3dd765..2d51d7c094f3 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -13,18 +13,23 @@ package kvserver import ( "bytes" "context" + "fmt" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "github.com/kr/pretty" @@ -152,7 +157,7 @@ func evaluateBatch( g *concurrency.Guard, st *kvserverpb.LeaseStatus, ui uncertainty.Interval, - readOnly bool, + evalPath batchEvalPath, ) (_ *roachpb.BatchResponse, _ result.Result, retErr *roachpb.Error) { defer func() { // Ensure that errors don't carry the WriteTooOld flag set. The client @@ -175,7 +180,7 @@ func evaluateBatch( br := ba.CreateReply() // Optimize any contiguous sequences of put and conditional put ops. - if len(baReqs) >= optimizePutThreshold && !readOnly { + if len(baReqs) >= optimizePutThreshold && evalPath == readWrite { baReqs = optimizePuts(readWriter, baReqs, baHeader.DistinctSpans) } @@ -270,7 +275,8 @@ func evaluateBatch( // may carry a response transaction and in the case of WriteTooOldError // (which is sometimes deferred) it is fully populated. curResult, err := evaluateCommand( - ctx, readWriter, rec, ms, baHeader, args, reply, g, st, ui) + ctx, readWriter, rec, ms, baHeader, args, reply, g, st, ui, evalPath, + ) if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil { filterArgs := kvserverbase.FilterArgs{ @@ -480,6 +486,7 @@ func evaluateCommand( g *concurrency.Guard, st *kvserverpb.LeaseStatus, ui uncertainty.Interval, + evalPath batchEvalPath, ) (result.Result, error) { var err error var pd result.Result @@ -490,13 +497,14 @@ func evaluateCommand( now = st.Now } cArgs := batcheval.CommandArgs{ - EvalCtx: rec, - Header: h, - Args: args, - Now: now, - Stats: ms, - Concurrency: g, - Uncertainty: ui, + EvalCtx: rec, + Header: h, + Args: args, + Now: now, + Stats: ms, + Concurrency: g, + Uncertainty: ui, + DontInterleaveIntents: evalPath == readOnlyWithoutInterleavedIntents, } if cmd.EvalRW != nil { @@ -607,3 +615,158 @@ func canDoServersideRetry( } return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp) } + +// canReadOnlyRequestDropLatchesBeforeEval determines whether the batch request +// can potentially resolve its conflicts upfront (by scanning just the lock +// table first), bump the ts cache, release latches and then proceed with +// evaluation. Only non-locking read requests that aren't being evaluated under +// the `OptimisticEval` path are eligible for this optimization. +func canReadOnlyRequestDropLatchesBeforeEval(ba *roachpb.BatchRequest, g *concurrency.Guard) bool { + switch ba.Header.ReadConsistency { + case roachpb.CONSISTENT: + + // TODO(aayush): INCONSISTENT and READ_UNCOMMITTED reads do not care about + // resolving lock conflicts at all. Yet, they can still drop latches early and + // evaluate once they've pinned their pebble engine state. We should consider + // supporting this by letting these kinds of requests drop latches early while + // also skipping the initial validation step of scanning the lock table. + case roachpb.INCONSISTENT: + return false + case roachpb.READ_UNCOMMITTED: + return false + default: + panic(fmt.Sprintf("unexpected ReadConsistency: %s", ba.Header.ReadConsistency)) + } + if g == nil { + // NB: A nil guard indicates that the caller is not holding latches. + return false + } + if len(g.Req.LockSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanLocal)) != 0 { + // If the request declared any local read spans, it must hold latches + // throughout its execution. + return false + } + switch g.EvalKind { + case concurrency.PessimisticEval, concurrency.PessimisticAfterFailedOptimisticEval: + case concurrency.OptimisticEval: + // Requests going through the optimistic path are not allowed to drop their + // latches before evaluation since we do not know upfront the extent to + // which they will end up reading, and thus we cannot determine how much of + // the timestamp cache to update. + return false + default: + panic(fmt.Sprintf("unexpected EvalKind: %v", g.EvalKind)) + } + // Only non-locking reads are eligible. This is because requests that need to + // lock the keys that they end up reading need to be isolated against other + // conflicting requests during their execution. Thus, they cannot release + // their latches before evaluation. + if ba.IsLocking() { + return false + } + switch ba.WaitPolicy { + case lock.WaitPolicy_Block, lock.WaitPolicy_Error: + case lock.WaitPolicy_SkipLocked: + // SkipLocked should only bump the timestamp cache over the keys that they + // actually ended up reading, and not the keys they ended up skipping over. + // Thus, they are not allowed to drop their latches before evaluation. + return false + default: + panic(fmt.Sprintf("unexpected WaitPolicy: %s", ba.WaitPolicy)) + } + // We allow all non-locking, pessimistically evaluating read requests to try + // and resolve their conflicts upfront. + for _, req := range ba.Requests { + inner := req.GetInner() + switch inner.(type) { + case *roachpb.ExportRequest, *roachpb.GetRequest, *roachpb.ScanRequest, *roachpb.ReverseScanRequest: + default: + return false + } + } + return true +} + +// scanConflictingIntents scans intents using only the separated intents lock +// table. The result set is added to the given `intents` slice. It ignores +// intents that do not conflict with `txn`. If it encounters intents that were +// written by `txn` that are either at a higher sequence number than txn's or at +// a lower sequence number but at a higher timestamp, `needIntentHistory` is set +// to true. This flag is used to signal to the caller that a subsequent scan +// over the MVCC key space (for the batch in question) will need to be performed +// using an intent interleaving iterator in order to be able to read the correct +// provisional value. +func scanConflictingIntents( + ctx context.Context, + reader storage.Reader, + txn *roachpb.Transaction, + ts hlc.Timestamp, + start, end roachpb.Key, + intents *[]roachpb.Intent, + maxIntents, curIntentBytes, targetBytes int64, +) (intentBytes int64, needIntentHistory bool, err error) { + upperBoundUnset := bytes.Equal(end, roachpb.KeyMin) // NB: Get requests do not set the end key. + if !upperBoundUnset && bytes.Compare(start, end) >= 0 { + return curIntentBytes, true /* needIntentHistory */, errors.AssertionFailedf("start key must be less than end key") + } + + ltStart, _ := keys.LockTableSingleKey(start, nil) + ltEnd, _ := keys.LockTableSingleKey(end, nil) + opts := storage.IterOptions{LowerBound: ltStart} + if upperBoundUnset { + opts.Prefix = true + } else { + opts.UpperBound = ltEnd + } + iter := reader.NewEngineIterator(opts) + defer iter.Close() + + var meta enginepb.MVCCMetadata + var ok bool + for ok, err = iter.SeekEngineKeyGE(storage.EngineKey{Key: ltStart}); ok; ok, err = iter.NextEngineKey() { + if err := ctx.Err(); err != nil { + return curIntentBytes, false, err + } + if maxIntents != 0 && int64(len(*intents)) >= maxIntents { + break + } + if targetBytes != 0 && curIntentBytes >= targetBytes { + break + } + key, err := iter.EngineKey() + if err != nil { + return curIntentBytes, false, err + } + lockedKey, err := keys.DecodeLockTableSingleKey(key.Key) + if err != nil { + return curIntentBytes, false, err + } + if err = protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil { + return curIntentBytes, false, err + } + if meta.Txn == nil { + return curIntentBytes, false, errors.Errorf("intent without transaction") + } + ownIntent := txn != nil && txn.ID == meta.Txn.ID + if ownIntent { + // If we ran into one of our own intents, check whether the intent has a + // higher (or equal) sequence number or a higher (or equal) timestamp. If + // either of these conditions is true, a corresponding scan over the MVCC + // key space will need access to the key's intent history in order to read + // the correct provisional value. So we set `needIntentHistory` to true. + if txn.Sequence <= meta.Txn.Sequence || ts.LessEq(meta.Timestamp.ToTimestamp()) { + needIntentHistory = true + } + continue + } + if conflictingIntent := meta.Timestamp.ToTimestamp().LessEq(ts); !conflictingIntent { + continue + } + *intents = append(*intents, roachpb.MakeIntent(meta.Txn, lockedKey)) + curIntentBytes += int64(len(lockedKey)) + int64(len(iter.Value())) + } + if err != nil { + return curIntentBytes, false, err + } + return curIntentBytes, needIntentHistory, nil /* err */ +} diff --git a/pkg/kv/kvserver/replica_evaluate_test.go b/pkg/kv/kvserver/replica_evaluate_test.go index 312bde10a818..31f0241e2eba 100644 --- a/pkg/kv/kvserver/replica_evaluate_test.go +++ b/pkg/kv/kvserver/replica_evaluate_test.go @@ -674,6 +674,10 @@ func TestEvaluateBatch(t *testing.T) { var r resp r.d = d + evalPath := readWrite + if d.readOnly { + evalPath = readOnlyDefault + } r.br, r.res, r.pErr = evaluateBatch( ctx, d.idKey, @@ -681,10 +685,10 @@ func TestEvaluateBatch(t *testing.T) { d.MockEvalCtx.EvalContext(), &d.ms, &d.ba, - nil, /* g */ - nil, /* st */ + nil, + nil, uncertainty.Interval{}, - d.readOnly, + evalPath, ) tc.check(t, r) diff --git a/pkg/kv/kvserver/replica_gossip.go b/pkg/kv/kvserver/replica_gossip.go index 96c40a425b30..d2687dd149e5 100644 --- a/pkg/kv/kvserver/replica_gossip.go +++ b/pkg/kv/kvserver/replica_gossip.go @@ -97,7 +97,10 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked( defer rw.Close() br, result, pErr := - evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, nil /* g */, nil /* st */, uncertainty.Interval{}, true /* readOnly */) + evaluateBatch( + ctx, kvserverbase.CmdIDKey(""), rw, rec, nil /* ms */, &ba, + nil /* g */, nil /* st */, uncertainty.Interval{}, readOnlyDefault, + ) if pErr != nil { return errors.Wrapf(pErr.GoError(), "couldn't scan node liveness records in span %s", span) } diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index d0bc0f20e1b9..2c76a32bc18f 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -23,12 +23,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" "github.com/kr/pretty" ) @@ -79,27 +81,37 @@ func (r *Replica) executeReadOnlyBatch( if err := r.checkExecutionCanProceedAfterStorageSnapshot(ba, st); err != nil { return nil, g, nil, roachpb.NewError(err) } - // TODO(nvanbenschoten): once all replicated intents are pulled into the - // concurrency manager's lock-table, we can be sure that if we reached this - // point, we will not conflict with any of them during evaluation. This in - // turn means that we can bump the timestamp cache *before* evaluation - // without risk of starving writes. Once we start doing that, we're free to - // release latches immediately after we acquire an engine iterator as long - // as we're performing a non-locking read. Note that this also requires that - // the request is not being optimistically evaluated (optimistic evaluation - // does not wait for latches or check locks). It would also be nice, but not - // required for correctness, that the read-only engine eagerly create an - // iterator (that is later cloned) while the latches are held, so that this - // request does not "see" the effect of any later requests that happen after - // the latches are released. + ok, stillNeedsInterleavedIntents, pErr := r.canDropLatchesBeforeEval(ctx, rw, ba, g, st) + if pErr != nil { + return nil, g, nil, pErr + } + evalPath := readOnlyDefault + if ok { + // Since the concurrency manager has sequenced this request all the intents + // that are in the concurrency manager's lock table, and we've scanned the + // replicated lock-table keyspace above in `canDropLatchesBeforeEval`, we + // can be sure that if we reached this point, we will not conflict with any + // of them during evaluation. This in turn means that we can bump the + // timestamp cache *before* evaluation without risk of starving writes. + // Consequently, we're free to release latches here since we've acquired a + // pebble iterator as long as we're performing a non-locking read (also + // checked in `canDropLatchesBeforeEval`). Note that this also requires that + // the request is not being optimistically evaluated (optimistic evaluation + // does not wait for latches or check locks). + log.VEventf(ctx, 3, "lock table scan complete without conflicts; dropping latches early") + if !stillNeedsInterleavedIntents { + evalPath = readOnlyWithoutInterleavedIntents + } + g = r.updateTimestampCacheAndDropLatches(ctx, g, ba, nil /* br */, nil /* pErr */, st) + } var result result.Result - br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui) + br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui, evalPath) // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. if isConcurrencyRetryError(pErr) { - if g.EvalKind == concurrency.OptimisticEval { + if g != nil && g.EvalKind == concurrency.OptimisticEval { // Since this request was not holding latches, it could have raced with // intent resolution. So we can't trust it to add discovered locks, if // there is a latch conflict. This means that a discovered lock plus a @@ -120,7 +132,7 @@ func (r *Replica) executeReadOnlyBatch( return nil, g, nil, pErr } - if g.EvalKind == concurrency.OptimisticEval { + if g != nil && g.EvalKind == concurrency.OptimisticEval { if pErr == nil { // Gather the spans that were read -- we distinguish the spans in the // request from the spans that were actually read, using resume spans in @@ -150,17 +162,10 @@ func (r *Replica) executeReadOnlyBatch( if pErr == nil { pErr = r.handleReadOnlyLocalEvalResult(ctx, ba, result.Local) } - - // Otherwise, update the timestamp cache and release the concurrency guard. - // Note: - // - The update to the timestamp cache is not gated on pErr == nil, - // since certain semantic errors (e.g. ConditionFailedError on CPut) - // require updating the timestamp cache (see updatesTSCacheOnErr). - // - For optimistic evaluation, used for limited scans, the update to the - // timestamp cache limits itself to the spans that were read, by using - // the ResumeSpans. - ec, g := endCmds{repl: r, g: g, st: st}, nil - ec.done(ctx, ba, br, pErr) + if g != nil { + // If we didn't already drop latches earlier, do so now. + _ = r.updateTimestampCacheAndDropLatches(ctx, g, ba, br, pErr, st) + } // Semi-synchronously process any intents that need resolving here in // order to apply back pressure on the client which generated them. The @@ -180,7 +185,11 @@ func (r *Replica) executeReadOnlyBatch( // prohibits any concurrent requests for the same range. See #17760. allowSyncProcessing := ba.ReadConsistency == roachpb.CONSISTENT && ba.WaitPolicy != lock.WaitPolicy_SkipLocked - if err := r.store.intentResolver.CleanupIntentsAsync(ctx, intents, allowSyncProcessing); err != nil { + if err := r.store.intentResolver.CleanupIntentsAsync( + ctx, + intents, + allowSyncProcessing, + ); err != nil { log.Warningf(ctx, "%v", err) } } @@ -196,6 +205,118 @@ func (r *Replica) executeReadOnlyBatch( return br, nil, nil, pErr } +// updateTimestampCacheAndDropLatches updates the timestamp cache and releases +// the concurrency guard. +// Note: +// - If `br` is nil, then this method assumes that latches are being released +// before evaluation of the request, and the timestamp cache is updated based +// only on the spans declared in the request. +// - The update to the timestamp cache is not gated on pErr == nil, since +// certain semantic errors (e.g. ConditionFailedError on CPut) require updating +// the timestamp cache (see updatesTSCacheOnErr). +// - For optimistic evaluation, used for limited scans, the update to the +// timestamp cache limits itself to the spans that were read, by using the +// ResumeSpans. +func (r *Replica) updateTimestampCacheAndDropLatches( + ctx context.Context, + g *concurrency.Guard, + ba *roachpb.BatchRequest, + br *roachpb.BatchResponse, + pErr *roachpb.Error, + st kvserverpb.LeaseStatus, +) *concurrency.Guard { + ec := endCmds{repl: r, g: g, st: st} + ec.done(ctx, ba, br, pErr) + return nil +} + +var allowDroppingLatchesBeforeEval = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.transaction.dropping_latches_before_eval.enabled", + "if enabled, allows certain read-only KV requests to drop latches before they evaluate", + true, +) + +// canDropLatchesBeforeEval determines whether a given batch request can proceed +// with evaluation without continuing to hold onto its latches[1] and if so, +// whether the evaluation of the requests in the batch needs an intent +// interleaving iterator[2]. +// +// [1] whether the request can safely release latches at this point in the +// execution. +// For certain qualifying types of requests (certain types of read-only +// requests: see `canReadOnlyRequestDropLatchesBeforeEval`), this method +// performs a scan of the lock table keyspace corresponding to the latch spans +// declared by the BatchRequest. +// If no conflicting intents are found, then it is deemed safe for this request +// to release its latches at this point. This is because read-only requests +// evaluate over a stable pebble snapshot (see the call to +// `PinEngineStateForIterators` in `executeReadOnlyBatch`), so if there are no +// lock conflicts, the rest of the execution is guaranteed to be isolated from +// the effects of other requests. +// If any conflicting intents are found, then it returns a WriteIntentError +// which needs to be handled by the caller before proceeding. +// +// [2] if the request can drop its latches early, whether it needs an intent +// interleaving iterator to perform its evaluation. +// If the aforementioned lock table scan determines that any of the requests in +// the batch may need access to the intent history of a key, then an intent +// interleaving iterator is needed to perform the evaluation. +func (r *Replica) canDropLatchesBeforeEval( + ctx context.Context, + rw storage.ReadWriter, + ba *roachpb.BatchRequest, + g *concurrency.Guard, + st kvserverpb.LeaseStatus, +) (ok, stillNeedsIntentInterleaving bool, pErr *roachpb.Error) { + disabled := r.store.cfg.TestingKnobs.DisableDroppingLatchesBeforeEval || + !allowDroppingLatchesBeforeEval.Get(&r.store.cfg.Settings.SV) + if disabled || !canReadOnlyRequestDropLatchesBeforeEval(ba, g) { + // If the request does not qualify, we neither drop latches nor use a + // non-interleaving iterator. + return false /* ok */, true /* stillNeedsIntentInterleaving */, nil + } + + log.VEventf( + ctx, 3, "can drop latches early for batch (%v); scanning lock table first to detect conflicts", ba, + ) + + maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&r.store.cfg.Settings.SV) + var start, end roachpb.Key + var intents []roachpb.Intent + var err error + var cumIntentBytes int64 + // Check if any of the requests within the batch need to resolve any intents + // or if any of them need to use an intent interleaving iterator. + for _, req := range ba.Requests { + start, end = req.GetInner().Header().Key, req.GetInner().Header().EndKey + var needsIntentInterleavingForThisRequest bool + cumIntentBytes, needsIntentInterleavingForThisRequest, err = scanConflictingIntents( + ctx, rw, ba.Txn, ba.Header.Timestamp, start, end, &intents, maxIntents, cumIntentBytes, ba.TargetBytes, + ) + if err != nil { + return false /* ok */, true /* stillNeedsIntentInterleaving */, roachpb.NewError( + errors.Wrap(err, "scanning intents"), + ) + } + stillNeedsIntentInterleaving = stillNeedsIntentInterleaving || needsIntentInterleavingForThisRequest + if maxIntents != 0 && int64(len(intents)) >= maxIntents { + break + } + if ba.TargetBytes != 0 && cumIntentBytes >= ba.TargetBytes { + break + } + } + if len(intents) > 0 { + return false /* ok */, false /* stillNeedsIntentInterleaving */, maybeAttachLease( + roachpb.NewError(&roachpb.WriteIntentError{Intents: intents}), &st.Lease, + ) + } + // If there were no conflicts, then the request can drop its latches and + // proceed with evaluation. + return true /* ok */, stillNeedsIntentInterleaving, nil +} + // evalContextWithAccount wraps an EvalContext to provide a non-nil // mon.BoundAccount. This wrapping is conditional on various factors, and // specific to a request (see executeReadOnlyBatchWithServersideRefreshes), @@ -244,6 +365,19 @@ func (e evalContextWithAccount) GetResponseMemoryAccount() *mon.BoundAccount { return e.memAccount } +// batchEvalPath enumerates the different evaluation paths that can be taken by +// a batch. +type batchEvalPath int + +const ( + // readOnlyDefault is the default evaluation path taken by read only requests. + readOnlyDefault batchEvalPath = iota + // readOnlyWithoutInterleavedIntents indicates that the request does not need + // an intent interleaving iterator during its evaluation. + readOnlyWithoutInterleavedIntents + readWrite +) + // executeReadOnlyBatchWithServersideRefreshes invokes evaluateBatch and retries // at a higher timestamp in the event of some retriable errors if allowed by the // batch/txn. @@ -255,6 +389,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( g *concurrency.Guard, st *kvserverpb.LeaseStatus, ui uncertainty.Interval, + evalPath batchEvalPath, ) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { log.Event(ctx, "executing read-only batch") @@ -305,14 +440,25 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( log.VEventf(ctx, 2, "server-side retry of batch") } now := timeutil.Now() - br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, g, st, ui, true /* readOnly */) + br, res, pErr = evaluateBatch( + ctx, kvserverbase.CmdIDKey(""), rw, rec, nil /* ms */, ba, g, st, ui, evalPath, + ) r.store.metrics.ReplicaReadBatchEvaluationLatency.RecordValue(timeutil.Since(now).Nanoseconds()) // Allow only one retry. if pErr == nil || retries > 0 { break } // If we can retry, set a higher batch timestamp and continue. - if !canDoServersideRetry(ctx, pErr, ba, br, g, hlc.Timestamp{} /* deadline */) { + // + // Note that if the batch request has already released its latches (as + // indicated by the latch guard being nil) before this point, then it cannot + // retry at a higher timestamp because it is not isolated at higher + // timestamps. + latchesHeld := g != nil + if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, br, g, hlc.Timestamp{}) { + // TODO(aayush,arul): These metrics are incorrect at the moment since + // hitting this branch does not mean that we won't serverside retry, it + // just means that we will have to reacquire latches. r.store.Metrics().ReadEvaluationServerSideRetryFailure.Inc(1) break } else { diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 4fb51a8ccb66..d07b6c6334ec 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -477,10 +477,19 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } - // The batch execution func returned a server-side concurrency retry - // error. It must have also handed back ownership of the concurrency - // guard without having already released the guard's latches. - g.AssertLatches() + // The batch execution func returned a server-side concurrency retry error. + // It may have either handed back ownership of the concurrency guard without + // having already released the guard's latches, or in case of certain types + // of read-only requests (see `canReadOnlyRequestDropLatchesBeforeEval`), it + // may have released the guard's latches. + dropLatchesAndLockWaitQueues := func() { + if g != nil { + latchSpans, lockSpans = g.TakeSpanSets() + r.concMgr.FinishReq(g) + g = nil + } + } + if filter := r.store.cfg.TestingKnobs.TestingConcurrencyRetryFilter; filter != nil { filter(ctx, *ba, pErr) } @@ -490,24 +499,25 @@ func (r *Replica) executeBatchWithConcurrencyRetries( // when checking for conflicts, which is handled below. Note that an // optimistic eval failure for any other reason will also retry as // PessimisticEval. - requestEvalKind = concurrency.PessimisticEval + if !r.store.cfg.TestingKnobs.RetainEvalKindOnConcurrencyRetry { + requestEvalKind = concurrency.PessimisticEval + } switch t := pErr.GetDetail().(type) { case *roachpb.WriteIntentError: // Drop latches, but retain lock wait-queues. + g.AssertLatches() if g, pErr = r.handleWriteIntentError(ctx, ba, g, pErr, t); pErr != nil { return nil, nil, pErr } case *roachpb.TransactionPushError: // Drop latches, but retain lock wait-queues. + g.AssertLatches() if g, pErr = r.handleTransactionPushError(ctx, ba, g, pErr, t); pErr != nil { return nil, nil, pErr } case *roachpb.IndeterminateCommitError: - // Drop latches and lock wait-queues. - latchSpans, lockSpans = g.TakeSpanSets() - r.concMgr.FinishReq(g) - g = nil + dropLatchesAndLockWaitQueues() // Then launch a task to handle the indeterminate commit error. No error // is returned if the transaction is recovered successfully to either a // COMMITTED or ABORTED state. @@ -515,9 +525,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } case *roachpb.ReadWithinUncertaintyIntervalError: - // Drop latches and lock wait-queues. - r.concMgr.FinishReq(g) - g = nil + dropLatchesAndLockWaitQueues() // If the batch is able to perform a server-side retry in order to avoid // the uncertainty error, it will have a new timestamp. Force a refresh of // the latch and lock spans. @@ -534,10 +542,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } case *roachpb.InvalidLeaseError: - // Drop latches and lock wait-queues. - latchSpans, lockSpans = g.TakeSpanSets() - r.concMgr.FinishReq(g) - g = nil + dropLatchesAndLockWaitQueues() // Then attempt to acquire the lease if not currently held by any // replica or redirect to the current leaseholder if currently held // by a different replica. @@ -545,10 +550,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } case *roachpb.MergeInProgressError: - // Drop latches and lock wait-queues. - latchSpans, lockSpans = g.TakeSpanSets() - r.concMgr.FinishReq(g) - g = nil + dropLatchesAndLockWaitQueues() // Then listen for the merge to complete. if pErr = r.handleMergeInProgressError(ctx, ba, pErr, t); pErr != nil { return nil, nil, pErr @@ -774,7 +776,7 @@ func (r *Replica) handleReadWithinUncertaintyIntervalError( // Attempt a server-side retry of the request. Note that we pass nil for // latchSpans, because we have already released our latches and plan to // re-acquire them if the retry is allowed. - if !canDoServersideRetry(ctx, pErr, ba, nil /* br */, nil /* g */, hlc.Timestamp{} /* deadline */) { + if !canDoServersideRetry(ctx, pErr, ba, nil, nil, hlc.Timestamp{}) { r.store.Metrics().ReadWithinUncertaintyIntervalErrorServerSideRetryFailure.Inc(1) return nil, pErr } @@ -1246,7 +1248,9 @@ func (ec *endCmds) poison() { } // done releases the latches acquired by the command and updates the timestamp -// cache using the final timestamp of each command. +// cache using the final timestamp of each command. If `br` is nil, it is +// assumed that `done` is being called by a request that's dropping its latches +// before evaluation. // // No-op if the receiver has been zeroed out by a call to move. Idempotent and // is safe to call more than once. @@ -1259,10 +1263,10 @@ func (ec *endCmds) done( } defer ec.move() // clear - // Update the timestamp cache. Each request within the batch is considered - // in turn; only those marked as affecting the cache are processed. However, - // only do so if the request is consistent and was operating on the - // leaseholder under a valid range lease. + // Update the timestamp cache. Each request within the batch is considered in + // turn; only those marked as affecting the cache are processed. However, only + // do so if the request is consistent and was operating on the leaseholder + // under a valid range lease. if ba.ReadConsistency == roachpb.CONSISTENT && ec.st.State == kvserverpb.LeaseState_VALID { ec.repl.updateTimestampCache(ctx, &ec.st, ba, br, pErr) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 89418b8e753c..590388b1c442 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -2547,7 +2547,7 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { blockReader.Store(false) blockWriter.Store(false) blockCh := make(chan struct{}, 1) - blockedCh := make(chan struct{}, 1) + waitForRequestBlocked := make(chan struct{}, 1) tc := testContext{} tsc := TestStoreConfig(nil) @@ -2558,10 +2558,10 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { return nil } if filterArgs.Req.Method() == roachpb.Get && blockReader.Load().(bool) { - blockedCh <- struct{}{} + waitForRequestBlocked <- struct{}{} <-blockCh } else if filterArgs.Req.Method() == roachpb.Put && blockWriter.Load().(bool) { - blockedCh <- struct{}{} + waitForRequestBlocked <- struct{}{} <-blockCh } return nil @@ -2579,17 +2579,75 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { interferes bool }{ // Reader & writer have same timestamps. - {makeTS(1, 0), makeTS(1, 0), roachpb.Key("a"), true, true}, - {makeTS(1, 0), makeTS(1, 0), roachpb.Key("b"), false, true}, - // Reader has earlier timestamp. - {makeTS(1, 0), makeTS(1, 1), roachpb.Key("c"), true, false}, - {makeTS(1, 0), makeTS(1, 1), roachpb.Key("d"), false, false}, - // Writer has earlier timestamp. - {makeTS(1, 1), makeTS(1, 0), roachpb.Key("e"), true, true}, - {makeTS(1, 1), makeTS(1, 0), roachpb.Key("f"), false, true}, + // + // Reader goes first, but the reader does not need to hold latches during + // evaluation, so we expect no interference. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 0), + key: roachpb.Key("a"), + readerFirst: true, + interferes: false, + }, + // Writer goes first, but the writer does need to hold latches during + // evaluation, so it should block the reader. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 0), + key: roachpb.Key("b"), + readerFirst: false, + interferes: true, + }, + // Reader has earlier timestamp, so it doesn't interfere with the write + // that's in its future. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: roachpb.Key("c"), + readerFirst: true, + interferes: false, + }, + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: roachpb.Key("d"), + readerFirst: false, + interferes: false, + }, + // Writer has an earlier timestamp. We expect no interference for the writer + // as the reader will be evaluating over a pebble snapshot. We'd expect the + // writer to be able to continue without interference but to get bumped by + // the timestamp cache. + { + readerTS: makeTS(1, 1), + writerTS: makeTS(1, 0), + key: roachpb.Key("e"), + readerFirst: true, + interferes: false, + }, + // We expect the reader to block for the writer that's writing in the + // reader's past. + { + readerTS: makeTS(1, 1), + writerTS: makeTS(1, 0), + key: roachpb.Key("f"), + readerFirst: false, + interferes: true, + }, // Local keys always interfere. - {makeTS(1, 0), makeTS(1, 1), keys.RangeDescriptorKey(roachpb.RKey("a")), true, true}, - {makeTS(1, 0), makeTS(1, 1), keys.RangeDescriptorKey(roachpb.RKey("b")), false, true}, + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: keys.RangeDescriptorKey(roachpb.RKey("a")), + readerFirst: true, + interferes: true, + }, + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: keys.RangeDescriptorKey(roachpb.RKey("b")), + interferes: true, + }, } for _, test := range testCases { t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { @@ -2613,7 +2671,8 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { _, pErr := tc.Sender().Send(context.Background(), baR) errCh <- pErr }() - <-blockedCh + // Wait for the above read to get blocked on blockCh. + <-waitForRequestBlocked go func() { _, pErr := tc.Sender().Send(context.Background(), baW) errCh <- pErr @@ -2624,7 +2683,9 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { _, pErr := tc.Sender().Send(context.Background(), baW) errCh <- pErr }() - <-blockedCh + // Wait for the above write to get blocked on blockCh while it's holding + // latches. + <-waitForRequestBlocked go func() { _, pErr := tc.Sender().Send(context.Background(), baR) errCh <- pErr diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 1f18b87a2996..b587fa0d37d2 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -51,9 +51,11 @@ func (r *Replica) addToTSCacheChecked( r.store.tsCache.Add(start, end, ts, txnID) } -// updateTimestampCache updates the timestamp cache in order to set a low water -// mark for the timestamp at which mutations to keys overlapping the provided -// request can write, such that they don't re-write history. +// updateTimestampCache updates the timestamp cache in order to set a low +// watermark for the timestamp at which mutations to keys overlapping the +// provided request can write, such that they don't re-write history. It can be +// called before or after a batch is done evaluating. A nil `br` indicates that +// this method is being called before the batch is done evaluating. func (r *Replica) updateTimestampCache( ctx context.Context, st *kvserverpb.LeaseStatus, @@ -75,6 +77,7 @@ func (r *Replica) updateTimestampCache( if ba.Txn != nil { txnID = ba.Txn.ID } + beforeEval := br == nil && pErr == nil for i, union := range ba.Requests { req := union.GetInner() if !roachpb.UpdatesTimestampCache(req) { @@ -220,13 +223,14 @@ func (r *Replica) updateTimestampCache( addToTSCache(start, end, ts, txnID) } case *roachpb.GetRequest: - if resume := resp.(*roachpb.GetResponse).ResumeSpan; resume != nil { + if !beforeEval && resp.(*roachpb.GetResponse).ResumeSpan != nil { // The request did not evaluate. Ignore it. continue } addToTSCache(start, end, ts, txnID) case *roachpb.ScanRequest: - if resume := resp.(*roachpb.ScanResponse).ResumeSpan; resume != nil { + if !beforeEval && resp.(*roachpb.ScanResponse).ResumeSpan != nil { + resume := resp.(*roachpb.ScanResponse).ResumeSpan if start.Equal(resume.Key) { // The request did not evaluate. Ignore it. continue @@ -238,7 +242,8 @@ func (r *Replica) updateTimestampCache( } addToTSCache(start, end, ts, txnID) case *roachpb.ReverseScanRequest: - if resume := resp.(*roachpb.ReverseScanResponse).ResumeSpan; resume != nil { + if !beforeEval && resp.(*roachpb.ReverseScanResponse).ResumeSpan != nil { + resume := resp.(*roachpb.ReverseScanResponse).ResumeSpan if end.Equal(resume.EndKey) { // The request did not evaluate. Ignore it. continue diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 87a58723720c..3ce3b961df66 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -659,7 +659,7 @@ func (r *Replica) evaluateWriteBatchWrapper( ) (storage.Batch, *roachpb.BatchResponse, result.Result, *roachpb.Error) { batch, opLogger := r.newBatchedEngine(ba, g) now := timeutil.Now() - br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, g, st, ui, false /* readOnly */) + br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, g, st, ui, readWrite) r.store.metrics.ReplicaWriteBatchEvaluationLatency.RecordValue(timeutil.Since(now).Nanoseconds()) if pErr == nil { if opLogger != nil { diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index b868d803c672..e0f6ac083367 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -48,6 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -1890,6 +1891,11 @@ func TestStoreReadInconsistent(t *testing.T) { // TestStoreScanResumeTSCache verifies that the timestamp cache is properly // updated when scans, reverse scan, and get requests return partial results // and a resume span. +// +// Requests that evaluate via the optimistic evaluation path will usually bump +// the tscache over just the keys that they ended up reading, whereas requests +// that evaluate via the pessimistic path will usually bump the tscache over the +// entire span of the request. func TestStoreScanResumeTSCache(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1897,8 +1903,16 @@ func TestStoreScanResumeTSCache(t *testing.T) { ctx := context.Background() stopper := stop.NewStopper() defer stopper.Stop(ctx) - store, manualClock := createTestStore(ctx, t, testStoreOpts{createSystemRanges: true}, stopper) + manualClock := timeutil.NewManualTime(timeutil.Unix(0, 123)) + cfg := TestStoreConfig(hlc.NewClock(manualClock, time.Nanosecond) /* maxOffset */) + // This test expects different updates to the timestamp cache based on whether + // a given request was evaluated under `PessimisticEval` or `OptimisticEval`. + // If these requests hit any concurrency retry errors, they are typically + // retried under the pessimistic path. We disable this behavior here via this + // testing knob to avoid flakiness. + cfg.TestingKnobs.RetainEvalKindOnConcurrencyRetry = true + store := createTestStoreWithConfig(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) // Write three keys at time t0. t0 := timeutil.Unix(1, 0) manualClock.MustAdvanceTo(t0) @@ -1969,22 +1983,23 @@ func TestStoreScanResumeTSCache(t *testing.T) { t.Errorf("expected timestamp cache for \"a\" set less than %s; got %s", lt, a) } - // Scan the span using 3 Get requests at t3 with max keys and verify the - // expected resume spans. The two Get requests that are evaluated should be - // accounted for in the timestamp cache, but the third request which is not + // Scan the span using 2 Get requests at t3 with max keys and verify the + // expected resume spans. The one Get request that is evaluated should be + // accounted for in the timestamp cache, but the second request which is not // evaluated due to the key limit should not be accounted for. t3 := timeutil.Unix(4, 0) manualClock.MustAdvanceTo(t3) h.Timestamp = makeTS(t3.UnixNano(), 0) ba := roachpb.BatchRequest{} + h.MaxSpanRequestKeys = 1 ba.Header = h - ba.Add(getArgsString("a"), getArgsString("b"), getArgsString("c")) + ba.Add(getArgsString("a"), getArgsString("b")) br, pErr := store.TestSender().Send(ctx, ba) require.Nil(t, pErr) - require.Len(t, br.Responses, 3) + require.Len(t, br.Responses, 2) for i, ru := range br.Responses { resp := ru.GetGet() - if i < 2 { + if i == 0 { require.NotNil(t, resp.Value) require.Nil(t, resp.ResumeSpan) } else { @@ -1993,12 +2008,10 @@ func TestStoreScanResumeTSCache(t *testing.T) { } } - // Verify the timestamp cache has been set for "a" and "b", but not for "c". + // Verify the timestamp cache has been set for "a" but not for "b". rTS, _ = store.tsCache.GetMax(roachpb.Key("a"), nil) require.Equal(t, makeTS(t3.UnixNano(), 0), rTS) rTS, _ = store.tsCache.GetMax(roachpb.Key("b"), nil) - require.Equal(t, makeTS(t3.UnixNano(), 0), rTS) - rTS, _ = store.tsCache.GetMax(roachpb.Key("c"), nil) require.Equal(t, makeTS(t2.UnixNano(), 0), rTS) } @@ -2108,8 +2121,8 @@ func TestStoreScanIntents(t *testing.T) { expFinish bool // do we expect the scan to finish? expCount int32 // how many times do we expect to scan? }{ - // Consistent which can push will make two loops. - {true, true, true, 2}, + // Consistent which can push will detect conflicts and resolve them. + {true, true, true, 1}, // Consistent but can't push will backoff and retry and not finish. {true, false, false, -1}, // Inconsistent and can push will make one loop, with async resolves. @@ -2197,6 +2210,108 @@ func TestStoreScanIntents(t *testing.T) { } } +// TestStoreScanIntentsRespectsLimit verifies that when reads are allowed to +// resolve their conflicts before eval (i.e. when they are allowed to drop their +// latches early), the scan for conflicting intents respects the max intent +// limits. +// +// The test proceeds as follows: a writer lays down more than +// `MaxIntentsPerWriteIntentErrorDefault` intents, and a reader is expected to +// encounter these intents and raise a `WriteIntentError` with exactly +// `MaxIntentsPerWriteIntentErrorDefault` intents in the error. +func TestStoreScanIntentsRespectsLimit(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + var interceptWriteIntentErrors atomic.Value + // `commitCh` is used to block the writer from committing until the reader has + // encountered the intents laid down by the writer. + commitCh := make(chan struct{}) + // intentsLaidDownCh is signalled when the writer is done laying down intents. + intentsLaidDownCh := make(chan struct{}) + tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &StoreTestingKnobs{ + TestingConcurrencyRetryFilter: func( + ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, + ) { + if errors.HasType(pErr.GoError(), (*roachpb.WriteIntentError)(nil)) { + // Assert that the WriteIntentError has MaxIntentsPerWriteIntentErrorIntents. + if trap := interceptWriteIntentErrors.Load(); trap != nil && trap.(bool) { + require.Equal( + t, storage.MaxIntentsPerWriteIntentErrorDefault, + len(pErr.GetDetail().(*roachpb.WriteIntentError).Intents), + ) + interceptWriteIntentErrors.Store(false) + // Allow the writer to commit. + t.Logf("allowing writer to commit") + close(commitCh) + } + } + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + store, err := tc.Server(0).GetStores().(*Stores).GetStore(tc.Server(0).GetFirstStoreID()) + require.NoError(t, err) + var intentKeys []roachpb.Key + var wg sync.WaitGroup + wg.Add(2) + + // Lay down more than `MaxIntentsPerWriteIntentErrorDefault` intents. + go func() { + defer wg.Done() + txn := newTransaction( + "test", roachpb.Key("test-key"), roachpb.NormalUserPriority, tc.Server(0).Clock(), + ) + for j := 0; j < storage.MaxIntentsPerWriteIntentErrorDefault+10; j++ { + var key roachpb.Key + key = append(key, keys.ScratchRangeMin...) + key = append(key, []byte(fmt.Sprintf("%d", j))...) + intentKeys = append(intentKeys, key) + args := putArgs(key, []byte(fmt.Sprintf("value%07d", j))) + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn}, &args) + require.Nil(t, pErr) + } + intentsLaidDownCh <- struct{}{} + <-commitCh // Wait for the test to tell us to commit the txn. + args, header := endTxnArgs(txn, true /* commit */) + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), header, &args) + require.Nil(t, pErr) + }() + + select { + case <-intentsLaidDownCh: + case <-time.After(testutils.DefaultSucceedsSoonDuration): + t.Fatal("timed out waiting for intents to be laid down") + } + + // Now, expect a conflicting reader to encounter the intents and raise a + // WriteIntentError with exactly `MaxIntentsPerWriteIntentErrorDefault` + // intents. See the TestingConcurrencyRetryFilter above. + var ba kv.Batch + for i := 0; i < storage.MaxIntentsPerWriteIntentErrorDefault+10; i += 10 { + for _, key := range intentKeys[i : i+10] { + args := getArgs(key) + ba.AddRawRequest(&args) + } + } + t.Logf("issuing gets while intercepting WriteIntentErrors") + interceptWriteIntentErrors.Store(true) + go func() { + defer wg.Done() + err := store.DB().Run(ctx, &ba) + require.NoError(t, err) + }() + + wg.Wait() +} + // TestStoreScanInconsistentResolvesIntents lays down 10 intents, // commits the txn without resolving intents, then does repeated // inconsistent reads until the data shows up, showing that the diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 2ab292632729..af9d24e8387a 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -89,6 +89,14 @@ type StoreTestingKnobs struct { // error returned to the client, or to simulate network failures. TestingResponseFilter kvserverbase.ReplicaResponseFilter + // RetainEvalKindOnConcurrencyRetry disables the behavior to mark requests as + // `PessimisticEval` when they hit concurrency retry errors. + RetainEvalKindOnConcurrencyRetry bool + + // DisableDroppingLatchesBeforeEval makes it such no read requests will + // attempt to resolve their conflicts upfront and drop latches before eval. + DisableDroppingLatchesBeforeEval bool + // SlowReplicationThresholdOverride is an interceptor that allows setting a // per-Batch SlowReplicationThreshold. SlowReplicationThresholdOverride func(ba *roachpb.BatchRequest) time.Duration diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 66fc8c4e1108..1c8e551d15ec 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -54,12 +54,12 @@ const ( // minimum total for a single store node must be under 2048 for Windows // compatibility. MinimumMaxOpenFiles = 1700 - // Default value for maximum number of intents reported by ExportToSST - // and Scan operations in WriteIntentError is set to half of the maximum - // lock table size. - // This value is subject to tuning in real environment as we have more - // data available. - maxIntentsPerWriteIntentErrorDefault = 5000 + // MaxIntentsPerWriteIntentErrorDefault is the default value for maximum + // number of intents reported by ExportToSST and Scan operations in + // WriteIntentError is set to half of the maximum lock table size. + // This value is subject to tuning in real environment as we have more data + // available. + MaxIntentsPerWriteIntentErrorDefault = 5000 ) var minWALSyncInterval = settings.RegisterDurationSetting( @@ -76,7 +76,8 @@ var MaxIntentsPerWriteIntentError = settings.RegisterIntSetting( settings.TenantWritable, "storage.mvcc.max_intents_per_error", "maximum number of intents returned in error during export of scan requests", - maxIntentsPerWriteIntentErrorDefault) + MaxIntentsPerWriteIntentErrorDefault, +) var rocksdbConcurrency = envutil.EnvOrDefaultInt( "COCKROACH_ROCKSDB_CONCURRENCY", func() int { @@ -866,6 +867,9 @@ type MVCCGetOptions struct { // LockTable is used to determine whether keys are locked in the in-memory // lock table when scanning with the SkipLocked option. LockTable LockTableView + // DontInterleavedIntents, when set, makes it such that intent metadata is not + // interleaved with the results of the scan. + DontInterleaveIntents bool } func (opts *MVCCGetOptions) validate() error { @@ -878,6 +882,9 @@ func (opts *MVCCGetOptions) validate() error { if opts.Inconsistent && opts.FailOnMoreRecent { return errors.Errorf("cannot allow inconsistent reads with fail on more recent option") } + if opts.DontInterleaveIntents && opts.SkipLocked { + return errors.Errorf("cannot disable interleaved intents with skip locked option") + } return nil } @@ -886,12 +893,16 @@ func (opts *MVCCGetOptions) errOnIntents() bool { } // newMVCCIterator sets up a suitable iterator for high-level MVCC operations -// operating at the given timestamp. If timestamp is empty, the iterator is -// considered to be used for inline values, disabling intents and range keys. -// If rangeKeyMasking is true, IterOptions.RangeKeyMaskingBelow is set to the -// given timestamp. +// operating at the given timestamp. If timestamp is empty or if +// `noInterleavedIntents` is set, the iterator is considered to be used for +// inline values, disabling intents and range keys. If rangeKeyMasking is true, +// IterOptions.RangeKeyMaskingBelow is set to the given timestamp. func newMVCCIterator( - reader Reader, timestamp hlc.Timestamp, rangeKeyMasking bool, opts IterOptions, + reader Reader, + timestamp hlc.Timestamp, + rangeKeyMasking bool, + noInterleavedIntents bool, + opts IterOptions, ) MVCCIterator { // If reading inline then just return a plain MVCCIterator without intents. // However, we allow the caller to enable range keys, since they may be needed @@ -904,7 +915,11 @@ func newMVCCIterator( opts.RangeKeyMaskingBelow.IsEmpty() { opts.RangeKeyMaskingBelow = timestamp } - return reader.NewMVCCIterator(MVCCKeyAndIntentsIterKind, opts) + iterKind := MVCCKeyAndIntentsIterKind + if noInterleavedIntents { + iterKind = MVCCKeyIterKind + } + return reader.NewMVCCIterator(iterKind, opts) } // MVCCGet returns the most recent value for the specified key whose timestamp @@ -948,10 +963,12 @@ func newMVCCIterator( func MVCCGet( ctx context.Context, reader Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { - iter := newMVCCIterator(reader, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + reader, timestamp, false /* rangeKeyMasking */, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() value, intent, err := mvccGet(ctx, iter, key, timestamp, opts) return value.ToPointer(), intent, err @@ -1291,10 +1308,12 @@ func MVCCPut( var iter MVCCIterator blind := ms == nil && timestamp.IsEmpty() if !blind { - iter = newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter = newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() } return mvccPutUsingIter(ctx, rw, iter, ms, key, timestamp, localTimestamp, value, txn, nil) @@ -1341,10 +1360,12 @@ func MVCCDelete( localTimestamp hlc.ClockTimestamp, txn *roachpb.Transaction, ) (foundKey bool, err error) { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() // TODO(yuzefovich): can we avoid the actual put if foundKey is false? @@ -2051,10 +2072,12 @@ func MVCCIncrement( txn *roachpb.Transaction, inc int64, ) (int64, error) { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() var int64Val int64 @@ -2128,10 +2151,12 @@ func MVCCConditionalPut( allowIfDoesNotExist CPutMissingBehavior, txn *roachpb.Transaction, ) error { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() return mvccConditionalPutUsingIter( @@ -2213,10 +2238,12 @@ func MVCCInitPut( failOnTombstones bool, txn *roachpb.Transaction, ) error { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() return mvccInitPutUsingIter(ctx, rw, iter, ms, key, timestamp, localTimestamp, value, failOnTombstones, txn) } @@ -2811,10 +2838,12 @@ func MVCCDeleteRange( buf := newPutBuffer() defer buf.release() - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() var keys []roachpb.Key @@ -2971,10 +3000,12 @@ func MVCCPredicateDeleteRange( // Create some reusable machinery for flushing a run with point tombstones // that is typically used in a single MVCCPut call. - pointTombstoneIter := newMVCCIterator(rw, endTime, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + pointTombstoneIter := newMVCCIterator( + rw, endTime, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer pointTombstoneIter.Close() pointTombstoneBuf := newPutBuffer() defer pointTombstoneBuf.release() @@ -3617,6 +3648,9 @@ type MVCCScanOptions struct { // LockTable is used to determine whether keys are locked in the in-memory // lock table when scanning with the SkipLocked option. LockTable LockTableView + // DontInterleaveIntents, when set, will make it such that intent metadata is + // not interleaved with the results of the scan. + DontInterleaveIntents bool } func (opts *MVCCScanOptions) validate() error { @@ -3629,6 +3663,9 @@ func (opts *MVCCScanOptions) validate() error { if opts.Inconsistent && opts.FailOnMoreRecent { return errors.Errorf("cannot allow inconsistent reads with fail on more recent option") } + if opts.DontInterleaveIntents && opts.SkipLocked { + return errors.Errorf("cannot disable interleaved intents with skip locked option") + } return nil } @@ -3715,11 +3752,13 @@ func MVCCScan( timestamp hlc.Timestamp, opts MVCCScanOptions, ) (MVCCScanResult, error) { - iter := newMVCCIterator(reader, timestamp, !opts.Tombstones, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - LowerBound: key, - UpperBound: endKey, - }) + iter := newMVCCIterator( + reader, timestamp, !opts.Tombstones, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: key, + UpperBound: endKey, + }, + ) defer iter.Close() return mvccScanToKvs(ctx, iter, key, endKey, timestamp, opts) } @@ -3732,11 +3771,13 @@ func MVCCScanToBytes( timestamp hlc.Timestamp, opts MVCCScanOptions, ) (MVCCScanResult, error) { - iter := newMVCCIterator(reader, timestamp, !opts.Tombstones, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - LowerBound: key, - UpperBound: endKey, - }) + iter := newMVCCIterator( + reader, timestamp, !opts.Tombstones, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: key, + UpperBound: endKey, + }, + ) defer iter.Close() return mvccScanToBytes(ctx, iter, key, endKey, timestamp, opts) } @@ -3781,11 +3822,13 @@ func MVCCIterate( opts MVCCScanOptions, f func(roachpb.KeyValue) error, ) ([]roachpb.Intent, error) { - iter := newMVCCIterator(reader, timestamp, !opts.Tombstones, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - LowerBound: key, - UpperBound: endKey, - }) + iter := newMVCCIterator( + reader, timestamp, !opts.Tombstones, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: key, + UpperBound: endKey, + }, + ) defer iter.Close() var intents []roachpb.Intent