diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 89c4a283d6e1..d125807211e8 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -4089,6 +4089,76 @@ func MVCCIterate( return intents, nil } +// MVCCPaginate iteratively invokes f() with the current maxKeys and +// targetBytes limits. If f returns iterutil.StopIteration (meaning that we +// have iterated through all elements), the iteration stops with no error +// propagated. If f returns any other error, the iteration stops and the error +// is propagated. If the number of keys hits the maxKeys limit or the number of +// bytes hits the targetBytes limit, the iteration stops with no error +// propagated but with the appropriate resume reason returned. f returns a +// resumeReason, which if set, will assert that the number of keys / bytes hit +// the key / byte limit matching the resumeReason. Moreover, if resumeReason is +// RESUME_BYTE_LIMIT and allowEmpty is true, then the iteration stops with no +// error propagated but with the RESUME_BYTE_LIMIT resume reason returned. +// +// We note that it is up to f() whether it wants to allow the numBytes to +// exceed targetBytes by up to one entry or whether it wants to terminate +// iteration before numBytes exceeds targetBytes. See the AllowEmpty option. +func MVCCPaginate( + ctx context.Context, + maxKeys, targetBytes int64, + allowEmpty bool, + f func(maxKeys, targetBytes int64) (numKeys, numBytes int64, resumeReason kvpb.ResumeReason, err error), +) (numKeys, numBytes int64, resumeReason kvpb.ResumeReason, err error) { + for { + if maxKeys < 0 { + return numKeys, numBytes, kvpb.RESUME_KEY_LIMIT, nil + } + if targetBytes < 0 { + return numKeys, numBytes, kvpb.RESUME_BYTE_LIMIT, nil + } + addedKeys, addedBytes, resumeReason, err := f(maxKeys, targetBytes) + if err != nil { + if addedKeys != 0 || addedBytes != 0 || resumeReason != 0 { + log.Fatalf(ctx, + "addedKeys, addedBytes, and resumeReason should all be 0, but got addedKeys=%d, addedBytes=%d, resumeReason=%d", + addedKeys, addedBytes, resumeReason) + } + err = iterutil.Map(err) + return numKeys, numBytes, 0, err + } + numKeys += addedKeys + numBytes += addedBytes + if maxKeys > 0 { + if addedKeys > maxKeys { + log.Fatalf(ctx, "added %d keys, which exceeds the max key limit %d", addedKeys, maxKeys) + } else if addedKeys < maxKeys { + maxKeys -= addedKeys + } else { + maxKeys = -1 + } + } + if targetBytes > 0 { + if addedBytes < targetBytes { + targetBytes -= addedBytes + } else { + targetBytes = -1 + } + } + if resumeReason == kvpb.RESUME_KEY_LIMIT { + if maxKeys >= 0 { + log.Fatalf(ctx, "Resume reason RESUME_KEY_LIMIT, but key limit = %d has not been hit", maxKeys) + } + } + if resumeReason == kvpb.RESUME_BYTE_LIMIT { + if !allowEmpty && targetBytes >= 0 { + log.Fatalf(ctx, "Resume reason RESUME_BYTE_LIMIT, but byte limit = %d has not been hit", targetBytes) + } + targetBytes = -1 + } + } +} + // MVCCResolveWriteIntent either commits, aborts (rolls back), or moves forward // in time an extant write intent for a given txn according to commit // parameter. ResolveWriteIntent will skip write intents of other txns. diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 2f202c82936a..a0d29be6be61 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -2674,6 +2675,123 @@ func TestMVCCResolveNewerIntent(t *testing.T) { } } +// TestMVCCPaginate tests that MVCCPaginate respects the MaxKeys and +// TargetBytes limits, and returns the correct numKeys, numBytes, and +// resumeReason. +func TestMVCCPaginate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + maxKeys int64 + targetBytes int64 + allowEmpty bool + numKeysPerIter int64 + numBytesPerIter int64 + numIters int + expectedNumKeys int64 + expectedNumBytes int64 + expectedResumeReason kvpb.ResumeReason + }{ + // MaxKeys and TargetBytes limits not reached, so do all 10 iterations. + { + maxKeys: 31, + targetBytes: 51, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 30, + expectedNumBytes: 50, + expectedResumeReason: 0, + }, + // MaxKeys limit reached after 7 iterations. + { + maxKeys: 21, + targetBytes: 51, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: kvpb.RESUME_KEY_LIMIT, + }, + // MaxKeys limit reached after 10 iterations. Despite the fact we + // finished iterating, we still return a resume reason because we check + // the MaxKeys and TargetBytes limits before we check if we stop + // iteration. + { + maxKeys: 30, + targetBytes: 50, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 30, + expectedNumBytes: 50, + expectedResumeReason: kvpb.RESUME_KEY_LIMIT, + }, + // TargetBytes limit reached after 7 iterations. + { + maxKeys: 31, + targetBytes: 34, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: kvpb.RESUME_BYTE_LIMIT, + }, + // TargetBytes limit reached after 7 iterations, but with TargetBytes + // limit exactly the number of bytes. + { + maxKeys: 31, + targetBytes: 35, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: kvpb.RESUME_BYTE_LIMIT, + }, + // TargetBytes limit reached after 7 iterations, but with AllowEmpty + // set to true, so only 6 iterations are completed. + { + maxKeys: 31, + targetBytes: 34, + allowEmpty: true, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 18, + expectedNumBytes: 30, + expectedResumeReason: kvpb.RESUME_BYTE_LIMIT, + }, + } + + for _, tc := range testCases { + var iter int + numKeys, numBytes, resumeReason, err := MVCCPaginate(context.Background(), tc.maxKeys, tc.targetBytes, tc.allowEmpty, + func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeReason kvpb.ResumeReason, err error) { + if iter == tc.numIters { + return 0, 0, 0, iterutil.StopIteration() + } + iter++ + if tc.allowEmpty && tc.numBytesPerIter > targetBytes { + return 0, 0, kvpb.RESUME_BYTE_LIMIT, nil + } + return tc.numKeysPerIter, tc.numBytesPerIter, 0, nil + }) + require.NoError(t, err) + require.Equal(t, tc.expectedNumKeys, numKeys) + require.Equal(t, tc.expectedNumBytes, numBytes) + require.Equal(t, tc.expectedResumeReason, resumeReason) + } +} + func TestMVCCResolveIntentTxnTimestampMismatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)