From 5401286b8995c9ce2f1383806d1ea083629994a5 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Mon, 9 Jan 2023 11:40:39 -0500 Subject: [PATCH 1/2] storage: Add MVCCPagination to allow general key and byte pagination Implement MVCCPagination which calls a user-inputted function f() until it returns an error (note that the iterutil.StopIteration() error means we have iterated through all elements), or until the number of keys hits the maxKeys limit or the number of bytes hits the targetBytes limit. MVCCPagination is a general framework that enables key and byte pagination. Informs: #77228 Release note: None --- pkg/storage/mvcc.go | 73 ++++++++++++++++++++++++ pkg/storage/mvcc_test.go | 118 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 191 insertions(+) diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 89c4a283d6e1..07ac1db2c773 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -4089,6 +4089,79 @@ func MVCCIterate( return intents, nil } +// MVCCPaginate iteratively invokes f() with the current maxKeys and +// targetBytes limits. If f returns iterutil.StopIteration (meaning that we +// have iterated through all elements), the iteration stops with no error +// propagated. If f returns any other error, the iteration stops and the error +// is propagated. If the number of keys hits the maxKeys limit or the number of +// bytes hits the targetBytes limit, the iteration stops with no error +// propagated but with the appropriate resume reason returned. f returns a +// resumeReason, which if set, will assert that the number of keys / bytes hit +// the key / byte limit matching the resumeReason. Moreover, if resumeReason is +// RESUME_BYTE_LIMIT and allowEmpty is true, then the iteration stops with no +// error propagated but with the RESUME_BYTE_LIMIT resume reason returned. +// +// We note that it is up to f() whether it wants to allow the numBytes to +// exceed targetBytes by up to one entry or whether it wants to terminate +// iteration before numBytes exceeds targetBytes. See the AllowEmpty option. +func MVCCPaginate( + ctx context.Context, + maxKeys, targetBytes int64, + allowEmpty bool, + f func(maxKeys, targetBytes int64) (numKeys, numBytes int64, resumeReason kvpb.ResumeReason, err error), +) (numKeys, numBytes int64, resumeReason kvpb.ResumeReason, err error) { + for { + if maxKeys < 0 { + return numKeys, numBytes, kvpb.RESUME_KEY_LIMIT, nil + } + if targetBytes < 0 { + return numKeys, numBytes, kvpb.RESUME_BYTE_LIMIT, nil + } + addedKeys, addedBytes, resumeReason, err := f(maxKeys, targetBytes) + if err != nil { + if addedKeys != 0 || addedBytes != 0 || resumeReason != 0 { + log.Fatalf(ctx, + "addedKeys, addedBytes, and resumeReason should all be 0, but got addedKeys=%d, addedBytes=%d, resumeReason=%d", + addedKeys, addedBytes, resumeReason) + } + err = iterutil.Map(err) + return numKeys, numBytes, 0, err + } + numKeys += addedKeys + numBytes += addedBytes + if maxKeys > 0 { + if addedKeys > maxKeys { + log.Fatalf(ctx, "added %d keys, which exceeds the max key limit %d", addedKeys, maxKeys) + } else if addedKeys < maxKeys { + maxKeys -= addedKeys + } else { + maxKeys = -1 + } + } + if targetBytes > 0 { + if addedBytes < targetBytes { + targetBytes -= addedBytes + } else { + targetBytes = -1 + } + } + switch resumeReason { + case kvpb.RESUME_KEY_LIMIT: + if maxKeys >= 0 { + log.Fatalf(ctx, "Resume reason RESUME_KEY_LIMIT, but key limit = %d has not been hit", maxKeys) + } + case kvpb.RESUME_BYTE_LIMIT: + if !allowEmpty && targetBytes >= 0 { + log.Fatalf(ctx, "Resume reason RESUME_BYTE_LIMIT, but byte limit = %d has not been hit", targetBytes) + } + targetBytes = -1 + case 0: + default: + log.Fatalf(ctx, "Resume reason must be RESUME_KEY_LIMIT, RESUME_BYTE_LIMIT, or 0, got resumeReason = %d", resumeReason) + } + } +} + // MVCCResolveWriteIntent either commits, aborts (rolls back), or moves forward // in time an extant write intent for a given txn according to commit // parameter. ResolveWriteIntent will skip write intents of other txns. diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 2f202c82936a..a0d29be6be61 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -2674,6 +2675,123 @@ func TestMVCCResolveNewerIntent(t *testing.T) { } } +// TestMVCCPaginate tests that MVCCPaginate respects the MaxKeys and +// TargetBytes limits, and returns the correct numKeys, numBytes, and +// resumeReason. +func TestMVCCPaginate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + maxKeys int64 + targetBytes int64 + allowEmpty bool + numKeysPerIter int64 + numBytesPerIter int64 + numIters int + expectedNumKeys int64 + expectedNumBytes int64 + expectedResumeReason kvpb.ResumeReason + }{ + // MaxKeys and TargetBytes limits not reached, so do all 10 iterations. + { + maxKeys: 31, + targetBytes: 51, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 30, + expectedNumBytes: 50, + expectedResumeReason: 0, + }, + // MaxKeys limit reached after 7 iterations. + { + maxKeys: 21, + targetBytes: 51, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: kvpb.RESUME_KEY_LIMIT, + }, + // MaxKeys limit reached after 10 iterations. Despite the fact we + // finished iterating, we still return a resume reason because we check + // the MaxKeys and TargetBytes limits before we check if we stop + // iteration. + { + maxKeys: 30, + targetBytes: 50, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 30, + expectedNumBytes: 50, + expectedResumeReason: kvpb.RESUME_KEY_LIMIT, + }, + // TargetBytes limit reached after 7 iterations. + { + maxKeys: 31, + targetBytes: 34, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: kvpb.RESUME_BYTE_LIMIT, + }, + // TargetBytes limit reached after 7 iterations, but with TargetBytes + // limit exactly the number of bytes. + { + maxKeys: 31, + targetBytes: 35, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: kvpb.RESUME_BYTE_LIMIT, + }, + // TargetBytes limit reached after 7 iterations, but with AllowEmpty + // set to true, so only 6 iterations are completed. + { + maxKeys: 31, + targetBytes: 34, + allowEmpty: true, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 18, + expectedNumBytes: 30, + expectedResumeReason: kvpb.RESUME_BYTE_LIMIT, + }, + } + + for _, tc := range testCases { + var iter int + numKeys, numBytes, resumeReason, err := MVCCPaginate(context.Background(), tc.maxKeys, tc.targetBytes, tc.allowEmpty, + func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeReason kvpb.ResumeReason, err error) { + if iter == tc.numIters { + return 0, 0, 0, iterutil.StopIteration() + } + iter++ + if tc.allowEmpty && tc.numBytesPerIter > targetBytes { + return 0, 0, kvpb.RESUME_BYTE_LIMIT, nil + } + return tc.numKeysPerIter, tc.numBytesPerIter, 0, nil + }) + require.NoError(t, err) + require.Equal(t, tc.expectedNumKeys, numKeys) + require.Equal(t, tc.expectedNumBytes, numBytes) + require.Equal(t, tc.expectedResumeReason, resumeReason) + } +} + func TestMVCCResolveIntentTxnTimestampMismatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) From b103e22adb51819c70a94f6534e31438b3c0cf6c Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Fri, 24 Feb 2023 19:44:21 -0500 Subject: [PATCH 2/2] storage: Push pagination for EndTxn into the MVCC layer In this PR, we push key pagination for EndTxn (currently located in cmd_end_transaction.go) into the MVCC layer (mvcc.go) by integrating MVCCPagination from the previous commit into the EndTxn command. Informs: #77228 Release note: None --- pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + .../kvserver/batcheval/cmd_end_transaction.go | 182 ++++++++++-------- 2 files changed, 102 insertions(+), 81 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index c10f21d60ca8..083cce9554ea 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -80,6 +80,7 @@ go_library( "//pkg/storage/enginepb", "//pkg/util", "//pkg/util/hlc", + "//pkg/util/iterutil", "//pkg/util/limit", "//pkg/util/log", "//pkg/util/mon", diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index e83b195a8413..4aefb916d095 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -13,7 +13,6 @@ package batcheval import ( "bytes" "context" - "math" "sync/atomic" "time" @@ -33,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -539,6 +539,28 @@ func resolveLocalLocks( args *kvpb.EndTxnRequest, txn *roachpb.Transaction, evalCtx EvalContext, +) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) { + var resolveAllowance int64 = lockResolutionBatchSize + if args.InternalCommitTrigger != nil { + // If this is a system transaction (such as a split or merge), don't + // enforce the resolve allowance. These transactions rely on having + // their locks resolved synchronously. + resolveAllowance = 0 + } + return resolveLocalLocksWithPagination(ctx, desc, readWriter, ms, args, txn, evalCtx, resolveAllowance) +} + +// resolveLocalLocksWithPagination is resolveLocalLocks but with a max key +// limit. +func resolveLocalLocksWithPagination( + ctx context.Context, + desc *roachpb.RangeDescriptor, + readWriter storage.ReadWriter, + ms *enginepb.MVCCStats, + args *kvpb.EndTxnRequest, + txn *roachpb.Transaction, + evalCtx EvalContext, + maxKeys int64, ) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) { if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil { // If this is a merge, then use the post-merge descriptor to determine @@ -547,95 +569,93 @@ func resolveLocalLocks( desc = &mergeTrigger.LeftDesc } - var resolveAllowance int64 = lockResolutionBatchSize - if args.InternalCommitTrigger != nil { - // If this is a system transaction (such as a split or merge), don't enforce the resolve allowance. - // These transactions rely on having their locks resolved synchronously. - resolveAllowance = math.MaxInt64 - } - - for _, span := range args.LockSpans { - if err := func() error { - if resolveAllowance == 0 { + remainingLockSpans := args.LockSpans + f := func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeReason kvpb.ResumeReason, err error) { + if len(remainingLockSpans) == 0 { + return 0, 0, 0, iterutil.StopIteration() + } + span := remainingLockSpans[0] + remainingLockSpans = remainingLockSpans[1:] + update := roachpb.MakeLockUpdate(txn, span) + if len(span.EndKey) == 0 { + // For single-key lock updates, do a KeyAddress-aware check of + // whether it's contained in our Range. + if !kvserverbase.ContainsKey(desc, span.Key) { externalLocks = append(externalLocks, span) - return nil + return 0, 0, 0, nil } - update := roachpb.MakeLockUpdate(txn, span) - if len(span.EndKey) == 0 { - // For single-key lock updates, do a KeyAddress-aware check of - // whether it's contained in our Range. - if !kvserverbase.ContainsKey(desc, span.Key) { - externalLocks = append(externalLocks, span) - return nil - } - // It may be tempting to reuse an iterator here, but this call - // can create the iterator with Prefix:true which is much faster - // than seeking -- especially for intents that are missing, e.g. - // due to async intent resolution. See: - // https://github.com/cockroachdb/cockroach/issues/64092 - // - // Note that the underlying pebbleIterator will still be reused - // since readWriter is a pebbleBatch in the typical case. - ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update, - storage.MVCCResolveWriteIntentOptions{}) - if err != nil { - return err - } - if ok { - resolveAllowance-- - } - resolvedLocks = append(resolvedLocks, update) - // If requested, replace point tombstones with range tombstones. - if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { - if err := storage.ReplacePointTombstonesWithRangeTombstones( - ctx, spanset.DisableReadWriterAssertions(readWriter), - ms, update.Key, update.EndKey); err != nil { - return err - } - } - return nil + // It may be tempting to reuse an iterator here, but this call + // can create the iterator with Prefix:true which is much faster + // than seeking -- especially for intents that are missing, e.g. + // due to async intent resolution. See: + // https://github.com/cockroachdb/cockroach/issues/64092 + // + // Note that the underlying pebbleIterator will still be reused + // since readWriter is a pebbleBatch in the typical case. + ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update, + storage.MVCCResolveWriteIntentOptions{}) + if err != nil { + return 0, 0, 0, errors.Wrapf(err, "resolving write intent at %s on end transaction [%s]", span, txn.Status) } - // For update ranges, cut into parts inside and outside our key - // range. Resolve locally inside, delegate the rest. In particular, - // an update range for range-local data is correctly considered local. - inSpan, outSpans := kvserverbase.IntersectSpan(span, desc) - externalLocks = append(externalLocks, outSpans...) - if inSpan != nil { - update.Span = *inSpan - numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update, - storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: resolveAllowance}) - if err != nil { - return err - } - if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil { - atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys) - } - resolveAllowance -= numKeys - if resumeSpan != nil { - if resolveAllowance != 0 { - log.Fatalf(ctx, "expected resolve allowance to be exactly 0 resolving %s; got %d", update.Span, resolveAllowance) - } - update.EndKey = resumeSpan.Key - externalLocks = append(externalLocks, *resumeSpan) + if ok { + numKeys = 1 + } + resolvedLocks = append(resolvedLocks, update) + // If requested, replace point tombstones with range tombstones. + if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { + if err := storage.ReplacePointTombstonesWithRangeTombstones( + ctx, spanset.DisableReadWriterAssertions(readWriter), + ms, update.Key, update.EndKey); err != nil { + return 0, 0, 0, errors.Wrapf(err, + "replacing point tombstones with range tombstones for write intent at %s on end transaction [%s]", + span, txn.Status) } - resolvedLocks = append(resolvedLocks, update) - // If requested, replace point tombstones with range tombstones. - if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { - if err := storage.ReplacePointTombstonesWithRangeTombstones( - ctx, spanset.DisableReadWriterAssertions(readWriter), - ms, update.Key, update.EndKey); err != nil { - return err - } + } + return numKeys, 0, 0, nil + } + // For update ranges, cut into parts inside and outside our key + // range. Resolve locally inside, delegate the rest. In particular, + // an update range for range-local data is correctly considered local. + inSpan, outSpans := kvserverbase.IntersectSpan(span, desc) + externalLocks = append(externalLocks, outSpans...) + if inSpan != nil { + update.Span = *inSpan + numKeys, _, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update, + storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys}) + if err != nil { + return 0, 0, 0, errors.Wrapf(err, "resolving write intent range at %s on end transaction [%s]", span, txn.Status) + } + if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil { + atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys) + } + if resumeSpan != nil { + update.EndKey = resumeSpan.Key + externalLocks = append(externalLocks, *resumeSpan) + } + resolvedLocks = append(resolvedLocks, update) + // If requested, replace point tombstones with range tombstones. + if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { + if err := storage.ReplacePointTombstonesWithRangeTombstones( + ctx, spanset.DisableReadWriterAssertions(readWriter), + ms, update.Key, update.EndKey); err != nil { + return 0, 0, 0, errors.Wrapf(err, + "replacing point tombstones with range tombstones for write intent range at %s on end transaction [%s]", + span, txn.Status) } - return nil } - return nil - }(); err != nil { - return nil, nil, errors.Wrapf(err, "resolving lock at %s on end transaction [%s]", span, txn.Status) + return numKeys, 0, resumeReason, nil } + return 0, 0, 0, nil } - removedAny := resolveAllowance != lockResolutionBatchSize + numKeys, _, _, err := storage.MVCCPaginate(ctx, maxKeys, 0 /* targetBytes */, false /* allowEmpty */, f) + if err != nil { + return nil, nil, err + } + + externalLocks = append(externalLocks, remainingLockSpans...) + + removedAny := numKeys > 0 if WriteAbortSpanOnResolve(txn.Status, args.Poison, removedAny) { if err := UpdateAbortSpan(ctx, evalCtx, readWriter, ms, txn.TxnMeta, args.Poison); err != nil { return nil, nil, err