Skip to content

Commit

Permalink
storage: Add MVCCPagination to allow general key and byte pagination
Browse files Browse the repository at this point in the history
Implement MVCCPagination which calls a user-inputted function f() until
it returns an error (note that the iterutil.StopIteration() error means
we have iterated through all elements), or until the number of keys hits
the maxKeys limit or the number of bytes hits the targetBytes limit.

MVCCPagination is a general framework that enables key and byte
pagination.

Informs: cockroachdb#77228

Release note: None
  • Loading branch information
KaiSun314 committed Jan 30, 2023
1 parent 10ef5d9 commit 36d75d5
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ go_library(
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/mon",
Expand Down
47 changes: 47 additions & 0 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4086,6 +4086,53 @@ func MVCCIterate(
return intents, nil
}

// MVCCPagination invokes f() until it returns done (i.e. we have iterated
// through all elements) or an error, or until the number of keys hits the
// maxKeys limit or the number of bytes hits the targetBytes limit.
func MVCCPagination(
ctx context.Context,
maxKeys, targetBytes int64,
f func(maxKeys, targetBytes int64) (numKeys, numBytes int64, resumeSpan *roachpb.Span, err error),
) (numKeys, numBytes int64, resumeReason roachpb.ResumeReason, err error) {
for {
if maxKeys < 0 {
return numKeys, numBytes, roachpb.RESUME_KEY_LIMIT, nil
}
if targetBytes < 0 {
return numKeys, numBytes, roachpb.RESUME_BYTE_LIMIT, nil
}
addedKeys, addedBytes, resumeSpan, err := f(maxKeys, targetBytes)
if err != nil {
err = iterutil.Map(err)
return numKeys, numBytes, 0, err
}
numKeys += addedKeys
numBytes += addedBytes
if maxKeys > 0 {
if addedKeys > maxKeys {
log.Fatalf(ctx, "added %d keys, which exceeds the max key limit %d", addedKeys, maxKeys)
} else if addedKeys < maxKeys {
maxKeys -= addedKeys
} else {
maxKeys = -1
}
}
if targetBytes > 0 {
if addedBytes < targetBytes {
targetBytes -= addedBytes
} else {
targetBytes = -1
}
}
if resumeSpan != nil {
if maxKeys >= 0 && targetBytes >= 0 {
log.Fatalf(ctx, "non-nil resume span returned, but both key limit = %d and byte limit = %d have not been hit",
maxKeys, targetBytes)
}
}
}
}

// MVCCResolveWriteIntent either commits, aborts (rolls back), or moves forward
// in time an extant write intent for a given txn according to commit
// parameter. ResolveWriteIntent will skip write intents of other txns.
Expand Down
96 changes: 96 additions & 0 deletions pkg/storage/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -2673,6 +2674,101 @@ func TestMVCCResolveNewerIntent(t *testing.T) {
}
}

// TestMVCCPagination tests that MVCCPagination respects the MaxKeys and
// TargetBytes limits, and returns the correct numKeys, numBytes, and
// resumeReason.
func TestMVCCPagination(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testCases := []struct {
maxKeys int64
targetBytes int64
numKeysPerIter int64
numBytesPerIter int64
numIters int
expectedNumKeys int64
expectedNumBytes int64
expectedResumeReason roachpb.ResumeReason
}{
// MaxKeys and TargetBytes limits not reached, so do all 10 iterations.
{
maxKeys: 31,
targetBytes: 51,
numKeysPerIter: 3,
numBytesPerIter: 5,
numIters: 10,
expectedNumKeys: 30,
expectedNumBytes: 50,
expectedResumeReason: 0,
},
// MaxKeys limit reached after 7 iterations.
{
maxKeys: 21,
targetBytes: 51,
numKeysPerIter: 3,
numBytesPerIter: 5,
numIters: 10,
expectedNumKeys: 21,
expectedNumBytes: 35,
expectedResumeReason: roachpb.RESUME_KEY_LIMIT,
},
// MaxKeys limit reached after 10 iterations. Despite the fact we
// finished iterating, we still return a resume reason because we check
// the MaxKeys and TargetBytes limits before we check if we stop
// iteration.
{
maxKeys: 30,
targetBytes: 50,
numKeysPerIter: 3,
numBytesPerIter: 5,
numIters: 10,
expectedNumKeys: 30,
expectedNumBytes: 50,
expectedResumeReason: roachpb.RESUME_KEY_LIMIT,
},
// TargetBytes limit reached after 7 iterations.
{
maxKeys: 31,
targetBytes: 34,
numKeysPerIter: 3,
numBytesPerIter: 5,
numIters: 10,
expectedNumKeys: 21,
expectedNumBytes: 35,
expectedResumeReason: roachpb.RESUME_BYTE_LIMIT,
},
// TargetBytes limit reached after 7 iterations, but with TargetBytes
// limit exactly the number of bytes.
{
maxKeys: 31,
targetBytes: 35,
numKeysPerIter: 3,
numBytesPerIter: 5,
numIters: 10,
expectedNumKeys: 21,
expectedNumBytes: 35,
expectedResumeReason: roachpb.RESUME_BYTE_LIMIT,
},
}

for _, tc := range testCases {
var iter int
numKeys, numBytes, resumeReason, err := MVCCPagination(context.Background(), tc.maxKeys, tc.targetBytes,
func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeSpan *roachpb.Span, err error) {
if iter == tc.numIters {
return 0, 0, nil, iterutil.StopIteration()
}
iter++
return tc.numKeysPerIter, tc.numBytesPerIter, nil, nil
})
require.NoError(t, err)
require.Equal(t, tc.expectedNumKeys, numKeys)
require.Equal(t, tc.expectedNumBytes, numBytes)
require.Equal(t, tc.expectedResumeReason, resumeReason)
}
}

func TestMVCCResolveIntentTxnTimestampMismatch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 36d75d5

Please sign in to comment.