Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: storage: fix a series of intent resolution bugs with ignored seq nums #117644

Merged
merged 3 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 56 additions & 51 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4244,30 +4244,30 @@ func MVCCResolveWriteIntent(
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
update roachpb.LockUpdate,
opts MVCCResolveWriteIntentOptions,
) (ok bool, numBytes int64, resumeSpan *roachpb.Span, err error) {
if len(intent.Key) == 0 {
if len(update.Key) == 0 {
return false, 0, nil, emptyKeyError()
}
if len(intent.EndKey) > 0 {
if len(update.EndKey) > 0 {
return false, 0, nil, errors.Errorf("can't resolve range intent as point intent")
}
if opts.TargetBytes < 0 {
return false, 0, &roachpb.Span{Key: intent.Key}, nil
return false, 0, &roachpb.Span{Key: update.Key}, nil
}

iterAndBuf := GetBufUsingIter(rw.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Prefix: true,
}))
iterAndBuf.iter.SeekIntentGE(intent.Key, intent.Txn.ID)
iterAndBuf.iter.SeekIntentGE(update.Key, update.Txn.ID)
// Production code will use a buffered writer, which makes the numBytes
// calculation accurate. Note that an inaccurate numBytes (e.g. 0 in the
// case of an unbuffered writer) does not affect any safety properties of
// the database.
beforeBytes := rw.BufferedSize()
ok, err = mvccResolveWriteIntent(ctx, rw, iterAndBuf.iter, ms, intent, iterAndBuf.buf)
ok, err = mvccResolveWriteIntent(ctx, rw, iterAndBuf.iter, ms, update, iterAndBuf.buf)
numBytes = int64(rw.BufferedSize() - beforeBytes)
// Using defer would be more convenient, but it is measurably slower.
iterAndBuf.Cleanup()
Expand Down Expand Up @@ -4552,60 +4552,60 @@ func (h singleDelOptimizationHelper) onAbortIntent() bool {
}

// mvccResolveWriteIntent is the core logic for resolving an intent.
// REQUIRES: iter is already seeked to intent.Key.
// REQUIRES: iter is already seeked to update.Key.
// REQUIRES: iter surfaces range keys via IterKeyTypePointsAndRanges.
// Returns whether an intent was found and resolved, false otherwise.
func mvccResolveWriteIntent(
ctx context.Context,
rw ReadWriter,
iter iterForKeyVersions,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
update roachpb.LockUpdate,
buf *putBuffer,
) (bool, error) {
metaKey := MakeMVCCMetadataKey(intent.Key)
metaKey := MakeMVCCMetadataKey(update.Key)
meta := &buf.meta
ok, origMetaKeySize, origMetaValSize, err :=
mvccGetIntent(iter, metaKey, meta)
if err != nil {
return false, err
}
if !ok || meta.Txn == nil || intent.Txn.ID != meta.Txn.ID {
if !ok || meta.Txn == nil || update.Txn.ID != meta.Txn.ID {
return false, nil
}
metaTimestamp := meta.Timestamp.ToTimestamp()
canSingleDelHelper := singleDelOptimizationHelper{
_didNotUpdateMeta: meta.TxnDidNotUpdateMeta,
_hasIgnoredSeqs: len(intent.IgnoredSeqNums) > 0,
_hasIgnoredSeqs: len(update.IgnoredSeqNums) > 0,
// NB: the value is only used if epochs match, so it doesn't
// matter if we use the one from meta or incoming request here.
_epoch: intent.Txn.Epoch,
_epoch: update.Txn.Epoch,
}

// A commit with a newer epoch than the intent effectively means that we
// An update with a newer epoch than the intent effectively means that we
// wrote this intent before an earlier retry, but didn't write it again
// after. We treat such intents as uncommitted.
//
// A commit with a newer timestamp than the intent means that our timestamp
// An update with a newer timestamp than the intent means that our timestamp
// was pushed during the course of an epoch. We treat such intents as
// committed after moving their timestamp forward. This is possible if a
// transaction writes an intent and then successfully refreshes its
// timestamp to avoid a restart.
//
// A commit with an older epoch than the intent should never happen because
// An update with an older epoch than the intent should never happen because
// epoch increments require client action. This means that they can't be
// caused by replays.
//
// A commit with an older timestamp than the intent should not happen under
// An update with an older timestamp than the intent should not happen under
// normal circumstances because a client should never bump its timestamp
// after issuing an EndTxn request. Replays of intent writes that are pushed
// forward due to WriteTooOld errors without client action combined with
// replays of intent resolution make this configuration a possibility. We
// treat such intents as uncommitted.
epochsMatch := meta.Txn.Epoch == intent.Txn.Epoch
timestampsValid := metaTimestamp.LessEq(intent.Txn.WriteTimestamp)
timestampChanged := metaTimestamp.Less(intent.Txn.WriteTimestamp)
commit := intent.Status == roachpb.COMMITTED && epochsMatch && timestampsValid
epochsMatch := meta.Txn.Epoch == update.Txn.Epoch
timestampsValid := metaTimestamp.LessEq(update.Txn.WriteTimestamp)
timestampChanged := metaTimestamp.Less(update.Txn.WriteTimestamp)
commit := update.Status == roachpb.COMMITTED && epochsMatch && timestampsValid

// Note the small difference to commit epoch handling here: We allow
// a push from a previous epoch to move a newer intent. That's not
Expand All @@ -4632,9 +4632,9 @@ func mvccResolveWriteIntent(
// used for resolving), but that costs latency.
// TODO(tschottdorf): various epoch-related scenarios here deserve more
// testing.
inProgress := !intent.Status.IsFinalized() && meta.Txn.Epoch >= intent.Txn.Epoch
inProgress := !update.Status.IsFinalized() && meta.Txn.Epoch >= update.Txn.Epoch
pushed := inProgress && timestampChanged
latestKey := MVCCKey{Key: intent.Key, Timestamp: metaTimestamp}
latestKey := MVCCKey{Key: update.Key, Timestamp: metaTimestamp}

// Handle partial txn rollbacks. If the current txn sequence
// is part of a rolled back (ignored) seqnum range, we're going
Expand All @@ -4646,7 +4646,13 @@ func mvccResolveWriteIntent(
var rolledBackVal *MVCCValue
buf.newMeta = *meta
newMeta := &buf.newMeta
if len(intent.IgnoredSeqNums) > 0 {
// Update the MVCC history only if:
// 1. There are IgnoredSeqNums present.
// 2. The update is not going to abort the intent; otherwise, the entire
// history will be removed anyway.
// 3. The epochs of the intent and the update match; otherwise the epochs may
// have different seq nums (and ignored seq nums).
if len(update.IgnoredSeqNums) > 0 && (commit || inProgress) && epochsMatch {
// NOTE: mvccMaybeRewriteIntentHistory mutates its meta argument.
// TODO(nvanbenschoten): this is an awkward interface. We shouldn't
// be mutating meta and we shouldn't be restoring the previous value
Expand All @@ -4657,7 +4663,7 @@ func mvccResolveWriteIntent(
// intact and corresponding to the stats in ms to ensure that later on (in
// updateStatsOnResolve) the stats will be updated correctly based on the
// old meta (meta) and the new meta (newMeta).
removeIntent, rolledBackVal, err = mvccMaybeRewriteIntentHistory(ctx, rw, intent.IgnoredSeqNums, newMeta, latestKey)
removeIntent, rolledBackVal, err = mvccMaybeRewriteIntentHistory(ctx, rw, update.IgnoredSeqNums, newMeta, latestKey)
if err != nil {
return false, err
}
Expand All @@ -4677,7 +4683,7 @@ func mvccResolveWriteIntent(
// If we need to update the intent to roll back part of its intent
// history, make sure that we don't regress its timestamp, even if the
// caller provided an outdated timestamp.
intent.Txn.WriteTimestamp.Forward(metaTimestamp)
update.Txn.WriteTimestamp.Forward(metaTimestamp)
}
}

Expand All @@ -4701,7 +4707,7 @@ func mvccResolveWriteIntent(
if commit || pushed || rolledBackVal != nil {
// The intent might be committing at a higher timestamp, or it might be
// getting pushed.
newTimestamp := intent.Txn.WriteTimestamp
newTimestamp := update.Txn.WriteTimestamp

// Assert that the intent timestamp never regresses. The logic above should
// not allow this, regardless of the input to this function.
Expand Down Expand Up @@ -4761,7 +4767,7 @@ func mvccResolveWriteIntent(
// to the observed timestamp.
newValue := oldValue
newValue.LocalTimestamp = oldValue.GetLocalTimestamp(oldKey.Timestamp)
newValue.LocalTimestamp.Forward(intent.ClockWhilePending.Timestamp)
newValue.LocalTimestamp.Forward(update.ClockWhilePending.Timestamp)
if !newValue.LocalTimestampNeeded(newKey.Timestamp) || !rw.ShouldWriteLocalTimestamps(ctx) {
newValue.LocalTimestamp = hlc.ClockTimestamp{}
}
Expand Down Expand Up @@ -4810,6 +4816,7 @@ func mvccResolveWriteIntent(

// Update or remove the metadata key.
var metaKeySize, metaValSize int64
var logicalOp MVCCLogicalOpType
if !commit {
// Keep existing intent if we're updating it. We update the existing
// metadata's timestamp instead of using the supplied intent meta to avoid
Expand All @@ -4818,29 +4825,27 @@ func mvccResolveWriteIntent(
// even if it can.
metaKeySize, metaValSize, err = buf.putIntentMeta(
ctx, rw, metaKey, newMeta, true /* alreadyExists */)
logicalOp = MVCCUpdateIntentOpType
} else {
metaKeySize = int64(metaKey.EncodedSize())
err = rw.ClearIntent(metaKey.Key, canSingleDelHelper.onCommitIntent(), meta.Txn.ID)
logicalOp = MVCCCommitIntentOpType
}
if err != nil {
return false, err
}

// Update stat counters related to resolving the intent.
if ms != nil {
ms.Add(updateStatsOnResolve(intent.Key, prevIsValue, prevValSize, origMetaKeySize, origMetaValSize,
ms.Add(updateStatsOnResolve(update.Key, prevIsValue, prevValSize, origMetaKeySize, origMetaValSize,
metaKeySize, metaValSize, meta, newMeta, commit))
}

// Log the logical MVCC operation.
logicalOp := MVCCCommitIntentOpType
if pushed {
logicalOp = MVCCUpdateIntentOpType
}
rw.LogLogicalOp(logicalOp, MVCCLogicalOpDetails{
Txn: intent.Txn,
Key: intent.Key,
Timestamp: intent.Txn.WriteTimestamp,
Txn: update.Txn,
Key: update.Key,
Timestamp: update.Txn.WriteTimestamp,
})

return true, nil
Expand All @@ -4850,7 +4855,7 @@ func mvccResolveWriteIntent(
// MVCCMetadata.
//
// Note that we have to support a somewhat unintuitive case - an ABORT with
// intent.Txn.Epoch < meta.Txn.Epoch:
// update.Txn.Epoch < meta.Txn.Epoch:
// - writer1 writes key0 at epoch 0
// - writer2 with higher priority encounters intent at key0 (epoch 0)
// - writer1 restarts, now at epoch one (txn record not updated)
Expand All @@ -4865,8 +4870,8 @@ func mvccResolveWriteIntent(

// Log the logical MVCC operation.
rw.LogLogicalOp(MVCCAbortIntentOpType, MVCCLogicalOpDetails{
Txn: intent.Txn,
Key: intent.Key,
Txn: update.Txn,
Key: update.Key,
})

ok = false
Expand Down Expand Up @@ -4931,7 +4936,7 @@ func mvccResolveWriteIntent(
// Clear stat counters attributable to the intent we're aborting.
if ms != nil {
ms.Add(updateStatsOnClear(
intent.Key, origMetaKeySize, origMetaValSize, 0, 0, meta, nil, 0))
update.Key, origMetaKeySize, origMetaValSize, 0, 0, meta, nil, 0))
}
return true, nil
}
Expand All @@ -4950,7 +4955,7 @@ func mvccResolveWriteIntent(

// Update stat counters with older version.
if ms != nil {
ms.Add(updateStatsOnClear(intent.Key, origMetaKeySize, origMetaValSize, metaKeySize,
ms.Add(updateStatsOnClear(update.Key, origMetaKeySize, origMetaValSize, metaKeySize,
metaValSize, meta, &buf.newMeta, unsafeNextKey.Timestamp.WallTime))
}

Expand Down Expand Up @@ -5052,28 +5057,28 @@ func MVCCResolveWriteIntentRange(
ctx context.Context,
rw ReadWriter,
ms *enginepb.MVCCStats,
intent roachpb.LockUpdate,
update roachpb.LockUpdate,
opts MVCCResolveWriteIntentRangeOptions,
) (numKeys, numBytes int64, resumeSpan *roachpb.Span, resumeReason kvpb.ResumeReason, err error) {
keysExceeded := opts.MaxKeys < 0
bytesExceeded := opts.TargetBytes < 0
if keysExceeded || bytesExceeded {
resumeSpan := intent.Span // don't inline or `intent` would escape to heap
resumeSpan := update.Span // don't inline or `update` would escape to heap
if keysExceeded {
resumeReason = kvpb.RESUME_KEY_LIMIT
} else if bytesExceeded {
resumeReason = kvpb.RESUME_BYTE_LIMIT
}
return 0, 0, &resumeSpan, resumeReason, nil
}
ltStart, _ := keys.LockTableSingleKey(intent.Key, nil)
ltEnd, _ := keys.LockTableSingleKey(intent.EndKey, nil)
ltStart, _ := keys.LockTableSingleKey(update.Key, nil)
ltEnd, _ := keys.LockTableSingleKey(update.EndKey, nil)
engineIter := rw.NewEngineIterator(IterOptions{LowerBound: ltStart, UpperBound: ltEnd})
var mvccIter MVCCIterator
iterOpts := IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: intent.Key,
UpperBound: intent.EndKey,
LowerBound: update.Key,
UpperBound: update.EndKey,
}
if rw.ConsistentIterators() {
// Production code should always have consistent iterators.
Expand All @@ -5096,8 +5101,8 @@ func MVCCResolveWriteIntentRange(
// only step the iterator and not seek.
sepIter.seekEngineKeyGE(EngineKey{Key: ltStart})

intentEndKey := intent.EndKey
intent.EndKey = nil
intentEndKey := update.EndKey
update.EndKey = nil

var lastResolvedKey roachpb.Key
for {
Expand Down Expand Up @@ -5126,7 +5131,7 @@ func MVCCResolveWriteIntentRange(
if meta.Txn == nil {
return 0, 0, nil, 0, errors.Errorf("intent with no txn")
}
if intent.Txn.ID != meta.Txn.ID {
if update.Txn.ID != meta.Txn.ID {
// Intent for a different txn, so ignore.
sepIter.nextEngineKey()
continue
Expand All @@ -5139,9 +5144,9 @@ func MVCCResolveWriteIntentRange(
// stability of the key passed to mvccResolveWriteIntent, and for the
// subsequent iteration to construct a resume span.
lastResolvedKey = append(lastResolvedKey[:0], sepIter.UnsafeKey().Key...)
intent.Key = lastResolvedKey
update.Key = lastResolvedKey
beforeBytes := rw.BufferedSize()
ok, err := mvccResolveWriteIntent(ctx, rw, sepIter, ms, intent, putBuf)
ok, err := mvccResolveWriteIntent(ctx, rw, sepIter, ms, update, putBuf)
if err != nil {
log.Warningf(ctx, "failed to resolve intent for key %q: %+v", lastResolvedKey, err)
} else if ok {
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,11 @@ func cmdTxnRestart(e *evalCtx) error {
up := roachpb.NormalUserPriority
tp := enginepb.MinTxnPriority
txn.Restart(up, tp, ts)
if e.hasArg("epoch") {
var epoch int
e.scanArg("epoch", &epoch)
txn.Epoch = enginepb.TxnEpoch(epoch)
}
e.results.txn = txn
return nil
}
Expand Down
Loading