Skip to content

Commit

Permalink
sstable: add suffix replacement block transform
Browse files Browse the repository at this point in the history
In CockroachDB, there exist processes that build and ingest sstables.
These sstables have timestamp-suffixed MVCC keys. Today, these keys'
timestamps are dated in the past and rewrite history. This rewriting
violates invariants in parts of the system. We would like to support
ingesting these sstables with recent, invariant-maintaing MVCC
timestamps. However, ingestion is used during bulk operations, and
rewriting large sstables' keys with a recent MVCC timestamp is
infeasibly expensive.

This change introduces a facility for constructing an sstable with a
placeholder suffix. When using this facility, a caller specifies a
SuffixPlaceholder write option. The caller is also required to configure
a Comparer that contains a non-nil Split function. When configured with
a suffix placeholder, the sstable writer requires that all keys'
suffixes (as determined by Split) exactly match the provided
SuffixPlaceholder. An sstable constructed in this fashion is still
incomplete and unable to be read unless explicitly permitted through the
AllowUnreplacedSuffix option.

When a caller would like to complete an sstable constructed with a
suffix placeholder, they may call ReplaceSuffix providing the
original placeholder value and the replacement value. The placeholder
and replacement values are required to be equal lengths. ReplaceSuffix
performs an O(1) write to record the replacement value.

After a suffix replacement the resulting sstable is complete, and
sstable readers may read the sstable. Readers detect the sstable
property and apply a block transform to replace suffix placeholders with
the replacement value on the fly as blocks are loaded.

Informs cockroachdb/cockroach#70422.
  • Loading branch information
jbowens committed Oct 13, 2021
1 parent cdbcfe9 commit 52acaa1
Show file tree
Hide file tree
Showing 12 changed files with 573 additions and 89 deletions.
12 changes: 8 additions & 4 deletions internal/base/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ const (
// necessary because sstable boundaries are inclusive, while the end key of a
// range deletion tombstone is exclusive.
InternalKeyRangeDeleteSentinel = (InternalKeySeqNumMax << 8) | uint64(InternalKeyKindRangeDelete)

// InternalKeyTrailerLength is the length in bytes of the internal key
// trailer encoding the key kind and sequence number.
InternalKeyTrailerLength = 8
)

var internalKeyKindNames = []string{
Expand Down Expand Up @@ -179,7 +183,7 @@ func ParseInternalKey(s string) InternalKey {

// DecodeInternalKey decodes an encoded internal key. See InternalKey.Encode().
func DecodeInternalKey(encodedKey []byte) InternalKey {
n := len(encodedKey) - 8
n := len(encodedKey) - InternalKeyTrailerLength
var trailer uint64
if n >= 0 {
trailer = binary.LittleEndian.Uint64(encodedKey[n:])
Expand Down Expand Up @@ -220,8 +224,8 @@ func (k InternalKey) Encode(buf []byte) {
}

// EncodeTrailer returns the trailer encoded to an 8-byte array.
func (k InternalKey) EncodeTrailer() [8]byte {
var buf [8]byte
func (k InternalKey) EncodeTrailer() [InternalKeyTrailerLength]byte {
var buf [InternalKeyTrailerLength]byte
binary.LittleEndian.PutUint64(buf[:], k.Trailer)
return buf
}
Expand Down Expand Up @@ -265,7 +269,7 @@ func (k InternalKey) Successor(cmp Compare, succ Successor, buf []byte) Internal

// Size returns the encoded size of the key.
func (k InternalKey) Size() int {
return len(k.UserKey) + 8
return len(k.UserKey) + InternalKeyTrailerLength
}

// SetSeqNum sets the sequence number component of the key.
Expand Down
31 changes: 21 additions & 10 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ func uvarintLen(v uint32) int {
}

type blockWriter struct {
// split, if set, will prevent sharing any portion of the key
// after the split point.
split Split
restartInterval int
nEntries int
nextRestart int
Expand All @@ -39,13 +42,19 @@ func (w *blockWriter) store(keySize int, value []byte) {
w.nextRestart = w.nEntries + w.restartInterval
w.restarts = append(w.restarts, uint32(len(w.buf)))
} else {
// TODO(peter): Manually inlined version of base.SharedPrefixLen(). This
// is 3% faster on BenchmarkWriter on go1.16. Remove if future versions
// show this to not be a performance win. For now, functions that use of
// unsafe cannot be inlined.
n := len(w.curKey)
if n > len(w.prevKey) {
n = len(w.prevKey)
// This is adapted from base.SharedPrefixLen. Unlike SharedPrefixLen,
// this version prevents the sharing of any suffix indicated by Split.
var n int
if w.split != nil {
n = w.split(w.curKey)
if p := w.split(w.prevKey); n > p {
n = p
}
} else {
n = len(w.curKey)
if n > len(w.prevKey) {
n = len(w.prevKey)
}
}
asUint64 := func(b []byte, i int) uint64 {
return binary.LittleEndian.Uint64(b[i:])
Expand Down Expand Up @@ -225,6 +234,8 @@ type blockIter struct {
key []byte
// fullKey is a buffer used for key prefix decompression.
fullKey []byte
// unsharedKey is the unshared suffix of key.
unsharedKey []byte
// val contains the value the iterator is currently pointed at. If non-nil,
// this points to a slice of the block data.
val []byte
Expand Down Expand Up @@ -392,14 +403,14 @@ func (i *blockIter) readEntry() {
ptr = unsafe.Pointer(uintptr(ptr) + 5)
}

unsharedKey := getBytes(ptr, int(unshared))
i.fullKey = append(i.fullKey[:shared], unsharedKey...)
i.unsharedKey = getBytes(ptr, int(unshared))
i.fullKey = append(i.fullKey[:shared], i.unsharedKey...)
if shared == 0 {
// Provide stability for the key across positioning calls if the key
// doesn't share a prefix with the previous key. This removes requiring the
// key to be copied if the caller knows the block has a restart interval of
// 1. An important example of this is range-del blocks.
i.key = unsharedKey
i.key = i.unsharedKey
} else {
i.key = i.fullKey
}
Expand Down
60 changes: 44 additions & 16 deletions sstable/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,50 @@ func TestBlockWriter(t *testing.T) {
return InternalKey{UserKey: []byte(s)}
}

w := &rawBlockWriter{
blockWriter: blockWriter{restartInterval: 16},
}
w.add(ikey("apple"), nil)
w.add(ikey("apricot"), nil)
w.add(ikey("banana"), nil)
block := w.finish()

expected := []byte(
"\x00\x05\x00apple" +
"\x02\x05\x00ricot" +
"\x00\x06\x00banana" +
"\x00\x00\x00\x00\x01\x00\x00\x00")
if !bytes.Equal(expected, block) {
t.Fatalf("expected\n%q\nfound\n%q", expected, block)
}
t.Run("simple", func(t *testing.T) {
w := &rawBlockWriter{
blockWriter: blockWriter{restartInterval: 16},
}
w.add(ikey("apple"), nil)
w.add(ikey("apricot"), nil)
w.add(ikey("banana"), nil)
block := w.finish()

expected := []byte(
"\x00\x05\x00apple" +
"\x02\x05\x00ricot" +
"\x00\x06\x00banana" +
"\x00\x00\x00\x00\x01\x00\x00\x00")
if !bytes.Equal(expected, block) {
t.Fatalf("expected\n%q\nfound\n%q", expected, block)
}
})
t.Run("split-prevents-key-sharing", func(t *testing.T) {
w := &rawBlockWriter{
blockWriter: blockWriter{
restartInterval: 16,
split: func(k []byte) int {
i := bytes.IndexByte(k, ' ')
if i < 0 {
return len(k)
}
return i
},
},
}
w.add(ikey("hello world"), nil)
w.add(ikey("hello worms"), nil)
w.add(ikey("hello worts"), nil)
block := w.finish()
expected := []byte(
"\x00\x0b\x00hello world" +
"\x05\x06\x00 worms" +
"\x05\x06\x00 worts" +
"\x00\x00\x00\x00\x01\x00\x00\x00")
if !bytes.Equal(expected, block) {
t.Fatalf("expected\n%q\nfound\n%q", expected, block)
}
})
}

func TestInvalidInternalKeyDecoding(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions sstable/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,26 @@ type WriterOptions struct {
// with the value stored in the sstable when it was written.
MergerName string

// SuffixPlaceholder enables key suffix replacement. When
// SuffixPlaceholder is set, Comparer.Split must be set as well.
// Suffix replacement only occurs within the suffix after the index
// returned by Split. Additionally, a suffix placeholder prevents
// key sharing after the index returned by Split.
//
// When a SuffixPlaceholder is set, all added keys that contain a
// prefix must have a suffix exactly equal to the SuffixPlaceholder.
// That is, if Split(k) < len(k), then k[Split(k):] must equal
// SuffixPlaceholder.
//
// When configured with a SuffixPlaceholder, Writer includes a
// SuffixReplacement table property indicating the placeholder and the
// intent to replace. A SSTable created with a non-empty SuffixPlaceholder
// cannot be read by Reader until modified by ReplaceSuffix to embed a
// replacement suffix.
//
// Range tombstones are not subject to suffix replacement.
SuffixPlaceholder []byte

// TableFormat specifies the format version for writing sstables. The default
// is TableFormatRocksDBv2 which creates RocksDB compatible sstables. Use
// TableFormatLevelDB to create LevelDB compatible sstable which can be used
Expand Down
46 changes: 44 additions & 2 deletions sstable/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

const propertiesBlockRestartInterval = math.MaxInt32
const propGlobalSeqnumName = "rocksdb.external_sst_file.global_seqno"
const propSuffixReplacementName = "pebble.suffix_replacement"

var propTagMap = make(map[string]reflect.StructField)
var propBoolTrue = []byte{'1'}
Expand All @@ -43,6 +44,12 @@ func init() {
case reflect.Uint32:
case reflect.Uint64:
case reflect.String:
case reflect.Slice:
switch f.Type.Elem().Kind() {
case reflect.Uint8:
default:
panic(fmt.Sprintf("unsupported property field type: %s %s", f.Name, f.Type))
}
default:
panic(fmt.Sprintf("unsupported property field type: %s %s", f.Name, f.Type))
}
Expand Down Expand Up @@ -128,6 +135,18 @@ type Properties struct {
RawKeySize uint64 `prop:"rocksdb.raw.key.size"`
// Total raw value size.
RawValueSize uint64 `prop:"rocksdb.raw.value.size"`
// SuffixReplacement configures sstable readers to apply a block
// transform replacing a configured placeholder key suffix with a
// new suffix of the same length. The property value must be an even
// length, containing first the placeholder suffix followed by the
// replacement suffix.
//
// For example a property value `aaabbb` instructs readers to transform
// blocks, replacing any instances of the key suffix `aaa` with `bbb`.
//
// See WriterOptions.SuffixPlaceholder and ReplaceSuffix for
// additional documentation.
SuffixReplacement []byte `prop:"pebble.suffix_replacement"`
// Size of the top-level index if kTwoLevelIndexSearch is used.
TopLevelIndexSize uint64 `prop:"rocksdb.top-level.index.size"`
// User collected properties.
Expand Down Expand Up @@ -159,8 +178,7 @@ func (p *Properties) String() string {
}

f := v.Field(i)
// TODO(peter): Use f.IsZero() when we can rely on go1.13.
if zero := reflect.Zero(f.Type()); zero.Interface() == f.Interface() {
if f.IsZero() {
// Skip printing of zero values which were not loaded from disk.
if _, ok := p.Loaded[ft.Offset]; !ok {
continue
Expand All @@ -180,6 +198,13 @@ func (p *Properties) String() string {
} else {
fmt.Fprintf(&buf, "%d\n", f.Uint())
}
case reflect.Slice:
switch ft.Type.Elem().Kind() {
case reflect.Uint8:
fmt.Fprintf(&buf, "%x\n", f.Bytes())
default:
panic("not reached")
}
case reflect.String:
fmt.Fprintf(&buf, "%s\n", f.String())
default:
Expand Down Expand Up @@ -222,6 +247,16 @@ func (p *Properties) load(b block, blockOffset uint64) error {
n, _ = binary.Uvarint(i.Value())
}
field.SetUint(n)
case reflect.Slice:
switch f.Type.Elem().Kind() {
case reflect.Uint8:
unsafeValue := i.Value()
b := make([]byte, len(unsafeValue))
copy(b, unsafeValue)
field.SetBytes(b)
default:
panic("not reached")
}
case reflect.String:
field.SetString(intern.Bytes(i.Value()))
default:
Expand Down Expand Up @@ -268,6 +303,10 @@ func (p *Properties) saveString(m map[string][]byte, offset uintptr, value strin
m[propOffsetTagMap[offset]] = []byte(value)
}

func (p *Properties) saveBytes(m map[string][]byte, offset uintptr, value []byte) {
m[propOffsetTagMap[offset]] = value
}

func (p *Properties) save(w *rawBlockWriter) {
m := make(map[string][]byte)
for k, v := range p.UserProperties {
Expand Down Expand Up @@ -328,6 +367,9 @@ func (p *Properties) save(w *rawBlockWriter) {
}
p.saveUvarint(m, unsafe.Offsetof(p.RawKeySize), p.RawKeySize)
p.saveUvarint(m, unsafe.Offsetof(p.RawValueSize), p.RawValueSize)
if len(p.SuffixReplacement) > 0 {
p.saveBytes(m, unsafe.Offsetof(p.SuffixReplacement), p.SuffixReplacement)
}
p.saveBool(m, unsafe.Offsetof(p.WholeKeyFiltering), p.WholeKeyFiltering)

keys := make([]string, 0, len(m))
Expand Down
8 changes: 8 additions & 0 deletions sstable/raw_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func (i *rawBlockIter) readEntry() {
i.nextOffset = int32(uintptr(ptr)-uintptr(i.ptr)) + int32(value)
}

func (i *rawBlockIter) valueLocation() (offset, length uint32) {
ptr := unsafe.Pointer(uintptr(i.ptr) + uintptr(i.offset))
_ /*shared*/, ptr = decodeVarint(ptr)
unshared, ptr := decodeVarint(ptr)
value, ptr := decodeVarint(ptr)
return uint32(uintptr(ptr) + uintptr(unshared) - uintptr(i.ptr) - uintptr(i.offset)), value
}

func (i *rawBlockIter) loadEntry() {
i.readEntry()
i.ikey.UserKey = i.key
Expand Down
Loading

0 comments on commit 52acaa1

Please sign in to comment.