From c661e1ea386db68bfb7bc1223dc812b0370795dc Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Thu, 23 Feb 2023 18:18:33 -0500 Subject: [PATCH] storage: Push pagination for EndTxn into the MVCC layer In this PR, we push key pagination for EndTxn (currently located in cmd_end_transaction.go) into the MVCC layer (mvcc.go) by integrating MVCCPagination from the previous commit into the EndTxn command. Informs: #77228 Release note: None --- .../kvserver/batcheval/cmd_end_transaction.go | 179 ++++++++++-------- 1 file changed, 98 insertions(+), 81 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index e83b195a8413..0eb04d40b852 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -13,7 +13,6 @@ package batcheval import ( "bytes" "context" - "math" "sync/atomic" "time" @@ -33,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -539,6 +539,28 @@ func resolveLocalLocks( args *kvpb.EndTxnRequest, txn *roachpb.Transaction, evalCtx EvalContext, +) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) { + var resolveAllowance int64 = lockResolutionBatchSize + if args.InternalCommitTrigger != nil { + // If this is a system transaction (such as a split or merge), don't + // enforce the resolve allowance. These transactions rely on having + // their locks resolved synchronously. + resolveAllowance = 0 + } + return resolveLocalLocksWithPagination(ctx, desc, readWriter, ms, args, txn, evalCtx, resolveAllowance) +} + +// resolveLocalLocksWithPagination is resolveLocalLocks but with a max key +// limit. +func resolveLocalLocksWithPagination( + ctx context.Context, + desc *roachpb.RangeDescriptor, + readWriter storage.ReadWriter, + ms *enginepb.MVCCStats, + args *kvpb.EndTxnRequest, + txn *roachpb.Transaction, + evalCtx EvalContext, + maxKeys int64, ) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) { if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil { // If this is a merge, then use the post-merge descriptor to determine @@ -547,95 +569,90 @@ func resolveLocalLocks( desc = &mergeTrigger.LeftDesc } - var resolveAllowance int64 = lockResolutionBatchSize - if args.InternalCommitTrigger != nil { - // If this is a system transaction (such as a split or merge), don't enforce the resolve allowance. - // These transactions rely on having their locks resolved synchronously. - resolveAllowance = math.MaxInt64 - } - - for _, span := range args.LockSpans { - if err := func() error { - if resolveAllowance == 0 { + remainingLockSpans := args.LockSpans + var span roachpb.Span + f := func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeReason kvpb.ResumeReason, err error) { + if len(remainingLockSpans) == 0 { + return 0, 0, 0, iterutil.StopIteration() + } + span = remainingLockSpans[0] + remainingLockSpans = remainingLockSpans[1:] + update := roachpb.MakeLockUpdate(txn, span) + if len(span.EndKey) == 0 { + // For single-key lock updates, do a KeyAddress-aware check of + // whether it's contained in our Range. + if !kvserverbase.ContainsKey(desc, span.Key) { externalLocks = append(externalLocks, span) - return nil + return 0, 0, 0, nil } - update := roachpb.MakeLockUpdate(txn, span) - if len(span.EndKey) == 0 { - // For single-key lock updates, do a KeyAddress-aware check of - // whether it's contained in our Range. - if !kvserverbase.ContainsKey(desc, span.Key) { - externalLocks = append(externalLocks, span) - return nil - } - // It may be tempting to reuse an iterator here, but this call - // can create the iterator with Prefix:true which is much faster - // than seeking -- especially for intents that are missing, e.g. - // due to async intent resolution. See: - // https://github.com/cockroachdb/cockroach/issues/64092 - // - // Note that the underlying pebbleIterator will still be reused - // since readWriter is a pebbleBatch in the typical case. - ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update, - storage.MVCCResolveWriteIntentOptions{}) - if err != nil { - return err - } - if ok { - resolveAllowance-- - } - resolvedLocks = append(resolvedLocks, update) - // If requested, replace point tombstones with range tombstones. - if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { - if err := storage.ReplacePointTombstonesWithRangeTombstones( - ctx, spanset.DisableReadWriterAssertions(readWriter), - ms, update.Key, update.EndKey); err != nil { - return err - } - } - return nil + // It may be tempting to reuse an iterator here, but this call + // can create the iterator with Prefix:true which is much faster + // than seeking -- especially for intents that are missing, e.g. + // due to async intent resolution. See: + // https://github.com/cockroachdb/cockroach/issues/64092 + // + // Note that the underlying pebbleIterator will still be reused + // since readWriter is a pebbleBatch in the typical case. + ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update, + storage.MVCCResolveWriteIntentOptions{}) + if err != nil { + return 0, 0, 0, err } - // For update ranges, cut into parts inside and outside our key - // range. Resolve locally inside, delegate the rest. In particular, - // an update range for range-local data is correctly considered local. - inSpan, outSpans := kvserverbase.IntersectSpan(span, desc) - externalLocks = append(externalLocks, outSpans...) - if inSpan != nil { - update.Span = *inSpan - numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update, - storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: resolveAllowance}) - if err != nil { - return err - } - if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil { - atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys) - } - resolveAllowance -= numKeys - if resumeSpan != nil { - if resolveAllowance != 0 { - log.Fatalf(ctx, "expected resolve allowance to be exactly 0 resolving %s; got %d", update.Span, resolveAllowance) - } - update.EndKey = resumeSpan.Key - externalLocks = append(externalLocks, *resumeSpan) + if ok { + numKeys = 1 + } + resolvedLocks = append(resolvedLocks, update) + // If requested, replace point tombstones with range tombstones. + if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { + if err := storage.ReplacePointTombstonesWithRangeTombstones( + ctx, spanset.DisableReadWriterAssertions(readWriter), + ms, update.Key, update.EndKey); err != nil { + return 0, 0, 0, err } - resolvedLocks = append(resolvedLocks, update) - // If requested, replace point tombstones with range tombstones. - if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { - if err := storage.ReplacePointTombstonesWithRangeTombstones( - ctx, spanset.DisableReadWriterAssertions(readWriter), - ms, update.Key, update.EndKey); err != nil { - return err - } + } + return numKeys, 0, 0, nil + } + // For update ranges, cut into parts inside and outside our key + // range. Resolve locally inside, delegate the rest. In particular, + // an update range for range-local data is correctly considered local. + inSpan, outSpans := kvserverbase.IntersectSpan(span, desc) + externalLocks = append(externalLocks, outSpans...) + if inSpan != nil { + update.Span = *inSpan + numKeys, _, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update, + storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys}) + if err != nil { + return 0, 0, 0, err + } + if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil { + atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys) + } + if resumeSpan != nil { + update.EndKey = resumeSpan.Key + externalLocks = append(externalLocks, *resumeSpan) + } + resolvedLocks = append(resolvedLocks, update) + // If requested, replace point tombstones with range tombstones. + if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { + if err := storage.ReplacePointTombstonesWithRangeTombstones( + ctx, spanset.DisableReadWriterAssertions(readWriter), + ms, update.Key, update.EndKey); err != nil { + return 0, 0, 0, err } - return nil } - return nil - }(); err != nil { - return nil, nil, errors.Wrapf(err, "resolving lock at %s on end transaction [%s]", span, txn.Status) + return numKeys, 0, resumeReason, nil } + return 0, 0, 0, nil } - removedAny := resolveAllowance != lockResolutionBatchSize + numKeys, _, _, err := storage.MVCCPaginate(ctx, maxKeys, 0, false, f) + if err != nil { + return nil, nil, errors.Wrapf(err, "resolving lock at %s on end transaction [%s]", span, txn.Status) + } + + externalLocks = append(externalLocks, remainingLockSpans...) + + removedAny := numKeys > 0 if WriteAbortSpanOnResolve(txn.Status, args.Poison, removedAny) { if err := UpdateAbortSpan(ctx, evalCtx, readWriter, ms, txn.TxnMeta, args.Poison); err != nil { return nil, nil, err