From 36d75d53f24efaf5d2af9ee82a02327ba87c20ae Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Mon, 9 Jan 2023 11:40:39 -0500 Subject: [PATCH] storage: Add MVCCPagination to allow general key and byte pagination Implement MVCCPagination which calls a user-inputted function f() until it returns an error (note that the iterutil.StopIteration() error means we have iterated through all elements), or until the number of keys hits the maxKeys limit or the number of bytes hits the targetBytes limit. MVCCPagination is a general framework that enables key and byte pagination. Informs: #77228 Release note: None --- pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + pkg/storage/mvcc.go | 47 +++++++++++++ pkg/storage/mvcc_test.go | 96 +++++++++++++++++++++++++++ 3 files changed, 144 insertions(+) diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 9ad5de919cb2..1e6d32747dd9 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -79,6 +79,7 @@ go_library( "//pkg/storage/enginepb", "//pkg/util", "//pkg/util/hlc", + "//pkg/util/iterutil", "//pkg/util/limit", "//pkg/util/log", "//pkg/util/mon", diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 53803ed89e81..41fafd5e5346 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -4086,6 +4086,53 @@ func MVCCIterate( return intents, nil } +// MVCCPagination invokes f() until it returns done (i.e. we have iterated +// through all elements) or an error, or until the number of keys hits the +// maxKeys limit or the number of bytes hits the targetBytes limit. +func MVCCPagination( + ctx context.Context, + maxKeys, targetBytes int64, + f func(maxKeys, targetBytes int64) (numKeys, numBytes int64, resumeSpan *roachpb.Span, err error), +) (numKeys, numBytes int64, resumeReason roachpb.ResumeReason, err error) { + for { + if maxKeys < 0 { + return numKeys, numBytes, roachpb.RESUME_KEY_LIMIT, nil + } + if targetBytes < 0 { + return numKeys, numBytes, roachpb.RESUME_BYTE_LIMIT, nil + } + addedKeys, addedBytes, resumeSpan, err := f(maxKeys, targetBytes) + if err != nil { + err = iterutil.Map(err) + return numKeys, numBytes, 0, err + } + numKeys += addedKeys + numBytes += addedBytes + if maxKeys > 0 { + if addedKeys > maxKeys { + log.Fatalf(ctx, "added %d keys, which exceeds the max key limit %d", addedKeys, maxKeys) + } else if addedKeys < maxKeys { + maxKeys -= addedKeys + } else { + maxKeys = -1 + } + } + if targetBytes > 0 { + if addedBytes < targetBytes { + targetBytes -= addedBytes + } else { + targetBytes = -1 + } + } + if resumeSpan != nil { + if maxKeys >= 0 && targetBytes >= 0 { + log.Fatalf(ctx, "non-nil resume span returned, but both key limit = %d and byte limit = %d have not been hit", + maxKeys, targetBytes) + } + } + } +} + // MVCCResolveWriteIntent either commits, aborts (rolls back), or moves forward // in time an extant write intent for a given txn according to commit // parameter. ResolveWriteIntent will skip write intents of other txns. diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 749849930bf0..0ad996b23545 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -2673,6 +2674,101 @@ func TestMVCCResolveNewerIntent(t *testing.T) { } } +// TestMVCCPagination tests that MVCCPagination respects the MaxKeys and +// TargetBytes limits, and returns the correct numKeys, numBytes, and +// resumeReason. +func TestMVCCPagination(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + maxKeys int64 + targetBytes int64 + numKeysPerIter int64 + numBytesPerIter int64 + numIters int + expectedNumKeys int64 + expectedNumBytes int64 + expectedResumeReason roachpb.ResumeReason + }{ + // MaxKeys and TargetBytes limits not reached, so do all 10 iterations. + { + maxKeys: 31, + targetBytes: 51, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 30, + expectedNumBytes: 50, + expectedResumeReason: 0, + }, + // MaxKeys limit reached after 7 iterations. + { + maxKeys: 21, + targetBytes: 51, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: roachpb.RESUME_KEY_LIMIT, + }, + // MaxKeys limit reached after 10 iterations. Despite the fact we + // finished iterating, we still return a resume reason because we check + // the MaxKeys and TargetBytes limits before we check if we stop + // iteration. + { + maxKeys: 30, + targetBytes: 50, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 30, + expectedNumBytes: 50, + expectedResumeReason: roachpb.RESUME_KEY_LIMIT, + }, + // TargetBytes limit reached after 7 iterations. + { + maxKeys: 31, + targetBytes: 34, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: roachpb.RESUME_BYTE_LIMIT, + }, + // TargetBytes limit reached after 7 iterations, but with TargetBytes + // limit exactly the number of bytes. + { + maxKeys: 31, + targetBytes: 35, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: roachpb.RESUME_BYTE_LIMIT, + }, + } + + for _, tc := range testCases { + var iter int + numKeys, numBytes, resumeReason, err := MVCCPagination(context.Background(), tc.maxKeys, tc.targetBytes, + func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeSpan *roachpb.Span, err error) { + if iter == tc.numIters { + return 0, 0, nil, iterutil.StopIteration() + } + iter++ + return tc.numKeysPerIter, tc.numBytesPerIter, nil, nil + }) + require.NoError(t, err) + require.Equal(t, tc.expectedNumKeys, numKeys) + require.Equal(t, tc.expectedNumBytes, numBytes) + require.Equal(t, tc.expectedResumeReason, resumeReason) + } +} + func TestMVCCResolveIntentTxnTimestampMismatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)