Skip to content

Commit

Permalink
Merge pull request #117644 from miraradeva/backport23.1-117541
Browse files Browse the repository at this point in the history
release-23.1: storage: fix a series of intent resolution bugs with ignored seq nums
  • Loading branch information
miraradeva authored Jan 11, 2024
2 parents 6f28a60 + 0a3085d commit cd1de07
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 51 deletions.
107 changes: 56 additions & 51 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4244,30 +4244,30 @@ func MVCCResolveWriteIntent(
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
update roachpb.LockUpdate,
opts MVCCResolveWriteIntentOptions,
) (ok bool, numBytes int64, resumeSpan *roachpb.Span, err error) {
if len(intent.Key) == 0 {
if len(update.Key) == 0 {
return false, 0, nil, emptyKeyError()
}
if len(intent.EndKey) > 0 {
if len(update.EndKey) > 0 {
return false, 0, nil, errors.Errorf("can't resolve range intent as point intent")
}
if opts.TargetBytes < 0 {
return false, 0, &roachpb.Span{Key: intent.Key}, nil
return false, 0, &roachpb.Span{Key: update.Key}, nil
}

iterAndBuf := GetBufUsingIter(rw.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Prefix: true,
}))
iterAndBuf.iter.SeekIntentGE(intent.Key, intent.Txn.ID)
iterAndBuf.iter.SeekIntentGE(update.Key, update.Txn.ID)
// Production code will use a buffered writer, which makes the numBytes
// calculation accurate. Note that an inaccurate numBytes (e.g. 0 in the
// case of an unbuffered writer) does not affect any safety properties of
// the database.
beforeBytes := rw.BufferedSize()
ok, err = mvccResolveWriteIntent(ctx, rw, iterAndBuf.iter, ms, intent, iterAndBuf.buf)
ok, err = mvccResolveWriteIntent(ctx, rw, iterAndBuf.iter, ms, update, iterAndBuf.buf)
numBytes = int64(rw.BufferedSize() - beforeBytes)
// Using defer would be more convenient, but it is measurably slower.
iterAndBuf.Cleanup()
Expand Down Expand Up @@ -4552,60 +4552,60 @@ func (h singleDelOptimizationHelper) onAbortIntent() bool {
}

// mvccResolveWriteIntent is the core logic for resolving an intent.
// REQUIRES: iter is already seeked to intent.Key.
// REQUIRES: iter is already seeked to update.Key.
// REQUIRES: iter surfaces range keys via IterKeyTypePointsAndRanges.
// Returns whether an intent was found and resolved, false otherwise.
func mvccResolveWriteIntent(
ctx context.Context,
rw ReadWriter,
iter iterForKeyVersions,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
update roachpb.LockUpdate,
buf *putBuffer,
) (bool, error) {
metaKey := MakeMVCCMetadataKey(intent.Key)
metaKey := MakeMVCCMetadataKey(update.Key)
meta := &buf.meta
ok, origMetaKeySize, origMetaValSize, err :=
mvccGetIntent(iter, metaKey, meta)
if err != nil {
return false, err
}
if !ok || meta.Txn == nil || intent.Txn.ID != meta.Txn.ID {
if !ok || meta.Txn == nil || update.Txn.ID != meta.Txn.ID {
return false, nil
}
metaTimestamp := meta.Timestamp.ToTimestamp()
canSingleDelHelper := singleDelOptimizationHelper{
_didNotUpdateMeta: meta.TxnDidNotUpdateMeta,
_hasIgnoredSeqs: len(intent.IgnoredSeqNums) > 0,
_hasIgnoredSeqs: len(update.IgnoredSeqNums) > 0,
// NB: the value is only used if epochs match, so it doesn't
// matter if we use the one from meta or incoming request here.
_epoch: intent.Txn.Epoch,
_epoch: update.Txn.Epoch,
}

// A commit with a newer epoch than the intent effectively means that we
// An update with a newer epoch than the intent effectively means that we
// wrote this intent before an earlier retry, but didn't write it again
// after. We treat such intents as uncommitted.
//
// A commit with a newer timestamp than the intent means that our timestamp
// An update with a newer timestamp than the intent means that our timestamp
// was pushed during the course of an epoch. We treat such intents as
// committed after moving their timestamp forward. This is possible if a
// transaction writes an intent and then successfully refreshes its
// timestamp to avoid a restart.
//
// A commit with an older epoch than the intent should never happen because
// An update with an older epoch than the intent should never happen because
// epoch increments require client action. This means that they can't be
// caused by replays.
//
// A commit with an older timestamp than the intent should not happen under
// An update with an older timestamp than the intent should not happen under
// normal circumstances because a client should never bump its timestamp
// after issuing an EndTxn request. Replays of intent writes that are pushed
// forward due to WriteTooOld errors without client action combined with
// replays of intent resolution make this configuration a possibility. We
// treat such intents as uncommitted.
epochsMatch := meta.Txn.Epoch == intent.Txn.Epoch
timestampsValid := metaTimestamp.LessEq(intent.Txn.WriteTimestamp)
timestampChanged := metaTimestamp.Less(intent.Txn.WriteTimestamp)
commit := intent.Status == roachpb.COMMITTED && epochsMatch && timestampsValid
epochsMatch := meta.Txn.Epoch == update.Txn.Epoch
timestampsValid := metaTimestamp.LessEq(update.Txn.WriteTimestamp)
timestampChanged := metaTimestamp.Less(update.Txn.WriteTimestamp)
commit := update.Status == roachpb.COMMITTED && epochsMatch && timestampsValid

// Note the small difference to commit epoch handling here: We allow
// a push from a previous epoch to move a newer intent. That's not
Expand All @@ -4632,9 +4632,9 @@ func mvccResolveWriteIntent(
// used for resolving), but that costs latency.
// TODO(tschottdorf): various epoch-related scenarios here deserve more
// testing.
inProgress := !intent.Status.IsFinalized() && meta.Txn.Epoch >= intent.Txn.Epoch
inProgress := !update.Status.IsFinalized() && meta.Txn.Epoch >= update.Txn.Epoch
pushed := inProgress && timestampChanged
latestKey := MVCCKey{Key: intent.Key, Timestamp: metaTimestamp}
latestKey := MVCCKey{Key: update.Key, Timestamp: metaTimestamp}

// Handle partial txn rollbacks. If the current txn sequence
// is part of a rolled back (ignored) seqnum range, we're going
Expand All @@ -4646,7 +4646,13 @@ func mvccResolveWriteIntent(
var rolledBackVal *MVCCValue
buf.newMeta = *meta
newMeta := &buf.newMeta
if len(intent.IgnoredSeqNums) > 0 {
// Update the MVCC history only if:
// 1. There are IgnoredSeqNums present.
// 2. The update is not going to abort the intent; otherwise, the entire
// history will be removed anyway.
// 3. The epochs of the intent and the update match; otherwise the epochs may
// have different seq nums (and ignored seq nums).
if len(update.IgnoredSeqNums) > 0 && (commit || inProgress) && epochsMatch {
// NOTE: mvccMaybeRewriteIntentHistory mutates its meta argument.
// TODO(nvanbenschoten): this is an awkward interface. We shouldn't
// be mutating meta and we shouldn't be restoring the previous value
Expand All @@ -4657,7 +4663,7 @@ func mvccResolveWriteIntent(
// intact and corresponding to the stats in ms to ensure that later on (in
// updateStatsOnResolve) the stats will be updated correctly based on the
// old meta (meta) and the new meta (newMeta).
removeIntent, rolledBackVal, err = mvccMaybeRewriteIntentHistory(ctx, rw, intent.IgnoredSeqNums, newMeta, latestKey)
removeIntent, rolledBackVal, err = mvccMaybeRewriteIntentHistory(ctx, rw, update.IgnoredSeqNums, newMeta, latestKey)
if err != nil {
return false, err
}
Expand All @@ -4677,7 +4683,7 @@ func mvccResolveWriteIntent(
// If we need to update the intent to roll back part of its intent
// history, make sure that we don't regress its timestamp, even if the
// caller provided an outdated timestamp.
intent.Txn.WriteTimestamp.Forward(metaTimestamp)
update.Txn.WriteTimestamp.Forward(metaTimestamp)
}
}

Expand All @@ -4701,7 +4707,7 @@ func mvccResolveWriteIntent(
if commit || pushed || rolledBackVal != nil {
// The intent might be committing at a higher timestamp, or it might be
// getting pushed.
newTimestamp := intent.Txn.WriteTimestamp
newTimestamp := update.Txn.WriteTimestamp

// Assert that the intent timestamp never regresses. The logic above should
// not allow this, regardless of the input to this function.
Expand Down Expand Up @@ -4761,7 +4767,7 @@ func mvccResolveWriteIntent(
// to the observed timestamp.
newValue := oldValue
newValue.LocalTimestamp = oldValue.GetLocalTimestamp(oldKey.Timestamp)
newValue.LocalTimestamp.Forward(intent.ClockWhilePending.Timestamp)
newValue.LocalTimestamp.Forward(update.ClockWhilePending.Timestamp)
if !newValue.LocalTimestampNeeded(newKey.Timestamp) || !rw.ShouldWriteLocalTimestamps(ctx) {
newValue.LocalTimestamp = hlc.ClockTimestamp{}
}
Expand Down Expand Up @@ -4810,6 +4816,7 @@ func mvccResolveWriteIntent(

// Update or remove the metadata key.
var metaKeySize, metaValSize int64
var logicalOp MVCCLogicalOpType
if !commit {
// Keep existing intent if we're updating it. We update the existing
// metadata's timestamp instead of using the supplied intent meta to avoid
Expand All @@ -4818,29 +4825,27 @@ func mvccResolveWriteIntent(
// even if it can.
metaKeySize, metaValSize, err = buf.putIntentMeta(
ctx, rw, metaKey, newMeta, true /* alreadyExists */)
logicalOp = MVCCUpdateIntentOpType
} else {
metaKeySize = int64(metaKey.EncodedSize())
err = rw.ClearIntent(metaKey.Key, canSingleDelHelper.onCommitIntent(), meta.Txn.ID)
logicalOp = MVCCCommitIntentOpType
}
if err != nil {
return false, err
}

// Update stat counters related to resolving the intent.
if ms != nil {
ms.Add(updateStatsOnResolve(intent.Key, prevIsValue, prevValSize, origMetaKeySize, origMetaValSize,
ms.Add(updateStatsOnResolve(update.Key, prevIsValue, prevValSize, origMetaKeySize, origMetaValSize,
metaKeySize, metaValSize, meta, newMeta, commit))
}

// Log the logical MVCC operation.
logicalOp := MVCCCommitIntentOpType
if pushed {
logicalOp = MVCCUpdateIntentOpType
}
rw.LogLogicalOp(logicalOp, MVCCLogicalOpDetails{
Txn: intent.Txn,
Key: intent.Key,
Timestamp: intent.Txn.WriteTimestamp,
Txn: update.Txn,
Key: update.Key,
Timestamp: update.Txn.WriteTimestamp,
})

return true, nil
Expand All @@ -4850,7 +4855,7 @@ func mvccResolveWriteIntent(
// MVCCMetadata.
//
// Note that we have to support a somewhat unintuitive case - an ABORT with
// intent.Txn.Epoch < meta.Txn.Epoch:
// update.Txn.Epoch < meta.Txn.Epoch:
// - writer1 writes key0 at epoch 0
// - writer2 with higher priority encounters intent at key0 (epoch 0)
// - writer1 restarts, now at epoch one (txn record not updated)
Expand All @@ -4865,8 +4870,8 @@ func mvccResolveWriteIntent(

// Log the logical MVCC operation.
rw.LogLogicalOp(MVCCAbortIntentOpType, MVCCLogicalOpDetails{
Txn: intent.Txn,
Key: intent.Key,
Txn: update.Txn,
Key: update.Key,
})

ok = false
Expand Down Expand Up @@ -4931,7 +4936,7 @@ func mvccResolveWriteIntent(
// Clear stat counters attributable to the intent we're aborting.
if ms != nil {
ms.Add(updateStatsOnClear(
intent.Key, origMetaKeySize, origMetaValSize, 0, 0, meta, nil, 0))
update.Key, origMetaKeySize, origMetaValSize, 0, 0, meta, nil, 0))
}
return true, nil
}
Expand All @@ -4950,7 +4955,7 @@ func mvccResolveWriteIntent(

// Update stat counters with older version.
if ms != nil {
ms.Add(updateStatsOnClear(intent.Key, origMetaKeySize, origMetaValSize, metaKeySize,
ms.Add(updateStatsOnClear(update.Key, origMetaKeySize, origMetaValSize, metaKeySize,
metaValSize, meta, &buf.newMeta, unsafeNextKey.Timestamp.WallTime))
}

Expand Down Expand Up @@ -5052,28 +5057,28 @@ func MVCCResolveWriteIntentRange(
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
update roachpb.LockUpdate,
opts MVCCResolveWriteIntentRangeOptions,
) (numKeys, numBytes int64, resumeSpan *roachpb.Span, resumeReason kvpb.ResumeReason, err error) {
keysExceeded := opts.MaxKeys < 0
bytesExceeded := opts.TargetBytes < 0
if keysExceeded || bytesExceeded {
resumeSpan := intent.Span // don't inline or `intent` would escape to heap
resumeSpan := update.Span // don't inline or `update` would escape to heap
if keysExceeded {
resumeReason = kvpb.RESUME_KEY_LIMIT
} else if bytesExceeded {
resumeReason = kvpb.RESUME_BYTE_LIMIT
}
return 0, 0, &resumeSpan, resumeReason, nil
}
ltStart, _ := keys.LockTableSingleKey(intent.Key, nil)
ltEnd, _ := keys.LockTableSingleKey(intent.EndKey, nil)
ltStart, _ := keys.LockTableSingleKey(update.Key, nil)
ltEnd, _ := keys.LockTableSingleKey(update.EndKey, nil)
engineIter := rw.NewEngineIterator(IterOptions{LowerBound: ltStart, UpperBound: ltEnd})
var mvccIter MVCCIterator
iterOpts := IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: intent.Key,
UpperBound: intent.EndKey,
LowerBound: update.Key,
UpperBound: update.EndKey,
}
if rw.ConsistentIterators() {
// Production code should always have consistent iterators.
Expand All @@ -5096,8 +5101,8 @@ func MVCCResolveWriteIntentRange(
// only step the iterator and not seek.
sepIter.seekEngineKeyGE(EngineKey{Key: ltStart})

intentEndKey := intent.EndKey
intent.EndKey = nil
intentEndKey := update.EndKey
update.EndKey = nil

var lastResolvedKey roachpb.Key
for {
Expand Down Expand Up @@ -5126,7 +5131,7 @@ func MVCCResolveWriteIntentRange(
if meta.Txn == nil {
return 0, 0, nil, 0, errors.Errorf("intent with no txn")
}
if intent.Txn.ID != meta.Txn.ID {
if update.Txn.ID != meta.Txn.ID {
// Intent for a different txn, so ignore.
sepIter.nextEngineKey()
continue
Expand All @@ -5139,9 +5144,9 @@ func MVCCResolveWriteIntentRange(
// stability of the key passed to mvccResolveWriteIntent, and for the
// subsequent iteration to construct a resume span.
lastResolvedKey = append(lastResolvedKey[:0], sepIter.UnsafeKey().Key...)
intent.Key = lastResolvedKey
update.Key = lastResolvedKey
beforeBytes := rw.BufferedSize()
ok, err := mvccResolveWriteIntent(ctx, rw, sepIter, ms, intent, putBuf)
ok, err := mvccResolveWriteIntent(ctx, rw, sepIter, ms, update, putBuf)
if err != nil {
log.Warningf(ctx, "failed to resolve intent for key %q: %+v", lastResolvedKey, err)
} else if ok {
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,11 @@ func cmdTxnRestart(e *evalCtx) error {
up := roachpb.NormalUserPriority
tp := enginepb.MinTxnPriority
txn.Restart(up, tp, ts)
if e.hasArg("epoch") {
var epoch int
e.scanArg("epoch", &epoch)
txn.Epoch = enginepb.TxnEpoch(epoch)
}
e.results.txn = txn
return nil
}
Expand Down
Loading

0 comments on commit cd1de07

Please sign in to comment.