diff --git a/internal/base/internal.go b/internal/base/internal.go index c3f90caa4c..5b48cb3757 100644 --- a/internal/base/internal.go +++ b/internal/base/internal.go @@ -78,6 +78,10 @@ const ( // necessary because sstable boundaries are inclusive, while the end key of a // range deletion tombstone is exclusive. InternalKeyRangeDeleteSentinel = (InternalKeySeqNumMax << 8) | uint64(InternalKeyKindRangeDelete) + + // InternalKeyTrailerLength is the length in bytes of the internal key + // trailer encoding the key kind and sequence number. + InternalKeyTrailerLength = 8 ) var internalKeyKindNames = []string{ @@ -179,7 +183,7 @@ func ParseInternalKey(s string) InternalKey { // DecodeInternalKey decodes an encoded internal key. See InternalKey.Encode(). func DecodeInternalKey(encodedKey []byte) InternalKey { - n := len(encodedKey) - 8 + n := len(encodedKey) - InternalKeyTrailerLength var trailer uint64 if n >= 0 { trailer = binary.LittleEndian.Uint64(encodedKey[n:]) @@ -220,8 +224,8 @@ func (k InternalKey) Encode(buf []byte) { } // EncodeTrailer returns the trailer encoded to an 8-byte array. -func (k InternalKey) EncodeTrailer() [8]byte { - var buf [8]byte +func (k InternalKey) EncodeTrailer() [InternalKeyTrailerLength]byte { + var buf [InternalKeyTrailerLength]byte binary.LittleEndian.PutUint64(buf[:], k.Trailer) return buf } @@ -265,7 +269,7 @@ func (k InternalKey) Successor(cmp Compare, succ Successor, buf []byte) Internal // Size returns the encoded size of the key. func (k InternalKey) Size() int { - return len(k.UserKey) + 8 + return len(k.UserKey) + InternalKeyTrailerLength } // SetSeqNum sets the sequence number component of the key. diff --git a/sstable/block.go b/sstable/block.go index 8ef57da2d3..9319c22842 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -22,6 +22,9 @@ func uvarintLen(v uint32) int { } type blockWriter struct { + // split, if set, will prevent sharing any portion of the key + // after the split point. + split Split restartInterval int nEntries int nextRestart int @@ -39,13 +42,19 @@ func (w *blockWriter) store(keySize int, value []byte) { w.nextRestart = w.nEntries + w.restartInterval w.restarts = append(w.restarts, uint32(len(w.buf))) } else { - // TODO(peter): Manually inlined version of base.SharedPrefixLen(). This - // is 3% faster on BenchmarkWriter on go1.16. Remove if future versions - // show this to not be a performance win. For now, functions that use of - // unsafe cannot be inlined. - n := len(w.curKey) - if n > len(w.prevKey) { - n = len(w.prevKey) + // This is adapted from base.SharedPrefixLen. Unlike SharedPrefixLen, + // this version prevents the sharing of any suffix indicated by Split. + var n int + if w.split != nil { + n = w.split(w.curKey) + if p := w.split(w.prevKey); n > p { + n = p + } + } else { + n = len(w.curKey) + if n > len(w.prevKey) { + n = len(w.prevKey) + } } asUint64 := func(b []byte, i int) uint64 { return binary.LittleEndian.Uint64(b[i:]) @@ -225,6 +234,8 @@ type blockIter struct { key []byte // fullKey is a buffer used for key prefix decompression. fullKey []byte + // unsharedKey is the unshared suffix of key. + unsharedKey []byte // val contains the value the iterator is currently pointed at. If non-nil, // this points to a slice of the block data. val []byte @@ -392,14 +403,14 @@ func (i *blockIter) readEntry() { ptr = unsafe.Pointer(uintptr(ptr) + 5) } - unsharedKey := getBytes(ptr, int(unshared)) - i.fullKey = append(i.fullKey[:shared], unsharedKey...) + i.unsharedKey = getBytes(ptr, int(unshared)) + i.fullKey = append(i.fullKey[:shared], i.unsharedKey...) if shared == 0 { // Provide stability for the key across positioning calls if the key // doesn't share a prefix with the previous key. This removes requiring the // key to be copied if the caller knows the block has a restart interval of // 1. An important example of this is range-del blocks. - i.key = unsharedKey + i.key = i.unsharedKey } else { i.key = i.fullKey } diff --git a/sstable/block_test.go b/sstable/block_test.go index 4fd18e6bd0..64c0c74c87 100644 --- a/sstable/block_test.go +++ b/sstable/block_test.go @@ -24,22 +24,50 @@ func TestBlockWriter(t *testing.T) { return InternalKey{UserKey: []byte(s)} } - w := &rawBlockWriter{ - blockWriter: blockWriter{restartInterval: 16}, - } - w.add(ikey("apple"), nil) - w.add(ikey("apricot"), nil) - w.add(ikey("banana"), nil) - block := w.finish() - - expected := []byte( - "\x00\x05\x00apple" + - "\x02\x05\x00ricot" + - "\x00\x06\x00banana" + - "\x00\x00\x00\x00\x01\x00\x00\x00") - if !bytes.Equal(expected, block) { - t.Fatalf("expected\n%q\nfound\n%q", expected, block) - } + t.Run("simple", func(t *testing.T) { + w := &rawBlockWriter{ + blockWriter: blockWriter{restartInterval: 16}, + } + w.add(ikey("apple"), nil) + w.add(ikey("apricot"), nil) + w.add(ikey("banana"), nil) + block := w.finish() + + expected := []byte( + "\x00\x05\x00apple" + + "\x02\x05\x00ricot" + + "\x00\x06\x00banana" + + "\x00\x00\x00\x00\x01\x00\x00\x00") + if !bytes.Equal(expected, block) { + t.Fatalf("expected\n%q\nfound\n%q", expected, block) + } + }) + t.Run("split-prevents-key-sharing", func(t *testing.T) { + w := &rawBlockWriter{ + blockWriter: blockWriter{ + restartInterval: 16, + split: func(k []byte) int { + i := bytes.IndexByte(k, ' ') + if i < 0 { + return len(k) + } + return i + }, + }, + } + w.add(ikey("hello world"), nil) + w.add(ikey("hello worms"), nil) + w.add(ikey("hello worts"), nil) + block := w.finish() + expected := []byte( + "\x00\x0b\x00hello world" + + "\x05\x06\x00 worms" + + "\x05\x06\x00 worts" + + "\x00\x00\x00\x00\x01\x00\x00\x00") + if !bytes.Equal(expected, block) { + t.Fatalf("expected\n%q\nfound\n%q", expected, block) + } + }) } func TestInvalidInternalKeyDecoding(t *testing.T) { diff --git a/sstable/options.go b/sstable/options.go index 05805c678f..4c84565104 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -188,6 +188,26 @@ type WriterOptions struct { // with the value stored in the sstable when it was written. MergerName string + // SuffixPlaceholder enables key suffix replacement. When + // SuffixPlaceholder is set, Comparer.Split must be set as well. + // Suffix replacement only occurs within the suffix after the index + // returned by Split. Additionally, a suffix placeholder prevents + // key sharing after the index returned by Split. + // + // When a SuffixPlaceholder is set, all added keys that contain a + // prefix must have a suffix exactly equal to the SuffixPlaceholder. + // That is, if Split(k) < len(k), then k[Split(k):] must equal + // SuffixPlaceholder. + // + // When configured with a SuffixPlaceholder, Writer includes a + // SuffixReplacement table property indicating the placeholder and the + // intent to replace. A SSTable created with a non-empty SuffixPlaceholder + // cannot be read by Reader until modified by ReplaceSuffix to embed a + // replacement suffix. + // + // Range tombstones are not subject to suffix replacement. + SuffixPlaceholder []byte + // TableFormat specifies the format version for writing sstables. The default // is TableFormatRocksDBv2 which creates RocksDB compatible sstables. Use // TableFormatLevelDB to create LevelDB compatible sstable which can be used diff --git a/sstable/properties.go b/sstable/properties.go index c8ff10be8e..6505d64714 100644 --- a/sstable/properties.go +++ b/sstable/properties.go @@ -18,6 +18,7 @@ import ( const propertiesBlockRestartInterval = math.MaxInt32 const propGlobalSeqnumName = "rocksdb.external_sst_file.global_seqno" +const propSuffixReplacementName = "pebble.suffix_replacement" var propTagMap = make(map[string]reflect.StructField) var propBoolTrue = []byte{'1'} @@ -43,6 +44,12 @@ func init() { case reflect.Uint32: case reflect.Uint64: case reflect.String: + case reflect.Slice: + switch f.Type.Elem().Kind() { + case reflect.Uint8: + default: + panic(fmt.Sprintf("unsupported property field type: %s %s", f.Name, f.Type)) + } default: panic(fmt.Sprintf("unsupported property field type: %s %s", f.Name, f.Type)) } @@ -128,6 +135,18 @@ type Properties struct { RawKeySize uint64 `prop:"rocksdb.raw.key.size"` // Total raw value size. RawValueSize uint64 `prop:"rocksdb.raw.value.size"` + // SuffixReplacement configures sstable readers to apply a block + // transform replacing a configured placeholder key suffix with a + // new suffix of the same length. The property value must be an even + // length, containing first the placeholder suffix followed by the + // replacement suffix. + // + // For example a property value `aaabbb` instructs readers to transform + // blocks, replacing any instances of the key suffix `aaa` with `bbb`. + // + // See WriterOptions.SuffixPlaceholder and ReplaceSuffix for + // additional documentation. + SuffixReplacement []byte `prop:"pebble.suffix_replacement"` // Size of the top-level index if kTwoLevelIndexSearch is used. TopLevelIndexSize uint64 `prop:"rocksdb.top-level.index.size"` // User collected properties. @@ -159,8 +178,7 @@ func (p *Properties) String() string { } f := v.Field(i) - // TODO(peter): Use f.IsZero() when we can rely on go1.13. - if zero := reflect.Zero(f.Type()); zero.Interface() == f.Interface() { + if f.IsZero() { // Skip printing of zero values which were not loaded from disk. if _, ok := p.Loaded[ft.Offset]; !ok { continue @@ -180,6 +198,13 @@ func (p *Properties) String() string { } else { fmt.Fprintf(&buf, "%d\n", f.Uint()) } + case reflect.Slice: + switch ft.Type.Elem().Kind() { + case reflect.Uint8: + fmt.Fprintf(&buf, "%x\n", f.Bytes()) + default: + panic("not reached") + } case reflect.String: fmt.Fprintf(&buf, "%s\n", f.String()) default: @@ -222,6 +247,16 @@ func (p *Properties) load(b block, blockOffset uint64) error { n, _ = binary.Uvarint(i.Value()) } field.SetUint(n) + case reflect.Slice: + switch f.Type.Elem().Kind() { + case reflect.Uint8: + unsafeValue := i.Value() + b := make([]byte, len(unsafeValue)) + copy(b, unsafeValue) + field.SetBytes(b) + default: + panic("not reached") + } case reflect.String: field.SetString(intern.Bytes(i.Value())) default: @@ -268,6 +303,10 @@ func (p *Properties) saveString(m map[string][]byte, offset uintptr, value strin m[propOffsetTagMap[offset]] = []byte(value) } +func (p *Properties) saveBytes(m map[string][]byte, offset uintptr, value []byte) { + m[propOffsetTagMap[offset]] = value +} + func (p *Properties) save(w *rawBlockWriter) { m := make(map[string][]byte) for k, v := range p.UserProperties { @@ -328,6 +367,9 @@ func (p *Properties) save(w *rawBlockWriter) { } p.saveUvarint(m, unsafe.Offsetof(p.RawKeySize), p.RawKeySize) p.saveUvarint(m, unsafe.Offsetof(p.RawValueSize), p.RawValueSize) + if len(p.SuffixReplacement) > 0 { + p.saveBytes(m, unsafe.Offsetof(p.SuffixReplacement), p.SuffixReplacement) + } p.saveBool(m, unsafe.Offsetof(p.WholeKeyFiltering), p.WholeKeyFiltering) keys := make([]string, 0, len(m)) diff --git a/sstable/raw_block.go b/sstable/raw_block.go index f83a5d7bff..8485e79ccb 100644 --- a/sstable/raw_block.go +++ b/sstable/raw_block.go @@ -85,6 +85,14 @@ func (i *rawBlockIter) readEntry() { i.nextOffset = int32(uintptr(ptr)-uintptr(i.ptr)) + int32(value) } +func (i *rawBlockIter) valueLocation() (offset, length uint32) { + ptr := unsafe.Pointer(uintptr(i.ptr) + uintptr(i.offset)) + _ /*shared*/, ptr = decodeVarint(ptr) + unshared, ptr := decodeVarint(ptr) + value, ptr := decodeVarint(ptr) + return uint32(uintptr(ptr) + uintptr(unshared) - uintptr(i.ptr) - uintptr(i.offset)), value +} + func (i *rawBlockIter) loadEntry() { i.readEntry() i.ikey.UserKey = i.key diff --git a/sstable/reader.go b/sstable/reader.go index fc060ea6f3..07dd4acb68 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -289,7 +289,7 @@ func (i *singleLevelIterator) loadBlock() bool { i.err = errCorruptIndexEntry return false } - block, err := i.reader.readBlock(i.dataBH, nil /* transform */, &i.dataRS) + block, err := i.reader.readBlock(i.dataBH, i.reader.transformMaybeSwapSuffix, &i.dataRS) if err != nil { i.err = err return false @@ -959,7 +959,7 @@ func (i *twoLevelIterator) loadIndex() bool { i.err = base.CorruptionErrorf("pebble/table: corrupt top level index entry") return false } - indexBlock, err := i.reader.readBlock(h, nil /* transform */, nil /* readaheadState */) + indexBlock, err := i.reader.readBlock(h, i.reader.transformMaybeSwapSuffix, nil /* readaheadState */) if err != nil { i.err = err return false @@ -1681,28 +1681,29 @@ func init() { // Reader is a table reader. type Reader struct { - file ReadableFile - fs vfs.FS - filename string - cacheID uint64 - fileNum base.FileNum - rawTombstones bool - err error - indexBH BlockHandle - filterBH BlockHandle - rangeDelBH BlockHandle - rangeDelTransform blockTransform - propertiesBH BlockHandle - metaIndexBH BlockHandle - footerBH BlockHandle - opts ReaderOptions - Compare Compare - FormatKey base.FormatKey - Split Split - mergerOK bool - checksumType ChecksumType - tableFilter *tableFilterReader - Properties Properties + file ReadableFile + fs vfs.FS + filename string + cacheID uint64 + fileNum base.FileNum + rawTombstones bool + err error + indexBH BlockHandle + filterBH BlockHandle + rangeDelBH BlockHandle + rangeDelTransform blockTransform + propertiesBH BlockHandle + metaIndexBH BlockHandle + footerBH BlockHandle + opts ReaderOptions + Compare Compare + FormatKey base.FormatKey + Split Split + mergerOK bool + unreplacedSuffixOK bool + checksumType ChecksumType + tableFilter *tableFilterReader + Properties Properties } // Close implements DB.Close, as documented in the pebble package. @@ -1847,7 +1848,7 @@ func (r *Reader) NewRawRangeDelIter() (base.InternalIterator, error) { } func (r *Reader) readIndex() (cache.Handle, error) { - return r.readBlock(r.indexBH, nil /* transform */, nil /* readaheadState */) + return r.readBlock(r.indexBH, r.transformMaybeSwapSuffix, nil /* readaheadState */) } func (r *Reader) readFilter() (cache.Handle, error) { @@ -1948,24 +1949,76 @@ func (r *Reader) readBlock( } if transform != nil { + pretransformBlock := b // Transforming blocks is rare, so the extra copy of the transformed data // is not problematic. - var err error - b, err = transform(b) + b, err = transform(pretransformBlock) if err != nil { r.opts.Cache.Free(v) return cache.Handle{}, err } - newV := r.opts.Cache.Alloc(len(b)) - copy(newV.Buf(), b) - r.opts.Cache.Free(v) - v = newV + + // If transform returned a new slice, copy it into cache and + // free the old one. + if len(b) == 0 || len(pretransformBlock) == 0 || &b[0] != &pretransformBlock[0] { + newV := r.opts.Cache.Alloc(len(b)) + copy(newV.Buf(), b) + r.opts.Cache.Free(v) + v = newV + } } h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, v) return h, nil } +// transformMaybeSwapSuffix is a blockTransform that performs a +// replacement of a key suffix. If the sstable's user properties +// contains the suffix replacement property, transformMaybeSwapSuffix +// adjusts all keys with the specified placeholder suffix with the +// indicated replacement value. +func (r *Reader) transformMaybeSwapSuffix(b []byte) ([]byte, error) { + if len(r.Properties.SuffixReplacement) == 0 { + return b, nil + } + + // The suffix replacement property holds the placeholder value + // concatenated with the replacement value. Values are required to + // be equal length. The SuffixReplacement property was already + // validated in when the reader was constructed. + split := len(r.Properties.SuffixReplacement) >> 1 + from, to := r.Properties.SuffixReplacement[:split], r.Properties.SuffixReplacement[split:] + + iter := &blockIter{} + if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum); err != nil { + return nil, err + } + for key, _ := iter.First(); key != nil; key, _ = iter.Next() { + split := r.Split(key.UserKey) + switch { + case split == len(key.UserKey): + // This key has no suffix. Skip it. + continue + case !bytes.Equal(key.UserKey[split:], from): + // This key has a suffix, but it doesn't match the placeholder + // value. The sstable Writer should not have allowed such an + // sstable to be produced, because we cannot know whether or not + // rewriting another key with the same prefix but the placeholder + // suffix might violate the key ordering. + panic(errors.New("pebble: suffix replacement sstable contains non-placeholder suffix")) + default: + // The suffix matches the expected placeholder. Overwrite it with + // the replacement suffix. + dest := len(iter.unsharedKey) - len(from) - base.InternalKeyTrailerLength + if dest < 0 { + panic(errors.New("pebble: suffix placeholder unexpectedly shared between keys")) + } + copy(iter.unsharedKey[dest:], to) + } + } + return b, nil +} + func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { // Convert v1 (RocksDB format) range-del blocks to v2 blocks on the fly. The // v1 format range-del blocks have unfragmented and unsorted range @@ -2184,7 +2237,7 @@ func (r *Reader) ValidateBlockChecksums() error { } // Read the block, which validates the checksum. - h, err := r.readBlock(bh, nil /* transform */, blockRS) + h, err := r.readBlock(bh, r.transformMaybeSwapSuffix, blockRS) if err != nil { return err } @@ -2241,7 +2294,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if n == 0 || n != len(val) { return 0, errCorruptIndexEntry } - startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */, nil /* readaheadState */) + startIdxBlock, err := r.readBlock(startIdxBH, r.transformMaybeSwapSuffix, nil /* readaheadState */) if err != nil { return 0, err } @@ -2261,7 +2314,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if n == 0 || n != len(val) { return 0, errCorruptIndexEntry } - endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */, nil /* readaheadState */) + endIdxBlock, err := r.readBlock(endIdxBH, r.transformMaybeSwapSuffix, nil /* readaheadState */) if err != nil { return 0, err } @@ -2386,6 +2439,29 @@ func NewReader(f ReadableFile, o ReaderOptions, extraOpts ...ReaderOption) (*Rea errors.Safe(r.fileNum), errors.Safe(r.Properties.MergerName)) } } + + if len(r.Properties.SuffixReplacement) > 0 { + // The suffix replacement property holds the placeholder value + // concatenated with the replacement value. Values are required to + // be equal length. + if len(r.Properties.SuffixReplacement)%2 != 0 { + r.err = errors.New("pebble: invalid suffix replacement property: odd length") + return nil, r.Close() + } + split := len(r.Properties.SuffixReplacement) >> 1 + if !r.unreplacedSuffixOK && bytes.Equal(r.Properties.SuffixReplacement[:split], r.Properties.SuffixReplacement[split:]) { + r.err = errors.Newf("pebble: unreplaced suffix") + return nil, r.Close() + } + // Require that r.Split is set. Suffix replacement only occurs after a + // key's Split prefix. + if r.Split == nil { + r.err = errors.Errorf("pebble/table: %d: suffix replacement requires Split, comparer %s omits", + errors.Safe(r.fileNum), errors.Safe(r.Properties.ComparerName)) + return nil, r.Close() + } + } + if r.err != nil { return nil, r.Close() } @@ -2505,7 +2581,7 @@ func (l *Layout) Describe( continue } - h, err := r.readBlock(b.BlockHandle, nil /* transform */, nil /* readaheadState */) + h, err := r.readBlock(b.BlockHandle, r.transformMaybeSwapSuffix, nil /* readaheadState */) if err != nil { fmt.Fprintf(w, " [err: %s]\n", err) continue diff --git a/sstable/writer.go b/sstable/writer.go index 8deb34e4ab..112dff1779 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/pebble/internal/crc" "github.com/cockroachdb/pebble/internal/private" "github.com/cockroachdb/pebble/internal/rangedel" + "github.com/cockroachdb/pebble/vfs" ) // WriterMetadata holds info about a finished sstable. @@ -105,6 +106,7 @@ type Writer struct { tableFormat TableFormat checksumType ChecksumType cache *cache.Cache + suffixPlaceholder []byte // disableKeyOrderChecks disables the checks that keys are added to an // sstable in order. It is intended for internal use only in the construction // of invalid sstables for testing. See tool/make_test_sstables.go. @@ -232,6 +234,14 @@ func (w *Writer) addPoint(key InternalKey, value []byte) error { } } + if len(w.suffixPlaceholder) > 0 { + i := w.split(key.UserKey) + if i < len(key.UserKey) && !bytes.Equal(w.suffixPlaceholder, key.UserKey[i:]) { + w.err = errors.New("pebble: all suffixed keys must contain the suffix placeholder") + return w.err + } + } + if err := w.maybeFlush(key, value); err != nil { return err } @@ -473,6 +483,26 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { return w.writeBlock(w.topLevelIndexBlock.finish(), w.compression) } +func (w *Writer) checksumBlock( + b []byte, blockType []byte, checksumType ChecksumType, +) (uint32, error) { + switch checksumType { + case ChecksumTypeCRC32c: + return crc.New(b).Update(blockType).Value(), nil + case ChecksumTypeXXHash64: + if w.xxHasher == nil { + w.xxHasher = xxhash.New() + } else { + w.xxHasher.Reset() + } + w.xxHasher.Write(b) + w.xxHasher.Write(blockType) + return uint32(w.xxHasher.Sum64()), nil + default: + return 0, errors.Newf("unsupported checksum type: %d", checksumType) + } +} + func (w *Writer) writeBlock(b []byte, compression Compression) (BlockHandle, error) { // Compress the buffer, discarding the result if the improvement isn't at // least 12.5%. @@ -489,21 +519,9 @@ func (w *Writer) writeBlock(b []byte, compression Compression) (BlockHandle, err w.tmp[0] = byte(blockType) // Calculate the checksum. - var checksum uint32 - switch w.checksumType { - case ChecksumTypeCRC32c: - checksum = crc.New(b).Update(w.tmp[:1]).Value() - case ChecksumTypeXXHash64: - if w.xxHasher == nil { - w.xxHasher = xxhash.New() - } else { - w.xxHasher.Reset() - } - w.xxHasher.Write(b) - w.xxHasher.Write(w.tmp[:1]) - checksum = uint32(w.xxHasher.Sum64()) - default: - return BlockHandle{}, errors.Newf("unsupported checksum type: %d", w.checksumType) + checksum, err := w.checksumBlock(b, w.tmp[:1], w.checksumType) + if err != nil { + return BlockHandle{}, err } binary.LittleEndian.PutUint32(w.tmp[1:5], checksum) bh := BlockHandle{w.meta.Size, uint64(len(b))} @@ -734,7 +752,7 @@ func (w *Writer) Metadata() (*WriterMetadata, error) { // WriterOption provide an interface to do work on Writer while it is being // opened. type WriterOption interface { - // writerAPply is called on the writer during opening in order to set + // writerApply is called on the writer during opening in order to set // internal parameters. writerApply(*Writer) } @@ -772,6 +790,7 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * tableFormat: o.TableFormat, checksumType: o.Checksum, cache: o.Cache, + suffixPlaceholder: o.SuffixPlaceholder, block: blockWriter{ restartInterval: o.BlockRestartInterval, }, @@ -823,6 +842,23 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * w.props.PropertyCollectorNames = "[]" w.props.ExternalFormatVersion = rocksDBExternalFormatVersion + if len(o.SuffixPlaceholder) > 0 { + w.props.SuffixReplacement = make([]byte, 2*len(o.SuffixPlaceholder)) + copy(w.props.SuffixReplacement, o.SuffixPlaceholder) + copy(w.props.SuffixReplacement[len(o.SuffixPlaceholder):], o.SuffixPlaceholder) + + // Configure block writers to prevent sharing of suffixes after + // a key's Split point. If SuffixPlaceholder is set, the caller + // must use a Comparer that provides a Split function. + if w.split == nil { + w.err = errors.New("pebble: SuffixPlaceholder requires a comparer Split function") + return w + } + w.block.split = w.split + w.indexBlock.split = w.split + w.topLevelIndexBlock.split = w.split + } + if len(o.TablePropertyCollectors) > 0 { w.propCollectors = make([]TablePropertyCollector, len(o.TablePropertyCollectors)) var buf bytes.Buffer @@ -855,6 +891,110 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * return w } +// AllowUnreplacedSuffix is a ReaderOption that may be passed to +// NewReader to allow reading from an sstable intended for +// suffix-replacement but which does not yet have its suffix written. +// Ordinarily, NewReader will error if there's an attempt to read an +// sstable that supports suffix replacement but does not yet specify a +// replacement suffix. +var AllowUnreplacedSuffix ReaderOption = unreplacedSuffixOK{} + +type unreplacedSuffixOK struct{} + +func (unreplacedSuffixOK) readerApply(r *Reader) { + r.unreplacedSuffixOK = true +} + +// ReplaceSuffix mutates the passed byte slice containing a serialized +// sstable, setting the replacement suffix to the provided value. The +// passed sstable must have been created with a SuffixPlaceholder set on +// the WriterOptions. +// +// ReplaceSuffix errors if the sstable was not created with a +// SuffixPlaceholder, or the existing placeholder does not match +// placeholderSuffix. +// +// Range tombstones are not subject to suffix replacement. +func ReplaceSuffix( + sstableBytes []byte, opts ReaderOptions, placeholderSuffix, replacementSuffix []byte, +) error { + if len(placeholderSuffix) != len(replacementSuffix) { + return errors.New("pebble: placeholder and replacement suffixes must be equal length") + } + + // Read the sstable to find the location of the properties block and + // validate the existing SuffixReplacement property. We pass the + // AllowUnreplacedSuffix reader option to indicate that this reading + // of a pre-replacement sstable is intentional. + offset, length, checksumType, err := func() (uint64, uint64, ChecksumType, error) { + r, err := NewReader(vfs.NewMemFile(sstableBytes), opts, AllowUnreplacedSuffix) + if err != nil { + return 0, 0, ChecksumTypeNone, err + } + defer r.Close() + + v := r.Properties.SuffixReplacement + switch { + case len(v) == 0: + return 0, 0, ChecksumTypeNone, errors.New("pebble: sstable does not support suffix replacement") + case len(v) != len(placeholderSuffix)+len(replacementSuffix) || !bytes.HasPrefix(v, placeholderSuffix): + return 0, 0, ChecksumTypeNone, errors.New("pebble: sstable created with a different placeholder suffix") + default: + return r.propertiesBH.Offset, r.propertiesBH.Length, r.checksumType, nil + } + }() + if err != nil { + return err + } + + // Iterate through the properties block, looking for the suffix + // replacement property. Find the offset of the suffix replacement + // property's value within the block, so that the replacement + // portion of the value may be overwritten with the provided + // replacement suffix. + propsBlock := sstableBytes[offset : offset+length] + i, err := newRawBlockIter(bytes.Compare, propsBlock) + if err != nil { + return err + } + var found bool + var valueOffset uint32 + for valid := i.First(); valid; valid = i.Next() { + if bytes.Equal(i.Key().UserKey, []byte(propSuffixReplacementName)) { + found = true + var valueLength uint32 + valueOffset, valueLength = i.valueLocation() + if int(valueLength) != 2*len(replacementSuffix) { + // Already validated the property value length above. A + // mismatch here indicates something very wrong. + panic("unreachable") + } + break + } + } + if !found { + // We already read the SuffixReplacement property through the + // reader up above, so we should always find the property. + panic("unreachable") + } + + // Replace the second half of the property's value to be the + // given replacement suffix. For example, if configuring a suffix + // replacement from `aaa` to `bbb`, the existing property value will + // be `aaaaaa`. Overwrite the second half so that the resulting + // property value is `aaabbb`. + copy(propsBlock[int(valueOffset)+len(placeholderSuffix):], replacementSuffix) + + // Update checksum after block to reflect the mutated values. + blockType := sstableBytes[offset+length : offset+length+1] + checksum, err := (&Writer{}).checksumBlock(propsBlock, blockType, checksumType) + if err != nil { + return err + } + binary.LittleEndian.PutUint32(sstableBytes[offset+length+1:offset+length+5], checksum) + return nil +} + func init() { private.SSTableWriterDisableKeyOrderChecks = func(i interface{}) { w := i.(*Writer) diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 5837eb11e1..2f7a8d3c22 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -8,9 +8,11 @@ import ( "bytes" "encoding/binary" "fmt" + "io/ioutil" "testing" "github.com/cockroachdb/pebble/bloom" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/datadriven" "github.com/cockroachdb/pebble/vfs" @@ -104,6 +106,160 @@ func TestWriter(t *testing.T) { }) } +func spaceSplit(a []byte) int { + v := bytes.IndexByte(a, ' ') + if v < 0 { + return len(a) + } + return v +} + +func TestReplaceSuffix(t *testing.T) { + mem := vfs.NewMem() + + // Suffix replacement requires a Split function. Add one that splits a key + // at the first space. + comparer := *base.DefaultComparer + comparer.Split = spaceSplit + + // Write a new file containing a few keys, including a few with the + // ` world` suffix. + f, err := mem.Create("prereplacement.sst") + require.NoError(t, err) + w := NewWriter(f, WriterOptions{ + Comparer: &comparer, + SuffixPlaceholder: []byte(` world`), + }) + inputKeys := []string{ + `bonjour world`, + `foo`, + `hello world`, + `hi world`, + `world`, + `worms`, + } + for _, k := range inputKeys { + require.NoError(t, w.Add(base.MakeInternalKey([]byte(k), 0, InternalKeyKindSet), nil)) + } + require.NoError(t, w.Close()) + + // The SSTable's SuffixReplacement property should be + // ` world world`, indicating that the suffix placeholder is + // ` world` and that the replacement value is not yet known. + m, err := w.Metadata() + require.NoError(t, err) + require.Equal(t, []byte(` world world`), m.Properties.SuffixReplacement) + + // Trying to read the sstable without a replacement value in the sstable + // property should error. This behavior helps avoid accidental reading of + // unfinished sstables. + readOpts := ReaderOptions{Comparer: &comparer} + f, err = mem.Open("prereplacement.sst") + require.NoError(t, err) + _, err = NewReader(f, readOpts) + require.EqualError(t, err, `pebble: unreplaced suffix`) + + // Perform the replacement. + f, err = mem.Open("prereplacement.sst") + require.NoError(t, err) + sstableBytes, err := ioutil.ReadAll(f) + require.NoError(t, err) + require.NoError(t, f.Close()) + require.NoError(t, ReplaceSuffix(sstableBytes, readOpts, []byte(` world`), []byte(` monde`))) + + // Read the file post-replacement. All the ` world` suffixes should be + // replaced with ` monde`. + f = vfs.NewMemFile(sstableBytes) + r, err := NewReader(f, readOpts) + require.NoError(t, err) + defer r.Close() + iter, err := r.NewIter(nil, nil) + require.NoError(t, err) + defer iter.Close() + var got []string + for k, _ := iter.First(); k != nil; k, _ = iter.Next() { + got = append(got, string(k.UserKey)) + } + want := []string{`bonjour monde`, `foo`, `hello monde`, `hi monde`, `world`, `worms`} + require.Equal(t, want, got) +} + +func TestReplaceSuffixErrors(t *testing.T) { + mem := vfs.NewMem() + writeFile := func(wo WriterOptions, keys ...string) ([]byte, error) { + filename := fmt.Sprintf("%s.sst", t.Name()) + f, err := mem.Create(filename) + require.NoError(t, err) + w := NewWriter(f, wo) + for _, k := range keys { + err = w.Add(base.MakeInternalKey([]byte(k), 0, InternalKeyKindSet), nil) + if err != nil { + return nil, err + } + } + if err = w.Close(); err != nil { + return nil, err + } + f, err = mem.Open(filename) + require.NoError(t, err) + defer f.Close() + b, err := ioutil.ReadAll(f) + require.NoError(t, err) + return b, err + } + + comparer := *base.DefaultComparer + comparer.Split = spaceSplit + + t.Run("need split function", func(t *testing.T) { + _, err := writeFile(WriterOptions{ + Comparer: base.DefaultComparer, + SuffixPlaceholder: []byte{0xde, 0xad, 0xbe, 0xef}, + }, "hello world") + require.EqualError(t, err, `pebble: SuffixPlaceholder requires a comparer Split function`) + }) + t.Run("all suffixes must match", func(t *testing.T) { + _, err := writeFile(WriterOptions{ + Comparer: &comparer, + SuffixPlaceholder: []byte(" world"), + }, "hello world", "hi everyone") + require.EqualError(t, err, `pebble: all suffixed keys must contain the suffix placeholder`) + }) + t.Run("replacement and placeholder must be equal lengths", func(t *testing.T) { + prereplacementFile, err := writeFile(WriterOptions{ + Comparer: &comparer, + SuffixPlaceholder: []byte(" world"), + }, "hello world") + require.NoError(t, err) + err = ReplaceSuffix(prereplacementFile, ReaderOptions{Comparer: &comparer}, + []byte(" world"), + []byte(" universe")) + require.EqualError(t, err, `pebble: placeholder and replacement suffixes must be equal length`) + }) + t.Run("sstable must contain suffix replacement property", func(t *testing.T) { + prereplacementFile, err := writeFile(WriterOptions{ + Comparer: &comparer, + SuffixPlaceholder: nil /* no suffix replacement */, + }, "hello world") + require.NoError(t, err) + err = ReplaceSuffix(prereplacementFile, ReaderOptions{Comparer: &comparer}, + []byte(" world"), + []byte(" globe")) + require.EqualError(t, err, `pebble: sstable does not support suffix replacement`) + }) + t.Run("ReplaceSuffix placeholder must match property's placeholder", func(t *testing.T) { + prereplacementFile, err := writeFile(WriterOptions{ + Comparer: &comparer, + SuffixPlaceholder: []byte(" world"), + }, "hello world") + require.NoError(t, err) + err = ReplaceSuffix(prereplacementFile, ReaderOptions{Comparer: &comparer}, + []byte(" globe"), + []byte(" monde")) + require.EqualError(t, err, `pebble: sstable created with a different placeholder suffix`) + }) +} + func TestWriterClearCache(t *testing.T) { // Verify that Writer clears the cache of blocks that it writes. mem := vfs.NewMem() diff --git a/testdata/event_listener b/testdata/event_listener index 412ba1a19c..a1971f012b 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -179,7 +179,7 @@ compact 1 2.3 K 0 B (size == estimated-debt, in = zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.4 K 5.9% (score == hit-rate) - tcache 1 616 B 0.0% (score == hit-rate) + tcache 1 640 B 0.0% (score == hit-rate) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/ingest b/testdata/ingest index 2d822112d0..05a768ed83 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -48,7 +48,7 @@ compact 0 0 B 0 B (size == estimated-debt, in = zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.5 K 46.7% (score == hit-rate) - tcache 1 616 B 50.0% (score == hit-rate) + tcache 1 640 B 50.0% (score == hit-rate) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/metrics b/testdata/metrics index 46db796e95..b003deb9fd 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -34,7 +34,7 @@ compact 0 0 B 0 B (size == estimated-debt, in = zmemtbl 1 256 K ztbl 0 0 B bcache 4 698 B 0.0% (score == hit-rate) - tcache 1 616 B 0.0% (score == hit-rate) + tcache 1 640 B 0.0% (score == hit-rate) titers 1 filter - - 0.0% (score == utility) @@ -81,7 +81,7 @@ compact 1 0 B 0 B (size == estimated-debt, in = zmemtbl 2 512 K ztbl 2 1.5 K bcache 8 1.4 K 33.3% (score == hit-rate) - tcache 2 1.2 K 50.0% (score == hit-rate) + tcache 2 1.3 K 50.0% (score == hit-rate) titers 2 filter - - 0.0% (score == utility) @@ -113,7 +113,7 @@ compact 1 0 B 0 B (size == estimated-debt, in = zmemtbl 1 256 K ztbl 2 1.5 K bcache 8 1.4 K 33.3% (score == hit-rate) - tcache 2 1.2 K 50.0% (score == hit-rate) + tcache 2 1.3 K 50.0% (score == hit-rate) titers 2 filter - - 0.0% (score == utility) @@ -142,7 +142,7 @@ compact 1 0 B 0 B (size == estimated-debt, in = zmemtbl 1 256 K ztbl 1 771 B bcache 4 698 B 33.3% (score == hit-rate) - tcache 1 616 B 50.0% (score == hit-rate) + tcache 1 640 B 50.0% (score == hit-rate) titers 1 filter - - 0.0% (score == utility)