Skip to content

Commit

Permalink
storage: Refactor resolve write intent options
Browse files Browse the repository at this point in the history
In this PR, we add MVCCResolveWriteIntentOptions to
MVCCResolveWriteIntent and MVCCResolveWriteIntentRangeOptions to
MVCCResolveWriteIntentRange. Moreover, we additionally return numBytes
and resumeSpan in MVCCResolveWriteIntent and numBytes and resumeReason
in MVCCResolveWriteIntentRange, but these return values are currently
unused and serve as a placeholder in refactoring, but will be used in
the future.

Informs: cockroachdb#77228

Release note: None
  • Loading branch information
KaiSun314 committed Jan 5, 2023
1 parent e7713f1 commit cf4ce53
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 113 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,8 @@ func TestSpanSetMVCCResolveWriteIntentRange(t *testing.T) {
Txn: enginepb.TxnMeta{}, // unused
Status: roachpb.PENDING,
}
if _, _, err := storage.MVCCResolveWriteIntentRange(
ctx, batch, nil /* ms */, intent, 0,
if _, _, _, _, err := storage.MVCCResolveWriteIntentRange(
ctx, batch, nil /* ms */, intent, storage.MVCCResolveWriteIntentRangeOptions{},
); err != nil {
t.Fatal(err)
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@ func resolveLocalLocks(
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update)
ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{})
if err != nil {
return err
}
Expand All @@ -579,15 +580,15 @@ func resolveLocalLocks(
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
num, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, resolveAllowance)
numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: resolveAllowance})
if err != nil {
return err
}
if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil {
atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, num)
atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys)
}
resolveAllowance -= num
resolveAllowance -= numKeys
if resumeSpan != nil {
if resolveAllowance != 0 {
log.Fatalf(ctx, "expected resolve allowance to be exactly 0 resolving %s; got %d", update.Span, resolveAllowance)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_refresh_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestRefreshRangeTimeBoundIterator(t *testing.T) {
// would not have any timestamp bounds and would be selected for every read.
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent); err != nil {
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
if err := storage.MVCCPut(ctx, db, nil, roachpb.Key("unused2"), ts1, hlc.ClockTimestamp{}, v, nil); err != nil {
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestRefreshRangeError(t *testing.T) {
if resolveIntent {
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent); err != nil {
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestRefreshError(t *testing.T) {
if resolveIntent {
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent); err != nil {
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func ResolveIntent(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update)
ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{})
if err != nil {
return result.Result{}, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func ResolveIntentRange(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
numKeys, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, h.MaxSpanRequestKeys)
numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: h.MaxSpanRequestKeys})
if err != nil {
return result.Result{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/gc/gc_random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func TestNewVsInvariants(t *testing.T) {
Txn: i.Txn,
Status: roachpb.ABORTED,
}
_, err := storage.MVCCResolveWriteIntent(ctx, eng, &stats, l)
_, _, _, err := storage.MVCCResolveWriteIntent(ctx, eng, &stats, l, storage.MVCCResolveWriteIntentOptions{})
require.NoError(t, err, "failed to resolve intent")
}
for _, cr := range gcer.clearRanges() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func applyReplicaUpdate(
Txn: intent.Txn,
Status: roachpb.ABORTED,
}
if _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, &ms, update); err != nil {
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, &ms, update, storage.MVCCResolveWriteIntentOptions{}); err != nil {
return PrepareReplicaReport{}, err
}
report.AbortedTransaction = true
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bench_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,11 @@ func (d mvccBenchData) Build(ctx context.Context, b *testing.B, eng Engine) erro
key := keySlice[idx]
txnMeta := txn.TxnMeta
txnMeta.WriteTimestamp = hlc.Timestamp{WallTime: int64(counts[idx]) * 5}
if _, err := MVCCResolveWriteIntent(ctx, batch, nil /* ms */, roachpb.LockUpdate{
if _, _, _, err := MVCCResolveWriteIntent(ctx, batch, nil /* ms */, roachpb.LockUpdate{
Span: roachpb.Span{Key: key},
Status: roachpb.COMMITTED,
Txn: txnMeta,
}); err != nil {
}, MVCCResolveWriteIntentOptions{}); err != nil {
b.Fatal(err)
}
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func setupKeysWithIntent(
// is not one that should be resolved.
continue
}
found, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lu)
found, _, _, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lu, MVCCResolveWriteIntentOptions{})
require.Equal(b, true, found)
require.NoError(b, err)
}
Expand Down Expand Up @@ -553,7 +553,7 @@ func BenchmarkIntentResolution(b *testing.B) {
b.StartTimer()
}
lockUpdate.Key = keys[i%numIntentKeys]
found, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lockUpdate)
found, _, _, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lockUpdate, MVCCResolveWriteIntentOptions{})
if !found || err != nil {
b.Fatalf("intent not found or err %s", err)
}
Expand Down Expand Up @@ -613,8 +613,9 @@ func BenchmarkIntentRangeResolution(b *testing.B) {
rangeNum := i % numRanges
lockUpdate.Key = keys[rangeNum*numKeysPerRange]
lockUpdate.EndKey = keys[(rangeNum+1)*numKeysPerRange]
resolved, span, err := MVCCResolveWriteIntentRange(
context.Background(), batch, nil, lockUpdate, 1000 /* max */)
resolved, _, span, _, err := MVCCResolveWriteIntentRange(
context.Background(), batch, nil, lockUpdate,
MVCCResolveWriteIntentRangeOptions{MaxKeys: 1000})
if err != nil {
b.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/metamorphic/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func (t txnCommitOp) run(ctx context.Context) string {
for _, span := range txn.LockSpans {
intent := roachpb.MakeLockUpdate(txn, span)
intent.Status = roachpb.COMMITTED
_, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent)
_, _, _, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent, storage.MVCCResolveWriteIntentOptions{})
if err != nil {
panic(err)
}
Expand All @@ -508,7 +508,7 @@ func (t txnAbortOp) run(ctx context.Context) string {
for _, span := range txn.LockSpans {
intent := roachpb.MakeLockUpdate(txn, span)
intent.Status = roachpb.ABORTED
_, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent)
_, _, _, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent, storage.MVCCResolveWriteIntentOptions{})
if err != nil {
panic(err)
}
Expand Down
71 changes: 51 additions & 20 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,21 @@ func (opts *MVCCGetOptions) errOnIntents() bool {
return !opts.Inconsistent && !opts.SkipLocked
}

// MVCCResolveWriteIntentOptions bundles options for the MVCCResolveWriteIntent
// function.
type MVCCResolveWriteIntentOptions struct {
// See the documentation for MVCCResolveWriteIntent for information on these
// parameters.
}

// MVCCResolveWriteIntentRangeOptions bundles options for the
// MVCCResolveWriteIntentRange function.
type MVCCResolveWriteIntentRangeOptions struct {
// See the documentation for MVCCResolveWriteIntentRange for information on
// these parameters.
MaxKeys int64
}

// newMVCCIterator sets up a suitable iterator for high-level MVCC operations
// operating at the given timestamp. If timestamp is empty or if
// `noInterleavedIntents` is set, the iterator is considered to be used for
Expand Down Expand Up @@ -3973,7 +3988,9 @@ func MVCCIterate(
// MVCCResolveWriteIntent either commits, aborts (rolls back), or moves forward
// in time an extant write intent for a given txn according to commit parameter.
// ResolveWriteIntent will skip write intents of other txns. It returns
// whether or not an intent was found to resolve.
// whether or not an intent was found to resolve. Note that the numBytes and
// resumeSpan return values are currently unused and serve as a placeholder in
// refactoring, but will be used in the future.
//
// Transaction epochs deserve a bit of explanation. The epoch for a
// transaction is incremented on transaction retries. A transaction
Expand All @@ -3991,24 +4008,28 @@ func MVCCIterate(
// epoch matching the commit epoch), and which intents get aborted,
// even if the transaction succeeds.
func MVCCResolveWriteIntent(
ctx context.Context, rw ReadWriter, ms *enginepb.MVCCStats, intent roachpb.LockUpdate,
) (bool, error) {
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
opts MVCCResolveWriteIntentOptions,
) (ok bool, numBytes int64, resumeSpan *roachpb.Span, err error) {
if len(intent.Key) == 0 {
return false, emptyKeyError()
return false, 0, nil, emptyKeyError()
}
if len(intent.EndKey) > 0 {
return false, errors.Errorf("can't resolve range intent as point intent")
return false, 0, nil, errors.Errorf("can't resolve range intent as point intent")
}

iterAndBuf := GetBufUsingIter(rw.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Prefix: true,
}))
iterAndBuf.iter.SeekIntentGE(intent.Key, intent.Txn.ID)
ok, err := mvccResolveWriteIntent(ctx, rw, iterAndBuf.iter, ms, intent, iterAndBuf.buf)
ok, err = mvccResolveWriteIntent(ctx, rw, iterAndBuf.iter, ms, intent, iterAndBuf.buf)
// Using defer would be more convenient, but it is measurably slower.
iterAndBuf.Cleanup()
return ok, err
return ok, 0, nil, err
}

// iterForKeyVersions provides a subset of the functionality of MVCCIterator.
Expand Down Expand Up @@ -4771,13 +4792,24 @@ func (b IterAndBuf) Cleanup() {
// ResolveWriteIntentRange will skip write intents of other txns. A max of zero
// means unbounded. A max of -1 means resolve nothing and returns the entire
// intent span as the resume span. Returns the number of intents resolved and a
// resume span if the max keys limit was exceeded.
// resume span if the max keys limit was exceeded. Note that the numBytes and
// resumeReason return values are currently unused and serve as a placeholder
// in refactoring, but will be used in the future.
func MVCCResolveWriteIntentRange(
ctx context.Context, rw ReadWriter, ms *enginepb.MVCCStats, intent roachpb.LockUpdate, max int64,
) (int64, *roachpb.Span, error) {
if max < 0 {
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
opts MVCCResolveWriteIntentRangeOptions,
) (
numKeys, numBytes int64,
resumeSpan *roachpb.Span,
resumeReason roachpb.ResumeReason,
err error,
) {
if opts.MaxKeys < 0 {
resumeSpan := intent.Span // don't inline or `intent` would escape to heap
return 0, &resumeSpan, nil
return 0, 0, &resumeSpan, 0, nil
}
ltStart, _ := keys.LockTableSingleKey(intent.Key, nil)
ltEnd, _ := keys.LockTableSingleKey(intent.EndKey, nil)
Expand Down Expand Up @@ -4814,25 +4846,24 @@ func MVCCResolveWriteIntentRange(
intent.EndKey = nil

var lastResolvedKey roachpb.Key
num := int64(0)
for {
if valid, err := sepIter.Valid(); err != nil {
return 0, nil, err
return 0, 0, nil, 0, err
} else if !valid {
// No more intents in the given range.
break
}
if max > 0 && num == max {
if opts.MaxKeys > 0 && numKeys == opts.MaxKeys {
// We could also compute a tighter nextKey here if we wanted to.
return num, &roachpb.Span{Key: lastResolvedKey.Next(), EndKey: intentEndKey}, nil
return numKeys, 0, &roachpb.Span{Key: lastResolvedKey.Next(), EndKey: intentEndKey}, 0, nil
}
// Parse the MVCCMetadata to see if it is a relevant intent.
meta := &putBuf.meta
if err := sepIter.ValueProto(meta); err != nil {
return 0, nil, err
return 0, 0, nil, 0, err
}
if meta.Txn == nil {
return 0, nil, errors.Errorf("intent with no txn")
return 0, 0, nil, 0, errors.Errorf("intent with no txn")
}
if intent.Txn.ID != meta.Txn.ID {
// Intent for a different txn, so ignore.
Expand All @@ -4852,11 +4883,11 @@ func MVCCResolveWriteIntentRange(
if err != nil {
log.Warningf(ctx, "failed to resolve intent for key %q: %+v", lastResolvedKey, err)
} else if ok {
num++
numKeys++
}
sepIter.nextEngineKey()
}
return num, nil, nil
return numKeys, 0, nil, 0, nil
}

// MVCCGarbageCollect creates an iterator on the ReadWriter. In parallel
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ func cmdResolveIntentRange(e *evalCtx) error {
intent.Status = status

return e.withWriter("resolve_intent_range", func(rw storage.ReadWriter) error {
_, _, err := storage.MVCCResolveWriteIntentRange(e.ctx, rw, e.ms, intent, 0)
_, _, _, _, err := storage.MVCCResolveWriteIntentRange(e.ctx, rw, e.ms, intent, storage.MVCCResolveWriteIntentRangeOptions{})
return err
})
}
Expand All @@ -905,7 +905,7 @@ func (e *evalCtx) resolveIntent(
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: key})
intent.Status = resolveStatus
intent.ClockWhilePending = roachpb.ObservedTimestamp{Timestamp: clockWhilePending}
_, err := storage.MVCCResolveWriteIntent(e.ctx, rw, e.ms, intent)
_, _, _, err := storage.MVCCResolveWriteIntent(e.ctx, rw, e.ms, intent, storage.MVCCResolveWriteIntentOptions{})
return err
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/mvcc_incremental_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,12 +1006,12 @@ func TestMVCCIncrementalIterator(t *testing.T) {

intent1 := roachpb.MakeLockUpdate(&txn1, roachpb.Span{Key: testKey1})
intent1.Status = roachpb.COMMITTED
if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent1); err != nil {
if _, _, _, err := MVCCResolveWriteIntent(ctx, e, nil, intent1, MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
intent2 := roachpb.MakeLockUpdate(&txn2, roachpb.Span{Key: testKey2})
intent2.Status = roachpb.ABORTED
if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2); err != nil {
if _, _, _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2, MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
t.Run("intents-resolved", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, latest, kvs(kv1_4_4, kv2_2_2)))
Expand Down Expand Up @@ -1073,12 +1073,12 @@ func TestMVCCIncrementalIterator(t *testing.T) {

intent1 := roachpb.MakeLockUpdate(&txn1, roachpb.Span{Key: testKey1})
intent1.Status = roachpb.COMMITTED
if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent1); err != nil {
if _, _, _, err := MVCCResolveWriteIntent(ctx, e, nil, intent1, MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
intent2 := roachpb.MakeLockUpdate(&txn2, roachpb.Span{Key: testKey2})
intent2.Status = roachpb.ABORTED
if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2); err != nil {
if _, _, _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2, MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
t.Run("intents-resolved", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, all, kvs(kv1_4_4, kv1Deleted3, kv1_2_2, kv1_1_1, kv2_2_2)))
Expand Down Expand Up @@ -1261,9 +1261,9 @@ func TestMVCCIncrementalIteratorIntentDeletion(t *testing.T) {
require.NoError(t, MVCCPut(ctx, db, nil, kC, txnC1.ReadTimestamp, hlc.ClockTimestamp{}, vC1, txnC1))
require.NoError(t, db.Flush())
require.NoError(t, db.Compact())
_, err := MVCCResolveWriteIntent(ctx, db, nil, intent(txnA1))
_, _, _, err := MVCCResolveWriteIntent(ctx, db, nil, intent(txnA1), MVCCResolveWriteIntentOptions{})
require.NoError(t, err)
_, err = MVCCResolveWriteIntent(ctx, db, nil, intent(txnB1))
_, _, _, err = MVCCResolveWriteIntent(ctx, db, nil, intent(txnB1), MVCCResolveWriteIntentOptions{})
require.NoError(t, err)
require.NoError(t, MVCCPut(ctx, db, nil, kA, ts2, hlc.ClockTimestamp{}, vA2, nil))
require.NoError(t, MVCCPut(ctx, db, nil, kA, txnA3.WriteTimestamp, hlc.ClockTimestamp{}, vA3, txnA3))
Expand Down
Loading

0 comments on commit cf4ce53

Please sign in to comment.