Skip to content

Commit

Permalink
storage: release replicated locks during intent resolution
Browse files Browse the repository at this point in the history
Fixes #109648.
Informs #100193.

This commit adds support for releasing replicated locks during point and ranged
intent resolution, if any are found. Replicated locks are released if the
resolving transaction is finalized or if resolving transaction is pending and
the lock has been rolled back through an epoch bump or savepoint rollback.

Release note: None
  • Loading branch information
nvanbenschoten committed Sep 23, 2023
1 parent 3da0b48 commit a3b9f67
Show file tree
Hide file tree
Showing 5 changed files with 445 additions and 59 deletions.
154 changes: 111 additions & 43 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4574,6 +4574,7 @@ func MVCCResolveWriteIntent(
if err := ltIter.ValueProto(&buf.meta); err != nil {
return false, 0, nil, errors.Wrap(err, "unmarshaling lock table value")
}
var strOk bool
if str == lock.Intent {
// Intent resolution requires an MVCC iterator to look up the MVCC
// version associated with the intent. Create one.
Expand All @@ -4588,21 +4589,21 @@ func MVCCResolveWriteIntent(
}
defer iter.Close()
}
ok, err = mvccResolveWriteIntent(ctx, rw, iter, ms, intent, &buf.meta, buf)
strOk, err = mvccResolveWriteIntent(ctx, rw, iter, ms, intent, &buf.meta, buf)
} else {
// TODO(nvanbenschoten): implement.
_ = str
strOk, err = mvccReleaseLockInternal(ctx, rw, ms, intent, str, &buf.meta, buf)
}
if err != nil {
return false, 0, nil, err
}
ok = ok || strOk
}
numBytes = int64(rw.BufferedSize() - beforeBytes)
return ok, numBytes, nil, nil
}

// With the separated lock table, we are employing a performance optimization:
// when an intent metadata is removed, we preferably want to do so using a
// when a lock's metadata value is removed, we preferably want to do so using a
// SingleDel (as opposed to a Del). This is only safe if the previous operations
// on the metadata key allow it. Due to practical limitations, at the time of
// writing the condition we need is that the pebble history of the key consists
Expand All @@ -4613,7 +4614,7 @@ func MVCCResolveWriteIntent(
// It is difficult to track the history of engine writes to a key precisely, in
// particular when values are ever aborted. So we apply the optimization only to
// the main case in which it is useful, namely that of a transaction committing
// its intent that it never re-wrote in the initial epoch (i.e. no chance of it
// its lock that it never re-wrote in the initial epoch (i.e. no chance of it
// ever being removed before as part of being pushed). Note that when a txn
// refreshes, it stays in the original epoch, and the intents are moved, which
// does *not* cause a write to the MVCC metadata key (for which the history has
Expand All @@ -4630,7 +4631,7 @@ func MVCCResolveWriteIntent(
// INSERT INTO kv VALUES(1, 2);
// COMMIT;
//
// This would first remove the intent (1,1) during the ROLLBACK using a Del (the
// This would first remove the lock (1,1) during the ROLLBACK using a Del (the
// anomaly below would occur the same if a SingleDel were used here), and thus
// without an additional condition the INSERT (1,2) would be eligible for
// committing via a SingleDel. This has to be avoided as well, since the
Expand All @@ -4651,7 +4652,7 @@ func MVCCResolveWriteIntent(
// ↓
// - Set
//
// which means that a previously deleted intent metadata would erroneously
// which means that a previously deleted lock metadata would erroneously
// become visible again. So on top of restricting SingleDel to the COMMIT case,
// we also restrict it to the case of having no ignored sequence number ranges
// (i.e. no nested txn was rolled back before the commit).
Expand All @@ -4676,22 +4677,22 @@ func (h singleDelOptimizationHelper) v() bool {
return *h._didNotUpdateMeta
}

// onCommitIntent returns true if the SingleDel optimization is available
// for committing an intent.
func (h singleDelOptimizationHelper) onCommitIntent() bool {
// We're committing the intent at epoch zero, the meta tracking says we didn't
// rewrite the intent, and we also didn't previously remove the metadata for
// onCommitLock returns true if the SingleDel optimization is available
// for committing a lock/intent.
func (h singleDelOptimizationHelper) onCommitLock() bool {
// We're committing the lock at epoch zero, the meta tracking says we didn't
// rewrite the lock, and we also didn't previously remove the metadata for
// this key as part of a voluntary rollback of a nested txn. So we are safe to
// use a SingleDel here.
return h.v() && !h._hasIgnoredSeqs && h._epoch == 0
}

// onAbortIntent returns true if the SingleDel optimization is available
// for removing an intent. It is always false.
// Note that "removing an intent" can occur if we know that the epoch
// onAbortLock returns true if the SingleDel optimization is available
// for removing a lock/intent. It is always false.
// Note that "removing a lock" can occur if we know that the epoch
// changed, or when a savepoint is rolled back. It does not imply that
// the transaction aborted.
func (h singleDelOptimizationHelper) onAbortIntent() bool {
func (h singleDelOptimizationHelper) onAbortLock() bool {
return false
}

Expand All @@ -4708,7 +4709,7 @@ func mvccResolveWriteIntent(
writer Writer,
iter MVCCIterator,
ms *enginepb.MVCCStats,
// TODO(nvanbenschoten): rename this field to "resolution".
// TODO(nvanbenschoten): rename this field to "update".
intent roachpb.LockUpdate,
meta *enginepb.MVCCMetadata,
buf *putBuffer,
Expand Down Expand Up @@ -4967,7 +4968,7 @@ func mvccResolveWriteIntent(
writer, metaKey, lock.Intent, newMeta, true /* alreadyExists */)
} else {
metaKeySize, metaValSize, err = buf.clearLockMeta(
writer, metaKey, lock.Intent, canSingleDelHelper.onCommitIntent(), meta.Txn.ID, ClearOptions{
writer, metaKey, lock.Intent, canSingleDelHelper.onCommitLock(), meta.Txn.ID, ClearOptions{
ValueSizeKnown: true,
ValueSize: uint32(origMetaValSize),
})
Expand Down Expand Up @@ -5079,7 +5080,7 @@ func mvccResolveWriteIntent(
if !ok {
// If there is no other version, we should just clean up the key entirely.
_, _, err := buf.clearLockMeta(
writer, metaKey, lock.Intent, canSingleDelHelper.onAbortIntent(), meta.Txn.ID, ClearOptions{
writer, metaKey, lock.Intent, canSingleDelHelper.onAbortLock(), meta.Txn.ID, ClearOptions{
ValueSizeKnown: true,
ValueSize: uint32(origMetaValSize),
})
Expand All @@ -5101,7 +5102,7 @@ func mvccResolveWriteIntent(
ValBytes: int64(nextValueLen),
}
metaKeySize, metaValSize, err := buf.clearLockMeta(
writer, metaKey, lock.Intent, canSingleDelHelper.onAbortIntent(), meta.Txn.ID, ClearOptions{
writer, metaKey, lock.Intent, canSingleDelHelper.onAbortLock(), meta.Txn.ID, ClearOptions{
ValueSizeKnown: true,
ValueSize: uint32(origMetaValSize),
})
Expand Down Expand Up @@ -5240,28 +5241,15 @@ func MVCCResolveWriteIntentRange(
intent.EndKey = nil

var lastResolvedKey roachpb.Key
var lastResolvedKeyOk bool
for valid, err := ltIter.SeekEngineKeyGE(EngineKey{Key: ltStart}); ; valid, err = ltIter.NextEngineKey() {
if err != nil {
return 0, 0, nil, 0, errors.Wrap(err, "seeking lock table")
} else if !valid {
// No more intents in the given range.
break
}
keysExceeded = opts.MaxKeys > 0 && numKeys == opts.MaxKeys
bytesExceeded = opts.TargetBytes > 0 && numBytes >= opts.TargetBytes
if keysExceeded || bytesExceeded {
if keysExceeded {
resumeReason = kvpb.RESUME_KEY_LIMIT
} else if bytesExceeded {
resumeReason = kvpb.RESUME_BYTE_LIMIT
}
// We could also compute a tighter nextKey here if we wanted to.
// TODO(nvanbenschoten): this resumeSpan won't be correct if there
// are multiple locks on the same key. What if only of the locks for
// a key are removed? Fix this by resolving zero or all locks on a
// given key.
return numKeys, numBytes, &roachpb.Span{Key: lastResolvedKey.Next(), EndKey: intentEndKey}, resumeReason, nil
}

ltEngineKey, err := ltIter.EngineKey()
if err != nil {
return 0, 0, nil, 0, errors.Wrap(err, "retrieving lock table key")
Expand All @@ -5270,29 +5258,58 @@ func MVCCResolveWriteIntentRange(
if err != nil {
return 0, 0, nil, 0, errors.Wrap(err, "decoding lock table key")
}
sameLockedKey := lastResolvedKey.Equal(ltKey.Key)
if !sameLockedKey {
// If this is not the same locked key as the last iteration, check
// whether we've exceeded the max keys or bytes limit. We don't check in
// between locks with different strengths on the same key because we
// can't encode a resume span that would be correct in that case. A
// transaction can only hold up to 3 locks on any given key, so this
// will never lead to us significantly overshooting the TargetBytes
// limit. We also only count each unique locked key once towards the
// MaxKeys limit, so this will never lead to us overshooting the MaxKeys
// limit at all.
keysExceeded = opts.MaxKeys > 0 && numKeys == opts.MaxKeys
bytesExceeded = opts.TargetBytes > 0 && numBytes >= opts.TargetBytes
if keysExceeded || bytesExceeded {
if keysExceeded {
resumeReason = kvpb.RESUME_KEY_LIMIT
} else if bytesExceeded {
resumeReason = kvpb.RESUME_BYTE_LIMIT
}
// We could also compute a tighter nextKey here if we wanted to.
resumeSpan := &roachpb.Span{Key: lastResolvedKey.Next(), EndKey: intentEndKey}
return numKeys, numBytes, resumeSpan, resumeReason, nil
}

// Copy the underlying bytes of the unsafe key. This is needed for
// stability of the key to check for sameLockedKey and to construct
// a resume span on subsequent iteration.
lastResolvedKey = append(lastResolvedKey[:0], ltKey.Key...)
lastResolvedKeyOk = false
}
if ltKey.TxnUUID != intent.Txn.ID {
return 0, 0, nil, 0, errors.AssertionFailedf(
"unexpected txnID %v != %v while scanning lock table", ltKey.TxnUUID, intent.Txn.ID)
}
intent.Key = ltKey.Key
if err := ltIter.ValueProto(&buf.meta); err != nil {
return 0, 0, nil, 0, errors.Wrap(err, "unmarshaling lock table value")
}
// Copy the underlying bytes of the unsafe key. This is needed for
// stability of the key to construct a resume span on subsequent
// iteration.
intent.Key = ltKey.Key
lastResolvedKey = append(lastResolvedKey[:0], intent.Key...)
beforeBytes := rw.BufferedSize()
var ok bool
if ltKey.Strength == lock.Intent {
ok, err = mvccResolveWriteIntent(ctx, rw, mvccIter, ms, intent, &buf.meta, buf)
} else {
// TODO(nvanbenschoten): implement.
_ = ltKey.Strength
ok, err = mvccReleaseLockInternal(ctx, rw, ms, intent, ltKey.Strength, &buf.meta, buf)
}
if err != nil {
log.Warningf(ctx, "failed to resolve intent for key %q: %+v", lastResolvedKey, err)
} else if ok {
}
if ok && !lastResolvedKeyOk {
// We only count the first successfully resolved lock/intent on a
// given key towards the returned key count and key limit.
lastResolvedKeyOk = true
numKeys++
}
numBytes += int64(rw.BufferedSize() - beforeBytes)
Expand Down Expand Up @@ -5458,6 +5475,57 @@ func validateLockAcquisition(txn *roachpb.Transaction, str lock.Strength) error
return nil
}

// mvccReleaseLockInternal releases a lock at the specified key and strength and
// by the specified transaction. The function accepts the instructions for how
// to release the lock (encoded in the LockUpdate), and the current value of the
// lock (meta).
func mvccReleaseLockInternal(
ctx context.Context,
writer Writer,
ms *enginepb.MVCCStats,
update roachpb.LockUpdate,
str lock.Strength,
meta *enginepb.MVCCMetadata,
buf *putBuffer,
) (bool, error) {
finalized := update.Status.IsFinalized()
rolledBack := meta.Txn.Epoch < update.Txn.Epoch ||
(meta.Txn.Epoch == update.Txn.Epoch && enginepb.TxnSeqIsIgnored(meta.Txn.Sequence, update.IgnoredSeqNums))
release := finalized || rolledBack
if !release {
return false, nil
}

canSingleDelHelper := singleDelOptimizationHelper{
_didNotUpdateMeta: meta.TxnDidNotUpdateMeta,
_hasIgnoredSeqs: len(update.IgnoredSeqNums) > 0,
_epoch: update.Txn.Epoch,
}
var txnDidNotUpdateMeta bool
if update.Status == roachpb.COMMITTED && !rolledBack {
txnDidNotUpdateMeta = canSingleDelHelper.onCommitLock()
} else {
txnDidNotUpdateMeta = canSingleDelHelper.onAbortLock()
}

metaKey := MakeMVCCMetadataKey(update.Key)
origMetaKeySize := int64(metaKey.EncodedSize())
origMetaValSize := int64(meta.Size())
keyBytes, valBytes, err := buf.clearLockMeta(writer, metaKey, str, txnDidNotUpdateMeta, meta.Txn.ID, ClearOptions{
ValueSizeKnown: true,
ValueSize: uint32(meta.Size()),
})
if err != nil {
return false, err
}

// TODO(nvanbenschoten): handle MVCCStats update after addressing #109645.
_, _, _, _, _ = ms, origMetaKeySize, origMetaValSize, keyBytes, valBytes

return true, nil

}

// MVCCGarbageCollect creates an iterator on the ReadWriter. In parallel
// it iterates through the keys listed for garbage collection by the
// keys slice. The iterator is seeked in turn to each listed
Expand Down
22 changes: 13 additions & 9 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,10 @@ func TestMVCCHistories(t *testing.T) {
if err != nil {
return errors.Wrapf(err, "decoding LockTable key: %v", eKey)
}
if ltKey.Strength == lock.Intent {
// Ignore intents, which are reported by reportDataEntries.
continue
}
// Unmarshal.
v, err := iter.UnsafeValue()
if err != nil {
Expand Down Expand Up @@ -601,11 +605,11 @@ func TestMVCCHistories(t *testing.T) {
}

cmd := e.getCmd()
txnChange = txnChange || cmd.typ == typTxnUpdate
dataChange = dataChange || cmd.typ == typDataUpdate
locksChange = locksChange || cmd.typ == typLocksUpdate
txnChange = txnChange || cmd.typ&typTxnUpdate != 0
dataChange = dataChange || cmd.typ&typDataUpdate != 0
locksChange = locksChange || cmd.typ&typLocksUpdate != 0

if trace || (stats && cmd.typ == typDataUpdate) {
if trace || (stats && cmd.typ&typDataUpdate != 0) {
// If tracing is also requested by the datadriven input,
// we'll trace the statement in the actual results too.
buf.Printf(">> %s", d.Cmd)
Expand Down Expand Up @@ -638,10 +642,10 @@ func TestMVCCHistories(t *testing.T) {
// If tracing is enabled, we report the intermediate results
// after each individual step in the script.
// This may modify foundErr too.
reportResults(cmd.typ == typTxnUpdate, cmd.typ == typDataUpdate, cmd.typ == typLocksUpdate)
reportResults(cmd.typ&typTxnUpdate != 0, cmd.typ&typDataUpdate != 0, cmd.typ&typLocksUpdate != 0)
}

if stats && cmd.typ == typDataUpdate {
if stats && cmd.typ&typDataUpdate != 0 {
// If stats are enabled, emit evaluated stats returned by the
// command, and compare them with the real computed stats diff.
var msEngineDiff enginepb.MVCCStats
Expand Down Expand Up @@ -753,7 +757,7 @@ type cmd struct {
type cmdType int

const (
typReadOnly cmdType = iota
typReadOnly cmdType = 1 << iota
typTxnUpdate
typDataUpdate
typLocksUpdate
Expand All @@ -770,8 +774,8 @@ var commands = map[string]cmd{
"txn_step": {typTxnUpdate, cmdTxnStep},
"txn_update": {typTxnUpdate, cmdTxnUpdate},

"resolve_intent": {typDataUpdate, cmdResolveIntent},
"resolve_intent_range": {typDataUpdate, cmdResolveIntentRange},
"resolve_intent": {typDataUpdate | typLocksUpdate, cmdResolveIntent},
"resolve_intent_range": {typDataUpdate | typLocksUpdate, cmdResolveIntentRange},
"check_intent": {typReadOnly, cmdCheckIntent},
"add_unreplicated_lock": {typLocksUpdate, cmdAddUnreplicatedLock},
"check_for_acquire_lock": {typReadOnly, cmdCheckForAcquireLock},
Expand Down
Loading

0 comments on commit a3b9f67

Please sign in to comment.