Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sstable: add suffix replacement block transform #1314

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions internal/base/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ const (
// necessary because sstable boundaries are inclusive, while the end key of a
// range deletion tombstone is exclusive.
InternalKeyRangeDeleteSentinel = (InternalKeySeqNumMax << 8) | uint64(InternalKeyKindRangeDelete)

// InternalKeyTrailerLength is the length in bytes of the internal key
// trailer encoding the key kind and sequence number.
InternalKeyTrailerLength = 8
)

var internalKeyKindNames = []string{
Expand Down Expand Up @@ -179,7 +183,7 @@ func ParseInternalKey(s string) InternalKey {

// DecodeInternalKey decodes an encoded internal key. See InternalKey.Encode().
func DecodeInternalKey(encodedKey []byte) InternalKey {
n := len(encodedKey) - 8
n := len(encodedKey) - InternalKeyTrailerLength
var trailer uint64
if n >= 0 {
trailer = binary.LittleEndian.Uint64(encodedKey[n:])
Expand Down Expand Up @@ -220,8 +224,8 @@ func (k InternalKey) Encode(buf []byte) {
}

// EncodeTrailer returns the trailer encoded to an 8-byte array.
func (k InternalKey) EncodeTrailer() [8]byte {
var buf [8]byte
func (k InternalKey) EncodeTrailer() [InternalKeyTrailerLength]byte {
var buf [InternalKeyTrailerLength]byte
binary.LittleEndian.PutUint64(buf[:], k.Trailer)
return buf
}
Expand Down Expand Up @@ -265,7 +269,7 @@ func (k InternalKey) Successor(cmp Compare, succ Successor, buf []byte) Internal

// Size returns the encoded size of the key.
func (k InternalKey) Size() int {
return len(k.UserKey) + 8
return len(k.UserKey) + InternalKeyTrailerLength
}

// SetSeqNum sets the sequence number component of the key.
Expand Down
31 changes: 21 additions & 10 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func uvarintLen(v uint32) int {
}

type blockWriter struct {
// split, if set, will prevent sharing any portion of the key
// after the split point.
split Split
restartInterval int
nEntries int
nextRestart int
Expand All @@ -39,13 +42,19 @@ func (w *blockWriter) store(keySize int, value []byte) {
w.nextRestart = w.nEntries + w.restartInterval
w.restarts = append(w.restarts, uint32(len(w.buf)))
} else {
// TODO(peter): Manually inlined version of base.SharedPrefixLen(). This
// is 3% faster on BenchmarkWriter on go1.16. Remove if future versions
// show this to not be a performance win. For now, functions that use of
// unsafe cannot be inlined.
n := len(w.curKey)
if n > len(w.prevKey) {
n = len(w.prevKey)
// This is adapted from base.SharedPrefixLen. Unlike SharedPrefixLen,
// this version prevents the sharing of any suffix indicated by Split.
var n int
if w.split != nil {
n = w.split(w.curKey)
if p := w.split(w.prevKey); n > p {
n = p
}
} else {
n = len(w.curKey)
if n > len(w.prevKey) {
n = len(w.prevKey)
}
}
asUint64 := func(b []byte, i int) uint64 {
return binary.LittleEndian.Uint64(b[i:])
Expand Down Expand Up @@ -225,6 +234,8 @@ type blockIter struct {
key []byte
// fullKey is a buffer used for key prefix decompression.
fullKey []byte
// unsharedKey is the unshared suffix of key.
unsharedKey []byte
// val contains the value the iterator is currently pointed at. If non-nil,
// this points to a slice of the block data.
val []byte
Expand Down Expand Up @@ -392,14 +403,14 @@ func (i *blockIter) readEntry() {
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}

unsharedKey := getBytes(ptr, int(unshared))
i.fullKey = append(i.fullKey[:shared], unsharedKey...)
i.unsharedKey = getBytes(ptr, int(unshared))
i.fullKey = append(i.fullKey[:shared], i.unsharedKey...)
if shared == 0 {
// Provide stability for the key across positioning calls if the key
// doesn't share a prefix with the previous key. This removes requiring the
// key to be copied if the caller knows the block has a restart interval of
// 1. An important example of this is range-del blocks.
i.key = unsharedKey
i.key = i.unsharedKey
} else {
i.key = i.fullKey
}
Expand Down
60 changes: 44 additions & 16 deletions sstable/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,50 @@ func TestBlockWriter(t *testing.T) {
return InternalKey{UserKey: []byte(s)}
}

w := &rawBlockWriter{
blockWriter: blockWriter{restartInterval: 16},
}
w.add(ikey("apple"), nil)
w.add(ikey("apricot"), nil)
w.add(ikey("banana"), nil)
block := w.finish()

expected := []byte(
"\x00\x05\x00apple" +
"\x02\x05\x00ricot" +
"\x00\x06\x00banana" +
"\x00\x00\x00\x00\x01\x00\x00\x00")
if !bytes.Equal(expected, block) {
t.Fatalf("expected\n%q\nfound\n%q", expected, block)
}
t.Run("simple", func(t *testing.T) {
w := &rawBlockWriter{
blockWriter: blockWriter{restartInterval: 16},
}
w.add(ikey("apple"), nil)
w.add(ikey("apricot"), nil)
w.add(ikey("banana"), nil)
block := w.finish()

expected := []byte(
"\x00\x05\x00apple" +
"\x02\x05\x00ricot" +
"\x00\x06\x00banana" +
"\x00\x00\x00\x00\x01\x00\x00\x00")
if !bytes.Equal(expected, block) {
t.Fatalf("expected\n%q\nfound\n%q", expected, block)
}
})
t.Run("split-prevents-key-sharing", func(t *testing.T) {
w := &rawBlockWriter{
blockWriter: blockWriter{
restartInterval: 16,
split: func(k []byte) int {
i := bytes.IndexByte(k, ' ')
if i < 0 {
return len(k)
}
return i
},
},
}
w.add(ikey("hello world"), nil)
w.add(ikey("hello worms"), nil)
w.add(ikey("hello worts"), nil)
block := w.finish()
expected := []byte(
"\x00\x0b\x00hello world" +
"\x05\x06\x00 worms" +
"\x05\x06\x00 worts" +
"\x00\x00\x00\x00\x01\x00\x00\x00")
if !bytes.Equal(expected, block) {
t.Fatalf("expected\n%q\nfound\n%q", expected, block)
}
})
}

func TestInvalidInternalKeyDecoding(t *testing.T) {
Expand Down
25 changes: 25 additions & 0 deletions sstable/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,31 @@ type WriterOptions struct {
// with the value stored in the sstable when it was written.
MergerName string

// SuffixPlaceholder enables key suffix replacement. When
// SuffixPlaceholder is set, Comparer.Split must be set as well.
// Suffix replacement only occurs within the suffix after the index
// returned by Split. Additionally, a suffix placeholder prevents
// key sharing after the index returned by Split. It also causes the
// Writer to ignore the comparer's Successor and Separator
// implementations, using identity functions instead.
//
// The placeholder suffix must produce a valid key. Additionally the
// suffix must be unused and never used as a replacement suffix
// value.
//
// When a SuffixPlaceholder is set, all added keys must contain a a
// suffix exactly equal to the SuffixPlaceholder. That is,
// k[Split(k)] must equal SuffixPlaceholder.
//
// When configured with a SuffixPlaceholder, Writer includes a
// SuffixReplacement table property indicating the placeholder and the
// intent to replace. A SSTable created with a non-empty SuffixPlaceholder
// cannot be read by Reader until modified by ReplaceSuffix to embed a
// replacement suffix.
//
// Range tombstones are not subject to suffix replacement.
SuffixPlaceholder []byte

// TableFormat specifies the format version for writing sstables. The default
// is TableFormatRocksDBv2 which creates RocksDB compatible sstables. Use
// TableFormatLevelDB to create LevelDB compatible sstable which can be used
Expand Down
46 changes: 44 additions & 2 deletions sstable/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

const propertiesBlockRestartInterval = math.MaxInt32
const propGlobalSeqnumName = "rocksdb.external_sst_file.global_seqno"
const propSuffixReplacementName = "pebble.suffix_replacement"

var propTagMap = make(map[string]reflect.StructField)
var propBoolTrue = []byte{'1'}
Expand All @@ -43,6 +44,12 @@ func init() {
case reflect.Uint32:
case reflect.Uint64:
case reflect.String:
case reflect.Slice:
switch f.Type.Elem().Kind() {
case reflect.Uint8:
default:
panic(fmt.Sprintf("unsupported property field type: %s %s", f.Name, f.Type))
}
default:
panic(fmt.Sprintf("unsupported property field type: %s %s", f.Name, f.Type))
}
Expand Down Expand Up @@ -128,6 +135,18 @@ type Properties struct {
RawKeySize uint64 `prop:"rocksdb.raw.key.size"`
// Total raw value size.
RawValueSize uint64 `prop:"rocksdb.raw.value.size"`
// SuffixReplacement configures sstable readers to apply a block
// transform replacing a configured placeholder key suffix with a
// new suffix of the same length. The property value must be an even
// length, containing first the placeholder suffix followed by the
// replacement suffix.
//
// For example a property value `aaabbb` instructs readers to transform
// blocks, replacing any instances of the key suffix `aaa` with `bbb`.
//
// See WriterOptions.SuffixPlaceholder and ReplaceSuffix for
// additional documentation.
SuffixReplacement []byte `prop:"pebble.suffix_replacement"`
// Size of the top-level index if kTwoLevelIndexSearch is used.
TopLevelIndexSize uint64 `prop:"rocksdb.top-level.index.size"`
// User collected properties.
Expand Down Expand Up @@ -159,8 +178,7 @@ func (p *Properties) String() string {
}

f := v.Field(i)
// TODO(peter): Use f.IsZero() when we can rely on go1.13.
if zero := reflect.Zero(f.Type()); zero.Interface() == f.Interface() {
if f.IsZero() {
// Skip printing of zero values which were not loaded from disk.
if _, ok := p.Loaded[ft.Offset]; !ok {
continue
Expand All @@ -180,6 +198,13 @@ func (p *Properties) String() string {
} else {
fmt.Fprintf(&buf, "%d\n", f.Uint())
}
case reflect.Slice:
switch ft.Type.Elem().Kind() {
case reflect.Uint8:
fmt.Fprintf(&buf, "%x\n", f.Bytes())
default:
panic("not reached")
}
case reflect.String:
fmt.Fprintf(&buf, "%s\n", f.String())
default:
Expand Down Expand Up @@ -222,6 +247,16 @@ func (p *Properties) load(b block, blockOffset uint64) error {
n, _ = binary.Uvarint(i.Value())
}
field.SetUint(n)
case reflect.Slice:
switch f.Type.Elem().Kind() {
case reflect.Uint8:
unsafeValue := i.Value()
b := make([]byte, len(unsafeValue))
copy(b, unsafeValue)
field.SetBytes(b)
default:
panic("not reached")
}
case reflect.String:
field.SetString(intern.Bytes(i.Value()))
default:
Expand Down Expand Up @@ -268,6 +303,10 @@ func (p *Properties) saveString(m map[string][]byte, offset uintptr, value strin
m[propOffsetTagMap[offset]] = []byte(value)
}

func (p *Properties) saveBytes(m map[string][]byte, offset uintptr, value []byte) {
m[propOffsetTagMap[offset]] = value
}

func (p *Properties) save(w *rawBlockWriter) {
m := make(map[string][]byte)
for k, v := range p.UserProperties {
Expand Down Expand Up @@ -328,6 +367,9 @@ func (p *Properties) save(w *rawBlockWriter) {
}
p.saveUvarint(m, unsafe.Offsetof(p.RawKeySize), p.RawKeySize)
p.saveUvarint(m, unsafe.Offsetof(p.RawValueSize), p.RawValueSize)
if len(p.SuffixReplacement) > 0 {
p.saveBytes(m, unsafe.Offsetof(p.SuffixReplacement), p.SuffixReplacement)
}
p.saveBool(m, unsafe.Offsetof(p.WholeKeyFiltering), p.WholeKeyFiltering)

keys := make([]string, 0, len(m))
Expand Down
8 changes: 8 additions & 0 deletions sstable/raw_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func (i *rawBlockIter) readEntry() {
i.nextOffset = int32(uintptr(ptr)-uintptr(i.ptr)) + int32(value)
}

func (i *rawBlockIter) valueLocation() (offset, length uint32) {
ptr := unsafe.Pointer(uintptr(i.ptr) + uintptr(i.offset))
_ /*shared*/, ptr = decodeVarint(ptr)
unshared, ptr := decodeVarint(ptr)
value, ptr := decodeVarint(ptr)
return uint32(uintptr(ptr) + uintptr(unshared) - uintptr(i.ptr) - uintptr(i.offset)), value
}

func (i *rawBlockIter) loadEntry() {
i.readEntry()
i.ikey.UserKey = i.key
Expand Down
Loading