Skip to content

Commit

Permalink
storage: add conflict handling for MVCC range tombstones
Browse files Browse the repository at this point in the history
This patch takes MVCC range tombstones into account for all MVCC write
operations, i.e. for conflict checks and conditional writes.

MVCC stats updates are not covered here, but will be addressed in a
subsequent PR. This also adds more exhaustive tests, especially for the
intent handling in `mvccPutInternal()`.

Release note: None
  • Loading branch information
erikgrinaker committed Jun 15, 2022
1 parent c030b8b commit b6c023e
Show file tree
Hide file tree
Showing 4 changed files with 498 additions and 45 deletions.
172 changes: 133 additions & 39 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,9 +719,9 @@ func newMVCCIterator(
reader Reader, timestamp hlc.Timestamp, rangeKeyMasking bool, opts IterOptions,
) MVCCIterator {
// If reading inline then just return a plain MVCCIterator without intents.
// We also disable range keys, since they're not allowed across inline values.
// However, we allow the caller to enable range keys, since they may be needed
// for conflict checks when writing inline values.
if timestamp.IsEmpty() {
opts.KeyTypes = IterKeyTypePointsOnly
return reader.NewMVCCIterator(MVCCKeyIterKind, opts)
}
// Enable range key masking if requested.
Expand Down Expand Up @@ -840,6 +840,20 @@ func mvccGet(
return value, intent, nil
}

// TODO(erikgrinaker): This is temporary until mvccGet always uses point
// synthesis (which requires read-path optimizations).
func mvccGetWithPointSynthesis(
ctx context.Context,
iter MVCCIterator,
key roachpb.Key,
timestamp hlc.Timestamp,
opts MVCCGetOptions,
) (value optionalValue, intent *roachpb.Intent, err error) {
pointIter := newPointSynthesizingIter(iter, true /* emitOnSeekGE */)
defer pointIter.release() // NB: not Close(), since we don't own iter
return mvccGet(ctx, pointIter, key, timestamp, opts)
}

// MVCCGetAsTxn constructs a temporary transaction from the given transaction
// metadata and calls MVCCGet as that transaction. This method is required
// only for reading intents of a transaction when only its metadata is known
Expand Down Expand Up @@ -874,57 +888,91 @@ func MVCCGetAsTxn(
// that is the usual contribution of the meta key). The value size returned
// will be zero, as there is no stored MVCCMetadata.
// ok is set to true.
// The passed in MVCCMetadata must not be nil.
// The passed in MVCCMetadata must not be nil. Any MVCC range tombstones will be
// treated like point tombstones.
//
// If the supplied iterator is nil, no seek operation is performed. This is
// used by the Blind{Put,ConditionalPut} operations to avoid seeking when the
// metadata is known not to exist. If iterAlreadyPositioned is true, the
// iterator has already been seeked to metaKey, so a wasteful seek can be
// avoided.
// metadata is known not to exist.
func mvccGetMetadata(
iter MVCCIterator, metaKey MVCCKey, iterAlreadyPositioned bool, meta *enginepb.MVCCMetadata,
iter MVCCIterator, metaKey MVCCKey, meta *enginepb.MVCCMetadata,
) (ok bool, keyBytes, valBytes int64, err error) {
if iter == nil {
return false, 0, 0, nil
}
if !iterAlreadyPositioned {
iter.SeekGE(metaKey)
}
iter.SeekGE(metaKey)
if ok, err = iter.Valid(); !ok {
return false, 0, 0, err
}

unsafeKey := iter.UnsafeKey()
if !unsafeKey.Key.Equal(metaKey.Key) {
return false, 0, 0, nil
}

if !unsafeKey.IsValue() {
hasPoint, hasRange := iter.HasPointAndRange()

// Check for existing intent metadata. Intents will be emitted colocated with
// a covering range key when seeking to it, so we don't need to handle range
// keys here.
if hasPoint && !unsafeKey.IsValue() {
if err := iter.ValueProto(meta); err != nil {
return false, 0, 0, err
}
return true, int64(unsafeKey.EncodedSize()),
int64(len(iter.UnsafeValue())), nil
return true, int64(unsafeKey.EncodedSize()), int64(len(iter.UnsafeValue())), nil
}

// Synthesize point key metadata. For values, the size of keys is always
// accounted for as MVCCVersionTimestampSize. The size of the metadata key is
// accounted for separately.
meta.Reset()
meta.KeyBytes = MVCCVersionTimestampSize

// If we land on a (bare) range key, step to look for a colocated point key.
if hasRange && !hasPoint {
rkTimestamp := iter.RangeKeys()[0].RangeKey.Timestamp

iter.Next()
if ok, err = iter.Valid(); err != nil {
return false, 0, 0, err
} else if ok {
// NB: For !ok, hasPoint is already false.
hasPoint, hasRange = iter.HasPointAndRange()
unsafeKey = iter.UnsafeKey()
}
// If only a bare range tombstone was found at the seek key, synthesize
// point tombstone metadata for it.
if !hasPoint || !unsafeKey.Key.Equal(metaKey.Key) {
meta.Deleted = true
meta.Timestamp = rkTimestamp.ToLegacyTimestamp()
return true, int64(encodedMVCCKeyPrefixLength(metaKey.Key)), 0, nil
}
}

// We're now on a point key. Check if it's covered by an MVCC range tombstone,
// and synthesize point tombstone metadata for it in that case.
if hasRange {
if rkTS := iter.RangeKeys()[0].RangeKey.Timestamp; unsafeKey.Timestamp.LessEq(rkTS) {
meta.Deleted = true
meta.Timestamp = rkTS.ToLegacyTimestamp()
return true, int64(encodedMVCCKeyPrefixLength(metaKey.Key)), 0, nil
}
}

// Synthesize metadata for a regular point key.
var unsafeVal MVCCValue
unsafeValRaw := iter.UnsafeValue()
unsafeVal, ok, err := tryDecodeSimpleMVCCValue(unsafeValRaw)
if !ok && err == nil {
if unsafeVal, ok, err = tryDecodeSimpleMVCCValue(unsafeValRaw); !ok && err == nil {
unsafeVal, err = decodeExtendedMVCCValue(unsafeValRaw)
}
if err != nil {
return false, 0, 0, err
}

meta.Reset()
// For values, the size of keys is always accounted for as
// MVCCVersionTimestampSize. The size of the metadata key is
// accounted for separately.
meta.KeyBytes = MVCCVersionTimestampSize
meta.ValBytes = int64(len(unsafeValRaw))
meta.Deleted = unsafeVal.IsTombstone()
meta.Timestamp = unsafeKey.Timestamp.ToLegacyTimestamp()
return true, int64(unsafeKey.EncodedSize()) - meta.KeyBytes, 0, nil

return true, int64(encodedMVCCKeyPrefixLength(metaKey.Key)), 0, nil
}

// putBuffer holds pointer data needed by mvccPutInternal. Bundling
Expand Down Expand Up @@ -1040,7 +1088,10 @@ func MVCCPut(
var iter MVCCIterator
blind := ms == nil && timestamp.IsEmpty()
if !blind {
iter = newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{Prefix: true})
iter = newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Prefix: true,
})
defer iter.Close()
}
return mvccPutUsingIter(ctx, rw, iter, ms, key, timestamp, localTimestamp, value, txn, nil)
Expand Down Expand Up @@ -1084,7 +1135,10 @@ func MVCCDelete(
localTimestamp hlc.ClockTimestamp,
txn *roachpb.Transaction,
) error {
iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{Prefix: true})
iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Prefix: true,
})
defer iter.Close()

return mvccPutUsingIter(ctx, rw, iter, ms, key, timestamp, localTimestamp, noValue, txn, nil)
Expand Down Expand Up @@ -1140,7 +1194,9 @@ func maybeGetValue(
var exVal optionalValue
if exists {
var err error
exVal, _, err = mvccGet(ctx, iter, key, readTimestamp, MVCCGetOptions{Tombstones: true})
exVal, _, err = mvccGetWithPointSynthesis(ctx, iter, key, readTimestamp, MVCCGetOptions{
Tombstones: true,
})
if err != nil {
return roachpb.Value{}, err
}
Expand Down Expand Up @@ -1208,7 +1264,10 @@ func replayTransactionalWrite(
// This is a special case. This is when the intent hasn't made it
// to the intent history yet. We must now assert the value written
// in the intent to the value we're trying to write.
writtenValue, _, err = mvccGet(ctx, iter, key, timestamp, MVCCGetOptions{Txn: txn, Tombstones: true})
writtenValue, _, err = mvccGetWithPointSynthesis(ctx, iter, key, timestamp, MVCCGetOptions{
Txn: txn,
Tombstones: true,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -1260,7 +1319,10 @@ func replayTransactionalWrite(
// last committed value on the key. Since we want the last committed
// value on the key, we must make an inconsistent read so we ignore
// our previous intents here.
exVal, _, err = mvccGet(ctx, iter, key, timestamp, MVCCGetOptions{Inconsistent: true, Tombstones: true})
exVal, _, err = mvccGetWithPointSynthesis(ctx, iter, key, timestamp, MVCCGetOptions{
Inconsistent: true,
Tombstones: true,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -1339,8 +1401,7 @@ func mvccPutInternal(
}

metaKey := MakeMVCCMetadataKey(key)
ok, origMetaKeySize, origMetaValSize, err :=
mvccGetMetadata(iter, metaKey, false /* iterAlreadyPositioned */, &buf.meta)
ok, origMetaKeySize, origMetaValSize, err := mvccGetMetadata(iter, metaKey, &buf.meta)
if err != nil {
return err
}
Expand Down Expand Up @@ -1461,9 +1522,13 @@ func mvccPutInternal(
if !enginepb.TxnSeqIsIgnored(meta.Txn.Sequence, txn.IgnoredSeqNums) {
// Seqnum of last write is not ignored. Retrieve the value.
iter.SeekGE(oldVersionKey)
var hasPoint bool
if valid, err := iter.Valid(); err != nil {
return err
} else if !valid || !iter.UnsafeKey().Equal(oldVersionKey) {
} else if valid {
hasPoint, _ = iter.HasPointAndRange()
}
if !hasPoint || !iter.UnsafeKey().Equal(oldVersionKey) {
return errors.Errorf("existing intent value missing: %s", oldVersionKey)
}

Expand Down Expand Up @@ -1495,7 +1560,10 @@ func mvccPutInternal(
//
// Since we want the last committed value on the key, we must make
// an inconsistent read so we ignore our previous intents here.
exVal, _, err = mvccGet(ctx, iter, key, readTimestamp, MVCCGetOptions{Inconsistent: true, Tombstones: true})
exVal, _, err = mvccGetWithPointSynthesis(ctx, iter, key, readTimestamp, MVCCGetOptions{
Inconsistent: true,
Tombstones: true,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -1524,9 +1592,21 @@ func mvccPutInternal(
// MVCCResolveWriteIntent.
prevKey := oldVersionKey.Next()
iter.SeekGE(prevKey)
if valid, err := iter.Valid(); err != nil {
valid, err := iter.Valid()
if err != nil {
return err
} else if valid && iter.UnsafeKey().Key.Equal(prevKey.Key) {
} else if valid {
// TODO(erikgrinaker): We don't handle MVCC range tombstones in MVCC
// stats yet, so if we land on a bare range key just step onto the
// next point key (if any).
if hasPoint, hasRange := iter.HasPointAndRange(); hasRange && !hasPoint {
iter.Next()
if valid, err = iter.Valid(); err != nil {
return err
}
}
}
if valid && iter.UnsafeKey().Key.Equal(prevKey.Key) {
prevUnsafeKey := iter.UnsafeKey()
if !prevUnsafeKey.IsValue() {
return errors.Errorf("expected an MVCC value key: %s", prevUnsafeKey)
Expand Down Expand Up @@ -1742,7 +1822,10 @@ func MVCCIncrement(
txn *roachpb.Transaction,
inc int64,
) (int64, error) {
iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{Prefix: true})
iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Prefix: true,
})
defer iter.Close()

var int64Val int64
Expand Down Expand Up @@ -1816,7 +1899,10 @@ func MVCCConditionalPut(
allowIfDoesNotExist CPutMissingBehavior,
txn *roachpb.Transaction,
) error {
iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{Prefix: true})
iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Prefix: true,
})
defer iter.Close()

return mvccConditionalPutUsingIter(
Expand Down Expand Up @@ -1898,7 +1984,10 @@ func MVCCInitPut(
failOnTombstones bool,
txn *roachpb.Transaction,
) error {
iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{Prefix: true})
iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Prefix: true,
})
defer iter.Close()
return mvccInitPutUsingIter(ctx, rw, iter, ms, key, timestamp, localTimestamp, value, failOnTombstones, txn)
}
Expand Down Expand Up @@ -1977,6 +2066,9 @@ func (m mvccKeyFormatter) Format(f fmt.State, c rune) {
// concatenates undifferentiated byte slice values, and efficiently
// combines time series observations if the roachpb.Value tag value
// indicates the value byte slice is of type TIMESERIES.
//
// Merges are not really MVCC operations: they operate on inline values with no
// version, and do not check for conflicts with other MVCC versions.
func MVCCMerge(
_ context.Context,
rw ReadWriter,
Expand Down Expand Up @@ -2277,7 +2369,10 @@ func MVCCDeleteRange(

buf := newPutBuffer()
defer buf.release()
iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{Prefix: true})
iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Prefix: true,
})
defer iter.Close()

var keys []roachpb.Key
Expand Down Expand Up @@ -3636,8 +3731,7 @@ func MVCCGarbageCollect(
meta := &enginepb.MVCCMetadata{}
for _, gcKey := range keys {
encKey := MakeMVCCMetadataKey(gcKey.Key)
ok, metaKeySize, metaValSize, err :=
mvccGetMetadata(iter, encKey, false /* iterAlreadyPositioned */, meta)
ok, metaKeySize, metaValSize, err := mvccGetMetadata(iter, encKey, meta)
if err != nil {
return err
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
// txn_step t=<name> [n=<int>]
// txn_advance t=<name> ts=<int>[,<int>]
// txn_status t=<name> status=<txnstatus>
// txn_ignore_seqs t=<name> seqs=[<int>-<int>[,<int>-<int>...]]
//
// resolve_intent t=<name> k=<key> [status=<txnstatus>] [clockWhilePending=<int>[,<int>]]
// check_intent k=<key> [none]
Expand All @@ -65,6 +66,8 @@ import (
// del_range [t=<name>] [ts=<int>[,<int>]] [localTs=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [end=<key>] [max=<max>] [returnKeys]
// del_range_ts [ts=<int>[,<int>]] [localTs=<int>[,<int>]] k=<key> end=<key>
// increment [t=<name>] [ts=<int>[,<int>]] [localTs=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [inc=<val>]
// initput [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> v=<string> [raw] [failOnTombstones]
// merge [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> v=<string> [raw]
// put [t=<name>] [ts=<int>[,<int>]] [localTs=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> v=<string> [raw]
// put_rangekey ts=<int>[,<int>] [localTS=<int>[,<int>]] k=<key> end=<key>
// get [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [inconsistent] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]]
Expand Down Expand Up @@ -488,6 +491,7 @@ var commands = map[string]cmd{
"del_range_ts": {typDataUpdate, cmdDeleteRangeTombstone},
"get": {typReadOnly, cmdGet},
"increment": {typDataUpdate, cmdIncrement},
"initput": {typDataUpdate, cmdInitPut},
"merge": {typDataUpdate, cmdMerge},
"put": {typDataUpdate, cmdPut},
"put_rangekey": {typDataUpdate, cmdPutRangeKey},
Expand Down Expand Up @@ -734,6 +738,27 @@ func cmdCPut(e *evalCtx) error {
})
}

func cmdInitPut(e *evalCtx) error {
txn := e.getTxn(optional)
ts := e.getTs(txn)
localTs := hlc.ClockTimestamp(e.getTsWithName("localTs"))

key := e.getKey()
val := e.getVal()
failOnTombstones := e.hasArg("failOnTombstones")
resolve, resolveStatus := e.getResolve()

return e.withWriter("initput", func(rw ReadWriter) error {
if err := MVCCInitPut(e.ctx, rw, e.ms, key, ts, localTs, val, failOnTombstones, txn); err != nil {
return err
}
if resolve {
return e.resolveIntent(rw, key, txn, resolveStatus, hlc.ClockTimestamp{})
}
return nil
})
}

func cmdDelete(e *evalCtx) error {
txn := e.getTxn(optional)
key := e.getKey()
Expand Down
Loading

0 comments on commit b6c023e

Please sign in to comment.