Skip to content

Commit

Permalink
sstable: add block level suffix replacement support
Browse files Browse the repository at this point in the history
This patch teaches the block iterator to replace the suffix of all surfaced
keys with a replacement suffix passed during block initiation. In other
words, the iterator will read the block as if all underlying keys actually had
the replacement prefix.

For correctness, this patch relies on two invariants: for all keys in the
block:
  1. no two keys in the sst share the same prefix
  2. byte.Compare(replacementSuffix,originalSuffix) > 0

During the normal readpath, i.e. for blocks without suffix rewriting, this
patch does slow Seeks down:
```
❯ benchstat master suffix
name                           old time/op  new time/op  delta
BlockIterSeekGE/restart=16-24   345ns ± 0%   367ns ± 0%  +6.53%  (p=0.002 n=6+6)
BlockIterSeekLT/restart=16-24   398ns ± 0%   416ns ± 0%  +4.55%  (p=0.008 n=5+5)
BlockIterNext/restart=16-24    15.4ns ± 0%  15.6ns ± 0%  +1.09%  (p=0.002 n=6+6)
BlockIterPrev/restart=16-24    30.6ns ± 0%  30.6ns ± 0%  -0.23%  (p=0.006 n=6+6)
```
  • Loading branch information
msbutler committed Jan 24, 2024
1 parent cd23876 commit 8860112
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 37 deletions.
113 changes: 107 additions & 6 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,28 +412,37 @@ type blockIter struct {
}
hideObsoletePoints bool
SyntheticPrefix
syntheticSuffix *SyntheticSuffix
}

// blockIter implements the base.InternalIterator interface.
var _ base.InternalIterator = (*blockIter)(nil)

func newBlockIter(cmp Compare, block block, syntheticPrefix SyntheticPrefix) (*blockIter, error) {
func newBlockIter(
cmp Compare, block block, syntheticPrefix SyntheticPrefix, syntheticSuffix *SyntheticSuffix,
) (*blockIter, error) {
i := &blockIter{}
return i, i.init(cmp, block, 0, false, syntheticPrefix)
return i, i.init(cmp, block, 0, false, syntheticPrefix, syntheticSuffix)
}

func (i *blockIter) String() string {
return "block"
}

func (i *blockIter) init(
cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix,
cmp Compare,
block block,
globalSeqNum uint64,
hideObsoletePoints bool,
syntheticPrefix SyntheticPrefix,
syntheticSuffix *SyntheticSuffix,
) error {
numRestarts := int32(binary.LittleEndian.Uint32(block[len(block)-4:]))
if numRestarts == 0 {
return base.CorruptionErrorf("pebble/table: invalid table (block has no restart points)")
}
i.SyntheticPrefix = syntheticPrefix
i.syntheticSuffix = syntheticSuffix
i.cmp = cmp
i.restarts = int32(len(block)) - 4*(1+numRestarts)
i.numRestarts = numRestarts
Expand Down Expand Up @@ -464,11 +473,16 @@ func (i *blockIter) init(
// ingested.
// - Foreign sstable iteration: globalSeqNum is always set.
func (i *blockIter) initHandle(
cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix,
cmp Compare,
block bufferHandle,
globalSeqNum uint64,
hideObsoletePoints bool,
syntheticPrefix SyntheticPrefix,
syntheticSuffix *SyntheticSuffix,
) error {
i.handle.Release()
i.handle = block
return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix)
return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix, syntheticSuffix)
}

func (i *blockIter) invalidate() {
Expand Down Expand Up @@ -671,6 +685,21 @@ func (i *blockIter) decodeInternalKey(key []byte) (hiddenPoint bool) {
return hiddenPoint
}

func (i *blockIter) maybeReplaceSuffix(maybeFromCache bool) {
if i.syntheticSuffix != nil && i.ikey.UserKey != nil {
prefixLen := i.syntheticSuffix.split(i.ikey.UserKey)
if !maybeFromCache {
i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...)
return
}
// If ikey is cached or may get cached, we must de-reference
// UserKey before prefix replacement.
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:prefixLen]...)
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...)
i.ikey.UserKey = i.syntheticSuffix.buf
}
}

func (i *blockIter) clearCache() {
i.cached = i.cached[:0]
i.cachedBuf = i.cachedBuf[:0]
Expand Down Expand Up @@ -811,6 +840,9 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba
if !i.valid() {
return nil, base.LazyValue{}
}
// Before conducting any user key comparison, we must replace the suffix.
i.maybeReplaceSuffix(false)

if !hiddenPoint && i.cmp(i.ikey.UserKey, key) >= 0 {
// Initialize i.lazyValue
if !i.lazyValueHandling.hasValuePrefix ||
Expand Down Expand Up @@ -953,6 +985,16 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba
targetOffset = decodeRestart(i.data[i.restarts+4*(index):])
}
} else if index == 0 {
if i.syntheticSuffix != nil {
// The binary search was conducted on keys without suffix replacement,
// implying the first key in the block may not be geq the search key. To
// double check, get the first key in the block with suffix replacement
// and compare to the search key.
ikey, lazyVal := i.First()
if i.cmp(ikey.UserKey, key) < 0 {
return ikey, lazyVal
}
}
// If index == 0 then all keys in this block are larger than the key
// sought.
i.offset = -1
Expand All @@ -979,6 +1021,7 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba
// of hidden keys we will be able to skip whole blocks (using block
// property filters) so we don't bother optimizing.
hiddenPoint := i.decodeInternalKey(i.key)
i.maybeReplaceSuffix(true)

// NB: we don't use the hiddenPoint return value of decodeInternalKey
// since we want to stop as soon as we reach a key >= ikey.UserKey, so
Expand All @@ -1003,9 +1046,41 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba
if hiddenPoint {
return i.Prev()
}
if i.syntheticSuffix != nil {
// The binary search was conducted on keys without suffix replacement,
// implying the returned restart point may not be geq the search key.
//
// For example: consider this block with a replacement ts of 4, and restart interval of 1:
// - pre replacement: a3,b2,c3
// - post replacement: a4,b4,c4
//
// Suppose the client calls SeekLT(b3), SeekLT must return b4.
//
// If the client calls SeekLT(b3), the binary search would return b2,
// the lowest key geq to b3, pre-suffix replacement. Then, SeekLT will
// begin forward iteration from a3, the previous restart point, to
// b{suffix}. The iteration stops when it encounters a key geq to the
// search key or if it reaches the upper bound. Without suffix replacement, we can assume
// that the upper bound of this forward iteration, b{suffix}, is greater
// than the search key, as implied by the binary search.
//
// If we naively hold this assumption with suffix replacement, the
// iteration would terminate at the upper bound, b4, call i.Prev, and
// incorrectly return a4. Instead, we must continue forward iteration
// past the upper bound, until we find a key geq the search key. With
// this correction, SeekLT would correctly return b4 in this example.
for i.cmp(i.ikey.UserKey, key) < 0 {
i.Next()
if !i.valid() {
break
}
}
// The current key is greater than or equal to our search key. Back up to
// the previous key which was less than our search key.
return i.Prev()
}
break
}

i.cacheEntry()
}

Expand Down Expand Up @@ -1040,6 +1115,7 @@ func (i *blockIter) First() (*InternalKey, base.LazyValue) {
if hiddenPoint {
return i.Next()
}
i.maybeReplaceSuffix(false)
if !i.lazyValueHandling.hasValuePrefix ||
base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet {
i.lazyValue = base.MakeInPlaceValue(i.val)
Expand Down Expand Up @@ -1082,6 +1158,7 @@ func (i *blockIter) Last() (*InternalKey, base.LazyValue) {
if hiddenPoint {
return i.Prev()
}
i.maybeReplaceSuffix(true)
if !i.lazyValueHandling.hasValuePrefix ||
base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet {
i.lazyValue = base.MakeInPlaceValue(i.val)
Expand Down Expand Up @@ -1130,6 +1207,10 @@ start:
if hiddenPoint {
goto start
}
if i.syntheticSuffix != nil {
prefixLen := i.syntheticSuffix.split(i.ikey.UserKey)
i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...)
}
} else {
i.ikey.Trailer = uint64(InternalKeyKindInvalid)
i.ikey.UserKey = nil
Expand Down Expand Up @@ -1400,6 +1481,10 @@ func (i *blockIter) nextPrefixV3(succKey []byte) (*InternalKey, base.LazyValue)
if i.globalSeqNum != 0 {
i.ikey.SetSeqNum(i.globalSeqNum)
}
if i.syntheticSuffix != nil {
prefixLen := i.syntheticSuffix.split(i.ikey.UserKey)
i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...)
}
} else {
i.ikey.Trailer = uint64(InternalKeyKindInvalid)
i.ikey.UserKey = nil
Expand Down Expand Up @@ -1458,6 +1543,14 @@ start:
if i.globalSeqNum != 0 {
i.ikey.SetSeqNum(i.globalSeqNum)
}
if i.syntheticSuffix != nil {
suffixLen := i.syntheticSuffix.split(i.ikey.UserKey)
// If ikey is cached or may get cached, we must de-reference
// UserKey before prefix replacement.
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:suffixLen]...)
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...)
i.ikey.UserKey = i.syntheticSuffix.buf
}
} else {
i.ikey.Trailer = uint64(InternalKeyKindInvalid)
i.ikey.UserKey = nil
Expand Down Expand Up @@ -1535,6 +1628,14 @@ start:
// Use the cache.
goto start
}
if i.syntheticSuffix != nil {
suffixLen := i.syntheticSuffix.split(i.ikey.UserKey)
// If ikey is cached or may get cached, we must de-reference
// UserKey before prefix replacement.
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:suffixLen]...)
i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...)
i.ikey.UserKey = i.syntheticSuffix.buf
}
if !i.lazyValueHandling.hasValuePrefix ||
base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet {
i.lazyValue = base.MakeInPlaceValue(i.val)
Expand Down
6 changes: 3 additions & 3 deletions sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func TestBlockProperties(t *testing.T) {

var blocks []int
var i int
iter, _ := newBlockIter(r.Compare, indexH.Get(), nil)
iter, _ := newBlockIter(r.Compare, indexH.Get(), nil, nil)
for key, value := iter.First(); key != nil; key, value = iter.Next() {
bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
if err != nil {
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
return err.Error()
}
twoLevelIndex := r.Properties.IndexPartitions > 0
i, err := newBlockIter(r.Compare, bh.Get(), nil)
i, err := newBlockIter(r.Compare, bh.Get(), nil, nil)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -1322,7 +1322,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
return err.Error()
}
if err := subiter.init(
r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix); err != nil {
r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix, r.syntheticSuffix); err != nil {
return err.Error()
}
for key, value := subiter.First(); key != nil; key, value = subiter.Next() {
Expand Down
Loading

0 comments on commit 8860112

Please sign in to comment.