Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add MVCCRangeKeyStack for range keys #84975

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions docs/tech-notes/mvcc-range-tombstones.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type MVCCRangeKey struct {
}
```

A range key stores an encoded `MVCCValue`, similarly to `MVCCKey`. They are
often paired as an `MVCCRangeKeyValue`:
A range key stores an encoded `MVCCValue`, similarly to `MVCCKey`. They can be
paired as an `MVCCRangeKeyValue`:

```go
type MVCCRangeKeyValue struct {
Expand Down Expand Up @@ -122,12 +122,31 @@ exist between the bounds `[a-d)`, and `c` is within those bounds. The same is
true for `a@5`, even though it is above both MVCC range tombstones. It is up to
the iterator caller to interpret the range keys as appropriate relative to the
point key. It follows that all range keys overlapping a key will be pulled into
memory at once, but we assume that overlapping range keys will be few. More on
MVCC iteration later.
memory at once, but we assume that overlapping range keys will be few.

In the KV API, however, this distinction doesn't really matter: `Get(c)` at
timestamp >= 5 would return nothing, while `Get(b)` would return `b5`. Again,
more on this later.
This is represented as a specialized compact data structure,
`MVCCRangeKeyStack`, where all range keys have the same bounds due to
fragmentation (described below):

```go
type MVCCRangeKeyStack struct {
Bounds roachpb.Span
Versions MVCCRangeKeyVersions
}

type MVCCRangeKeyVersions []MVCCRangeKeyVersion

type MVCCRangeKeyVersion struct {
Timestamp hlc.Timestamp
Value []byte // encoded MVCCValue
}
```

In the KV API, however, the relationship between point keys and range keys
doesn't really matter: `Get(c)` at timestamp >= 5 would simply return nothing,
while `Get(b)` would return `b5`. More on this later.

### Fragmentation

Range keys do not have a stable, discrete identity, and should be considered a
continuum: they may be partially removed or replaced, merged or fragmented by
Expand Down Expand Up @@ -161,7 +180,7 @@ Pebble: `[a-b)@1`, `[b-c)@2`, `[b-c)@1`, and `[c-d)@2`. Similarly, clearing
`[b-d)@2` would merge the remaining keys back into `[a-c)@1`.

This implies that all range keys exposed for a specific key position all have
the same key bounds.
the same key bounds, as shown in `MVCCRangeKeyStack`.

Fragmentation is beneficial because it makes all range key properties local,
which avoids incurring unnecessary access costs across SSTs and CRDB ranges when
Expand Down Expand Up @@ -268,7 +287,7 @@ The properties of point and range keys are accessed via:
* `RangeBounds()`: start and end bounds of range keys overlapping the current
position, if any.
* `RangeKeys()`: all range keys at the current key position (i.e. at all
timestamps), as `[]MVCCRangeKeyValue`.
timestamps), as `MVCCRangeKeyStack`.

During iteration with `IterKeyTypePointsAndRanges`, range keys are emitted at
their start key and at every overlapping point key. Consider a modified
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/backupccl/file_sst_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,9 @@ func (s *fileSSTSink) copyRangeKeys(dataSST []byte) error {
} else if !ok {
break
}
for _, rkv := range iter.RangeKeys() {
if err := s.sst.PutRawMVCCRangeKey(rkv.RangeKey, rkv.Value); err != nil {
rangeKeys := iter.RangeKeys()
for _, v := range rangeKeys.Versions {
if err := s.sst.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), v.Value); err != nil {
return err
}
}
Expand Down
29 changes: 16 additions & 13 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,15 +417,16 @@ func EvalAddSSTable(
} else if !ok {
break
}
for _, rkv := range rangeIter.RangeKeys() {
if err = readWriter.PutRawMVCCRangeKey(rkv.RangeKey, rkv.Value); err != nil {
rangeKeys := rangeIter.RangeKeys()
for _, v := range rangeKeys.Versions {
if err = readWriter.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), v.Value); err != nil {
return result.Result{}, err
}
if sstToReqTS.IsSet() {
readWriter.LogLogicalOp(storage.MVCCDeleteRangeOpType, storage.MVCCLogicalOpDetails{
Key: rkv.RangeKey.StartKey,
EndKey: rkv.RangeKey.EndKey,
Timestamp: rkv.RangeKey.Timestamp,
Key: rangeKeys.Bounds.Key,
EndKey: rangeKeys.Bounds.EndKey,
Timestamp: v.Timestamp,
})
}
}
Expand Down Expand Up @@ -517,26 +518,28 @@ func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.M
break
}

for _, rkv := range iter.RangeKeys() {
if err := rkv.RangeKey.Validate(); err != nil {
rangeKeys := iter.RangeKeys()
for _, v := range rangeKeys.Versions {
rangeKey := rangeKeys.AsRangeKey(v)
if err := rangeKey.Validate(); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "SST contains invalid range key")
}
if sstTimestamp.IsSet() && rkv.RangeKey.Timestamp != sstTimestamp {
if sstTimestamp.IsSet() && v.Timestamp != sstTimestamp {
return errors.AssertionFailedf(
"SST has unexpected timestamp %s (expected %s) for range key %s",
rkv.RangeKey.Timestamp, sstTimestamp, rkv.RangeKey)
v.Timestamp, sstTimestamp, rangeKeys.Bounds)
}
value, err := storage.DecodeMVCCValue(rkv.Value)
value, err := storage.DecodeMVCCValue(v.Value)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
"SST contains invalid range key value for range key %s", rkv.RangeKey)
"SST contains invalid range key value for range key %s", rangeKey)
}
if !value.IsTombstone() {
return errors.AssertionFailedf("SST contains non-tombstone range key %s", rkv.RangeKey)
return errors.AssertionFailedf("SST contains non-tombstone range key %s", rangeKey)
}
if value.MVCCValueHeader != (enginepb.MVCCValueHeader{}) {
return errors.AssertionFailedf("SST contains non-empty MVCC value header for range key %s",
rkv.RangeKey)
rangeKey)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestEvalAddSSTable(t *testing.T) {
toReqTS: 1,
sst: kvs{pointKV("a", 1, "a1"), rangeKV("c", "d", 2, "")},
expectErr: []string{
`unexpected timestamp 0.000000002,0 (expected 0.000000001,0) for range key {c-d}/0.000000002,0`,
`unexpected timestamp 0.000000002,0 (expected 0.000000001,0) for range key {c-d}`,
`key has suffix "\x00\x00\x00\x00\x00\x00\x00\x02\t", expected "\x00\x00\x00\x00\x00\x00\x00\x01\t"`,
},
},
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_clear_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ func computeStatsDelta(
if ok, err := iter.Valid(); err != nil {
return err
} else if ok && iter.RangeBounds().Key.Compare(bound) < 0 {
for i, rkv := range iter.RangeKeys() {
keyBytes := int64(storage.EncodedMVCCTimestampSuffixLength(rkv.RangeKey.Timestamp))
valBytes := int64(len(rkv.Value))
for i, v := range iter.RangeKeys().Versions {
keyBytes := int64(storage.EncodedMVCCTimestampSuffixLength(v.Timestamp))
valBytes := int64(len(v.Value))
if i == 0 {
delta.RangeKeyCount--
keyBytes += 2 * int64(storage.EncodedMVCCKeyPrefixLength(bound))
Expand All @@ -229,7 +229,7 @@ func computeStatsDelta(
delta.RangeValCount--
delta.RangeValBytes -= valBytes
delta.GCBytesAge -= (keyBytes + valBytes) *
(delta.LastUpdateNanos/1e9 - rkv.RangeKey.Timestamp.WallTime/1e9)
(delta.LastUpdateNanos/1e9 - v.Timestamp.WallTime/1e9)
}
}
return nil
Expand Down
15 changes: 8 additions & 7 deletions pkg/kv/kvserver/batcheval/cmd_delete_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ func checkPredicateDeleteRange(t *testing.T, engine storage.Reader, rKeyInfo sto
// PredicateDeleteRange should not have written any delete tombstones;
// therefore, any range key tombstones in the span should have been
// written before the request was issued.
for _, rKey := range iter.RangeKeys() {
require.Equal(t, true, rKey.RangeKey.Timestamp.Less(rKeyInfo.Timestamp))
for _, v := range iter.RangeKeys().Versions {
require.True(t, v.Timestamp.Less(rKeyInfo.Timestamp))
}
continue
}
Expand Down Expand Up @@ -337,13 +337,14 @@ func checkDeleteRangeTombstone(
break
}
require.True(t, ok)
for _, rkv := range iter.RangeKeys() {
if rkv.RangeKey.Timestamp.Equal(rangeKey.Timestamp) {
rangeKeys := iter.RangeKeys()
for _, v := range rangeKeys.Versions {
if v.Timestamp.Equal(rangeKey.Timestamp) {
if len(seen.RangeKey.StartKey) == 0 {
seen = rkv.Clone()
seen = rangeKeys.AsRangeKeyValue(v).Clone()
} else {
seen.RangeKey.EndKey = rkv.RangeKey.EndKey.Clone()
require.Equal(t, seen.Value, rkv.Value)
seen.RangeKey.EndKey = rangeKeys.Bounds.EndKey.Clone()
require.Equal(t, seen.Value, v.Value)
}
break
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,17 +1315,17 @@ func computeSplitRangeKeyStatsDelta(
// contribution of the range key fragmentation. The naïve calculation would be
// rhs.EncodedSize() - (keyLen(rhs.EndKey) - keyLen(lhs.EndKey))
// which simplifies to 2 * keyLen(rhs.StartKey) + tsLen(rhs.Timestamp).
for i, rkv := range iter.RangeKeys() {
keyBytes := int64(storage.EncodedMVCCTimestampSuffixLength(rkv.RangeKey.Timestamp))
valBytes := int64(len(rkv.Value))
for i, v := range iter.RangeKeys().Versions {
keyBytes := int64(storage.EncodedMVCCTimestampSuffixLength(v.Timestamp))
valBytes := int64(len(v.Value))
if i == 0 {
delta.RangeKeyCount++
keyBytes += 2 * int64(storage.EncodedMVCCKeyPrefixLength(splitKey))
}
delta.RangeKeyBytes += keyBytes
delta.RangeValCount++
delta.RangeValBytes += valBytes
delta.GCBytesAge += (keyBytes + valBytes) * (nowNanos/1e9 - rkv.RangeKey.Timestamp.WallTime/1e9)
delta.GCBytesAge += (keyBytes + valBytes) * (nowNanos/1e9 - v.Timestamp.WallTime/1e9)
}

return delta, nil
Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvserver/batcheval/cmd_refresh_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,8 @@ func refreshRange(
key := iter.UnsafeKey().Clone()

if _, hasRange := iter.HasPointAndRange(); hasRange {
rangeKVs := iter.RangeKeys()
if len(rangeKVs) == 0 { // defensive
return errors.Errorf("expected range key at %s not found", key)
}
return roachpb.NewRefreshFailedError(roachpb.RefreshFailedError_REASON_COMMITTED_VALUE,
key.Key, rangeKVs[0].RangeKey.Timestamp)
key.Key, iter.RangeKeys().Versions[0].Timestamp)
}

if !key.IsValue() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,8 @@ func processReplicatedRangeTombstones(
if !ok {
break
}
rangeKeys := iter.RangeKeys()
// TODO(erikgrinaker): Rewrite to use MVCCRangeKeyStack.
rangeKeys := iter.RangeKeys().AsRangeKeyValues()

if idx := sort.Search(len(rangeKeys), func(i int) bool {
return rangeKeys[i].RangeKey.Timestamp.LessEq(gcThreshold)
Expand Down
13 changes: 5 additions & 8 deletions pkg/kv/kvserver/gc/gc_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
package gc

import (
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
Expand Down Expand Up @@ -193,15 +192,13 @@ func (it *gcIterator) currentRangeTS() hlc.Timestamp {
if rangeTombstoneStartKey.Equal(it.cachedRangeTombstoneKey) {
return it.cachedRangeTombstoneTS
}
it.cachedRangeTombstoneKey = append(it.cachedRangeTombstoneKey[:0], rangeTombstoneStartKey...)

it.cachedRangeTombstoneTS = hlc.Timestamp{}
rangeKeys := it.it.RangeKeys()
if idx := sort.Search(len(rangeKeys), func(i int) bool {
return rangeKeys[i].RangeKey.Timestamp.LessEq(it.threshold)
}); idx < len(rangeKeys) {
it.cachedRangeTombstoneTS = rangeKeys[idx].RangeKey.Timestamp
if v, ok := it.it.RangeKeys().FirstBelow(it.threshold); ok {
it.cachedRangeTombstoneTS = v.Timestamp
} else {
it.cachedRangeTombstoneTS = hlc.Timestamp{}
}
it.cachedRangeTombstoneKey = append(it.cachedRangeTombstoneKey[:0], rangeTombstoneStartKey...)
return it.cachedRangeTombstoneTS
}

Expand Down
17 changes: 6 additions & 11 deletions pkg/kv/kvserver/gc/gc_random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,12 +466,12 @@ func getExpectationsGenerator(
// so we will add them to history of current key for analysis.
// Bare range tombstones are ignored.
if r {
for _, r := range it.RangeKeys() {
for _, r := range it.RangeKeys().AsRangeKeys() {
history = append(history, historyItem{
MVCCKeyValue: storage.MVCCKeyValue{
Key: storage.MVCCKey{
Key: r.RangeKey.StartKey,
Timestamp: r.RangeKey.Timestamp,
Key: r.StartKey,
Timestamp: r.Timestamp,
},
Value: nil,
},
Expand Down Expand Up @@ -603,7 +603,7 @@ func getKeyHistory(t *testing.T, r storage.Reader, key roachpb.Key) string {
break
}
if r && len(result) == 0 {
for _, rk := range it.RangeKeys() {
for _, rk := range it.RangeKeys().AsRangeKeyValues() {
result = append(result, fmt.Sprintf("R:%s", rk.RangeKey.String()))
}
}
Expand All @@ -622,13 +622,8 @@ func rangeFragmentsFromIt(t *testing.T, it storage.MVCCIterator) [][]storage.MVC
if !ok {
break
}
_, r := it.HasPointAndRange()
if r {
fragments := make([]storage.MVCCRangeKeyValue, len(it.RangeKeys()))
for i, r := range it.RangeKeys() {
fragments[i] = r.Clone()
}
result = append(result, fragments)
if _, hasRange := it.HasPointAndRange(); hasRange {
result = append(result, it.RangeKeys().Clone().AsRangeKeyValues())
}
it.NextKey()
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/gc/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,20 +1084,20 @@ func engineData(t *testing.T, r storage.Reader, desc roachpb.RangeDescriptor) []
_, r := rangeIt.HasPointAndRange()
if r {
span := rangeIt.RangeBounds()
newKeys := rangeIt.RangeKeys()
newKeys := rangeIt.RangeKeys().AsRangeKeys()
if lastEnd.Equal(span.Key) {
// Try merging keys by timestamp.
var newPartial []storage.MVCCRangeKey
i, j := 0, 0
for i < len(newKeys) && j < len(partialRangeKeys) {
switch newKeys[i].RangeKey.Timestamp.Compare(partialRangeKeys[j].Timestamp) {
switch newKeys[i].Timestamp.Compare(partialRangeKeys[j].Timestamp) {
case 1:
newPartial = append(newPartial, newKeys[i].RangeKey.Clone())
newPartial = append(newPartial, newKeys[i].Clone())
i++
case 0:
newPartial = append(newPartial, storage.MVCCRangeKey{
StartKey: partialRangeKeys[j].StartKey,
EndKey: newKeys[i].RangeKey.EndKey.Clone(),
EndKey: newKeys[i].EndKey.Clone(),
Timestamp: partialRangeKeys[j].Timestamp,
})
i++
Expand All @@ -1108,7 +1108,7 @@ func engineData(t *testing.T, r storage.Reader, desc roachpb.RangeDescriptor) []
}
}
for ; i < len(newKeys); i++ {
newPartial = append(newPartial, newKeys[i].RangeKey.Clone())
newPartial = append(newPartial, newKeys[i].Clone())
}
for ; j < len(partialRangeKeys); j++ {
newPartial = append(newPartial, partialRangeKeys[j].Clone())
Expand All @@ -1118,7 +1118,7 @@ func engineData(t *testing.T, r storage.Reader, desc roachpb.RangeDescriptor) []
result = append(result, makeRangeCells(partialRangeKeys)...)
partialRangeKeys = make([]storage.MVCCRangeKey, len(newKeys))
for i, rk := range newKeys {
partialRangeKeys[i] = rk.RangeKey.Clone()
partialRangeKeys[i] = rk.Clone()
}
}
lastEnd = span.EndKey.Clone()
Expand Down
9 changes: 4 additions & 5 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,15 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err
rangeKeysStart = append(rangeKeysStart[:0], rangeBounds.Key...)

// Emit events for these MVCC range tombstones, in chronological order.
rangeKeys := i.RangeKeys()
for i := len(rangeKeys) - 1; i >= 0; i-- {
versions := i.RangeKeys().Versions
for i := len(versions) - 1; i >= 0; i-- {
var span roachpb.Span
a, span.Key = a.Copy(rangeBounds.Key, 0)
a, span.EndKey = a.Copy(rangeBounds.EndKey, 0)
err := outputFn(&roachpb.RangeFeedEvent{
DeleteRange: &roachpb.RangeFeedDeleteRange{
Span: span,
Timestamp: rangeKeys[i].RangeKey.Timestamp,
Timestamp: versions[i].Timestamp,
},
})
if err != nil {
Expand Down Expand Up @@ -271,8 +271,7 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err
// to rangeKeysStart above, because NextIgnoringTime() could reveal
// additional MVCC range tombstones below StartTime that cover this
// point. We need to find a more performant way to handle this.
if !hasRange || !storage.HasRangeKeyBetween(
i.RangeKeys(), reorderBuf[l].Val.Value.Timestamp, ts) {
if !hasRange || !i.RangeKeys().HasBetween(ts, reorderBuf[l].Val.Value.Timestamp) {
// TODO(sumeer): find out if it is deliberate that we are not populating
// PrevValue.Timestamp.
reorderBuf[l].Val.PrevValue.RawBytes = val
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ func (s *testIterator) RangeBounds() roachpb.Span {
return roachpb.Span{}
}

// RangeTombstones implements SimpleMVCCIterator.
func (s *testIterator) RangeKeys() []storage.MVCCRangeKeyValue {
return []storage.MVCCRangeKeyValue{}
// RangeKeys implements SimpleMVCCIterator.
func (s *testIterator) RangeKeys() storage.MVCCRangeKeyStack {
return storage.MVCCRangeKeyStack{}
}

func TestInitResolvedTSScan(t *testing.T) {
Expand Down
Loading