Skip to content

Commit

Permalink
sstable: add block level synthetic prefix support
Browse files Browse the repository at this point in the history
This adds the concept of synthetic prefixes to sstable readers and their
underlying block iterators, treating the synthetic prefix as an extra
shared prefix, shared even by restart keys.

When a reader is configured with a synthetic prefix, it will assume that
that prefix is implicitly prepended to every key in the underlying sst
blocks when interacting with or returning those keys.
  • Loading branch information
dt committed Jan 19, 2024
1 parent 3b7293f commit d6f9dd4
Show file tree
Hide file tree
Showing 16 changed files with 218 additions and 71 deletions.
52 changes: 44 additions & 8 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package sstable

import (
"bytes"
"context"
"encoding/binary"
"unsafe"
Expand Down Expand Up @@ -410,34 +411,40 @@ type blockIter struct {
hasValuePrefix bool
}
hideObsoletePoints bool
SyntheticPrefix
}

// blockIter implements the base.InternalIterator interface.
var _ base.InternalIterator = (*blockIter)(nil)

func newBlockIter(cmp Compare, block block) (*blockIter, error) {
func newBlockIter(cmp Compare, block block, syntheticPrefix SyntheticPrefix) (*blockIter, error) {
i := &blockIter{}
return i, i.init(cmp, block, 0, false)
return i, i.init(cmp, block, 0, false, syntheticPrefix)
}

func (i *blockIter) String() string {
return "block"
}

func (i *blockIter) init(
cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool,
cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix,
) error {
numRestarts := int32(binary.LittleEndian.Uint32(block[len(block)-4:]))
if numRestarts == 0 {
return base.CorruptionErrorf("pebble/table: invalid table (block has no restart points)")
}
i.SyntheticPrefix = syntheticPrefix
i.cmp = cmp
i.restarts = int32(len(block)) - 4*(1+numRestarts)
i.numRestarts = numRestarts
i.globalSeqNum = globalSeqNum
i.ptr = unsafe.Pointer(&block[0])
i.data = block
i.fullKey = i.fullKey[:0]
if i.SyntheticPrefix != nil {
i.fullKey = append(i.fullKey[:0], i.SyntheticPrefix...)
} else {
i.fullKey = i.fullKey[:0]
}
i.val = nil
i.hideObsoletePoints = hideObsoletePoints
i.clearCache()
Expand All @@ -457,11 +464,11 @@ func (i *blockIter) init(
// ingested.
// - Foreign sstable iteration: globalSeqNum is always set.
func (i *blockIter) initHandle(
cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool,
cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix,
) error {
i.handle.Release()
i.handle = block
return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints)
return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix)
}

func (i *blockIter) invalidate() {
Expand Down Expand Up @@ -557,6 +564,7 @@ func (i *blockIter) readEntry() {
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}

shared += uint32(len(i.SyntheticPrefix))
unsharedKey := getBytes(ptr, int(unshared))
// TODO(sumeer): move this into the else block below.
i.fullKey = append(i.fullKey[:shared], unsharedKey...)
Expand Down Expand Up @@ -633,6 +641,9 @@ func (i *blockIter) readFirstKey() error {
i.firstUserKey = nil
return base.CorruptionErrorf("pebble/table: invalid firstKey in block")
}
if i.SyntheticPrefix != nil {
i.firstUserKey = append(i.SyntheticPrefix, i.firstUserKey...)
}
return nil
}

Expand Down Expand Up @@ -693,6 +704,17 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba
panic(errors.AssertionFailedf("invalidated blockIter used"))
}

searchKey := key
if i.SyntheticPrefix != nil {
if !bytes.HasPrefix(key, i.SyntheticPrefix) {
if i.cmp(i.SyntheticPrefix, key) >= 0 {
return i.First()
}
return nil, base.LazyValue{}
}
searchKey = key[len(i.SyntheticPrefix):]
}

i.clearCache()
// Find the index of the smallest restart point whose key is > the key
// sought; index will be numRestarts if there is no such restart point.
Expand Down Expand Up @@ -756,7 +778,7 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba
}
// Else k is invalid, and left as nil

if i.cmp(key, k) > 0 {
if i.cmp(searchKey, k) > 0 {
// The search key is greater than the user key at this restart point.
// Search beyond this restart point, since we are trying to find the
// first restart point with a user key >= the search key.
Expand Down Expand Up @@ -833,6 +855,17 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba
var index int32

{
searchKey := key
if i.SyntheticPrefix != nil {
if !bytes.HasPrefix(key, i.SyntheticPrefix) {
if i.cmp(i.SyntheticPrefix, key) < 0 {
return i.Last()
}
return nil, base.LazyValue{}
}
searchKey = key[len(i.SyntheticPrefix):]
}

// NB: manually inlined sort.Search is ~5% faster.
//
// Define f(-1) == false and f(n) == true.
Expand Down Expand Up @@ -889,7 +922,7 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba
}
// Else k is invalid, and left as nil

if i.cmp(key, k) > 0 {
if i.cmp(searchKey, k) > 0 {
// The search key is greater than the user key at this restart point.
// Search beyond this restart point, since we are trying to find the
// first restart point with a user key >= the search key.
Expand Down Expand Up @@ -1226,6 +1259,9 @@ func (i *blockIter) nextPrefixV3(succKey []byte) (*InternalKey, base.LazyValue)
value = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}
if i.SyntheticPrefix != nil {
shared += uint32(len(i.SyntheticPrefix))
}
// The starting position of the value.
valuePtr := unsafe.Pointer(uintptr(ptr) + uintptr(unshared))
i.nextOffset = int32(uintptr(valuePtr)-uintptr(i.ptr)) + int32(value)
Expand Down
6 changes: 3 additions & 3 deletions sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func TestBlockProperties(t *testing.T) {

var blocks []int
var i int
iter, _ := newBlockIter(r.Compare, indexH.Get())
iter, _ := newBlockIter(r.Compare, indexH.Get(), nil)
for key, value := iter.First(); key != nil; key, value = iter.Next() {
bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
if err != nil {
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
return err.Error()
}
twoLevelIndex := r.Properties.IndexPartitions > 0
i, err := newBlockIter(r.Compare, bh.Get())
i, err := newBlockIter(r.Compare, bh.Get(), nil)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -1322,7 +1322,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
return err.Error()
}
if err := subiter.init(
r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false); err != nil {
r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix); err != nil {
return err.Error()
}
for key, value := subiter.First(); key != nil; key, value = subiter.Next() {
Expand Down
70 changes: 63 additions & 7 deletions sstable/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestBlockIter2(t *testing.T) {
return ""

case "iter":
iter, err := newBlockIter(bytes.Compare, block)
iter, err := newBlockIter(bytes.Compare, block, nil)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestBlockIterKeyStability(t *testing.T) {
}
block := w.finish()

i, err := newBlockIter(bytes.Compare, block)
i, err := newBlockIter(bytes.Compare, block, nil)
require.NoError(t, err)

// Check that the supplied slice resides within the bounds of the block.
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestBlockIterReverseDirections(t *testing.T) {

for targetPos := 0; targetPos < w.restartInterval; targetPos++ {
t.Run("", func(t *testing.T) {
i, err := newBlockIter(bytes.Compare, block)
i, err := newBlockIter(bytes.Compare, block, nil)
require.NoError(t, err)

pos := 3
Expand All @@ -357,6 +357,62 @@ func TestBlockIterReverseDirections(t *testing.T) {
}
}

func TestBlockSyntheticPrefix(t *testing.T) {
for _, prefix := range []string{"", "_", "~", "fruits/"} {
for _, restarts := range []int{1, 2, 3, 4, 10} {
t.Run(fmt.Sprintf("prefix=%s/restarts=%d", prefix, restarts), func(t *testing.T) {

elidedPrefixWriter, includedPrefixWriter := &blockWriter{restartInterval: restarts}, &blockWriter{restartInterval: restarts}
keys := []string{
"apple", "apricot", "banana",
"grape", "orange", "peach",
"pear", "persimmon",
}
for _, k := range keys {
elidedPrefixWriter.add(ikey(k), nil)
includedPrefixWriter.add(ikey(prefix+k), nil)
}

elidedPrefixBlock, includedPrefixBlock := elidedPrefixWriter.finish(), includedPrefixWriter.finish()

expect, err := newBlockIter(bytes.Compare, includedPrefixBlock, nil)
require.NoError(t, err)

got, err := newBlockIter(bytes.Compare, elidedPrefixBlock, SyntheticPrefix([]byte(prefix)))
require.NoError(t, err)

check := func(eKey *base.InternalKey, eVal base.LazyValue) func(gKey *base.InternalKey, gVal base.LazyValue) {
return func(gKey *base.InternalKey, gVal base.LazyValue) {
t.Helper()
if eKey != nil {
t.Logf("[%q] expected %q, got %q", prefix, eKey.UserKey, gKey.UserKey)
require.Equal(t, eKey, gKey)
require.Equal(t, eVal, gVal)
} else {
t.Logf("[%q] expected nil, got %q", prefix, gKey)
require.Nil(t, gKey)
}
}
}

check(expect.First())(got.First())
check(expect.Next())(got.Next())
check(expect.Prev())(got.Prev())

check(expect.SeekGE([]byte(prefix+"or"), base.SeekGEFlagsNone))(got.SeekGE([]byte(prefix+"or"), base.SeekGEFlagsNone))
check(expect.SeekGE([]byte(prefix+"peach"), base.SeekGEFlagsNone))(got.SeekGE([]byte(prefix+"peach"), base.SeekGEFlagsNone))
check(expect.Next())(got.Next())
check(expect.Next())(got.Next())
check(expect.Next())(got.Next())

check(expect.SeekLT([]byte(prefix+"banana"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"banana"), base.SeekLTFlagsNone))
check(expect.SeekLT([]byte(prefix+"pomegranate"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"pomegranate"), base.SeekLTFlagsNone))
check(expect.SeekLT([]byte(prefix+"apple"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"apple"), base.SeekLTFlagsNone))
})
}
}
}

func BenchmarkBlockIterSeekGE(b *testing.B) {
const blockSize = 32 << 10

Expand All @@ -376,7 +432,7 @@ func BenchmarkBlockIterSeekGE(b *testing.B) {
w.add(ikey, nil)
}

it, err := newBlockIter(bytes.Compare, w.finish())
it, err := newBlockIter(bytes.Compare, w.finish(), nil)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -418,7 +474,7 @@ func BenchmarkBlockIterSeekLT(b *testing.B) {
w.add(ikey, nil)
}

it, err := newBlockIter(bytes.Compare, w.finish())
it, err := newBlockIter(bytes.Compare, w.finish(), nil)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -464,7 +520,7 @@ func BenchmarkBlockIterNext(b *testing.B) {
w.add(ikey, nil)
}

it, err := newBlockIter(bytes.Compare, w.finish())
it, err := newBlockIter(bytes.Compare, w.finish(), nil)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -496,7 +552,7 @@ func BenchmarkBlockIterPrev(b *testing.B) {
w.add(ikey, nil)
}

it, err := newBlockIter(bytes.Compare, w.finish())
it, err := newBlockIter(bytes.Compare, w.finish(), nil)
if err != nil {
b.Fatal(err)
}
Expand Down
29 changes: 15 additions & 14 deletions sstable/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ type Layout struct {
// ValidateBlockChecksums, which validates a static list of BlockHandles
// referenced in this struct.

Data []BlockHandleWithProperties
Index []BlockHandle
TopIndex BlockHandle
Filter BlockHandle
RangeDel BlockHandle
RangeKey BlockHandle
ValueBlock []BlockHandle
ValueIndex BlockHandle
Properties BlockHandle
MetaIndex BlockHandle
Footer BlockHandle
Format TableFormat
Data []BlockHandleWithProperties
Index []BlockHandle
TopIndex BlockHandle
Filter BlockHandle
RangeDel BlockHandle
RangeKey BlockHandle
ValueBlock []BlockHandle
ValueIndex BlockHandle
Properties BlockHandle
MetaIndex BlockHandle
Footer BlockHandle
Format TableFormat
SyntheticPrefix SyntheticPrefix
}

// Describe returns a description of the layout. If the verbose parameter is
Expand Down Expand Up @@ -186,7 +187,7 @@ func (l *Layout) Describe(
var lastKey InternalKey
switch b.name {
case "data", "range-del", "range-key":
iter, _ := newBlockIter(r.Compare, h.Get())
iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix)
for key, value := iter.First(); key != nil; key, value = iter.Next() {
ptr := unsafe.Pointer(uintptr(iter.ptr) + uintptr(iter.offset))
shared, ptr := decodeVarint(ptr)
Expand Down Expand Up @@ -238,7 +239,7 @@ func (l *Layout) Describe(
formatRestarts(iter.data, iter.restarts, iter.numRestarts)
formatTrailer()
case "index", "top-index":
iter, _ := newBlockIter(r.Compare, h.Get())
iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix)
for key, value := iter.First(); key != nil; key, value = iter.Next() {
bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion sstable/prefix_replacing_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (p *prefixReplacingIterator) SeekLT(
p.i.First()
return p.rewriteResult(p.i.Prev())
}
return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags))
key = p.rewriteArg(key)
resKey, resResult := p.i.SeekLT(key, flags)
return p.rewriteResult(resKey, resResult)
}

// First implements the Iterator interface.
Expand Down
13 changes: 11 additions & 2 deletions sstable/prefix_replacing_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,24 @@ func TestPrefixReplacingIterator(t *testing.T) {
{from: []byte("zzz"), to: []byte("aa")},
} {
t.Run(fmt.Sprintf("%s_%s", tc.from, tc.to), func(t *testing.T) {
r := buildTestTable(t, 20, 256, 256, DefaultCompression, tc.from)
var readerOpts []ReaderOption
if len(tc.from) == 0 {
readerOpts = append(readerOpts, WithSyntheticPrefix(tc.to))
}
r := buildTestTable(t, 20, 256, 256, DefaultCompression, tc.from, readerOpts...)
defer r.Close()
rawIter, err := r.NewIter(nil, nil)
require.NoError(t, err)
defer rawIter.Close()

raw := rawIter.(*singleLevelIterator)

it := newPrefixReplacingIterator(raw, tc.from, tc.to, DefaultComparer.Compare)
var it Iterator
if r.syntheticPrefix != nil {
it = raw
} else {
it = newPrefixReplacingIterator(raw, tc.from, tc.to, DefaultComparer.Compare)
}

kMin, kMax, k := []byte{0}, []byte("~"), func(i uint64) []byte {
return binary.BigEndian.AppendUint64(tc.to[:len(tc.to):len(tc.to)], i)
Expand Down
Loading

0 comments on commit d6f9dd4

Please sign in to comment.