diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index f821c54e7a27..a66b5e087097 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -81,6 +81,7 @@ go_library( "//pkg/storage/enginepb", "//pkg/util", "//pkg/util/hlc", + "//pkg/util/iterutil", "//pkg/util/limit", "//pkg/util/log", "//pkg/util/mon", diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index e83b195a8413..4aefb916d095 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -13,7 +13,6 @@ package batcheval import ( "bytes" "context" - "math" "sync/atomic" "time" @@ -33,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -539,6 +539,28 @@ func resolveLocalLocks( args *kvpb.EndTxnRequest, txn *roachpb.Transaction, evalCtx EvalContext, +) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) { + var resolveAllowance int64 = lockResolutionBatchSize + if args.InternalCommitTrigger != nil { + // If this is a system transaction (such as a split or merge), don't + // enforce the resolve allowance. These transactions rely on having + // their locks resolved synchronously. + resolveAllowance = 0 + } + return resolveLocalLocksWithPagination(ctx, desc, readWriter, ms, args, txn, evalCtx, resolveAllowance) +} + +// resolveLocalLocksWithPagination is resolveLocalLocks but with a max key +// limit. +func resolveLocalLocksWithPagination( + ctx context.Context, + desc *roachpb.RangeDescriptor, + readWriter storage.ReadWriter, + ms *enginepb.MVCCStats, + args *kvpb.EndTxnRequest, + txn *roachpb.Transaction, + evalCtx EvalContext, + maxKeys int64, ) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) { if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil { // If this is a merge, then use the post-merge descriptor to determine @@ -547,95 +569,93 @@ func resolveLocalLocks( desc = &mergeTrigger.LeftDesc } - var resolveAllowance int64 = lockResolutionBatchSize - if args.InternalCommitTrigger != nil { - // If this is a system transaction (such as a split or merge), don't enforce the resolve allowance. - // These transactions rely on having their locks resolved synchronously. - resolveAllowance = math.MaxInt64 - } - - for _, span := range args.LockSpans { - if err := func() error { - if resolveAllowance == 0 { + remainingLockSpans := args.LockSpans + f := func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeReason kvpb.ResumeReason, err error) { + if len(remainingLockSpans) == 0 { + return 0, 0, 0, iterutil.StopIteration() + } + span := remainingLockSpans[0] + remainingLockSpans = remainingLockSpans[1:] + update := roachpb.MakeLockUpdate(txn, span) + if len(span.EndKey) == 0 { + // For single-key lock updates, do a KeyAddress-aware check of + // whether it's contained in our Range. + if !kvserverbase.ContainsKey(desc, span.Key) { externalLocks = append(externalLocks, span) - return nil + return 0, 0, 0, nil } - update := roachpb.MakeLockUpdate(txn, span) - if len(span.EndKey) == 0 { - // For single-key lock updates, do a KeyAddress-aware check of - // whether it's contained in our Range. - if !kvserverbase.ContainsKey(desc, span.Key) { - externalLocks = append(externalLocks, span) - return nil - } - // It may be tempting to reuse an iterator here, but this call - // can create the iterator with Prefix:true which is much faster - // than seeking -- especially for intents that are missing, e.g. - // due to async intent resolution. See: - // https://github.com/cockroachdb/cockroach/issues/64092 - // - // Note that the underlying pebbleIterator will still be reused - // since readWriter is a pebbleBatch in the typical case. - ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update, - storage.MVCCResolveWriteIntentOptions{}) - if err != nil { - return err - } - if ok { - resolveAllowance-- - } - resolvedLocks = append(resolvedLocks, update) - // If requested, replace point tombstones with range tombstones. - if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { - if err := storage.ReplacePointTombstonesWithRangeTombstones( - ctx, spanset.DisableReadWriterAssertions(readWriter), - ms, update.Key, update.EndKey); err != nil { - return err - } - } - return nil + // It may be tempting to reuse an iterator here, but this call + // can create the iterator with Prefix:true which is much faster + // than seeking -- especially for intents that are missing, e.g. + // due to async intent resolution. See: + // https://github.com/cockroachdb/cockroach/issues/64092 + // + // Note that the underlying pebbleIterator will still be reused + // since readWriter is a pebbleBatch in the typical case. + ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update, + storage.MVCCResolveWriteIntentOptions{}) + if err != nil { + return 0, 0, 0, errors.Wrapf(err, "resolving write intent at %s on end transaction [%s]", span, txn.Status) } - // For update ranges, cut into parts inside and outside our key - // range. Resolve locally inside, delegate the rest. In particular, - // an update range for range-local data is correctly considered local. - inSpan, outSpans := kvserverbase.IntersectSpan(span, desc) - externalLocks = append(externalLocks, outSpans...) - if inSpan != nil { - update.Span = *inSpan - numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update, - storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: resolveAllowance}) - if err != nil { - return err - } - if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil { - atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys) - } - resolveAllowance -= numKeys - if resumeSpan != nil { - if resolveAllowance != 0 { - log.Fatalf(ctx, "expected resolve allowance to be exactly 0 resolving %s; got %d", update.Span, resolveAllowance) - } - update.EndKey = resumeSpan.Key - externalLocks = append(externalLocks, *resumeSpan) + if ok { + numKeys = 1 + } + resolvedLocks = append(resolvedLocks, update) + // If requested, replace point tombstones with range tombstones. + if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { + if err := storage.ReplacePointTombstonesWithRangeTombstones( + ctx, spanset.DisableReadWriterAssertions(readWriter), + ms, update.Key, update.EndKey); err != nil { + return 0, 0, 0, errors.Wrapf(err, + "replacing point tombstones with range tombstones for write intent at %s on end transaction [%s]", + span, txn.Status) } - resolvedLocks = append(resolvedLocks, update) - // If requested, replace point tombstones with range tombstones. - if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { - if err := storage.ReplacePointTombstonesWithRangeTombstones( - ctx, spanset.DisableReadWriterAssertions(readWriter), - ms, update.Key, update.EndKey); err != nil { - return err - } + } + return numKeys, 0, 0, nil + } + // For update ranges, cut into parts inside and outside our key + // range. Resolve locally inside, delegate the rest. In particular, + // an update range for range-local data is correctly considered local. + inSpan, outSpans := kvserverbase.IntersectSpan(span, desc) + externalLocks = append(externalLocks, outSpans...) + if inSpan != nil { + update.Span = *inSpan + numKeys, _, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update, + storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys}) + if err != nil { + return 0, 0, 0, errors.Wrapf(err, "resolving write intent range at %s on end transaction [%s]", span, txn.Status) + } + if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil { + atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys) + } + if resumeSpan != nil { + update.EndKey = resumeSpan.Key + externalLocks = append(externalLocks, *resumeSpan) + } + resolvedLocks = append(resolvedLocks, update) + // If requested, replace point tombstones with range tombstones. + if evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes { + if err := storage.ReplacePointTombstonesWithRangeTombstones( + ctx, spanset.DisableReadWriterAssertions(readWriter), + ms, update.Key, update.EndKey); err != nil { + return 0, 0, 0, errors.Wrapf(err, + "replacing point tombstones with range tombstones for write intent range at %s on end transaction [%s]", + span, txn.Status) } - return nil } - return nil - }(); err != nil { - return nil, nil, errors.Wrapf(err, "resolving lock at %s on end transaction [%s]", span, txn.Status) + return numKeys, 0, resumeReason, nil } + return 0, 0, 0, nil } - removedAny := resolveAllowance != lockResolutionBatchSize + numKeys, _, _, err := storage.MVCCPaginate(ctx, maxKeys, 0 /* targetBytes */, false /* allowEmpty */, f) + if err != nil { + return nil, nil, err + } + + externalLocks = append(externalLocks, remainingLockSpans...) + + removedAny := numKeys > 0 if WriteAbortSpanOnResolve(txn.Status, args.Poison, removedAny) { if err := UpdateAbortSpan(ctx, evalCtx, readWriter, ms, txn.TxnMeta, args.Poison); err != nil { return nil, nil, err diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index d504934f53c1..76df4a2b279e 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -4096,6 +4096,79 @@ func MVCCIterate( return intents, nil } +// MVCCPaginate iteratively invokes f() with the current maxKeys and +// targetBytes limits. If f returns iterutil.StopIteration (meaning that we +// have iterated through all elements), the iteration stops with no error +// propagated. If f returns any other error, the iteration stops and the error +// is propagated. If the number of keys hits the maxKeys limit or the number of +// bytes hits the targetBytes limit, the iteration stops with no error +// propagated but with the appropriate resume reason returned. f returns a +// resumeReason, which if set, will assert that the number of keys / bytes hit +// the key / byte limit matching the resumeReason. Moreover, if resumeReason is +// RESUME_BYTE_LIMIT and allowEmpty is true, then the iteration stops with no +// error propagated but with the RESUME_BYTE_LIMIT resume reason returned. +// +// We note that it is up to f() whether it wants to allow the numBytes to +// exceed targetBytes by up to one entry or whether it wants to terminate +// iteration before numBytes exceeds targetBytes. See the AllowEmpty option. +func MVCCPaginate( + ctx context.Context, + maxKeys, targetBytes int64, + allowEmpty bool, + f func(maxKeys, targetBytes int64) (numKeys, numBytes int64, resumeReason kvpb.ResumeReason, err error), +) (numKeys, numBytes int64, resumeReason kvpb.ResumeReason, err error) { + for { + if maxKeys < 0 { + return numKeys, numBytes, kvpb.RESUME_KEY_LIMIT, nil + } + if targetBytes < 0 { + return numKeys, numBytes, kvpb.RESUME_BYTE_LIMIT, nil + } + addedKeys, addedBytes, resumeReason, err := f(maxKeys, targetBytes) + if err != nil { + if addedKeys != 0 || addedBytes != 0 || resumeReason != 0 { + log.Fatalf(ctx, + "addedKeys, addedBytes, and resumeReason should all be 0, but got addedKeys=%d, addedBytes=%d, resumeReason=%d", + addedKeys, addedBytes, resumeReason) + } + err = iterutil.Map(err) + return numKeys, numBytes, 0, err + } + numKeys += addedKeys + numBytes += addedBytes + if maxKeys > 0 { + if addedKeys > maxKeys { + log.Fatalf(ctx, "added %d keys, which exceeds the max key limit %d", addedKeys, maxKeys) + } else if addedKeys < maxKeys { + maxKeys -= addedKeys + } else { + maxKeys = -1 + } + } + if targetBytes > 0 { + if addedBytes < targetBytes { + targetBytes -= addedBytes + } else { + targetBytes = -1 + } + } + switch resumeReason { + case kvpb.RESUME_KEY_LIMIT: + if maxKeys >= 0 { + log.Fatalf(ctx, "Resume reason RESUME_KEY_LIMIT, but key limit = %d has not been hit", maxKeys) + } + case kvpb.RESUME_BYTE_LIMIT: + if !allowEmpty && targetBytes >= 0 { + log.Fatalf(ctx, "Resume reason RESUME_BYTE_LIMIT, but byte limit = %d has not been hit", targetBytes) + } + targetBytes = -1 + case 0: + default: + log.Fatalf(ctx, "Resume reason must be RESUME_KEY_LIMIT, RESUME_BYTE_LIMIT, or 0, got resumeReason = %d", resumeReason) + } + } +} + // MVCCResolveWriteIntent either commits, aborts (rolls back), or moves forward // in time an extant write intent for a given txn according to commit // parameter. ResolveWriteIntent will skip write intents of other txns. diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 2e01d64c0eda..2b9eac261c9d 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -2674,6 +2675,123 @@ func TestMVCCResolveNewerIntent(t *testing.T) { } } +// TestMVCCPaginate tests that MVCCPaginate respects the MaxKeys and +// TargetBytes limits, and returns the correct numKeys, numBytes, and +// resumeReason. +func TestMVCCPaginate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + maxKeys int64 + targetBytes int64 + allowEmpty bool + numKeysPerIter int64 + numBytesPerIter int64 + numIters int + expectedNumKeys int64 + expectedNumBytes int64 + expectedResumeReason kvpb.ResumeReason + }{ + // MaxKeys and TargetBytes limits not reached, so do all 10 iterations. + { + maxKeys: 31, + targetBytes: 51, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 30, + expectedNumBytes: 50, + expectedResumeReason: 0, + }, + // MaxKeys limit reached after 7 iterations. + { + maxKeys: 21, + targetBytes: 51, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: kvpb.RESUME_KEY_LIMIT, + }, + // MaxKeys limit reached after 10 iterations. Despite the fact we + // finished iterating, we still return a resume reason because we check + // the MaxKeys and TargetBytes limits before we check if we stop + // iteration. + { + maxKeys: 30, + targetBytes: 50, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 30, + expectedNumBytes: 50, + expectedResumeReason: kvpb.RESUME_KEY_LIMIT, + }, + // TargetBytes limit reached after 7 iterations. + { + maxKeys: 31, + targetBytes: 34, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: kvpb.RESUME_BYTE_LIMIT, + }, + // TargetBytes limit reached after 7 iterations, but with TargetBytes + // limit exactly the number of bytes. + { + maxKeys: 31, + targetBytes: 35, + allowEmpty: false, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 21, + expectedNumBytes: 35, + expectedResumeReason: kvpb.RESUME_BYTE_LIMIT, + }, + // TargetBytes limit reached after 7 iterations, but with AllowEmpty + // set to true, so only 6 iterations are completed. + { + maxKeys: 31, + targetBytes: 34, + allowEmpty: true, + numKeysPerIter: 3, + numBytesPerIter: 5, + numIters: 10, + expectedNumKeys: 18, + expectedNumBytes: 30, + expectedResumeReason: kvpb.RESUME_BYTE_LIMIT, + }, + } + + for _, tc := range testCases { + var iter int + numKeys, numBytes, resumeReason, err := MVCCPaginate(context.Background(), tc.maxKeys, tc.targetBytes, tc.allowEmpty, + func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeReason kvpb.ResumeReason, err error) { + if iter == tc.numIters { + return 0, 0, 0, iterutil.StopIteration() + } + iter++ + if tc.allowEmpty && tc.numBytesPerIter > targetBytes { + return 0, 0, kvpb.RESUME_BYTE_LIMIT, nil + } + return tc.numKeysPerIter, tc.numBytesPerIter, 0, nil + }) + require.NoError(t, err) + require.Equal(t, tc.expectedNumKeys, numKeys) + require.Equal(t, tc.expectedNumBytes, numBytes) + require.Equal(t, tc.expectedResumeReason, resumeReason) + } +} + func TestMVCCResolveIntentTxnTimestampMismatch(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)