Skip to content

Commit

Permalink
Merge #94939
Browse files Browse the repository at this point in the history
94939: storage: Push pagination for EndTxn into the MVCC layer r=nvanbenschoten a=KaiSun314

Informs: #77228

This PR consists of two parts: adding `MVCCPagination` and integrating `MVCCPagination` to push pagination of `EndTxn` into the MVCC layer.

Part 1: Add `MVCCPagination`

Implement MVCCPagination which calls a user-inputted function f() until
it returns an error (note that the iterutil.StopIteration() error means
we have iterated through all elements), or until the number of keys hits
the maxKeys limit or the number of bytes hits the targetBytes limit.

MVCCPagination is a general framework that enables key and byte
pagination.

Part 2: Integrate `MVCCPagination` to Push Pagination of `EndTxn` Into The MVCC Layer

In this PR, we push key pagination for EndTxn (currently located in
cmd_end_transaction.go) into the MVCC layer (mvcc.go) by integrating
MVCCPagination from the previous commit into EndTxn.

Release note: None

Co-authored-by: Kai Sun <[email protected]>
  • Loading branch information
craig[bot] and KaiSun314 committed Feb 26, 2023
2 parents 1b05269 + b103e22 commit 4e6df7b
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 81 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ go_library(
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/mon",
Expand Down
182 changes: 101 additions & 81 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package batcheval
import (
"bytes"
"context"
"math"
"sync/atomic"
"time"

Expand All @@ -33,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -539,6 +539,28 @@ func resolveLocalLocks(
args *kvpb.EndTxnRequest,
txn *roachpb.Transaction,
evalCtx EvalContext,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
var resolveAllowance int64 = lockResolutionBatchSize
if args.InternalCommitTrigger != nil {
// If this is a system transaction (such as a split or merge), don't
// enforce the resolve allowance. These transactions rely on having
// their locks resolved synchronously.
resolveAllowance = 0
}
return resolveLocalLocksWithPagination(ctx, desc, readWriter, ms, args, txn, evalCtx, resolveAllowance)
}

// resolveLocalLocksWithPagination is resolveLocalLocks but with a max key
// limit.
func resolveLocalLocksWithPagination(
ctx context.Context,
desc *roachpb.RangeDescriptor,
readWriter storage.ReadWriter,
ms *enginepb.MVCCStats,
args *kvpb.EndTxnRequest,
txn *roachpb.Transaction,
evalCtx EvalContext,
maxKeys int64,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil {
// If this is a merge, then use the post-merge descriptor to determine
Expand All @@ -547,95 +569,93 @@ func resolveLocalLocks(
desc = &mergeTrigger.LeftDesc
}

var resolveAllowance int64 = lockResolutionBatchSize
if args.InternalCommitTrigger != nil {
// If this is a system transaction (such as a split or merge), don't enforce the resolve allowance.
// These transactions rely on having their locks resolved synchronously.
resolveAllowance = math.MaxInt64
}

for _, span := range args.LockSpans {
if err := func() error {
if resolveAllowance == 0 {
remainingLockSpans := args.LockSpans
f := func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeReason kvpb.ResumeReason, err error) {
if len(remainingLockSpans) == 0 {
return 0, 0, 0, iterutil.StopIteration()
}
span := remainingLockSpans[0]
remainingLockSpans = remainingLockSpans[1:]
update := roachpb.MakeLockUpdate(txn, span)
if len(span.EndKey) == 0 {
// For single-key lock updates, do a KeyAddress-aware check of
// whether it's contained in our Range.
if !kvserverbase.ContainsKey(desc, span.Key) {
externalLocks = append(externalLocks, span)
return nil
return 0, 0, 0, nil
}
update := roachpb.MakeLockUpdate(txn, span)
if len(span.EndKey) == 0 {
// For single-key lock updates, do a KeyAddress-aware check of
// whether it's contained in our Range.
if !kvserverbase.ContainsKey(desc, span.Key) {
externalLocks = append(externalLocks, span)
return nil
}
// It may be tempting to reuse an iterator here, but this call
// can create the iterator with Prefix:true which is much faster
// than seeking -- especially for intents that are missing, e.g.
// due to async intent resolution. See:
// https://github.com/cockroachdb/cockroach/issues/64092
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{})
if err != nil {
return err
}
if ok {
resolveAllowance--
}
resolvedLocks = append(resolvedLocks, update)
// If requested, replace point tombstones with range tombstones.
if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
if err := storage.ReplacePointTombstonesWithRangeTombstones(
ctx, spanset.DisableReadWriterAssertions(readWriter),
ms, update.Key, update.EndKey); err != nil {
return err
}
}
return nil
// It may be tempting to reuse an iterator here, but this call
// can create the iterator with Prefix:true which is much faster
// than seeking -- especially for intents that are missing, e.g.
// due to async intent resolution. See:
// https://github.com/cockroachdb/cockroach/issues/64092
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{})
if err != nil {
return 0, 0, 0, errors.Wrapf(err, "resolving write intent at %s on end transaction [%s]", span, txn.Status)
}
// For update ranges, cut into parts inside and outside our key
// range. Resolve locally inside, delegate the rest. In particular,
// an update range for range-local data is correctly considered local.
inSpan, outSpans := kvserverbase.IntersectSpan(span, desc)
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: resolveAllowance})
if err != nil {
return err
}
if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil {
atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys)
}
resolveAllowance -= numKeys
if resumeSpan != nil {
if resolveAllowance != 0 {
log.Fatalf(ctx, "expected resolve allowance to be exactly 0 resolving %s; got %d", update.Span, resolveAllowance)
}
update.EndKey = resumeSpan.Key
externalLocks = append(externalLocks, *resumeSpan)
if ok {
numKeys = 1
}
resolvedLocks = append(resolvedLocks, update)
// If requested, replace point tombstones with range tombstones.
if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
if err := storage.ReplacePointTombstonesWithRangeTombstones(
ctx, spanset.DisableReadWriterAssertions(readWriter),
ms, update.Key, update.EndKey); err != nil {
return 0, 0, 0, errors.Wrapf(err,
"replacing point tombstones with range tombstones for write intent at %s on end transaction [%s]",
span, txn.Status)
}
resolvedLocks = append(resolvedLocks, update)
// If requested, replace point tombstones with range tombstones.
if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
if err := storage.ReplacePointTombstonesWithRangeTombstones(
ctx, spanset.DisableReadWriterAssertions(readWriter),
ms, update.Key, update.EndKey); err != nil {
return err
}
}
return numKeys, 0, 0, nil
}
// For update ranges, cut into parts inside and outside our key
// range. Resolve locally inside, delegate the rest. In particular,
// an update range for range-local data is correctly considered local.
inSpan, outSpans := kvserverbase.IntersectSpan(span, desc)
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
numKeys, _, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys})
if err != nil {
return 0, 0, 0, errors.Wrapf(err, "resolving write intent range at %s on end transaction [%s]", span, txn.Status)
}
if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil {
atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys)
}
if resumeSpan != nil {
update.EndKey = resumeSpan.Key
externalLocks = append(externalLocks, *resumeSpan)
}
resolvedLocks = append(resolvedLocks, update)
// If requested, replace point tombstones with range tombstones.
if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
if err := storage.ReplacePointTombstonesWithRangeTombstones(
ctx, spanset.DisableReadWriterAssertions(readWriter),
ms, update.Key, update.EndKey); err != nil {
return 0, 0, 0, errors.Wrapf(err,
"replacing point tombstones with range tombstones for write intent range at %s on end transaction [%s]",
span, txn.Status)
}
return nil
}
return nil
}(); err != nil {
return nil, nil, errors.Wrapf(err, "resolving lock at %s on end transaction [%s]", span, txn.Status)
return numKeys, 0, resumeReason, nil
}
return 0, 0, 0, nil
}

removedAny := resolveAllowance != lockResolutionBatchSize
numKeys, _, _, err := storage.MVCCPaginate(ctx, maxKeys, 0 /* targetBytes */, false /* allowEmpty */, f)
if err != nil {
return nil, nil, err
}

externalLocks = append(externalLocks, remainingLockSpans...)

removedAny := numKeys > 0
if WriteAbortSpanOnResolve(txn.Status, args.Poison, removedAny) {
if err := UpdateAbortSpan(ctx, evalCtx, readWriter, ms, txn.TxnMeta, args.Poison); err != nil {
return nil, nil, err
Expand Down
73 changes: 73 additions & 0 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4096,6 +4096,79 @@ func MVCCIterate(
return intents, nil
}

// MVCCPaginate iteratively invokes f() with the current maxKeys and
// targetBytes limits. If f returns iterutil.StopIteration (meaning that we
// have iterated through all elements), the iteration stops with no error
// propagated. If f returns any other error, the iteration stops and the error
// is propagated. If the number of keys hits the maxKeys limit or the number of
// bytes hits the targetBytes limit, the iteration stops with no error
// propagated but with the appropriate resume reason returned. f returns a
// resumeReason, which if set, will assert that the number of keys / bytes hit
// the key / byte limit matching the resumeReason. Moreover, if resumeReason is
// RESUME_BYTE_LIMIT and allowEmpty is true, then the iteration stops with no
// error propagated but with the RESUME_BYTE_LIMIT resume reason returned.
//
// We note that it is up to f() whether it wants to allow the numBytes to
// exceed targetBytes by up to one entry or whether it wants to terminate
// iteration before numBytes exceeds targetBytes. See the AllowEmpty option.
func MVCCPaginate(
ctx context.Context,
maxKeys, targetBytes int64,
allowEmpty bool,
f func(maxKeys, targetBytes int64) (numKeys, numBytes int64, resumeReason kvpb.ResumeReason, err error),
) (numKeys, numBytes int64, resumeReason kvpb.ResumeReason, err error) {
for {
if maxKeys < 0 {
return numKeys, numBytes, kvpb.RESUME_KEY_LIMIT, nil
}
if targetBytes < 0 {
return numKeys, numBytes, kvpb.RESUME_BYTE_LIMIT, nil
}
addedKeys, addedBytes, resumeReason, err := f(maxKeys, targetBytes)
if err != nil {
if addedKeys != 0 || addedBytes != 0 || resumeReason != 0 {
log.Fatalf(ctx,
"addedKeys, addedBytes, and resumeReason should all be 0, but got addedKeys=%d, addedBytes=%d, resumeReason=%d",
addedKeys, addedBytes, resumeReason)
}
err = iterutil.Map(err)
return numKeys, numBytes, 0, err
}
numKeys += addedKeys
numBytes += addedBytes
if maxKeys > 0 {
if addedKeys > maxKeys {
log.Fatalf(ctx, "added %d keys, which exceeds the max key limit %d", addedKeys, maxKeys)
} else if addedKeys < maxKeys {
maxKeys -= addedKeys
} else {
maxKeys = -1
}
}
if targetBytes > 0 {
if addedBytes < targetBytes {
targetBytes -= addedBytes
} else {
targetBytes = -1
}
}
switch resumeReason {
case kvpb.RESUME_KEY_LIMIT:
if maxKeys >= 0 {
log.Fatalf(ctx, "Resume reason RESUME_KEY_LIMIT, but key limit = %d has not been hit", maxKeys)
}
case kvpb.RESUME_BYTE_LIMIT:
if !allowEmpty && targetBytes >= 0 {
log.Fatalf(ctx, "Resume reason RESUME_BYTE_LIMIT, but byte limit = %d has not been hit", targetBytes)
}
targetBytes = -1
case 0:
default:
log.Fatalf(ctx, "Resume reason must be RESUME_KEY_LIMIT, RESUME_BYTE_LIMIT, or 0, got resumeReason = %d", resumeReason)
}
}
}

// MVCCResolveWriteIntent either commits, aborts (rolls back), or moves forward
// in time an extant write intent for a given txn according to commit
// parameter. ResolveWriteIntent will skip write intents of other txns.
Expand Down
Loading

0 comments on commit 4e6df7b

Please sign in to comment.