Skip to content

Commit

Permalink
storage: Push pagination for EndTxn into the MVCC layer
Browse files Browse the repository at this point in the history
In this PR, we push key pagination for EndTxn (currently located in
cmd_end_transaction.go) into the MVCC layer (mvcc.go) by integrating
MVCCPagination from the previous commit into the EndTxn command.

Informs: cockroachdb#77228

Release note: None
  • Loading branch information
KaiSun314 committed Feb 25, 2023
1 parent 5401286 commit b103e22
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 81 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/mon",
Expand Down
182 changes: 101 additions & 81 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package batcheval
import (
"bytes"
"context"
"math"
"sync/atomic"
"time"

Expand All @@ -33,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -539,6 +539,28 @@ func resolveLocalLocks(
args *kvpb.EndTxnRequest,
txn *roachpb.Transaction,
evalCtx EvalContext,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
var resolveAllowance int64 = lockResolutionBatchSize
if args.InternalCommitTrigger != nil {
// If this is a system transaction (such as a split or merge), don't
// enforce the resolve allowance. These transactions rely on having
// their locks resolved synchronously.
resolveAllowance = 0
}
return resolveLocalLocksWithPagination(ctx, desc, readWriter, ms, args, txn, evalCtx, resolveAllowance)
}

// resolveLocalLocksWithPagination is resolveLocalLocks but with a max key
// limit.
func resolveLocalLocksWithPagination(
ctx context.Context,
desc *roachpb.RangeDescriptor,
readWriter storage.ReadWriter,
ms *enginepb.MVCCStats,
args *kvpb.EndTxnRequest,
txn *roachpb.Transaction,
evalCtx EvalContext,
maxKeys int64,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil {
// If this is a merge, then use the post-merge descriptor to determine
Expand All @@ -547,95 +569,93 @@ func resolveLocalLocks(
desc = &mergeTrigger.LeftDesc
}

var resolveAllowance int64 = lockResolutionBatchSize
if args.InternalCommitTrigger != nil {
// If this is a system transaction (such as a split or merge), don't enforce the resolve allowance.
// These transactions rely on having their locks resolved synchronously.
resolveAllowance = math.MaxInt64
}

for _, span := range args.LockSpans {
if err := func() error {
if resolveAllowance == 0 {
remainingLockSpans := args.LockSpans
f := func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeReason kvpb.ResumeReason, err error) {
if len(remainingLockSpans) == 0 {
return 0, 0, 0, iterutil.StopIteration()
}
span := remainingLockSpans[0]
remainingLockSpans = remainingLockSpans[1:]
update := roachpb.MakeLockUpdate(txn, span)
if len(span.EndKey) == 0 {
// For single-key lock updates, do a KeyAddress-aware check of
// whether it's contained in our Range.
if !kvserverbase.ContainsKey(desc, span.Key) {
externalLocks = append(externalLocks, span)
return nil
return 0, 0, 0, nil
}
update := roachpb.MakeLockUpdate(txn, span)
if len(span.EndKey) == 0 {
// For single-key lock updates, do a KeyAddress-aware check of
// whether it's contained in our Range.
if !kvserverbase.ContainsKey(desc, span.Key) {
externalLocks = append(externalLocks, span)
return nil
}
// It may be tempting to reuse an iterator here, but this call
// can create the iterator with Prefix:true which is much faster
// than seeking -- especially for intents that are missing, e.g.
// due to async intent resolution. See:
// https://github.com/cockroachdb/cockroach/issues/64092
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{})
if err != nil {
return err
}
if ok {
resolveAllowance--
}
resolvedLocks = append(resolvedLocks, update)
// If requested, replace point tombstones with range tombstones.
if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
if err := storage.ReplacePointTombstonesWithRangeTombstones(
ctx, spanset.DisableReadWriterAssertions(readWriter),
ms, update.Key, update.EndKey); err != nil {
return err
}
}
return nil
// It may be tempting to reuse an iterator here, but this call
// can create the iterator with Prefix:true which is much faster
// than seeking -- especially for intents that are missing, e.g.
// due to async intent resolution. See:
// https://github.com/cockroachdb/cockroach/issues/64092
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{})
if err != nil {
return 0, 0, 0, errors.Wrapf(err, "resolving write intent at %s on end transaction [%s]", span, txn.Status)
}
// For update ranges, cut into parts inside and outside our key
// range. Resolve locally inside, delegate the rest. In particular,
// an update range for range-local data is correctly considered local.
inSpan, outSpans := kvserverbase.IntersectSpan(span, desc)
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: resolveAllowance})
if err != nil {
return err
}
if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil {
atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys)
}
resolveAllowance -= numKeys
if resumeSpan != nil {
if resolveAllowance != 0 {
log.Fatalf(ctx, "expected resolve allowance to be exactly 0 resolving %s; got %d", update.Span, resolveAllowance)
}
update.EndKey = resumeSpan.Key
externalLocks = append(externalLocks, *resumeSpan)
if ok {
numKeys = 1
}
resolvedLocks = append(resolvedLocks, update)
// If requested, replace point tombstones with range tombstones.
if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
if err := storage.ReplacePointTombstonesWithRangeTombstones(
ctx, spanset.DisableReadWriterAssertions(readWriter),
ms, update.Key, update.EndKey); err != nil {
return 0, 0, 0, errors.Wrapf(err,
"replacing point tombstones with range tombstones for write intent at %s on end transaction [%s]",
span, txn.Status)
}
resolvedLocks = append(resolvedLocks, update)
// If requested, replace point tombstones with range tombstones.
if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
if err := storage.ReplacePointTombstonesWithRangeTombstones(
ctx, spanset.DisableReadWriterAssertions(readWriter),
ms, update.Key, update.EndKey); err != nil {
return err
}
}
return numKeys, 0, 0, nil
}
// For update ranges, cut into parts inside and outside our key
// range. Resolve locally inside, delegate the rest. In particular,
// an update range for range-local data is correctly considered local.
inSpan, outSpans := kvserverbase.IntersectSpan(span, desc)
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
numKeys, _, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys})
if err != nil {
return 0, 0, 0, errors.Wrapf(err, "resolving write intent range at %s on end transaction [%s]", span, txn.Status)
}
if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil {
atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys)
}
if resumeSpan != nil {
update.EndKey = resumeSpan.Key
externalLocks = append(externalLocks, *resumeSpan)
}
resolvedLocks = append(resolvedLocks, update)
// If requested, replace point tombstones with range tombstones.
if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
if err := storage.ReplacePointTombstonesWithRangeTombstones(
ctx, spanset.DisableReadWriterAssertions(readWriter),
ms, update.Key, update.EndKey); err != nil {
return 0, 0, 0, errors.Wrapf(err,
"replacing point tombstones with range tombstones for write intent range at %s on end transaction [%s]",
span, txn.Status)
}
return nil
}
return nil
}(); err != nil {
return nil, nil, errors.Wrapf(err, "resolving lock at %s on end transaction [%s]", span, txn.Status)
return numKeys, 0, resumeReason, nil
}
return 0, 0, 0, nil
}

removedAny := resolveAllowance != lockResolutionBatchSize
numKeys, _, _, err := storage.MVCCPaginate(ctx, maxKeys, 0 /* targetBytes */, false /* allowEmpty */, f)
if err != nil {
return nil, nil, err
}

externalLocks = append(externalLocks, remainingLockSpans...)

removedAny := numKeys > 0
if WriteAbortSpanOnResolve(txn.Status, args.Poison, removedAny) {
if err := UpdateAbortSpan(ctx, evalCtx, readWriter, ms, txn.TxnMeta, args.Poison); err != nil {
return nil, nil, err
Expand Down

0 comments on commit b103e22

Please sign in to comment.