Skip to content

Commit

Permalink
storage: Push pagination for EndTxn into the MVCC layer
Browse files Browse the repository at this point in the history
In this PR, we push key pagination for EndTxn (currently located in
cmd_end_transaction.go) into the MVCC layer (mvcc.go) by integrating
MVCCPagination from the previous commit into the EndTxn command.

Informs: cockroachdb#77228

Release note: None
  • Loading branch information
KaiSun314 committed Jan 30, 2023
1 parent 36d75d5 commit 4b0d350
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 25 deletions.
65 changes: 43 additions & 22 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package batcheval
import (
"bytes"
"context"
"math"
"sync/atomic"
"time"

Expand All @@ -32,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -517,6 +517,28 @@ func resolveLocalLocks(
args *roachpb.EndTxnRequest,
txn *roachpb.Transaction,
evalCtx EvalContext,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
var resolveAllowance int64 = lockResolutionBatchSize
if args.InternalCommitTrigger != nil {
// If this is a system transaction (such as a split or merge), don't
// enforce the resolve allowance. These transactions rely on having
// their locks resolved synchronously.
resolveAllowance = 0
}
return resolveLocalLocksWithPagination(ctx, desc, readWriter, ms, args, txn, evalCtx, resolveAllowance)
}

// resolveLocalLocksWithPagination is resolveLocalLocks but with a max key
// limit.
func resolveLocalLocksWithPagination(
ctx context.Context,
desc *roachpb.RangeDescriptor,
readWriter storage.ReadWriter,
ms *enginepb.MVCCStats,
args *roachpb.EndTxnRequest,
txn *roachpb.Transaction,
evalCtx EvalContext,
maxKeys int64,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil {
// If this is a merge, then use the post-merge descriptor to determine
Expand All @@ -525,19 +547,14 @@ func resolveLocalLocks(
desc = &mergeTrigger.LeftDesc
}

var resolveAllowance int64 = lockResolutionBatchSize
if args.InternalCommitTrigger != nil {
// If this is a system transaction (such as a split or merge), don't enforce the resolve allowance.
// These transactions rely on having their locks resolved synchronously.
resolveAllowance = math.MaxInt64
}

for _, span := range args.LockSpans {
i := 0
f := func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeSpan *roachpb.Span, err error) {
if i >= len(args.LockSpans) {
return 0, 0, nil, iterutil.StopIteration()
}
span := args.LockSpans[i]
i++
if err := func() error {
if resolveAllowance == 0 {
externalLocks = append(externalLocks, span)
return nil
}
update := roachpb.MakeLockUpdate(txn, span)
if len(span.EndKey) == 0 {
// For single-key lock updates, do a KeyAddress-aware check of
Expand All @@ -560,7 +577,7 @@ func resolveLocalLocks(
return err
}
if ok {
resolveAllowance--
numKeys = 1
}
resolvedLocks = append(resolvedLocks, update)
// If requested, replace point tombstones with range tombstones.
Expand All @@ -580,19 +597,15 @@ func resolveLocalLocks(
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: resolveAllowance})
numKeys, _, resumeSpan, _, err = storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys})
if err != nil {
return err
}
if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil {
atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys)
}
resolveAllowance -= numKeys
if resumeSpan != nil {
if resolveAllowance != 0 {
log.Fatalf(ctx, "expected resolve allowance to be exactly 0 resolving %s; got %d", update.Span, resolveAllowance)
}
update.EndKey = resumeSpan.Key
externalLocks = append(externalLocks, *resumeSpan)
}
Expand All @@ -609,11 +622,19 @@ func resolveLocalLocks(
}
return nil
}(); err != nil {
return nil, nil, errors.Wrapf(err, "resolving lock at %s on end transaction [%s]", span, txn.Status)
return 0, 0, nil, errors.Wrapf(err, "resolving lock at %s on end transaction [%s]", span, txn.Status)
}
return numKeys, numBytes, resumeSpan, nil
}

numKeys, _, _, err := storage.MVCCPagination(ctx, maxKeys, 0, f)
if err != nil {
return nil, nil, err
}

removedAny := resolveAllowance != lockResolutionBatchSize
externalLocks = append(externalLocks, args.LockSpans[i:]...)

removedAny := numKeys > 0
if WriteAbortSpanOnResolve(txn.Status, args.Poison, removedAny) {
if err := UpdateAbortSpan(ctx, evalCtx, readWriter, ms, txn.TxnMeta, args.Poison); err != nil {
return nil, nil, err
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4086,9 +4086,10 @@ func MVCCIterate(
return intents, nil
}

// MVCCPagination invokes f() until it returns done (i.e. we have iterated
// through all elements) or an error, or until the number of keys hits the
// maxKeys limit or the number of bytes hits the targetBytes limit.
// MVCCPagination invokes f() until it returns an error (note that the
// iterutil.StopIteration() error means we have iterated through all elements),
// or until the number of keys hits the maxKeys limit or the number of bytes
// hits the targetBytes limit.
func MVCCPagination(
ctx context.Context,
maxKeys, targetBytes int64,
Expand Down

0 comments on commit 4b0d350

Please sign in to comment.