Skip to content

Commit

Permalink
storage: return if {shared, exclusive} replicated locks were resolved
Browse files Browse the repository at this point in the history
This patch returns whether shared or exclusive replicated locks were
resolved by calls to `MVCCResolveWriteIntent{,Range}`. We also print
this information in the MVCC histories test.

In an upcoming patch, we'll use this information to selectively bump
the timestamp cache in response to replicated {shared, exclusive} lock
resolution.

Epic: none

Release note: None
  • Loading branch information
arulajmani authored and nvanbenschoten committed Oct 4, 2023
1 parent 1b83c04 commit 0fbb976
Show file tree
Hide file tree
Showing 19 changed files with 128 additions and 89 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func TestSpanSetMVCCResolveWriteIntentRange(t *testing.T) {
Txn: enginepb.TxnMeta{ID: uuid.MakeV4()}, // unused
Status: roachpb.PENDING,
}
if _, _, _, _, err := storage.MVCCResolveWriteIntentRange(
if _, _, _, _, _, err := storage.MVCCResolveWriteIntentRange(
ctx, batch, nil /* ms */, intent, storage.MVCCResolveWriteIntentRangeOptions{},
); err != nil {
t.Fatal(err)
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func resolveLocalLocksWithPagination(
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, numBytes, resumeSpan, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
ok, numBytes, resumeSpan, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{TargetBytes: targetBytes})
if err != nil {
return 0, 0, 0, errors.Wrapf(err, "resolving write intent at %s on end transaction [%s]", span, txn.Status)
Expand Down Expand Up @@ -630,8 +630,10 @@ func resolveLocalLocksWithPagination(
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
numKeys, numBytes, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys, TargetBytes: targetBytes})
numKeys, numBytes, resumeSpan, resumeReason, _, err :=
storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys, TargetBytes: targetBytes},
)
if err != nil {
return 0, 0, 0, errors.Wrapf(err, "resolving write intent range at %s on end transaction [%s]", span, txn.Status)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_refresh_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func TestRefreshRangeTimeBoundIterator(t *testing.T) {
// would not have any timestamp bounds and would be selected for every read.
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
if _, _, _, _, err := storage.MVCCResolveWriteIntent(
ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{},
); err != nil {
t.Fatal(err)
}
if err := storage.MVCCPut(ctx, db, roachpb.Key("unused2"), ts1, v, storage.MVCCWriteOptions{}); err != nil {
Expand Down Expand Up @@ -278,7 +280,7 @@ func TestRefreshRangeError(t *testing.T) {
if resolveIntent {
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
if _, _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
t.Fatal(err)
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_refresh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func TestRefreshError(t *testing.T) {
if resolveIntent {
intent := roachpb.MakeLockUpdate(txn, roachpb.Span{Key: k})
intent.Status = roachpb.COMMITTED
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{}); err != nil {
if _, _, _, _, err := storage.MVCCResolveWriteIntent(
ctx, db, nil, intent, storage.MVCCResolveWriteIntentOptions{},
); err != nil {
t.Fatal(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func ResolveIntent(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
ok, numBytes, resumeSpan, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
ok, numBytes, resumeSpan, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{TargetBytes: h.TargetBytes})
if err != nil {
return result.Result{}, err
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ func ResolveIntentRange(
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
numKeys, numBytes, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: h.MaxSpanRequestKeys, TargetBytes: h.TargetBytes})
numKeys, numBytes, resumeSpan, resumeReason, _, err :=
storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: h.MaxSpanRequestKeys, TargetBytes: h.TargetBytes},
)
if err != nil {
return result.Result{}, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/gc/gc_random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,9 @@ func TestNewVsInvariants(t *testing.T) {
Txn: i.Txn,
Status: roachpb.ABORTED,
}
_, _, _, err := storage.MVCCResolveWriteIntent(ctx, eng, &stats, l, storage.MVCCResolveWriteIntentOptions{})
_, _, _, _, err := storage.MVCCResolveWriteIntent(
ctx, eng, &stats, l, storage.MVCCResolveWriteIntentOptions{},
)
require.NoError(t, err, "failed to resolve intent")
}
for _, cr := range gcer.clearRanges() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func applyReplicaUpdate(
Txn: res.Intent.Txn,
Status: roachpb.ABORTED,
}
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, &ms, update, storage.MVCCResolveWriteIntentOptions{}); err != nil {
if _, _, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, &ms, update, storage.MVCCResolveWriteIntentOptions{}); err != nil {
return PrepareReplicaReport{}, err
}
report.AbortedTransaction = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bench_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (d mvccBenchData) Build(ctx context.Context, b *testing.B, eng Engine) erro
key := keySlice[idx]
txnMeta := txn.TxnMeta
txnMeta.WriteTimestamp = hlc.Timestamp{WallTime: int64(counts[idx]) * 5}
if _, _, _, err := MVCCResolveWriteIntent(ctx, batch, nil /* ms */, roachpb.LockUpdate{
if _, _, _, _, err := MVCCResolveWriteIntent(ctx, batch, nil /* ms */, roachpb.LockUpdate{
Span: roachpb.Span{Key: key},
Status: roachpb.COMMITTED,
Txn: txnMeta,
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func setupKeysWithIntent(
// is not one that should be resolved.
continue
}
found, _, _, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lu, MVCCResolveWriteIntentOptions{})
found, _, _, _, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lu, MVCCResolveWriteIntentOptions{})
require.Equal(b, true, found)
require.NoError(b, err)
}
Expand Down Expand Up @@ -567,7 +567,7 @@ func BenchmarkIntentResolution(b *testing.B) {
b.StartTimer()
}
lockUpdate.Key = keys[i%numIntentKeys]
found, _, _, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lockUpdate, MVCCResolveWriteIntentOptions{})
found, _, _, _, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lockUpdate, MVCCResolveWriteIntentOptions{})
if !found || err != nil {
b.Fatalf("intent not found or err %s", err)
}
Expand Down Expand Up @@ -627,7 +627,7 @@ func BenchmarkIntentRangeResolution(b *testing.B) {
rangeNum := i % numRanges
lockUpdate.Key = keys[rangeNum*numKeysPerRange]
lockUpdate.EndKey = keys[(rangeNum+1)*numKeysPerRange]
resolved, _, span, _, err := MVCCResolveWriteIntentRange(
resolved, _, span, _, _, err := MVCCResolveWriteIntentRange(
context.Background(), batch, nil, lockUpdate,
MVCCResolveWriteIntentRangeOptions{MaxKeys: 1000})
if err != nil {
Expand Down Expand Up @@ -1901,7 +1901,7 @@ func BenchmarkMVCCScannerWithIntentsAndVersions(b *testing.B) {
key := makeKey(nil, j)
lu := lockUpdate
lu.Key = key
found, _, _, err := MVCCResolveWriteIntent(
found, _, _, _, err := MVCCResolveWriteIntent(
ctx, batch, nil, lu, MVCCResolveWriteIntentOptions{})
require.Equal(b, true, found)
require.NoError(b, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/metamorphic/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (t txnCommitOp) run(ctx context.Context) string {
for _, span := range txn.LockSpans {
intent := roachpb.MakeLockUpdate(txn, span)
intent.Status = roachpb.COMMITTED
_, _, _, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent, storage.MVCCResolveWriteIntentOptions{})
_, _, _, _, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent, storage.MVCCResolveWriteIntentOptions{})
if err != nil {
panic(err)
}
Expand All @@ -510,7 +510,7 @@ func (t txnAbortOp) run(ctx context.Context) string {
for _, span := range txn.LockSpans {
intent := roachpb.MakeLockUpdate(txn, span)
intent.Status = roachpb.ABORTED
_, _, _, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent, storage.MVCCResolveWriteIntentOptions{})
_, _, _, _, err := storage.MVCCResolveWriteIntent(context.TODO(), t.m.engine, nil, intent, storage.MVCCResolveWriteIntentOptions{})
if err != nil {
panic(err)
}
Expand Down
62 changes: 37 additions & 25 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4731,7 +4731,9 @@ func MVCCPaginate(
//
// Returns whether or not an intent was found to resolve, number of bytes added
// to the write batch by intent resolution, and the resume span if the max
// bytes limit was exceeded.
// bytes limit was exceeded. Additionally, if any replicated locks with strength
// lock.Shared or lock.Exclusive are released, a boolean indicating as such is
// also returned.
//
// Transaction epochs deserve a bit of explanation. The epoch for a
// transaction is incremented on transaction retries. A transaction
Expand All @@ -4754,15 +4756,15 @@ func MVCCResolveWriteIntent(
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
opts MVCCResolveWriteIntentOptions,
) (ok bool, numBytes int64, resumeSpan *roachpb.Span, err error) {
) (ok bool, numBytes int64, resumeSpan *roachpb.Span, replLocksReleased bool, err error) {
if len(intent.Key) == 0 {
return false, 0, nil, emptyKeyError()
return false, 0, nil, false, emptyKeyError()
}
if len(intent.EndKey) > 0 {
return false, 0, nil, errors.Errorf("can't resolve range intent as point intent")
return false, 0, nil, false, errors.Errorf("can't resolve range intent as point intent")
}
if opts.TargetBytes < 0 {
return false, 0, &roachpb.Span{Key: intent.Key}, nil
return false, 0, &roachpb.Span{Key: intent.Key}, false, nil
}

// Production code will use a buffered writer, which makes the numBytes
Expand All @@ -4777,7 +4779,7 @@ func MVCCResolveWriteIntent(
MatchTxnID: intent.Txn.ID,
})
if err != nil {
return false, 0, nil, err
return false, 0, nil, false, err
}
defer ltIter.Close()
buf := newPutBuffer()
Expand Down Expand Up @@ -4811,20 +4813,20 @@ func MVCCResolveWriteIntent(

for valid, err := ltIter.SeekEngineKeyGE(ltSeekKey); ; valid, err = ltIter.NextEngineKey() {
if err != nil {
return false, 0, nil, errors.Wrap(err, "seeking lock table")
return false, 0, nil, false, errors.Wrap(err, "seeking lock table")
} else if !valid {
break
}
str, txnID, err := ltIter.LockTableKeyVersion()
if err != nil {
return false, 0, nil, errors.Wrap(err, "decoding lock table key version")
return false, 0, nil, false, errors.Wrap(err, "decoding lock table key version")
}
if txnID != intent.Txn.ID {
return false, 0, nil, errors.AssertionFailedf(
return false, 0, nil, false, errors.AssertionFailedf(
"unexpected txnID %v != %v while scanning lock table", txnID, intent.Txn.ID)
}
if err := ltIter.ValueProto(&buf.meta); err != nil {
return false, 0, nil, errors.Wrap(err, "unmarshaling lock table value")
return false, 0, nil, false, errors.Wrap(err, "unmarshaling lock table value")
}
var strOk bool
if str == lock.Intent {
Expand All @@ -4837,21 +4839,22 @@ func MVCCResolveWriteIntent(
KeyTypes: IterKeyTypePointsAndRanges,
})
if err != nil {
return false, 0, nil, err
return false, 0, nil, false, err
}
defer iter.Close()
}
strOk, err = mvccResolveWriteIntent(ctx, rw, iter, ms, intent, &buf.meta, buf)
} else {
strOk, err = mvccReleaseLockInternal(ctx, rw, ms, intent, str, &buf.meta, buf)
replLocksReleased = replLocksReleased || strOk
}
if err != nil {
return false, 0, nil, err
return false, 0, nil, false, err
}
ok = ok || strOk
}
numBytes = int64(rw.BufferedSize() - beforeBytes)
return ok, numBytes, nil, nil
return ok, numBytes, nil, replLocksReleased, nil
}

// With the separated lock table, we are employing a performance optimization:
Expand Down Expand Up @@ -5438,14 +5441,22 @@ func mvccMaybeRewriteIntentHistory(
//
// Returns the number of intents resolved, number of bytes added to the write
// batch by intent resolution, the resume span if the max keys or bytes limit
// was exceeded, and the resume reason.
// was exceeded, and the resume reason. Additionally, if any replicated locks
// with strength lock.Shared or lock.Exclusive are released, a boolean
// indicating as such is also returned.
func MVCCResolveWriteIntentRange(
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
opts MVCCResolveWriteIntentRangeOptions,
) (numKeys, numBytes int64, resumeSpan *roachpb.Span, resumeReason kvpb.ResumeReason, err error) {
) (
numKeys, numBytes int64,
resumeSpan *roachpb.Span,
resumeReason kvpb.ResumeReason,
replLocksReleased bool,
err error,
) {
keysExceeded := opts.MaxKeys < 0
bytesExceeded := opts.TargetBytes < 0
if keysExceeded || bytesExceeded {
Expand All @@ -5455,7 +5466,7 @@ func MVCCResolveWriteIntentRange(
} else if bytesExceeded {
resumeReason = kvpb.RESUME_BYTE_LIMIT
}
return 0, 0, &resumeSpan, resumeReason, nil
return 0, 0, &resumeSpan, resumeReason, false, nil
}

ltStart, _ := keys.LockTableSingleKey(intent.Key, nil)
Expand All @@ -5466,7 +5477,7 @@ func MVCCResolveWriteIntentRange(
MatchTxnID: intent.Txn.ID,
})
if err != nil {
return 0, 0, nil, 0, err
return 0, 0, nil, 0, false, err
}
defer ltIter.Close()
var mvccIter MVCCIterator
Expand All @@ -5479,7 +5490,7 @@ func MVCCResolveWriteIntentRange(
// Production code should always have consistent iterators.
mvccIter, err = rw.NewMVCCIterator(MVCCKeyIterKind, iterOpts)
if err != nil {
return 0, 0, nil, 0, err
return 0, 0, nil, 0, false, err
}
} else {
// For correctness, we need mvccIter to be consistent with engineIter.
Expand All @@ -5496,19 +5507,19 @@ func MVCCResolveWriteIntentRange(
var lastResolvedKeyOk bool
for valid, err := ltIter.SeekEngineKeyGE(EngineKey{Key: ltStart}); ; valid, err = ltIter.NextEngineKey() {
if err != nil {
return 0, 0, nil, 0, errors.Wrap(err, "seeking lock table")
return 0, 0, nil, 0, false, errors.Wrap(err, "seeking lock table")
} else if !valid {
// No more intents in the given range.
break
}

ltEngineKey, err := ltIter.EngineKey()
if err != nil {
return 0, 0, nil, 0, errors.Wrap(err, "retrieving lock table key")
return 0, 0, nil, 0, false, errors.Wrap(err, "retrieving lock table key")
}
ltKey, err := ltEngineKey.ToLockTableKey()
if err != nil {
return 0, 0, nil, 0, errors.Wrap(err, "decoding lock table key")
return 0, 0, nil, 0, false, errors.Wrap(err, "decoding lock table key")
}
sameLockedKey := lastResolvedKey.Equal(ltKey.Key)
if !sameLockedKey {
Expand All @@ -5531,7 +5542,7 @@ func MVCCResolveWriteIntentRange(
}
// We could also compute a tighter nextKey here if we wanted to.
resumeSpan := &roachpb.Span{Key: lastResolvedKey.Next(), EndKey: intentEndKey}
return numKeys, numBytes, resumeSpan, resumeReason, nil
return numKeys, numBytes, resumeSpan, resumeReason, replLocksReleased, nil
}

// Copy the underlying bytes of the unsafe key. This is needed for
Expand All @@ -5541,19 +5552,20 @@ func MVCCResolveWriteIntentRange(
lastResolvedKeyOk = false
}
if ltKey.TxnUUID != intent.Txn.ID {
return 0, 0, nil, 0, errors.AssertionFailedf(
return 0, 0, nil, 0, false, errors.AssertionFailedf(
"unexpected txnID %v != %v while scanning lock table", ltKey.TxnUUID, intent.Txn.ID)
}
intent.Key = ltKey.Key
if err := ltIter.ValueProto(&buf.meta); err != nil {
return 0, 0, nil, 0, errors.Wrap(err, "unmarshaling lock table value")
return 0, 0, nil, 0, false, errors.Wrap(err, "unmarshaling lock table value")
}
beforeBytes := rw.BufferedSize()
var ok bool
if ltKey.Strength == lock.Intent {
ok, err = mvccResolveWriteIntent(ctx, rw, mvccIter, ms, intent, &buf.meta, buf)
} else {
ok, err = mvccReleaseLockInternal(ctx, rw, ms, intent, ltKey.Strength, &buf.meta, buf)
replLocksReleased = replLocksReleased || ok
}
if err != nil {
log.Warningf(ctx, "failed to resolve intent for key %q: %+v", lastResolvedKey, err)
Expand All @@ -5566,7 +5578,7 @@ func MVCCResolveWriteIntentRange(
}
numBytes += int64(rw.BufferedSize() - beforeBytes)
}
return numKeys, numBytes, nil, 0, nil
return numKeys, numBytes, nil, 0, replLocksReleased, nil
}

// MVCCCheckForAcquireLock scans the replicated lock table to determine whether
Expand Down
Loading

0 comments on commit 0fbb976

Please sign in to comment.