Skip to content

Commit

Permalink
sstable: clean up TODO in fragmentBlockIter
Browse files Browse the repository at this point in the history
The way we use `Decode` to append to an existing span is pretty
subtle. This change adds a separate `DecodeIntoSpan` which is used to
add keys to an existing span. Checks for consistency of the start/end
keys are moved here.
  • Loading branch information
RaduBerinde committed Jun 11, 2024
1 parent 31bb155 commit 1b67ad5
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 99 deletions.
20 changes: 20 additions & 0 deletions internal/rangedel/rangedel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package rangedel

import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
)

Expand Down Expand Up @@ -41,3 +42,22 @@ func Decode(ik base.InternalKey, v []byte, keysDst []keyspan.Key) keyspan.Span {
}),
}
}

// DecodeIntoSpan decodes an internal key pair encoding a range deletion and
// appends a key to the given span. The start and end keys must match those in
// the span.
func DecodeIntoSpan(cmp base.Compare, ik base.InternalKey, v []byte, s *keyspan.Span) error {
// This function should only be called when ik.UserKey matches the Start of
// the span we already have. If this is not the case, it is a bug in the
// calling code.
if invariants.Enabled && cmp(s.Start, ik.UserKey) != 0 {
return base.AssertionFailedf("DecodeIntoSpan called with different start key")
}
// The value can come from disk or from the user, so we want to check the end
// key in all builds.
if cmp(s.End, v) != 0 {
return base.CorruptionErrorf("pebble: corrupt range key fragmentation")
}
s.Keys = append(s.Keys, keyspan.Key{Trailer: ik.Trailer})
return nil
}
78 changes: 55 additions & 23 deletions internal/rangekey/rangekey.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ package rangekey
import (
"encoding/binary"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
)

Expand Down Expand Up @@ -161,30 +161,59 @@ func (e *Encoder) flush(s *keyspan.Span, seqNum uint64, del bool) error {
}

// Decode takes an internal key pair encoding range key(s) and returns a decoded
// keyspan containing the keys. If keysDst is provided, keys will be appended to
// keysDst.
func Decode(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span, error) {
// keyspan containing the keys. If keysBuf is provided, keys will be appended to
// it.
func Decode(ik base.InternalKey, v []byte, keysBuf []keyspan.Key) (keyspan.Span, error) {
var s keyspan.Span
s.Start = ik.UserKey
var err error
s.End, v, err = DecodeEndKey(ik.Kind(), v)
if err != nil {
return keyspan.Span{}, err
}
s.Keys, err = appendKeys(keysBuf, ik, v)
if err != nil {
return keyspan.Span{}, err
}
return s, nil
}

// DecodeIntoSpan decodes an internal key pair encoding range key(s) and appends
// them to the given span. The start and end keys must match those in the span.
func DecodeIntoSpan(cmp base.Compare, ik base.InternalKey, v []byte, s *keyspan.Span) error {
// Hydrate the user key bounds.
s.Start = ik.UserKey
var ok bool
s.End, v, ok = DecodeEndKey(ik.Kind(), v)
if !ok {
return keyspan.Span{}, base.CorruptionErrorf("pebble: unable to decode range key end from %s", ik.Kind())
startKey := ik.UserKey
endKey, v, err := DecodeEndKey(ik.Kind(), v)
if err != nil {
return err
}
s.Keys = keysDst
// This function should only be called when ik.UserKey matches the Start of
// the span we already have. If this is not the case, it is a bug in the
// calling code.
if invariants.Enabled && cmp(s.Start, startKey) != 0 {
return base.AssertionFailedf("DecodeIntoSpan called with different start key")
}
// The value can come from disk or from the user, so we want to check the end
// key in all builds.
if cmp(s.End, endKey) != 0 {
return base.CorruptionErrorf("pebble: corrupt range key fragmentation")
}
s.Keys, err = appendKeys(s.Keys, ik, v)
return err
}

func appendKeys(buf []keyspan.Key, ik base.InternalKey, v []byte) ([]keyspan.Key, error) {
// Hydrate the contents of the range key(s).
switch ik.Kind() {
case base.InternalKeyKindRangeKeySet:
for len(v) > 0 {
var sv SuffixValue
var ok bool
sv, v, ok = decodeSuffixValue(v)
if !ok {
return keyspan.Span{}, base.CorruptionErrorf("pebble: unable to decode range key suffix-value tuple")
return nil, base.CorruptionErrorf("pebble: unable to decode range key suffix-value tuple")
}
s.Keys = append(s.Keys, keyspan.Key{
buf = append(buf, keyspan.Key{
Trailer: ik.Trailer,
Suffix: sv.Suffix,
Value: sv.Value,
Expand All @@ -193,24 +222,25 @@ func Decode(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span,
case base.InternalKeyKindRangeKeyUnset:
for len(v) > 0 {
var suffix []byte
var ok bool
suffix, v, ok = decodeSuffix(v)
if !ok {
return keyspan.Span{}, base.CorruptionErrorf("pebble: unable to decode range key unset suffix")
return nil, base.CorruptionErrorf("pebble: unable to decode range key unset suffix")
}
s.Keys = append(s.Keys, keyspan.Key{
buf = append(buf, keyspan.Key{
Trailer: ik.Trailer,
Suffix: suffix,
})
}
case base.InternalKeyKindRangeKeyDelete:
if len(v) > 0 {
return keyspan.Span{}, base.CorruptionErrorf("pebble: RANGEKEYDELs must not contain additional data")
return nil, base.CorruptionErrorf("pebble: RANGEKEYDELs must not contain additional data")
}
s.Keys = append(s.Keys, keyspan.Key{Trailer: ik.Trailer})
buf = append(buf, keyspan.Key{Trailer: ik.Trailer})
default:
return keyspan.Span{}, base.CorruptionErrorf("pebble: %s is not a range key", ik.Kind())
return nil, base.CorruptionErrorf("pebble: %s is not a range key", ik.Kind())
}
return s, nil
return buf, nil
}

// SuffixValue represents a tuple of a suffix and a corresponding value. A
Expand Down Expand Up @@ -284,21 +314,23 @@ func EncodeSetValue(dst []byte, endKey []byte, suffixValues []SuffixValue) int {
// DecodeEndKey reads the end key from the beginning of a range key (RANGEKEYSET,
// RANGEKEYUNSET or RANGEKEYDEL)'s physical encoded value. Both sets and unsets
// encode the range key, plus additional data in the value.
func DecodeEndKey(kind base.InternalKeyKind, data []byte) (endKey, value []byte, ok bool) {
func DecodeEndKey(kind base.InternalKeyKind, data []byte) (endKey, value []byte, _ error) {
switch kind {
case base.InternalKeyKindRangeKeyDelete:
// No splitting is necessary for range key deletes. The value is the end
// key, and there is no additional associated value.
return data, nil, true
return data, nil, nil

case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset:
v, n := binary.Uvarint(data)
if n <= 0 || uint64(n)+v >= uint64(len(data)) {
return nil, nil, false
return nil, nil, base.CorruptionErrorf("pebble: unable to decode range key end from %s", kind)
}
endKey, value = data[n:n+int(v)], data[n+int(v):]
return endKey, value, true
return endKey, value, nil

default:
panic(errors.Newf("key kind %s is not a range key kind", kind))
return nil, nil, base.AssertionFailedf("key kind %s is not a range key kind", kind)
}
}

Expand Down
12 changes: 4 additions & 8 deletions internal/rangekey/rangekey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,8 @@ func TestSetValue_Roundtrip(t *testing.T) {
n := EncodeSetValue(b, tc.endKey, tc.suffixValues)
require.Equal(t, l, n)

var endKey, rest []byte
var ok bool
endKey, rest, ok = DecodeEndKey(base.InternalKeyKindRangeKeySet, b[:n])
require.True(t, ok)
endKey, rest, err := DecodeEndKey(base.InternalKeyKindRangeKeySet, b[:n])
require.NoError(t, err)

var suffixValues []SuffixValue
for len(rest) > 0 {
Expand Down Expand Up @@ -184,10 +182,8 @@ func TestUnsetValue_Roundtrip(t *testing.T) {
n := EncodeUnsetValue(b, tc.endKey, tc.suffixes)
require.Equal(t, l, n)

var ok bool
var endKey, rest []byte
endKey, rest, ok = DecodeEndKey(base.InternalKeyKindRangeKeyUnset, b[:n])
require.True(t, ok)
endKey, rest, err := DecodeEndKey(base.InternalKeyKindRangeKeyUnset, b[:n])
require.NoError(t, err)
var suffixes [][]byte
for len(rest) > 0 {
var ok bool
Expand Down
100 changes: 44 additions & 56 deletions sstable/block_fragment_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,42 @@ type fragmentBlockIter struct {
elideSameSeqnum bool
}

func (i *fragmentBlockIter) resetForReuse() fragmentBlockIter {
return fragmentBlockIter{blockIter: i.blockIter.resetForReuse()}
func (i *fragmentBlockIter) Init(elideSameSeqnum bool) {
// Use the i.keyBuf array to back the Keys slice to prevent an allocation
// when the spans contain few keys.
i.span.Keys = i.keyBuf[:0]
i.elideSameSeqnum = elideSameSeqnum
}

func (i *fragmentBlockIter) ResetForReuse() {
*i = fragmentBlockIter{blockIter: i.blockIter.resetForReuse()}
}

func (i *fragmentBlockIter) decodeSpanKeys(kv *base.InternalKV, internalValue []byte) error {
// TODO(jackson): The use of i.span.Keys to accumulate keys across multiple
// calls to Decode is too confusing and subtle. Refactor to make it
// explicit.
// initSpan initializes the span with a single fragment.
// Note that the span start and end keys and range key contents are aliased to
// the key or value. This is ok because the range del/key block doesn't use
// prefix compression (and we don't perform any transforms), so the key/value
// will be pointing directly into the buffer data.
func (i *fragmentBlockIter) initSpan(ik base.InternalKey, internalValue []byte) error {
var err error
if ik.Kind() == base.InternalKeyKindRangeDelete {
i.span = rangedel.Decode(ik, internalValue, i.span.Keys[:0])
} else {
i.span, err = rangekey.Decode(ik, internalValue, i.span.Keys[:0])
}
return err
}

// decode the contents of the fragment's value. This always includes at
// least the end key: RANGEDELs store the end key directly as the value,
// whereas the various range key kinds store are more complicated. The
// details of the range key internal value format are documented within the
// internal/rangekey package.
// addToSpan adds a fragment to the existing span. The fragment must be for the
// same start/end keys.
func (i *fragmentBlockIter) addToSpan(
cmp base.Compare, ik base.InternalKey, internalValue []byte,
) error {
var err error
switch kv.Kind() {
case base.InternalKeyKindRangeDelete:
i.span = rangedel.Decode(kv.K, internalValue, i.span.Keys)
case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete:
i.span, err = rangekey.Decode(kv.K, internalValue, i.span.Keys)
default:
i.span = keyspan.Span{}
err = base.CorruptionErrorf("pebble: corrupt keyspan fragment of kind %d", kv.Kind())
if ik.Kind() == base.InternalKeyKindRangeDelete {
err = rangedel.DecodeIntoSpan(cmp, ik, internalValue, &i.span)
} else {
err = rangekey.DecodeIntoSpan(cmp, ik, internalValue, &i.span)
}
return err
}
Expand Down Expand Up @@ -106,31 +119,20 @@ func (i *fragmentBlockIter) gatherForward(kv *base.InternalKV) (*keyspan.Span, e
i.span.Keys = i.keyBuf[:0]

// Decode the span's end key and individual keys from the value.
internalValue := kv.V.InPlaceValue()
if err := i.decodeSpanKeys(kv, internalValue); err != nil {
if err := i.initSpan(kv.K, kv.InPlaceValue()); err != nil {
return nil, err
}
prevEnd := i.span.End

// There might exist additional internal keys with identical bounds encoded
// within the block. Iterate forward, accumulating all the keys with
// identical bounds to s.
kv = i.blockIter.Next()
for kv != nil && i.blockIter.cmp(kv.K.UserKey, i.span.Start) == 0 {
internalValue = kv.InPlaceValue()
if err := i.decodeSpanKeys(kv, internalValue); err != nil {
return nil, err
}

// Since k indicates an equal start key, the encoded end key must
// exactly equal the original end key from the first internal key.
// Overlapping fragments are required to have exactly equal start and
// end bounds.
if i.blockIter.cmp(prevEnd, i.span.End) != 0 {
i.span = keyspan.Span{}
return nil, base.CorruptionErrorf("pebble: corrupt keyspan fragmentation")
// Overlapping fragments are required to have exactly equal start and
// end bounds.
for kv = i.blockIter.Next(); kv != nil && i.blockIter.cmp(kv.K.UserKey, i.span.Start) == 0; kv = i.blockIter.Next() {
if err := i.addToSpan(i.blockIter.cmp, kv.K, kv.InPlaceValue()); err != nil {
return nil, err
}
kv = i.blockIter.Next()
}
if i.elideSameSeqnum && len(i.span.Keys) > 0 {
i.elideKeysOfSameSeqNum()
Expand All @@ -152,36 +154,22 @@ func (i *fragmentBlockIter) gatherBackward(kv *base.InternalKV) (*keyspan.Span,
if kv == nil || !i.blockIter.valid() {
return nil, nil
}
// Use the i.keyBuf array to back the Keys slice to prevent an allocation
// when a span contains few keys.
i.span.Keys = i.keyBuf[:0]

// Decode the span's end key and individual keys from the value.
internalValue := kv.V.InPlaceValue()
if err := i.decodeSpanKeys(kv, internalValue); err != nil {
if err := i.initSpan(kv.K, kv.InPlaceValue()); err != nil {
return nil, err
}
prevEnd := i.span.End

// There might exist additional internal keys with identical bounds encoded
// within the block. Iterate backward, accumulating all the keys with
// identical bounds to s.
kv = i.blockIter.Prev()
for kv != nil && i.blockIter.cmp(kv.K.UserKey, i.span.Start) == 0 {
internalValue = kv.V.InPlaceValue()
if err := i.decodeSpanKeys(kv, internalValue); err != nil {
//
// Overlapping fragments are required to have exactly equal start and
// end bounds.
for kv = i.blockIter.Prev(); kv != nil && i.blockIter.cmp(kv.K.UserKey, i.span.Start) == 0; kv = i.blockIter.Prev() {
if err := i.addToSpan(i.blockIter.cmp, kv.K, kv.InPlaceValue()); err != nil {
return nil, err
}

// Since k indicates an equal start key, the encoded end key must
// exactly equal the original end key from the first internal key.
// Overlapping fragments are required to have exactly equal start and
// end bounds.
if i.blockIter.cmp(prevEnd, i.span.End) != 0 {
i.span = keyspan.Span{}
return nil, base.CorruptionErrorf("pebble: corrupt keyspan fragmentation")
}
kv = i.blockIter.Prev()
}
// i.blockIter is positioned over the last internal key for the previous
// span.
Expand Down
2 changes: 1 addition & 1 deletion sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ type rangeKeyFragmentBlockIter struct {

func (i *rangeKeyFragmentBlockIter) Close() error {
err := i.fragmentBlockIter.Close()
i.fragmentBlockIter = i.fragmentBlockIter.resetForReuse()
i.fragmentBlockIter.ResetForReuse()
rangeKeyFragmentBlockIterPool.Put(i)
return err
}
Expand Down
20 changes: 9 additions & 11 deletions sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,19 +1263,17 @@ func (w *Writer) encodeRangeKeySpan(span keyspan.Span) {
func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
if !w.disableKeyOrderChecks && w.rangeKeyBlock.nEntries > 0 {
prevStartKey := w.rangeKeyBlock.getCurKey()
prevEndKey, _, ok := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.curValue)
if !ok {
prevEndKey, _, err := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.curValue)
if err != nil {
// We panic here as we should have previously decoded and validated this
// key and value when it was first added to the range key block.
panic(errors.Errorf("pebble: invalid end key for span: %s",
prevStartKey.Pretty(w.formatKey)))
panic(err)
}

curStartKey := key
curEndKey, _, ok := rangekey.DecodeEndKey(curStartKey.Kind(), value)
if !ok {
w.err = errors.Errorf("pebble: invalid end key for span: %s",
curStartKey.Pretty(w.formatKey))
curEndKey, _, err := rangekey.DecodeEndKey(curStartKey.Kind(), value)
if err != nil {
w.err = err
return w.err
}

Expand Down Expand Up @@ -2103,9 +2101,9 @@ func (w *Writer) Close() (err error) {
if w.props.NumRangeKeys() > 0 {
key := w.rangeKeyBlock.getCurKey()
kind := key.Kind()
endKey, _, ok := rangekey.DecodeEndKey(kind, w.rangeKeyBlock.curValue)
if !ok {
return errors.Newf("invalid end key: %s", w.rangeKeyBlock.curValue)
endKey, _, err := rangekey.DecodeEndKey(kind, w.rangeKeyBlock.curValue)
if err != nil {
return err
}
k := base.MakeExclusiveSentinelKey(kind, endKey).Clone()
w.meta.SetLargestRangeKey(k)
Expand Down

0 comments on commit 1b67ad5

Please sign in to comment.