Skip to content

Commit

Permalink
*: migrate higher-level code to MVCCRangeKeyStack
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
erikgrinaker committed Jul 29, 2022
1 parent 5fdec92 commit 4800f7c
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 47 deletions.
5 changes: 3 additions & 2 deletions pkg/ccl/backupccl/file_sst_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,9 @@ func (s *fileSSTSink) copyRangeKeys(dataSST []byte) error {
} else if !ok {
break
}
for _, rkv := range iter.RangeKeys().AsRangeKeyValues() {
if err := s.sst.PutRawMVCCRangeKey(rkv.RangeKey, rkv.Value); err != nil {
rangeKeys := iter.RangeKeys()
for _, v := range rangeKeys.Versions {
if err := s.sst.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), v.Value); err != nil {
return err
}
}
Expand Down
29 changes: 16 additions & 13 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,15 +417,16 @@ func EvalAddSSTable(
} else if !ok {
break
}
for _, rkv := range rangeIter.RangeKeys().AsRangeKeyValues() {
if err = readWriter.PutRawMVCCRangeKey(rkv.RangeKey, rkv.Value); err != nil {
rangeKeys := rangeIter.RangeKeys()
for _, v := range rangeKeys.Versions {
if err = readWriter.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), v.Value); err != nil {
return result.Result{}, err
}
if sstToReqTS.IsSet() {
readWriter.LogLogicalOp(storage.MVCCDeleteRangeOpType, storage.MVCCLogicalOpDetails{
Key: rkv.RangeKey.StartKey,
EndKey: rkv.RangeKey.EndKey,
Timestamp: rkv.RangeKey.Timestamp,
Key: rangeKeys.Bounds.Key,
EndKey: rangeKeys.Bounds.EndKey,
Timestamp: v.Timestamp,
})
}
}
Expand Down Expand Up @@ -517,26 +518,28 @@ func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.M
break
}

for _, rkv := range iter.RangeKeys().AsRangeKeyValues() {
if err := rkv.RangeKey.Validate(); err != nil {
rangeKeys := iter.RangeKeys()
for _, v := range rangeKeys.Versions {
rangeKey := rangeKeys.AsRangeKey(v)
if err := rangeKey.Validate(); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "SST contains invalid range key")
}
if sstTimestamp.IsSet() && rkv.RangeKey.Timestamp != sstTimestamp {
if sstTimestamp.IsSet() && v.Timestamp != sstTimestamp {
return errors.AssertionFailedf(
"SST has unexpected timestamp %s (expected %s) for range key %s",
rkv.RangeKey.Timestamp, sstTimestamp, rkv.RangeKey)
v.Timestamp, sstTimestamp, rangeKeys.Bounds)
}
value, err := storage.DecodeMVCCValue(rkv.Value)
value, err := storage.DecodeMVCCValue(v.Value)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
"SST contains invalid range key value for range key %s", rkv.RangeKey)
"SST contains invalid range key value for range key %s", rangeKey)
}
if !value.IsTombstone() {
return errors.AssertionFailedf("SST contains non-tombstone range key %s", rkv.RangeKey)
return errors.AssertionFailedf("SST contains non-tombstone range key %s", rangeKey)
}
if value.MVCCValueHeader != (enginepb.MVCCValueHeader{}) {
return errors.AssertionFailedf("SST contains non-empty MVCC value header for range key %s",
rkv.RangeKey)
rangeKey)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_clear_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ func computeStatsDelta(
if ok, err := iter.Valid(); err != nil {
return err
} else if ok && iter.RangeBounds().Key.Compare(bound) < 0 {
for i, rkv := range iter.RangeKeys().AsRangeKeyValues() {
keyBytes := int64(storage.EncodedMVCCTimestampSuffixLength(rkv.RangeKey.Timestamp))
valBytes := int64(len(rkv.Value))
for i, v := range iter.RangeKeys().Versions {
keyBytes := int64(storage.EncodedMVCCTimestampSuffixLength(v.Timestamp))
valBytes := int64(len(v.Value))
if i == 0 {
delta.RangeKeyCount--
keyBytes += 2 * int64(storage.EncodedMVCCKeyPrefixLength(bound))
Expand All @@ -229,7 +229,7 @@ func computeStatsDelta(
delta.RangeValCount--
delta.RangeValBytes -= valBytes
delta.GCBytesAge -= (keyBytes + valBytes) *
(delta.LastUpdateNanos/1e9 - rkv.RangeKey.Timestamp.WallTime/1e9)
(delta.LastUpdateNanos/1e9 - v.Timestamp.WallTime/1e9)
}
}
return nil
Expand Down
15 changes: 8 additions & 7 deletions pkg/kv/kvserver/batcheval/cmd_delete_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ func checkPredicateDeleteRange(t *testing.T, engine storage.Reader, rKeyInfo sto
// PredicateDeleteRange should not have written any delete tombstones;
// therefore, any range key tombstones in the span should have been
// written before the request was issued.
for _, rKey := range iter.RangeKeys().AsRangeKeyValues() {
require.Equal(t, true, rKey.RangeKey.Timestamp.Less(rKeyInfo.Timestamp))
for _, v := range iter.RangeKeys().Versions {
require.True(t, v.Timestamp.Less(rKeyInfo.Timestamp))
}
continue
}
Expand Down Expand Up @@ -337,13 +337,14 @@ func checkDeleteRangeTombstone(
break
}
require.True(t, ok)
for _, rkv := range iter.RangeKeys().AsRangeKeyValues() {
if rkv.RangeKey.Timestamp.Equal(rangeKey.Timestamp) {
rangeKeys := iter.RangeKeys()
for _, v := range rangeKeys.Versions {
if v.Timestamp.Equal(rangeKey.Timestamp) {
if len(seen.RangeKey.StartKey) == 0 {
seen = rkv.Clone()
seen = rangeKeys.AsRangeKeyValue(v).Clone()
} else {
seen.RangeKey.EndKey = rkv.RangeKey.EndKey.Clone()
require.Equal(t, seen.Value, rkv.Value)
seen.RangeKey.EndKey = rangeKeys.Bounds.EndKey.Clone()
require.Equal(t, seen.Value, v.Value)
}
break
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,17 +1315,17 @@ func computeSplitRangeKeyStatsDelta(
// contribution of the range key fragmentation. The naïve calculation would be
// rhs.EncodedSize() - (keyLen(rhs.EndKey) - keyLen(lhs.EndKey))
// which simplifies to 2 * keyLen(rhs.StartKey) + tsLen(rhs.Timestamp).
for i, rkv := range iter.RangeKeys().AsRangeKeyValues() {
keyBytes := int64(storage.EncodedMVCCTimestampSuffixLength(rkv.RangeKey.Timestamp))
valBytes := int64(len(rkv.Value))
for i, v := range iter.RangeKeys().Versions {
keyBytes := int64(storage.EncodedMVCCTimestampSuffixLength(v.Timestamp))
valBytes := int64(len(v.Value))
if i == 0 {
delta.RangeKeyCount++
keyBytes += 2 * int64(storage.EncodedMVCCKeyPrefixLength(splitKey))
}
delta.RangeKeyBytes += keyBytes
delta.RangeValCount++
delta.RangeValBytes += valBytes
delta.GCBytesAge += (keyBytes + valBytes) * (nowNanos/1e9 - rkv.RangeKey.Timestamp.WallTime/1e9)
delta.GCBytesAge += (keyBytes + valBytes) * (nowNanos/1e9 - v.Timestamp.WallTime/1e9)
}

return delta, nil
Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvserver/batcheval/cmd_refresh_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,8 @@ func refreshRange(
key := iter.UnsafeKey().Clone()

if _, hasRange := iter.HasPointAndRange(); hasRange {
rangeKVs := iter.RangeKeys().AsRangeKeyValues()
if len(rangeKVs) == 0 { // defensive
return errors.Errorf("expected range key at %s not found", key)
}
return roachpb.NewRefreshFailedError(roachpb.RefreshFailedError_REASON_COMMITTED_VALUE,
key.Key, rangeKVs[0].RangeKey.Timestamp)
key.Key, iter.RangeKeys().Versions[0].Timestamp)
}

if !key.IsValue() {
Expand Down
9 changes: 4 additions & 5 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,15 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err
rangeKeysStart = append(rangeKeysStart[:0], rangeBounds.Key...)

// Emit events for these MVCC range tombstones, in chronological order.
rangeKeys := i.RangeKeys().AsRangeKeyValues()
for i := len(rangeKeys) - 1; i >= 0; i-- {
versions := i.RangeKeys().Versions
for i := len(versions) - 1; i >= 0; i-- {
var span roachpb.Span
a, span.Key = a.Copy(rangeBounds.Key, 0)
a, span.EndKey = a.Copy(rangeBounds.EndKey, 0)
err := outputFn(&roachpb.RangeFeedEvent{
DeleteRange: &roachpb.RangeFeedDeleteRange{
Span: span,
Timestamp: rangeKeys[i].RangeKey.Timestamp,
Timestamp: versions[i].Timestamp,
},
})
if err != nil {
Expand Down Expand Up @@ -271,8 +271,7 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err
// to rangeKeysStart above, because NextIgnoringTime() could reveal
// additional MVCC range tombstones below StartTime that cover this
// point. We need to find a more performant way to handle this.
if !hasRange || !storage.HasRangeKeyBetween(
i.RangeKeys().AsRangeKeyValues(), reorderBuf[l].Val.Value.Timestamp, ts) {
if !hasRange || !i.RangeKeys().HasBetween(ts, reorderBuf[l].Val.Value.Timestamp) {
// TODO(sumeer): find out if it is deliberate that we are not populating
// PrevValue.Timestamp.
reorderBuf[l].Val.PrevValue.RawBytes = val
Expand Down
15 changes: 8 additions & 7 deletions pkg/kv/kvserver/rditer/replica_data_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,20 @@ func verifyRDReplicatedOnlyMVCCIter(
}
}
if r {
rks := iter.RangeKeys().AsRangeKeyValues()
if !rks[0].RangeKey.StartKey.Equal(rangeStart) {
rangeKeys := iter.RangeKeys().Clone()
if !rangeKeys.Bounds.Key.Equal(rangeStart) {
rangeStart = rangeKeys.Bounds.Key.Clone()
if !reverse {
for _, rk := range rks {
actualRanges = append(actualRanges, rk.RangeKey.Clone())
for _, v := range rangeKeys.Versions {
actualRanges = append(actualRanges, rangeKeys.AsRangeKey(v))
}
} else {
for i := len(rks) - 1; i >= 0; i-- {
actualRanges = append([]storage.MVCCRangeKey{rks[i].RangeKey.Clone()},
for i := rangeKeys.Len() - 1; i >= 0; i-- {
actualRanges = append([]storage.MVCCRangeKey{
rangeKeys.AsRangeKey(rangeKeys.Versions[i])},
actualRanges...)
}
}
rangeStart = rks[0].RangeKey.StartKey.Clone()
}
}
next()
Expand Down

0 comments on commit 4800f7c

Please sign in to comment.