Skip to content

Commit

Permalink
storage: store encoded MVCCValues in SequencedIntents
Browse files Browse the repository at this point in the history
This commit switches from storing encoded `roachpb.Value`s to storing
encoded `storage.MVCCValue`s in `MVCCMetadata`'s `SequencedIntent`s. Doing
so ensures that MVCCValue headers are not lost when an intent is rolled
back. This is important to avoid losing the local timestamp of values in a
key's intent history. Failure to do so could allow for stale reads.
  • Loading branch information
nvanbenschoten committed May 9, 2022
1 parent 1bcf950 commit a3e608d
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 75 deletions.
1 change: 0 additions & 1 deletion pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2489,7 +2489,6 @@ var _ = (SequencedWriteBySeq{}).Find
func init() {
// Inject the format dependency into the enginepb package.
enginepb.FormatBytesAsKey = func(k []byte) string { return Key(k).String() }
enginepb.FormatBytesAsValue = func(v []byte) string { return Value{RawBytes: v}.PrettyPrint() }
}

// SafeValue implements the redact.SafeValue interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/enginepb/mvcc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ message MVCCMetadata {
// the IntentHistory.
optional int32 sequence = 1 [(gogoproto.nullable) = false, (gogoproto.casttype) = "TxnSeq"];
// Value is the value written to the key as part of the transaction at
// the above Sequence. Value uses the roachpb.Value encoding.
// the above Sequence. Value uses the storage.MVCCValue encoding.
optional bytes value = 2;
}

Expand Down
112 changes: 57 additions & 55 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,24 +1171,27 @@ func replayTransactionalWrite(
txn *roachpb.Transaction,
valueFn func(optionalValue) (roachpb.Value, error),
) error {
var found bool
var writtenValue []byte
var writtenValue optionalValue
var err error
if txn.Sequence == meta.Txn.Sequence {
// This is a special case. This is when the intent hasn't made it
// to the intent history yet. We must now assert the value written
// in the intent to the value we're trying to write.
exVal, _, err := mvccGet(ctx, iter, key, timestamp, MVCCGetOptions{Txn: txn, Tombstones: true})
writtenValue, _, err = mvccGet(ctx, iter, key, timestamp, MVCCGetOptions{Txn: txn, Tombstones: true})
if err != nil {
return err
}
writtenValue = exVal.RawBytes
found = true
} else {
// Get the value from the intent history.
writtenValue, found = meta.GetIntentValue(txn.Sequence)
if intentValRaw, ok := meta.GetIntentValue(txn.Sequence); ok {
intentVal, err := DecodeMVCCValue(intentValRaw)
if err != nil {
return err
}
writtenValue = makeOptionalValue(intentVal.Value)
}
}
if !found {
if !writtenValue.exists {
// NB: This error may be due to a batched `DelRange` operation that, upon being replayed, finds a new key to delete.
// See issue #71236 for more explanation.
err := errors.AssertionFailedf("transaction %s with sequence %d missing an intent with lower sequence %d",
Expand All @@ -1215,9 +1218,11 @@ func replayTransactionalWrite(
// If the previous value was found in the IntentHistory,
// simply apply the value function to the historic value
// to get the would-be value.
prevVal := prevIntent.Value

exVal = makeOptionalValue(roachpb.Value{RawBytes: prevVal})
prevIntentVal, err := DecodeMVCCValue(prevIntent.Value)
if err != nil {
return err
}
exVal = makeOptionalValue(prevIntentVal.Value)
} else {
// If the previous value at the key wasn't written by this
// transaction, or it was hidden by a rolled back seqnum, we look at
Expand All @@ -1239,9 +1244,9 @@ func replayTransactionalWrite(
// To ensure the transaction is idempotent, we must assert that the
// calculated value on this replay is the same as the one we've previously
// written.
if !bytes.Equal(value.RawBytes, writtenValue) {
if !bytes.Equal(value.RawBytes, writtenValue.RawBytes) {
return errors.AssertionFailedf("transaction %s with sequence %d has a different value %+v after recomputing from what was written: %+v",
txn.ID, txn.Sequence, value.RawBytes, writtenValue)
txn.ID, txn.Sequence, value.RawBytes, writtenValue.RawBytes)
}
return nil
}
Expand Down Expand Up @@ -1402,6 +1407,8 @@ func mvccPutInternal(

// We're overwriting the intent that was present at this key, before we do
// that though - we must record the older value in the IntentHistory.
oldVersionKey := metaKey
oldVersionKey.Timestamp = metaTimestamp

// But where to find the older value? There are 4 cases:
// - last write inside txn, same epoch, seqnum of last write is not
Expand All @@ -1422,23 +1429,36 @@ func mvccPutInternal(
// rolled back, either due to transaction retries or transaction savepoint
// rollbacks.)
var exVal optionalValue
// Set to true when the current provisional value is not ignored due to
// a txn restart or a savepoint rollback.
var curProvNotIgnored bool
// Set when the current provisional value is not ignored due to a txn
// restart or a savepoint rollback. Represents an encoded MVCCValue.
var curProvValRaw []byte
if txn.Epoch == meta.Txn.Epoch /* last write inside txn */ {
if !enginepb.TxnSeqIsIgnored(meta.Txn.Sequence, txn.IgnoredSeqNums) {
// Seqnum of last write is not ignored. Retrieve the value
// using a consistent read.
exVal, _, err = mvccGet(ctx, iter, key, readTimestamp, MVCCGetOptions{Txn: txn, Tombstones: true})
// Seqnum of last write is not ignored. Retrieve the value.
iter.SeekGE(oldVersionKey)
if valid, err := iter.Valid(); err != nil {
return err
} else if !valid && !iter.UnsafeKey().Equal(oldVersionKey) {
return errors.Errorf("existing intent value missing: %s", oldVersionKey)
}

// NOTE: we use Value instead of UnsafeValue so that we can move the
// iterator below without invalidating this byte slice.
curProvValRaw = iter.Value()
curIntentVal, err := DecodeMVCCValue(curProvValRaw)
if err != nil {
return err
}
curProvNotIgnored = true
exVal = makeOptionalValue(curIntentVal.Value)
} else {
// Seqnum of last write was ignored. Try retrieving the value from the history.
prevIntent, prevValueWritten := meta.GetPrevIntentSeq(txn.Sequence, txn.IgnoredSeqNums)
if prevValueWritten {
exVal = makeOptionalValue(roachpb.Value{RawBytes: prevIntent.Value})
prevIntent, prevIntentOk := meta.GetPrevIntentSeq(txn.Sequence, txn.IgnoredSeqNums)
if prevIntentOk {
prevIntentVal, err := DecodeMVCCValue(prevIntent.Value)
if err != nil {
return err
}
exVal = makeOptionalValue(prevIntentVal.Value)
}
}
}
Expand Down Expand Up @@ -1472,33 +1492,33 @@ func mvccPutInternal(
// delete the old intent, taking care with MVCC stats.
logicalOp = MVCCUpdateIntentOpType
if metaTimestamp.Less(writeTimestamp) {
versionKey := metaKey
versionKey.Timestamp = metaTimestamp

{
// If the older write intent has a version underneath it, we need to
// read its size because its GCBytesAge contribution may change as we
// move the intent above it. A similar phenomenon occurs in
// MVCCResolveWriteIntent.
_, prevUnsafeVal, haveNextVersion, err := unsafeNextVersion(iter, versionKey)
if err != nil {
prevKey := oldVersionKey.Next()
iter.SeekGE(prevKey)
if valid, err := iter.Valid(); err != nil {
return err
}
if haveNextVersion {
prevVal, ok, err := tryDecodeSimpleMVCCValue(prevUnsafeVal)
if !ok && err == nil {
prevVal, err = decodeExtendedMVCCValue(prevUnsafeVal)
} else if valid && iter.UnsafeKey().Key.Equal(prevKey.Key) {
prevUnsafeKey := iter.UnsafeKey()
if !prevUnsafeKey.IsValue() {
return errors.Errorf("expected an MVCC value key: %s", prevUnsafeKey)
}

prevValRaw := iter.UnsafeValue()
prevVal, err := DecodeMVCCValue(prevValRaw)
if err != nil {
return err
}
prevIsValue = prevVal.Value.IsPresent()
prevValSize = int64(len(prevUnsafeVal))
prevValSize = int64(len(prevValRaw))
}
iter = nil // prevent accidental use below
}

if err := writer.ClearMVCC(versionKey); err != nil {
if err := writer.ClearMVCC(oldVersionKey); err != nil {
return err
}
} else if writeTimestamp.Less(metaTimestamp) {
Expand Down Expand Up @@ -1529,10 +1549,10 @@ func mvccPutInternal(
// history if the current sequence number is not ignored. There's no
// reason to add past committed values or a value already in the intent
// history back into it.
if curProvNotIgnored {
prevIntentValBytes := exVal.RawBytes
if curProvValRaw != nil {
prevIntentValRaw := curProvValRaw
prevIntentSequence := meta.Txn.Sequence
buf.newMeta.AddToIntentHistory(prevIntentSequence, prevIntentValBytes)
buf.newMeta.AddToIntentHistory(prevIntentSequence, prevIntentValRaw)
}
} else {
buf.newMeta.IntentHistory = nil
Expand Down Expand Up @@ -2676,24 +2696,6 @@ func MVCCResolveWriteIntent(
return ok, err
}

// unsafeNextVersion positions the iterator at the successor to latestKey. If this value
// exists and is a version of the same key, returns the UnsafeKey() and UnsafeValue() of that
// key-value pair along with `true`.
func unsafeNextVersion(iter MVCCIterator, latestKey MVCCKey) (MVCCKey, []byte, bool, error) {
// Compute the next possible mvcc value for this key.
nextKey := latestKey.Next()
iter.SeekGE(nextKey)

if ok, err := iter.Valid(); err != nil || !ok || !iter.UnsafeKey().Key.Equal(latestKey.Key) {
return MVCCKey{}, nil, false /* never ok */, err
}
unsafeKey := iter.UnsafeKey()
if !unsafeKey.IsValue() {
return MVCCKey{}, nil, false, errors.Errorf("expected an MVCC value key: %s", unsafeKey)
}
return unsafeKey, iter.UnsafeValue(), true, nil
}

// iterForKeyVersions provides a subset of the functionality of MVCCIterator.
// The expected use-case is when the iter is already positioned at the intent
// (if one exists) for a particular key, or some version, and positioning
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/mvcc_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package storage

import (
"encoding/binary"
"fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -243,3 +244,14 @@ func decodeExtendedMVCCValue(buf []byte) (MVCCValue, error) {
v.Value.RawBytes = buf[headerSize:]
return v, nil
}

func init() {
// Inject the format dependency into the enginepb package.
enginepb.FormatBytesAsValue = func(v []byte) string {
val, err := DecodeMVCCValue(v)
if err != nil {
return fmt.Sprintf("err=%v", err)
}
return val.String()
}
}
8 changes: 6 additions & 2 deletions pkg/storage/pebble_mvcc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ func (p *pebbleMVCCScanner) getAndAdvance(ctx context.Context) bool {
// numbers) that we should read. If there exists a value in the intent
// history that has a sequence number equal to or less than the read
// sequence, read that value.
if value, found := p.getFromIntentHistory(); found {
if intentValueRaw, found := p.getFromIntentHistory(); found {
// If we're adding a value due to a previous intent, we want to populate
// the timestamp as of current metaTimestamp. Note that this may be
// controversial as this maybe be neither the write timestamp when this
Expand All @@ -751,7 +751,11 @@ func (p *pebbleMVCCScanner) getAndAdvance(ctx context.Context) bool {
// addAndAdvance to take an MVCCKey explicitly.
p.curUnsafeKey.Timestamp = metaTS
p.keyBuf = EncodeMVCCKeyToBuf(p.keyBuf[:0], p.curUnsafeKey)
return p.addAndAdvance(ctx, p.curUnsafeKey.Key, p.keyBuf, value)
p.curUnsafeValue, p.err = DecodeMVCCValue(intentValueRaw)
if p.err != nil {
return false
}
return p.addAndAdvance(ctx, p.curUnsafeKey.Key, p.keyBuf, p.curUnsafeValue.Value.RawBytes)
}
// 13. If no value in the intent history has a sequence number equal to
// or less than the read, we must ignore the intents laid down by the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Perform some writes at various sequence numbers with local timestamps.

run ok
with t=A
txn_begin ts=50
txn_step seq=15
put k=k v=a localTs=15,0
txn_step seq=20
put k=k v=b localTs=20,0
txn_step seq=25
put k=k v=c localTs=25,0
----
>> at end:
txn: "A" meta={id=00000000 key=/Min pri=0.00000000 epo=0 ts=50.000000000,0 min=0,0 seq=25} lock=true stat=PENDING rts=50.000000000,0 wto=false gul=0,0
meta: "k"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=50.000000000,0 min=0,0 seq=25} ts=50.000000000,0 del=false klen=12 vlen=19 ih={{15 vheader{ localTs=15.000000000,0 } /BYTES/a}{20 vheader{ localTs=20.000000000,0 } /BYTES/b}} mergeTs=<nil> txnDidNotUpdateMeta=false
data: "k"/50.000000000,0 -> vheader{ localTs=25.000000000,0 } /BYTES/c

# Rollback to a previous sequence number. Should be able to read before and
# after resolving the intent.

run ok
with t=A
txn_ignore_seqs seqs=(25-25)
get k=k
resolve_intent k=k status=PENDING
get k=k
----
get: "k" -> /BYTES/b @50.000000000,0
get: "k" -> /BYTES/b @50.000000000,0
>> at end:
txn: "A" meta={id=00000000 key=/Min pri=0.00000000 epo=0 ts=50.000000000,0 min=0,0 seq=25} lock=true stat=PENDING rts=50.000000000,0 wto=false gul=0,0 isn=1
meta: "k"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=50.000000000,0 min=0,0 seq=20} ts=50.000000000,0 del=false klen=12 vlen=19 ih={{15 vheader{ localTs=15.000000000,0 } /BYTES/a}} mergeTs=<nil> txnDidNotUpdateMeta=false
data: "k"/50.000000000,0 -> vheader{ localTs=20.000000000,0 } /BYTES/b

# Rollback and commit at a previous sequence number. Committed value should have
# original local timestamp. This is important to avoid losing the local timestamp
# of values in a key's intent history.

run ok
with t=A
txn_ignore_seqs seqs=(20-20)
resolve_intent k=k status=COMMITTED
----
>> at end:
txn: "A" meta={id=00000000 key=/Min pri=0.00000000 epo=0 ts=50.000000000,0 min=0,0 seq=25} lock=true stat=PENDING rts=50.000000000,0 wto=false gul=0,0 isn=1
data: "k"/50.000000000,0 -> vheader{ localTs=15.000000000,0 } /BYTES/a
Loading

0 comments on commit a3e608d

Please sign in to comment.