Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Refactor resolve write intent options #94764

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
@@ -563,8 +563,8 @@ func TestSpanSetMVCCResolveWriteIntentRange(t *testing.T) {
Txn: enginepb.TxnMeta{}, // unused
Status: roachpb.PENDING,
}
if _, _, err := storage.MVCCResolveWriteIntentRange(
ctx, batch, nil /* ms */, intent, 0,
if _, _, _, _, err := storage.MVCCResolveWriteIntentRange(
ctx, batch, nil /* ms */, intent, storage.MVCCResolveWriteIntentRangeOptions{},
); err != nil {
t.Fatal(err)
}
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
@@ -554,7 +554,8 @@ func resolveLocalLocks(
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update)
ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{})
if err != nil {
return err
}
@@ -579,15 +580,15 @@ func resolveLocalLocks(
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
num, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, resolveAllowance)
numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: resolveAllowance})
if err != nil {
return err
}
if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil {
atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, num)
atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys)
}
resolveAllowance -= num
resolveAllowance -= numKeys
if resumeSpan != nil {
if resolveAllowance != 0 {
log.Fatalf(ctx, "expected resolve allowance to be exactly 0 resolving %s; got %d", update.Span, resolveAllowance)
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_refresh_range_test.go
Original file line number Diff line number Diff line change
@@ -173,7 +173,7 @@ func TestRefreshRangeTimeBoundIterator(t *testing.T) {
// would not have any timestamp bounds and would be selected for every read.
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent); err != nil {
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
if err := storage.MVCCPut(ctx, db, nil, roachpb.Key("unused2"), ts1, hlc.ClockTimestamp{}, v, nil); err != nil {
@@ -272,7 +272,7 @@ func TestRefreshRangeError(t *testing.T) {
if resolveIntent {
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent); err != nil {
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_refresh_test.go
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ func TestRefreshError(t *testing.T) {
if resolveIntent {
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent); err != nil {
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
}
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
@@ -93,7 +93,8 @@ func ResolveIntent(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update)
ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{})
if err != nil {
return result.Result{}, err
}
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
@@ -52,8 +52,8 @@ func ResolveIntentRange(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
numKeys, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, h.MaxSpanRequestKeys)
numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: h.MaxSpanRequestKeys})
if err != nil {
return result.Result{}, err
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/gc/gc_random_test.go
Original file line number Diff line number Diff line change
@@ -314,7 +314,7 @@ func TestNewVsInvariants(t *testing.T) {
Txn: i.Txn,
Status: roachpb.ABORTED,
}
_, err := storage.MVCCResolveWriteIntent(ctx, eng, &stats, l)
_, _, _, err := storage.MVCCResolveWriteIntent(ctx, eng, &stats, l, storage.MVCCResolveWriteIntentOptions{})
require.NoError(t, err, "failed to resolve intent")
}
for _, cr := range gcer.clearRanges() {
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/apply.go
Original file line number Diff line number Diff line change
@@ -264,7 +264,7 @@ func applyReplicaUpdate(
Txn: intent.Txn,
Status: roachpb.ABORTED,
}
if _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, &ms, update); err != nil {
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, &ms, update, storage.MVCCResolveWriteIntentOptions{}); err != nil {
return PrepareReplicaReport{}, err
}
report.AbortedTransaction = true
4 changes: 2 additions & 2 deletions pkg/storage/bench_data_test.go
Original file line number Diff line number Diff line change
@@ -353,11 +353,11 @@ func (d mvccBenchData) Build(ctx context.Context, b *testing.B, eng Engine) erro
key := keySlice[idx]
txnMeta := txn.TxnMeta
txnMeta.WriteTimestamp = hlc.Timestamp{WallTime: int64(counts[idx]) * 5}
if _, err := MVCCResolveWriteIntent(ctx, batch, nil /* ms */, roachpb.LockUpdate{
if _, _, _, err := MVCCResolveWriteIntent(ctx, batch, nil /* ms */, roachpb.LockUpdate{
Span: roachpb.Span{Key: key},
Status: roachpb.COMMITTED,
Txn: txnMeta,
}); err != nil {
}, MVCCResolveWriteIntentOptions{}); err != nil {
b.Fatal(err)
}
}
9 changes: 5 additions & 4 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
@@ -334,7 +334,7 @@ func setupKeysWithIntent(
// is not one that should be resolved.
continue
}
found, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lu)
found, _, _, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lu, MVCCResolveWriteIntentOptions{})
require.Equal(b, true, found)
require.NoError(b, err)
}
@@ -553,7 +553,7 @@ func BenchmarkIntentResolution(b *testing.B) {
b.StartTimer()
}
lockUpdate.Key = keys[i%numIntentKeys]
found, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lockUpdate)
found, _, _, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lockUpdate, MVCCResolveWriteIntentOptions{})
if !found || err != nil {
b.Fatalf("intent not found or err %s", err)
}
@@ -613,8 +613,9 @@ func BenchmarkIntentRangeResolution(b *testing.B) {
rangeNum := i % numRanges
lockUpdate.Key = keys[rangeNum*numKeysPerRange]
lockUpdate.EndKey = keys[(rangeNum+1)*numKeysPerRange]
resolved, span, err := MVCCResolveWriteIntentRange(
context.Background(), batch, nil, lockUpdate, 1000 /* max */)
resolved, _, span, _, err := MVCCResolveWriteIntentRange(
context.Background(), batch, nil, lockUpdate,
MVCCResolveWriteIntentRangeOptions{MaxKeys: 1000})
if err != nil {
b.Fatal(err)
}
4 changes: 2 additions & 2 deletions pkg/storage/metamorphic/operations.go
Original file line number Diff line number Diff line change
@@ -485,7 +485,7 @@ func (t txnCommitOp) run(ctx context.Context) string {
for _, span := range txn.LockSpans {
intent := roachpb.MakeLockUpdate(txn, span)
intent.Status = roachpb.COMMITTED
_, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent)
_, _, _, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent, storage.MVCCResolveWriteIntentOptions{})
if err != nil {
panic(err)
}
@@ -508,7 +508,7 @@ func (t txnAbortOp) run(ctx context.Context) string {
for _, span := range txn.LockSpans {
intent := roachpb.MakeLockUpdate(txn, span)
intent.Status = roachpb.ABORTED
_, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent)
_, _, _, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent, storage.MVCCResolveWriteIntentOptions{})
if err != nil {
panic(err)
}
71 changes: 51 additions & 20 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
@@ -922,6 +922,21 @@ func (opts *MVCCGetOptions) errOnIntents() bool {
return !opts.Inconsistent && !opts.SkipLocked
}

// MVCCResolveWriteIntentOptions bundles options for the MVCCResolveWriteIntent
// function.
type MVCCResolveWriteIntentOptions struct {
// See the documentation for MVCCResolveWriteIntent for information on these
// parameters.
}

// MVCCResolveWriteIntentRangeOptions bundles options for the
// MVCCResolveWriteIntentRange function.
type MVCCResolveWriteIntentRangeOptions struct {
// See the documentation for MVCCResolveWriteIntentRange for information on
// these parameters.
MaxKeys int64
}

// newMVCCIterator sets up a suitable iterator for high-level MVCC operations
// operating at the given timestamp. If timestamp is empty or if
// `noInterleavedIntents` is set, the iterator is considered to be used for
@@ -3974,7 +3989,9 @@ func MVCCIterate(
// MVCCResolveWriteIntent either commits, aborts (rolls back), or moves forward
// in time an extant write intent for a given txn according to commit parameter.
// ResolveWriteIntent will skip write intents of other txns. It returns
// whether or not an intent was found to resolve.
// whether or not an intent was found to resolve. Note that the numBytes and
// resumeSpan return values are currently unused and serve as a placeholder in
// refactoring, but will be used in the future.
//
// Transaction epochs deserve a bit of explanation. The epoch for a
// transaction is incremented on transaction retries. A transaction
@@ -3992,24 +4009,28 @@ func MVCCIterate(
// epoch matching the commit epoch), and which intents get aborted,
// even if the transaction succeeds.
func MVCCResolveWriteIntent(
ctx context.Context, rw ReadWriter, ms *enginepb.MVCCStats, intent roachpb.LockUpdate,
) (bool, error) {
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
opts MVCCResolveWriteIntentOptions,
) (ok bool, numBytes int64, resumeSpan *roachpb.Span, err error) {
if len(intent.Key) == 0 {
return false, emptyKeyError()
return false, 0, nil, emptyKeyError()
}
if len(intent.EndKey) > 0 {
return false, errors.Errorf("can't resolve range intent as point intent")
return false, 0, nil, errors.Errorf("can't resolve range intent as point intent")
}

iterAndBuf := GetBufUsingIter(rw.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Prefix: true,
}))
iterAndBuf.iter.SeekIntentGE(intent.Key, intent.Txn.ID)
ok, err := mvccResolveWriteIntent(ctx, rw, iterAndBuf.iter, ms, intent, iterAndBuf.buf)
ok, err = mvccResolveWriteIntent(ctx, rw, iterAndBuf.iter, ms, intent, iterAndBuf.buf)
// Using defer would be more convenient, but it is measurably slower.
iterAndBuf.Cleanup()
return ok, err
return ok, 0, nil, err
}

// iterForKeyVersions provides a subset of the functionality of MVCCIterator.
@@ -4772,13 +4793,24 @@ func (b IterAndBuf) Cleanup() {
// ResolveWriteIntentRange will skip write intents of other txns. A max of zero
// means unbounded. A max of -1 means resolve nothing and returns the entire
// intent span as the resume span. Returns the number of intents resolved and a
// resume span if the max keys limit was exceeded.
// resume span if the max keys limit was exceeded. Note that the numBytes and
// resumeReason return values are currently unused and serve as a placeholder
// in refactoring, but will be used in the future.
func MVCCResolveWriteIntentRange(
ctx context.Context, rw ReadWriter, ms *enginepb.MVCCStats, intent roachpb.LockUpdate, max int64,
) (int64, *roachpb.Span, error) {
if max < 0 {
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
opts MVCCResolveWriteIntentRangeOptions,
) (
numKeys, numBytes int64,
resumeSpan *roachpb.Span,
resumeReason roachpb.ResumeReason,
err error,
) {
if opts.MaxKeys < 0 {
resumeSpan := intent.Span // don't inline or `intent` would escape to heap
return 0, &resumeSpan, nil
return 0, 0, &resumeSpan, roachpb.RESUME_KEY_LIMIT, nil
}
ltStart, _ := keys.LockTableSingleKey(intent.Key, nil)
ltEnd, _ := keys.LockTableSingleKey(intent.EndKey, nil)
@@ -4815,25 +4847,24 @@ func MVCCResolveWriteIntentRange(
intent.EndKey = nil

var lastResolvedKey roachpb.Key
num := int64(0)
for {
if valid, err := sepIter.Valid(); err != nil {
return 0, nil, err
return 0, 0, nil, 0, err
} else if !valid {
// No more intents in the given range.
break
}
if max > 0 && num == max {
if opts.MaxKeys > 0 && numKeys == opts.MaxKeys {
// We could also compute a tighter nextKey here if we wanted to.
return num, &roachpb.Span{Key: lastResolvedKey.Next(), EndKey: intentEndKey}, nil
return numKeys, 0, &roachpb.Span{Key: lastResolvedKey.Next(), EndKey: intentEndKey}, roachpb.RESUME_KEY_LIMIT, nil
}
// Parse the MVCCMetadata to see if it is a relevant intent.
meta := &putBuf.meta
if err := sepIter.ValueProto(meta); err != nil {
return 0, nil, err
return 0, 0, nil, 0, err
}
if meta.Txn == nil {
return 0, nil, errors.Errorf("intent with no txn")
return 0, 0, nil, 0, errors.Errorf("intent with no txn")
}
if intent.Txn.ID != meta.Txn.ID {
// Intent for a different txn, so ignore.
@@ -4853,11 +4884,11 @@ func MVCCResolveWriteIntentRange(
if err != nil {
log.Warningf(ctx, "failed to resolve intent for key %q: %+v", lastResolvedKey, err)
} else if ok {
num++
numKeys++
}
sepIter.nextEngineKey()
}
return num, nil, nil
return numKeys, 0, nil, 0, nil
}

// MVCCGarbageCollect creates an iterator on the ReadWriter. In parallel
4 changes: 2 additions & 2 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
@@ -890,7 +890,7 @@ func cmdResolveIntentRange(e *evalCtx) error {
intent.Status = status

return e.withWriter("resolve_intent_range", func(rw storage.ReadWriter) error {
_, _, err := storage.MVCCResolveWriteIntentRange(e.ctx, rw, e.ms, intent, 0)
_, _, _, _, err := storage.MVCCResolveWriteIntentRange(e.ctx, rw, e.ms, intent, storage.MVCCResolveWriteIntentRangeOptions{})
return err
})
}
@@ -905,7 +905,7 @@ func (e *evalCtx) resolveIntent(
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: key})
intent.Status = resolveStatus
intent.ClockWhilePending = roachpb.ObservedTimestamp{Timestamp: clockWhilePending}
_, err := storage.MVCCResolveWriteIntent(e.ctx, rw, e.ms, intent)
_, _, _, err := storage.MVCCResolveWriteIntent(e.ctx, rw, e.ms, intent, storage.MVCCResolveWriteIntentOptions{})
return err
}

12 changes: 6 additions & 6 deletions pkg/storage/mvcc_incremental_iterator_test.go
Original file line number Diff line number Diff line change
@@ -1006,12 +1006,12 @@ func TestMVCCIncrementalIterator(t *testing.T) {

intent1 := roachpb.MakeLockUpdate(&txn1, roachpb.Span{Key: testKey1})
intent1.Status = roachpb.COMMITTED
if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent1); err != nil {
if _, _, _, err := MVCCResolveWriteIntent(ctx, e, nil, intent1, MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
intent2 := roachpb.MakeLockUpdate(&txn2, roachpb.Span{Key: testKey2})
intent2.Status = roachpb.ABORTED
if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2); err != nil {
if _, _, _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2, MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
t.Run("intents-resolved", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, latest, kvs(kv1_4_4, kv2_2_2)))
@@ -1073,12 +1073,12 @@ func TestMVCCIncrementalIterator(t *testing.T) {

intent1 := roachpb.MakeLockUpdate(&txn1, roachpb.Span{Key: testKey1})
intent1.Status = roachpb.COMMITTED
if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent1); err != nil {
if _, _, _, err := MVCCResolveWriteIntent(ctx, e, nil, intent1, MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
intent2 := roachpb.MakeLockUpdate(&txn2, roachpb.Span{Key: testKey2})
intent2.Status = roachpb.ABORTED
if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2); err != nil {
if _, _, _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2, MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
t.Run("intents-resolved", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, all, kvs(kv1_4_4, kv1Deleted3, kv1_2_2, kv1_1_1, kv2_2_2)))
@@ -1261,9 +1261,9 @@ func TestMVCCIncrementalIteratorIntentDeletion(t *testing.T) {
require.NoError(t, MVCCPut(ctx, db, nil, kC, txnC1.ReadTimestamp, hlc.ClockTimestamp{}, vC1, txnC1))
require.NoError(t, db.Flush())
require.NoError(t, db.Compact())
_, err := MVCCResolveWriteIntent(ctx, db, nil, intent(txnA1))
_, _, _, err := MVCCResolveWriteIntent(ctx, db, nil, intent(txnA1), MVCCResolveWriteIntentOptions{})
require.NoError(t, err)
_, err = MVCCResolveWriteIntent(ctx, db, nil, intent(txnB1))
_, _, _, err = MVCCResolveWriteIntent(ctx, db, nil, intent(txnB1), MVCCResolveWriteIntentOptions{})
require.NoError(t, err)
require.NoError(t, MVCCPut(ctx, db, nil, kA, ts2, hlc.ClockTimestamp{}, vA2, nil))
require.NoError(t, MVCCPut(ctx, db, nil, kA, txnA3.WriteTimestamp, hlc.ClockTimestamp{}, vA3, txnA3))
Loading