Skip to content

Commit

Permalink
storage: Add MVCCPagination to allow general key and byte pagination
Browse files Browse the repository at this point in the history
Implement MVCCPagination which calls a user-inputted function f() until
it returns an error (note that the iterutil.StopIteration() error means
we have iterated through all elements), or until the number of keys hits
the maxKeys limit or the number of bytes hits the targetBytes limit.

MVCCPagination is a general framework that enables key and byte
pagination.

Informs: #77228

Release note: None
  • Loading branch information
KaiSun314 committed Feb 23, 2023
1 parent f33f404 commit ba83903
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 0 deletions.
70 changes: 70 additions & 0 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4089,6 +4089,76 @@ func MVCCIterate(
return intents, nil
}

// MVCCPaginate iteratively invokes f() with the current maxKeys and
// targetBytes limits. If f returns iterutil.StopIteration (meaning that we
// have iterated through all elements), the iteration stops with no error
// propagated. If f returns any other error, the iteration stops and the error
// is propagated. If the number of keys hits the maxKeys limit or the number of
// bytes hits the targetBytes limit, the iteration stops with no error
// propagated but with the appropriate resume reason returned. f returns a
// resumeReason, which if set, will assert that the number of keys / bytes hit
// the key / byte limit matching the resumeReason. Moreover, if resumeReason is
// RESUME_BYTE_LIMIT and allowEmpty is true, then the iteration stops with no
// error propagated but with the RESUME_BYTE_LIMIT resume reason returned.
//
// We note that it is up to f() whether it wants to allow the numBytes to
// exceed targetBytes by up to one entry or whether it wants to terminate
// iteration before numBytes exceeds targetBytes. See the AllowEmpty option.
func MVCCPaginate(
ctx context.Context,
maxKeys, targetBytes int64,
allowEmpty bool,
f func(maxKeys, targetBytes int64) (numKeys, numBytes int64, resumeReason kvpb.ResumeReason, err error),
) (numKeys, numBytes int64, resumeReason kvpb.ResumeReason, err error) {
for {
if maxKeys < 0 {
return numKeys, numBytes, kvpb.RESUME_KEY_LIMIT, nil
}
if targetBytes < 0 {
return numKeys, numBytes, kvpb.RESUME_BYTE_LIMIT, nil
}
addedKeys, addedBytes, resumeReason, err := f(maxKeys, targetBytes)
if err != nil {
if addedKeys != 0 || addedBytes != 0 || resumeReason != 0 {
log.Fatalf(ctx,
"addedKeys, addedBytes, and resumeReason should all be 0, but got addedKeys=%d, addedBytes=%d, resumeReason=%d",
addedKeys, addedBytes, resumeReason)
}
err = iterutil.Map(err)
return numKeys, numBytes, 0, err
}
numKeys += addedKeys
numBytes += addedBytes
if maxKeys > 0 {
if addedKeys > maxKeys {
log.Fatalf(ctx, "added %d keys, which exceeds the max key limit %d", addedKeys, maxKeys)
} else if addedKeys < maxKeys {
maxKeys -= addedKeys
} else {
maxKeys = -1
}
}
if targetBytes > 0 {
if addedBytes < targetBytes {
targetBytes -= addedBytes
} else {
targetBytes = -1
}
}
if resumeReason == kvpb.RESUME_KEY_LIMIT {
if maxKeys >= 0 {
log.Fatalf(ctx, "Resume reason RESUME_KEY_LIMIT, but key limit = %d has not been hit", maxKeys)
}
}
if resumeReason == kvpb.RESUME_BYTE_LIMIT {
if !allowEmpty && targetBytes >= 0 {
log.Fatalf(ctx, "Resume reason RESUME_BYTE_LIMIT, but byte limit = %d has not been hit", targetBytes)
}
targetBytes = -1
}
}
}

// MVCCResolveWriteIntent either commits, aborts (rolls back), or moves forward
// in time an extant write intent for a given txn according to commit
// parameter. ResolveWriteIntent will skip write intents of other txns.
Expand Down
118 changes: 118 additions & 0 deletions pkg/storage/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -2674,6 +2675,123 @@ func TestMVCCResolveNewerIntent(t *testing.T) {
}
}

// TestMVCCPaginate tests that MVCCPaginate respects the MaxKeys and
// TargetBytes limits, and returns the correct numKeys, numBytes, and
// resumeReason.
func TestMVCCPaginate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testCases := []struct {
maxKeys int64
targetBytes int64
allowEmpty bool
numKeysPerIter int64
numBytesPerIter int64
numIters int
expectedNumKeys int64
expectedNumBytes int64
expectedResumeReason kvpb.ResumeReason
}{
// MaxKeys and TargetBytes limits not reached, so do all 10 iterations.
{
maxKeys: 31,
targetBytes: 51,
allowEmpty: false,
numKeysPerIter: 3,
numBytesPerIter: 5,
numIters: 10,
expectedNumKeys: 30,
expectedNumBytes: 50,
expectedResumeReason: 0,
},
// MaxKeys limit reached after 7 iterations.
{
maxKeys: 21,
targetBytes: 51,
allowEmpty: false,
numKeysPerIter: 3,
numBytesPerIter: 5,
numIters: 10,
expectedNumKeys: 21,
expectedNumBytes: 35,
expectedResumeReason: kvpb.RESUME_KEY_LIMIT,
},
// MaxKeys limit reached after 10 iterations. Despite the fact we
// finished iterating, we still return a resume reason because we check
// the MaxKeys and TargetBytes limits before we check if we stop
// iteration.
{
maxKeys: 30,
targetBytes: 50,
allowEmpty: false,
numKeysPerIter: 3,
numBytesPerIter: 5,
numIters: 10,
expectedNumKeys: 30,
expectedNumBytes: 50,
expectedResumeReason: kvpb.RESUME_KEY_LIMIT,
},
// TargetBytes limit reached after 7 iterations.
{
maxKeys: 31,
targetBytes: 34,
allowEmpty: false,
numKeysPerIter: 3,
numBytesPerIter: 5,
numIters: 10,
expectedNumKeys: 21,
expectedNumBytes: 35,
expectedResumeReason: kvpb.RESUME_BYTE_LIMIT,
},
// TargetBytes limit reached after 7 iterations, but with TargetBytes
// limit exactly the number of bytes.
{
maxKeys: 31,
targetBytes: 35,
allowEmpty: false,
numKeysPerIter: 3,
numBytesPerIter: 5,
numIters: 10,
expectedNumKeys: 21,
expectedNumBytes: 35,
expectedResumeReason: kvpb.RESUME_BYTE_LIMIT,
},
// TargetBytes limit reached after 7 iterations, but with AllowEmpty
// set to true, so only 6 iterations are completed.
{
maxKeys: 31,
targetBytes: 34,
allowEmpty: true,
numKeysPerIter: 3,
numBytesPerIter: 5,
numIters: 10,
expectedNumKeys: 18,
expectedNumBytes: 30,
expectedResumeReason: kvpb.RESUME_BYTE_LIMIT,
},
}

for _, tc := range testCases {
var iter int
numKeys, numBytes, resumeReason, err := MVCCPaginate(context.Background(), tc.maxKeys, tc.targetBytes, tc.allowEmpty,
func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeReason kvpb.ResumeReason, err error) {
if iter == tc.numIters {
return 0, 0, 0, iterutil.StopIteration()
}
iter++
if tc.allowEmpty && tc.numBytesPerIter > targetBytes {
return 0, 0, kvpb.RESUME_BYTE_LIMIT, nil
}
return tc.numKeysPerIter, tc.numBytesPerIter, 0, nil
})
require.NoError(t, err)
require.Equal(t, tc.expectedNumKeys, numKeys)
require.Equal(t, tc.expectedNumBytes, numBytes)
require.Equal(t, tc.expectedResumeReason, resumeReason)
}
}

func TestMVCCResolveIntentTxnTimestampMismatch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit ba83903

Please sign in to comment.