Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sstable: add block level synthetic prefix support #3237

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 52 additions & 14 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package sstable

import (
"bytes"
"context"
"encoding/binary"
"unsafe"
Expand Down Expand Up @@ -404,40 +405,47 @@ type blockIter struct {
cachedBuf []byte
handle bufferHandle
// for block iteration for already loaded blocks.
firstUserKey []byte
lazyValueHandling struct {
firstUserKey []byte
firstUserKeyWithPrefix []byte
lazyValueHandling struct {
vbr *valueBlockReader
hasValuePrefix bool
}
hideObsoletePoints bool
prefix SyntheticPrefix
}

// blockIter implements the base.InternalIterator interface.
var _ base.InternalIterator = (*blockIter)(nil)

func newBlockIter(cmp Compare, block block) (*blockIter, error) {
func newBlockIter(cmp Compare, block block, syntheticPrefix SyntheticPrefix) (*blockIter, error) {
i := &blockIter{}
return i, i.init(cmp, block, 0, false)
return i, i.init(cmp, block, 0, false, syntheticPrefix)
}

func (i *blockIter) String() string {
return "block"
}

func (i *blockIter) init(
cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool,
cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix,
) error {
numRestarts := int32(binary.LittleEndian.Uint32(block[len(block)-4:]))
if numRestarts == 0 {
return base.CorruptionErrorf("pebble/table: invalid table (block has no restart points)")
}
i.prefix = syntheticPrefix
i.cmp = cmp
i.restarts = int32(len(block)) - 4*(1+numRestarts)
i.numRestarts = numRestarts
i.globalSeqNum = globalSeqNum
i.ptr = unsafe.Pointer(&block[0])
i.data = block
i.fullKey = i.fullKey[:0]
if i.prefix != nil {
i.fullKey = append(i.fullKey[:0], i.prefix...)
} else {
i.fullKey = i.fullKey[:0]
}
i.val = nil
i.hideObsoletePoints = hideObsoletePoints
i.clearCache()
Expand All @@ -457,11 +465,11 @@ func (i *blockIter) init(
// ingested.
// - Foreign sstable iteration: globalSeqNum is always set.
func (i *blockIter) initHandle(
cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool,
cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix,
) error {
i.handle.Release()
i.handle = block
return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints)
return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix)
}

func (i *blockIter) invalidate() {
Expand All @@ -482,10 +490,11 @@ func (i *blockIter) isDataInvalidated() bool {

func (i *blockIter) resetForReuse() blockIter {
return blockIter{
fullKey: i.fullKey[:0],
cached: i.cached[:0],
cachedBuf: i.cachedBuf[:0],
data: nil,
fullKey: i.fullKey[:0],
cached: i.cached[:0],
cachedBuf: i.cachedBuf[:0],
firstUserKeyWithPrefix: i.firstUserKeyWithPrefix[:0],
data: nil,
}
}

Expand Down Expand Up @@ -557,6 +566,7 @@ func (i *blockIter) readEntry() {
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}

shared += uint32(len(i.prefix))
unsharedKey := getBytes(ptr, int(unshared))
// TODO(sumeer): move this into the else block below.
i.fullKey = append(i.fullKey[:shared], unsharedKey...)
Expand Down Expand Up @@ -633,6 +643,9 @@ func (i *blockIter) readFirstKey() error {
i.firstUserKey = nil
return base.CorruptionErrorf("pebble/table: invalid firstKey in block")
}
if i.prefix != nil {
i.firstUserKey = append(append(i.firstUserKeyWithPrefix[:0], i.prefix...), i.firstUserKey...)
}
return nil
}

Expand Down Expand Up @@ -693,6 +706,17 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba
panic(errors.AssertionFailedf("invalidated blockIter used"))
}

searchKey := key
msbutler marked this conversation as resolved.
Show resolved Hide resolved
if i.prefix != nil {
if !bytes.HasPrefix(key, i.prefix) {
if i.cmp(i.prefix, key) >= 0 {
return i.First()
}
return nil, base.LazyValue{}
}
searchKey = key[len(i.prefix):]
}

i.clearCache()
// Find the index of the smallest restart point whose key is > the key
// sought; index will be numRestarts if there is no such restart point.
Expand Down Expand Up @@ -756,7 +780,7 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba
}
// Else k is invalid, and left as nil

if i.cmp(key, k) > 0 {
if i.cmp(searchKey, k) > 0 {
// The search key is greater than the user key at this restart point.
// Search beyond this restart point, since we are trying to find the
// first restart point with a user key >= the search key.
Expand Down Expand Up @@ -833,6 +857,17 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba
var index int32

{
searchKey := key
if i.prefix != nil {
if !bytes.HasPrefix(key, i.prefix) {
if i.cmp(i.prefix, key) < 0 {
return i.Last()
}
return nil, base.LazyValue{}
}
searchKey = key[len(i.prefix):]
}

// NB: manually inlined sort.Search is ~5% faster.
//
// Define f(-1) == false and f(n) == true.
Expand Down Expand Up @@ -889,7 +924,7 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba
}
// Else k is invalid, and left as nil

if i.cmp(key, k) > 0 {
if i.cmp(searchKey, k) > 0 {
// The search key is greater than the user key at this restart point.
// Search beyond this restart point, since we are trying to find the
// first restart point with a user key >= the search key.
Expand Down Expand Up @@ -1226,6 +1261,9 @@ func (i *blockIter) nextPrefixV3(succKey []byte) (*InternalKey, base.LazyValue)
value = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a)
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}
if i.prefix != nil {
shared += uint32(len(i.prefix))
}
// The starting position of the value.
valuePtr := unsafe.Pointer(uintptr(ptr) + uintptr(unshared))
i.nextOffset = int32(uintptr(valuePtr)-uintptr(i.ptr)) + int32(value)
Expand Down
6 changes: 3 additions & 3 deletions sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func TestBlockProperties(t *testing.T) {

var blocks []int
var i int
iter, _ := newBlockIter(r.Compare, indexH.Get())
iter, _ := newBlockIter(r.Compare, indexH.Get(), nil)
for key, value := iter.First(); key != nil; key, value = iter.Next() {
bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
if err != nil {
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
return err.Error()
}
twoLevelIndex := r.Properties.IndexPartitions > 0
i, err := newBlockIter(r.Compare, bh.Get())
i, err := newBlockIter(r.Compare, bh.Get(), nil)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -1322,7 +1322,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
return err.Error()
}
if err := subiter.init(
r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false); err != nil {
r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix); err != nil {
return err.Error()
}
for key, value := subiter.First(); key != nil; key, value = subiter.Next() {
Expand Down
70 changes: 63 additions & 7 deletions sstable/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestBlockIter2(t *testing.T) {
return ""

case "iter":
iter, err := newBlockIter(bytes.Compare, block)
iter, err := newBlockIter(bytes.Compare, block, nil)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestBlockIterKeyStability(t *testing.T) {
}
block := w.finish()

i, err := newBlockIter(bytes.Compare, block)
i, err := newBlockIter(bytes.Compare, block, nil)
require.NoError(t, err)

// Check that the supplied slice resides within the bounds of the block.
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestBlockIterReverseDirections(t *testing.T) {

for targetPos := 0; targetPos < w.restartInterval; targetPos++ {
t.Run("", func(t *testing.T) {
i, err := newBlockIter(bytes.Compare, block)
i, err := newBlockIter(bytes.Compare, block, nil)
require.NoError(t, err)

pos := 3
Expand All @@ -357,6 +357,62 @@ func TestBlockIterReverseDirections(t *testing.T) {
}
}

func TestBlockSyntheticPrefix(t *testing.T) {
for _, prefix := range []string{"", "_", "~", "fruits/"} {
for _, restarts := range []int{1, 2, 3, 4, 10} {
t.Run(fmt.Sprintf("prefix=%s/restarts=%d", prefix, restarts), func(t *testing.T) {

elidedPrefixWriter, includedPrefixWriter := &blockWriter{restartInterval: restarts}, &blockWriter{restartInterval: restarts}
keys := []string{
"apple", "apricot", "banana",
"grape", "orange", "peach",
"pear", "persimmon",
}
for _, k := range keys {
elidedPrefixWriter.add(ikey(k), nil)
includedPrefixWriter.add(ikey(prefix+k), nil)
}

elidedPrefixBlock, includedPrefixBlock := elidedPrefixWriter.finish(), includedPrefixWriter.finish()

expect, err := newBlockIter(bytes.Compare, includedPrefixBlock, nil)
require.NoError(t, err)

got, err := newBlockIter(bytes.Compare, elidedPrefixBlock, SyntheticPrefix([]byte(prefix)))
require.NoError(t, err)

check := func(eKey *base.InternalKey, eVal base.LazyValue) func(gKey *base.InternalKey, gVal base.LazyValue) {
return func(gKey *base.InternalKey, gVal base.LazyValue) {
t.Helper()
if eKey != nil {
t.Logf("[%q] expected %q, got %q", prefix, eKey.UserKey, gKey.UserKey)
require.Equal(t, eKey, gKey)
require.Equal(t, eVal, gVal)
} else {
t.Logf("[%q] expected nil, got %q", prefix, gKey)
require.Nil(t, gKey)
}
}
}

check(expect.First())(got.First())
check(expect.Next())(got.Next())
check(expect.Prev())(got.Prev())

check(expect.SeekGE([]byte(prefix+"or"), base.SeekGEFlagsNone))(got.SeekGE([]byte(prefix+"or"), base.SeekGEFlagsNone))
check(expect.SeekGE([]byte(prefix+"peach"), base.SeekGEFlagsNone))(got.SeekGE([]byte(prefix+"peach"), base.SeekGEFlagsNone))
check(expect.Next())(got.Next())
check(expect.Next())(got.Next())
check(expect.Next())(got.Next())

check(expect.SeekLT([]byte(prefix+"banana"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"banana"), base.SeekLTFlagsNone))
check(expect.SeekLT([]byte(prefix+"pomegranate"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"pomegranate"), base.SeekLTFlagsNone))
check(expect.SeekLT([]byte(prefix+"apple"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"apple"), base.SeekLTFlagsNone))
})
}
}
}

func BenchmarkBlockIterSeekGE(b *testing.B) {
const blockSize = 32 << 10

Expand All @@ -376,7 +432,7 @@ func BenchmarkBlockIterSeekGE(b *testing.B) {
w.add(ikey, nil)
}

it, err := newBlockIter(bytes.Compare, w.finish())
it, err := newBlockIter(bytes.Compare, w.finish(), nil)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -418,7 +474,7 @@ func BenchmarkBlockIterSeekLT(b *testing.B) {
w.add(ikey, nil)
}

it, err := newBlockIter(bytes.Compare, w.finish())
it, err := newBlockIter(bytes.Compare, w.finish(), nil)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -464,7 +520,7 @@ func BenchmarkBlockIterNext(b *testing.B) {
w.add(ikey, nil)
}

it, err := newBlockIter(bytes.Compare, w.finish())
it, err := newBlockIter(bytes.Compare, w.finish(), nil)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -496,7 +552,7 @@ func BenchmarkBlockIterPrev(b *testing.B) {
w.add(ikey, nil)
}

it, err := newBlockIter(bytes.Compare, w.finish())
it, err := newBlockIter(bytes.Compare, w.finish(), nil)
if err != nil {
b.Fatal(err)
}
Expand Down
29 changes: 15 additions & 14 deletions sstable/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ type Layout struct {
// ValidateBlockChecksums, which validates a static list of BlockHandles
// referenced in this struct.

Data []BlockHandleWithProperties
Index []BlockHandle
TopIndex BlockHandle
Filter BlockHandle
RangeDel BlockHandle
RangeKey BlockHandle
ValueBlock []BlockHandle
ValueIndex BlockHandle
Properties BlockHandle
MetaIndex BlockHandle
Footer BlockHandle
Format TableFormat
Data []BlockHandleWithProperties
Index []BlockHandle
TopIndex BlockHandle
Filter BlockHandle
RangeDel BlockHandle
RangeKey BlockHandle
ValueBlock []BlockHandle
ValueIndex BlockHandle
Properties BlockHandle
MetaIndex BlockHandle
Footer BlockHandle
Format TableFormat
SyntheticPrefix SyntheticPrefix
}

// Describe returns a description of the layout. If the verbose parameter is
Expand Down Expand Up @@ -186,7 +187,7 @@ func (l *Layout) Describe(
var lastKey InternalKey
switch b.name {
case "data", "range-del", "range-key":
iter, _ := newBlockIter(r.Compare, h.Get())
iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix)
for key, value := iter.First(); key != nil; key, value = iter.Next() {
ptr := unsafe.Pointer(uintptr(iter.ptr) + uintptr(iter.offset))
shared, ptr := decodeVarint(ptr)
Expand Down Expand Up @@ -238,7 +239,7 @@ func (l *Layout) Describe(
formatRestarts(iter.data, iter.restarts, iter.numRestarts)
formatTrailer()
case "index", "top-index":
iter, _ := newBlockIter(r.Compare, h.Get())
iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix)
for key, value := iter.First(); key != nil; key, value = iter.Next() {
bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion sstable/prefix_replacing_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (p *prefixReplacingIterator) SeekLT(
p.i.First()
return p.rewriteResult(p.i.Prev())
}
return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags))
key = p.rewriteArg(key)
resKey, resResult := p.i.SeekLT(key, flags)
return p.rewriteResult(resKey, resResult)
}

// First implements the Iterator interface.
Expand Down
Loading
Loading