Skip to content

Commit

Permalink
Merge pull request #146 from petermattis/pmattis/range-del-v1-convert
Browse files Browse the repository at this point in the history
Convert v1 to v2 range-del blocks on the fly
  • Loading branch information
petermattis authored May 28, 2019
2 parents b604d7f + 13d77b0 commit ce8e5ee
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 75 deletions.
24 changes: 24 additions & 0 deletions internal/rangedel/fragmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ import (
"github.com/petermattis/pebble/internal/base"
)

type tombstonesByStartKey struct {
cmp base.Compare
buf []Tombstone
}

func (v *tombstonesByStartKey) Len() int { return len(v.buf) }
func (v *tombstonesByStartKey) Less(i, j int) bool {
return base.InternalCompare(v.cmp, v.buf[i].Start, v.buf[j].Start) < 0
}
func (v *tombstonesByStartKey) Swap(i, j int) {
v.buf[i], v.buf[j] = v.buf[j], v.buf[i]
}

type tombstonesByEndKey struct {
cmp base.Compare
buf []Tombstone
Expand All @@ -34,6 +47,17 @@ func (v *tombstonesBySeqNum) Swap(i, j int) {
(*v)[i], (*v)[j] = (*v)[j], (*v)[i]
}

// Sort the tombstones by start key. This is the ordering required by the
// Fragmenter. Usually tombstones are naturally sorted by their start key, but
// that isn't true for tombstones in the legacy range-del-v1 block format.
func Sort(cmp base.Compare, tombstones []Tombstone) {
sorter := tombstonesByStartKey{
cmp: cmp,
buf: tombstones,
}
sort.Sort(&sorter)
}

// Fragmenter fragments a set of range tombstones such that overlapping
// tombstones are split at their overlap points. The fragmented tombstones are
// output to the supplied Output function.
Expand Down
19 changes: 15 additions & 4 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type blockIter struct {
ptr unsafe.Pointer
data []byte
key, val []byte
fullKey []byte
keyBuf [256]byte
ikey InternalKey
cached []blockEntry
Expand All @@ -135,10 +136,10 @@ func (i *blockIter) init(cmp Compare, block block, globalSeqNum uint64) error {
i.globalSeqNum = globalSeqNum
i.ptr = unsafe.Pointer(&block[0])
i.data = block
if i.key == nil {
i.key = i.keyBuf[:0]
if i.fullKey == nil {
i.fullKey = i.keyBuf[:0]
} else {
i.key = i.key[:0]
i.fullKey = i.fullKey[:0]
}
i.val = nil
i.clearCache()
Expand Down Expand Up @@ -216,7 +217,17 @@ func (i *blockIter) readEntry() {
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}

i.key = append(i.key[:shared], getBytes(ptr, int(unshared))...)
unsharedKey := getBytes(ptr, int(unshared))
i.fullKey = append(i.fullKey[:shared], unsharedKey...)
if shared == 0 {
// Provide stability for the key across positioning calls if the key
// doesn't share a prefix with the previous key. This removes requiring the
// key to be copied if the caller knows the block has a restart interval of
// 1. An important example of this is range-del blocks.
i.key = unsharedKey
} else {
i.key = i.fullKey
}
ptr = unsafe.Pointer(uintptr(ptr) + uintptr(unshared))
i.val = getBytes(ptr, int(value))
i.nextOffset = int(uintptr(ptr)-uintptr(i.ptr)) + int(value)
Expand Down
29 changes: 29 additions & 0 deletions sstable/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/petermattis/pebble/internal/base"
"github.com/petermattis/pebble/internal/datadriven"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

Expand Down Expand Up @@ -204,6 +205,34 @@ func TestBlockIter2(t *testing.T) {
}
}

func TestBlockIterKeyStability(t *testing.T) {
w := &blockWriter{restartInterval: 1}
expected := [][]byte{
[]byte("apple"),
[]byte("apricot"),
[]byte("banana"),
}
for i := range expected {
w.add(InternalKey{UserKey: expected[i]}, nil)
}
block := w.finish()

i, err := newBlockIter(bytes.Compare, block)
if err != nil {
t.Fatal(err)
}
// Loop over the block entries, storing each key slice.
var keys [][]byte
for key, _ := i.First(); key != nil; key, _ = i.Next() {
keys = append(keys, key.UserKey)
}

// Check that the slices match our expected values. Note that this is only
// guaranteed because of the usage of a restart-interval of 1 so that prefix
// compression was not performed.
require.EqualValues(t, expected, keys)
}

func BenchmarkBlockIterSeekGE(b *testing.B) {
const blockSize = 32 << 10

Expand Down
148 changes: 90 additions & 58 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/petermattis/pebble/cache"
"github.com/petermattis/pebble/internal/base"
"github.com/petermattis/pebble/internal/crc"
"github.com/petermattis/pebble/internal/rangedel"
"github.com/petermattis/pebble/vfs"
)

Expand Down Expand Up @@ -137,7 +138,7 @@ func (i *Iterator) loadBlock() bool {
i.err = errors.New("pebble/table: corrupt index entry")
return false
}
block, _, err := i.reader.readBlock(h)
block, _, err := i.reader.readBlock(h, nil /* transform */)
if err != nil {
i.err = err
return false
Expand Down Expand Up @@ -166,7 +167,7 @@ func (i *Iterator) seekBlock(key []byte) bool {
i.err = errors.New("pebble/table: corrupt index entry")
return false
}
block, _, err := i.reader.readBlock(h)
block, _, err := i.reader.readBlock(h, nil /* transform */)
if err != nil {
i.err = err
return false
Expand Down Expand Up @@ -472,21 +473,23 @@ type weakCachedBlock struct {
handle cache.WeakHandle
}

type blockTransform func([]byte) ([]byte, error)

// Reader is a table reader.
type Reader struct {
file vfs.File
fileNum uint64
err error
index weakCachedBlock
filter weakCachedBlock
rangeDel weakCachedBlock
rangeDelV2 bool
opts *Options
cache *cache.Cache
compare Compare
split Split
tableFilter *tableFilterReader
Properties Properties
file vfs.File
fileNum uint64
err error
index weakCachedBlock
filter weakCachedBlock
rangeDel weakCachedBlock
rangeDelTransform blockTransform
opts *Options
cache *cache.Cache
compare Compare
split Split
tableFilter *tableFilterReader
Properties Properties
}

// Close implements DB.Close, as documented in the pebble package.
Expand Down Expand Up @@ -580,46 +583,20 @@ func (r *Reader) NewRangeDelIter() *blockIter {
}

func (r *Reader) readIndex() (block, error) {
return r.readWeakCachedBlock(&r.index)
return r.readWeakCachedBlock(&r.index, nil /* transform */)
}

func (r *Reader) readFilter() (block, error) {
return r.readWeakCachedBlock(&r.filter)
return r.readWeakCachedBlock(&r.filter, nil /* transform */)
}

func (r *Reader) readRangeDel() (block, error) {
// Fast-path for retrieving the block from a weak cache handle.
r.rangeDel.mu.RLock()
var b []byte
if r.rangeDel.handle != nil {
b = r.rangeDel.handle.Get()
}
r.rangeDel.mu.RUnlock()
if b != nil {
return b, nil
}

// Slow-path: read the index block from disk. This checks the cache again,
// but that is ok because somebody else might have inserted it for us.
b, h, err := r.readBlock(r.rangeDel.bh)
if err == nil && h != nil {
if !r.rangeDelV2 {
// TODO(peter): if we have a v1 range-del block, convert it on the fly
// and cache the converted version. We just need to create a
// rangedel.Fragmenter and loop over the v1 block and add all of the
// contents. Note that the contents of the v1 block may not be sorted, so
// we'll have to sort them first. We also need to truncate the v1
// tombstones to the sstable boundaries.
}

r.rangeDel.mu.Lock()
r.rangeDel.handle = h
r.rangeDel.mu.Unlock()
}
return b, err
return r.readWeakCachedBlock(&r.rangeDel, r.rangeDelTransform)
}

func (r *Reader) readWeakCachedBlock(w *weakCachedBlock) (block, error) {
func (r *Reader) readWeakCachedBlock(
w *weakCachedBlock, transform blockTransform,
) (block, error) {
// Fast-path for retrieving the block from a weak cache handle.
w.mu.RLock()
var b []byte
Expand All @@ -633,7 +610,7 @@ func (r *Reader) readWeakCachedBlock(w *weakCachedBlock) (block, error) {

// Slow-path: read the index block from disk. This checks the cache again,
// but that is ok because somebody else might have inserted it for us.
b, h, err := r.readBlock(w.bh)
b, h, err := r.readBlock(w.bh, transform)
if err == nil && h != nil {
w.mu.Lock()
w.handle = h
Expand All @@ -643,7 +620,9 @@ func (r *Reader) readWeakCachedBlock(w *weakCachedBlock) (block, error) {
}

// readBlock reads and decompresses a block from disk into memory.
func (r *Reader) readBlock(bh blockHandle) (block, cache.WeakHandle, error) {
func (r *Reader) readBlock(
bh blockHandle, transform blockTransform,
) (block, cache.WeakHandle, error) {
if b := r.cache.Get(r.fileNum, bh.offset); b != nil {
return b, nil, nil
}
Expand All @@ -652,29 +631,82 @@ func (r *Reader) readBlock(bh blockHandle) (block, cache.WeakHandle, error) {
if _, err := r.file.ReadAt(b, int64(bh.offset)); err != nil {
return nil, nil, err
}

checksum0 := binary.LittleEndian.Uint32(b[bh.length+1:])
checksum1 := crc.New(b[:bh.length+1]).Value()
if checksum0 != checksum1 {
return nil, nil, errors.New("pebble/table: invalid table (checksum mismatch)")
}

switch b[bh.length] {
case noCompressionBlockType:
b = b[:bh.length]
h := r.cache.Set(r.fileNum, bh.offset, b)
return b, h, nil
case snappyCompressionBlockType:
b, err := snappy.Decode(nil, b[:bh.length])
var err error
b, err = snappy.Decode(nil, b[:bh.length])
if err != nil {
return nil, nil, err
}
h := r.cache.Set(r.fileNum, bh.offset, b)
return b, h, nil
default:
return nil, nil, fmt.Errorf("pebble/table: unknown block compression: %d", b[bh.length])
}
return nil, nil, fmt.Errorf("pebble/table: unknown block compression: %d", b[bh.length])

if transform != nil {
var err error
b, err = transform(b)
if err != nil {
return nil, nil, err
}
}

h := r.cache.Set(r.fileNum, bh.offset, b)
return b, h, nil
}

func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) {
// Convert v1 (RocksDB format) range-del blocks to v2 blocks on the fly. The
// v1 format range-del blocks have unfragmented and unsorted range
// tombstones. We need properly fragmented and sorted range tombstones in
// order to serve from them directly.
iter := &blockIter{}
if err := iter.init(r.compare, b, r.Properties.GlobalSeqNum); err != nil {
return nil, err
}
var tombstones []rangedel.Tombstone
for key, value := iter.First(); key != nil; key, value = iter.Next() {
t := rangedel.Tombstone{
Start: *key,
End: value,
}
tombstones = append(tombstones, t)
}
rangedel.Sort(r.compare, tombstones)

// Fragment the tombstones, outputting them directly to a block writer.
rangeDelBlock := blockWriter{
restartInterval: 1,
}
frag := rangedel.Fragmenter{
Cmp: r.compare,
Emit: func(fragmented []rangedel.Tombstone) {
for i := range fragmented {
t := &fragmented[i]
rangeDelBlock.add(t.Start, t.End)
}
},
}
for i := range tombstones {
t := &tombstones[i]
frag.Add(t.Start, t.End)
}
frag.Finish()

// Return the contents of the constructed v2 format range-del block.
return rangeDelBlock.finish(), nil
}

func (r *Reader) readMetaindex(metaindexBH blockHandle, o *Options) error {
b, _, err := r.readBlock(metaindexBH)
b, _, err := r.readBlock(metaindexBH, nil /* transform */)
if err != nil {
return err
}
Expand All @@ -696,7 +728,7 @@ func (r *Reader) readMetaindex(metaindexBH blockHandle, o *Options) error {
}

if bh, ok := meta[metaPropertiesName]; ok {
b, _, err = r.readBlock(bh)
b, _, err = r.readBlock(bh, nil /* transform */)
if err != nil {
return err
}
Expand All @@ -707,9 +739,9 @@ func (r *Reader) readMetaindex(metaindexBH blockHandle, o *Options) error {

if bh, ok := meta[metaRangeDelV2Name]; ok {
r.rangeDel.bh = bh
r.rangeDelV2 = true
} else if bh, ok := meta[metaRangeDelName]; ok {
r.rangeDel.bh = bh
r.rangeDelTransform = r.transformRangeDelV1
}

for level := range r.opts.Levels {
Expand Down
Loading

0 comments on commit ce8e5ee

Please sign in to comment.