From 77cf843ed8ff8450fa6e1761a0d37a9b3826f7ee Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Wed, 29 Sep 2021 13:11:38 -0400 Subject: [PATCH] sstable: add suffix replacement block transform In CockroachDB, there exist processes that build and ingest sstables. These sstables have timestamp-suffixed MVCC keys. Today, these keys' timestamps are dated in the past and rewrite history. This rewriting violates invariants in parts of the system. We would like to support ingesting these sstables with recent, invariant-maintaing MVCC timestamps. However, ingestion is used during bulk operations, and rewriting large sstables' keys with a recent MVCC timestamp is infeasibly expensive. This change introduces a facility for constructing an sstable with a placeholder suffix. When using this facility, a caller specifies a SuffixPlaceholder write option. The caller is also required to configure a Comparer that contains a non-nil Split function. When configured with a suffix placeholder, the sstable writer requires that all keys' suffixes (as determined by Split) exactly match the provided SuffixPlaceholder. An sstable constructed in this fashion is still incomplete and unable to be read unless explicitly permitted through the AllowUnreplacedSuffix option. When a caller would like to complete an sstable constructed with a suffix placeholder, they may call ReplaceSuffix providing the original placeholder value and the replacement value. The placeholder and replacement values are required to be equal lengths. ReplaceSuffix performs an O(1) write to record the replacement value. After a suffix replacement the resulting sstable is complete, and sstable readers may read the sstable. Readers detect the sstable property and apply a block transform to replace suffix placeholders with the replacement value on the fly as blocks are loaded. Informs cockroachdb/cockroach#70422. --- internal/base/internal.go | 12 ++- sstable/block.go | 31 ++++--- sstable/block_test.go | 60 +++++++++---- sstable/options.go | 20 +++++ sstable/properties.go | 46 +++++++++- sstable/raw_block.go | 8 ++ sstable/reader.go | 145 ++++++++++++++++++++++++-------- sstable/writer.go | 172 ++++++++++++++++++++++++++++++++++---- sstable/writer_test.go | 78 +++++++++++++++++ testdata/event_listener | 2 +- testdata/ingest | 2 +- testdata/metrics | 8 +- 12 files changed, 495 insertions(+), 89 deletions(-) diff --git a/internal/base/internal.go b/internal/base/internal.go index c3f90caa4c..5b48cb3757 100644 --- a/internal/base/internal.go +++ b/internal/base/internal.go @@ -78,6 +78,10 @@ const ( // necessary because sstable boundaries are inclusive, while the end key of a // range deletion tombstone is exclusive. InternalKeyRangeDeleteSentinel = (InternalKeySeqNumMax << 8) | uint64(InternalKeyKindRangeDelete) + + // InternalKeyTrailerLength is the length in bytes of the internal key + // trailer encoding the key kind and sequence number. + InternalKeyTrailerLength = 8 ) var internalKeyKindNames = []string{ @@ -179,7 +183,7 @@ func ParseInternalKey(s string) InternalKey { // DecodeInternalKey decodes an encoded internal key. See InternalKey.Encode(). func DecodeInternalKey(encodedKey []byte) InternalKey { - n := len(encodedKey) - 8 + n := len(encodedKey) - InternalKeyTrailerLength var trailer uint64 if n >= 0 { trailer = binary.LittleEndian.Uint64(encodedKey[n:]) @@ -220,8 +224,8 @@ func (k InternalKey) Encode(buf []byte) { } // EncodeTrailer returns the trailer encoded to an 8-byte array. -func (k InternalKey) EncodeTrailer() [8]byte { - var buf [8]byte +func (k InternalKey) EncodeTrailer() [InternalKeyTrailerLength]byte { + var buf [InternalKeyTrailerLength]byte binary.LittleEndian.PutUint64(buf[:], k.Trailer) return buf } @@ -265,7 +269,7 @@ func (k InternalKey) Successor(cmp Compare, succ Successor, buf []byte) Internal // Size returns the encoded size of the key. func (k InternalKey) Size() int { - return len(k.UserKey) + 8 + return len(k.UserKey) + InternalKeyTrailerLength } // SetSeqNum sets the sequence number component of the key. diff --git a/sstable/block.go b/sstable/block.go index 8ef57da2d3..9319c22842 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -22,6 +22,9 @@ func uvarintLen(v uint32) int { } type blockWriter struct { + // split, if set, will prevent sharing any portion of the key + // after the split point. + split Split restartInterval int nEntries int nextRestart int @@ -39,13 +42,19 @@ func (w *blockWriter) store(keySize int, value []byte) { w.nextRestart = w.nEntries + w.restartInterval w.restarts = append(w.restarts, uint32(len(w.buf))) } else { - // TODO(peter): Manually inlined version of base.SharedPrefixLen(). This - // is 3% faster on BenchmarkWriter on go1.16. Remove if future versions - // show this to not be a performance win. For now, functions that use of - // unsafe cannot be inlined. - n := len(w.curKey) - if n > len(w.prevKey) { - n = len(w.prevKey) + // This is adapted from base.SharedPrefixLen. Unlike SharedPrefixLen, + // this version prevents the sharing of any suffix indicated by Split. + var n int + if w.split != nil { + n = w.split(w.curKey) + if p := w.split(w.prevKey); n > p { + n = p + } + } else { + n = len(w.curKey) + if n > len(w.prevKey) { + n = len(w.prevKey) + } } asUint64 := func(b []byte, i int) uint64 { return binary.LittleEndian.Uint64(b[i:]) @@ -225,6 +234,8 @@ type blockIter struct { key []byte // fullKey is a buffer used for key prefix decompression. fullKey []byte + // unsharedKey is the unshared suffix of key. + unsharedKey []byte // val contains the value the iterator is currently pointed at. If non-nil, // this points to a slice of the block data. val []byte @@ -392,14 +403,14 @@ func (i *blockIter) readEntry() { ptr = unsafe.Pointer(uintptr(ptr) + 5) } - unsharedKey := getBytes(ptr, int(unshared)) - i.fullKey = append(i.fullKey[:shared], unsharedKey...) + i.unsharedKey = getBytes(ptr, int(unshared)) + i.fullKey = append(i.fullKey[:shared], i.unsharedKey...) if shared == 0 { // Provide stability for the key across positioning calls if the key // doesn't share a prefix with the previous key. This removes requiring the // key to be copied if the caller knows the block has a restart interval of // 1. An important example of this is range-del blocks. - i.key = unsharedKey + i.key = i.unsharedKey } else { i.key = i.fullKey } diff --git a/sstable/block_test.go b/sstable/block_test.go index 4fd18e6bd0..64c0c74c87 100644 --- a/sstable/block_test.go +++ b/sstable/block_test.go @@ -24,22 +24,50 @@ func TestBlockWriter(t *testing.T) { return InternalKey{UserKey: []byte(s)} } - w := &rawBlockWriter{ - blockWriter: blockWriter{restartInterval: 16}, - } - w.add(ikey("apple"), nil) - w.add(ikey("apricot"), nil) - w.add(ikey("banana"), nil) - block := w.finish() - - expected := []byte( - "\x00\x05\x00apple" + - "\x02\x05\x00ricot" + - "\x00\x06\x00banana" + - "\x00\x00\x00\x00\x01\x00\x00\x00") - if !bytes.Equal(expected, block) { - t.Fatalf("expected\n%q\nfound\n%q", expected, block) - } + t.Run("simple", func(t *testing.T) { + w := &rawBlockWriter{ + blockWriter: blockWriter{restartInterval: 16}, + } + w.add(ikey("apple"), nil) + w.add(ikey("apricot"), nil) + w.add(ikey("banana"), nil) + block := w.finish() + + expected := []byte( + "\x00\x05\x00apple" + + "\x02\x05\x00ricot" + + "\x00\x06\x00banana" + + "\x00\x00\x00\x00\x01\x00\x00\x00") + if !bytes.Equal(expected, block) { + t.Fatalf("expected\n%q\nfound\n%q", expected, block) + } + }) + t.Run("split-prevents-key-sharing", func(t *testing.T) { + w := &rawBlockWriter{ + blockWriter: blockWriter{ + restartInterval: 16, + split: func(k []byte) int { + i := bytes.IndexByte(k, ' ') + if i < 0 { + return len(k) + } + return i + }, + }, + } + w.add(ikey("hello world"), nil) + w.add(ikey("hello worms"), nil) + w.add(ikey("hello worts"), nil) + block := w.finish() + expected := []byte( + "\x00\x0b\x00hello world" + + "\x05\x06\x00 worms" + + "\x05\x06\x00 worts" + + "\x00\x00\x00\x00\x01\x00\x00\x00") + if !bytes.Equal(expected, block) { + t.Fatalf("expected\n%q\nfound\n%q", expected, block) + } + }) } func TestInvalidInternalKeyDecoding(t *testing.T) { diff --git a/sstable/options.go b/sstable/options.go index 05805c678f..4c84565104 100644 --- a/sstable/options.go +++ b/sstable/options.go @@ -188,6 +188,26 @@ type WriterOptions struct { // with the value stored in the sstable when it was written. MergerName string + // SuffixPlaceholder enables key suffix replacement. When + // SuffixPlaceholder is set, Comparer.Split must be set as well. + // Suffix replacement only occurs within the suffix after the index + // returned by Split. Additionally, a suffix placeholder prevents + // key sharing after the index returned by Split. + // + // When a SuffixPlaceholder is set, all added keys that contain a + // prefix must have a suffix exactly equal to the SuffixPlaceholder. + // That is, if Split(k) < len(k), then k[Split(k):] must equal + // SuffixPlaceholder. + // + // When configured with a SuffixPlaceholder, Writer includes a + // SuffixReplacement table property indicating the placeholder and the + // intent to replace. A SSTable created with a non-empty SuffixPlaceholder + // cannot be read by Reader until modified by ReplaceSuffix to embed a + // replacement suffix. + // + // Range tombstones are not subject to suffix replacement. + SuffixPlaceholder []byte + // TableFormat specifies the format version for writing sstables. The default // is TableFormatRocksDBv2 which creates RocksDB compatible sstables. Use // TableFormatLevelDB to create LevelDB compatible sstable which can be used diff --git a/sstable/properties.go b/sstable/properties.go index c8ff10be8e..6505d64714 100644 --- a/sstable/properties.go +++ b/sstable/properties.go @@ -18,6 +18,7 @@ import ( const propertiesBlockRestartInterval = math.MaxInt32 const propGlobalSeqnumName = "rocksdb.external_sst_file.global_seqno" +const propSuffixReplacementName = "pebble.suffix_replacement" var propTagMap = make(map[string]reflect.StructField) var propBoolTrue = []byte{'1'} @@ -43,6 +44,12 @@ func init() { case reflect.Uint32: case reflect.Uint64: case reflect.String: + case reflect.Slice: + switch f.Type.Elem().Kind() { + case reflect.Uint8: + default: + panic(fmt.Sprintf("unsupported property field type: %s %s", f.Name, f.Type)) + } default: panic(fmt.Sprintf("unsupported property field type: %s %s", f.Name, f.Type)) } @@ -128,6 +135,18 @@ type Properties struct { RawKeySize uint64 `prop:"rocksdb.raw.key.size"` // Total raw value size. RawValueSize uint64 `prop:"rocksdb.raw.value.size"` + // SuffixReplacement configures sstable readers to apply a block + // transform replacing a configured placeholder key suffix with a + // new suffix of the same length. The property value must be an even + // length, containing first the placeholder suffix followed by the + // replacement suffix. + // + // For example a property value `aaabbb` instructs readers to transform + // blocks, replacing any instances of the key suffix `aaa` with `bbb`. + // + // See WriterOptions.SuffixPlaceholder and ReplaceSuffix for + // additional documentation. + SuffixReplacement []byte `prop:"pebble.suffix_replacement"` // Size of the top-level index if kTwoLevelIndexSearch is used. TopLevelIndexSize uint64 `prop:"rocksdb.top-level.index.size"` // User collected properties. @@ -159,8 +178,7 @@ func (p *Properties) String() string { } f := v.Field(i) - // TODO(peter): Use f.IsZero() when we can rely on go1.13. - if zero := reflect.Zero(f.Type()); zero.Interface() == f.Interface() { + if f.IsZero() { // Skip printing of zero values which were not loaded from disk. if _, ok := p.Loaded[ft.Offset]; !ok { continue @@ -180,6 +198,13 @@ func (p *Properties) String() string { } else { fmt.Fprintf(&buf, "%d\n", f.Uint()) } + case reflect.Slice: + switch ft.Type.Elem().Kind() { + case reflect.Uint8: + fmt.Fprintf(&buf, "%x\n", f.Bytes()) + default: + panic("not reached") + } case reflect.String: fmt.Fprintf(&buf, "%s\n", f.String()) default: @@ -222,6 +247,16 @@ func (p *Properties) load(b block, blockOffset uint64) error { n, _ = binary.Uvarint(i.Value()) } field.SetUint(n) + case reflect.Slice: + switch f.Type.Elem().Kind() { + case reflect.Uint8: + unsafeValue := i.Value() + b := make([]byte, len(unsafeValue)) + copy(b, unsafeValue) + field.SetBytes(b) + default: + panic("not reached") + } case reflect.String: field.SetString(intern.Bytes(i.Value())) default: @@ -268,6 +303,10 @@ func (p *Properties) saveString(m map[string][]byte, offset uintptr, value strin m[propOffsetTagMap[offset]] = []byte(value) } +func (p *Properties) saveBytes(m map[string][]byte, offset uintptr, value []byte) { + m[propOffsetTagMap[offset]] = value +} + func (p *Properties) save(w *rawBlockWriter) { m := make(map[string][]byte) for k, v := range p.UserProperties { @@ -328,6 +367,9 @@ func (p *Properties) save(w *rawBlockWriter) { } p.saveUvarint(m, unsafe.Offsetof(p.RawKeySize), p.RawKeySize) p.saveUvarint(m, unsafe.Offsetof(p.RawValueSize), p.RawValueSize) + if len(p.SuffixReplacement) > 0 { + p.saveBytes(m, unsafe.Offsetof(p.SuffixReplacement), p.SuffixReplacement) + } p.saveBool(m, unsafe.Offsetof(p.WholeKeyFiltering), p.WholeKeyFiltering) keys := make([]string, 0, len(m)) diff --git a/sstable/raw_block.go b/sstable/raw_block.go index f83a5d7bff..8485e79ccb 100644 --- a/sstable/raw_block.go +++ b/sstable/raw_block.go @@ -85,6 +85,14 @@ func (i *rawBlockIter) readEntry() { i.nextOffset = int32(uintptr(ptr)-uintptr(i.ptr)) + int32(value) } +func (i *rawBlockIter) valueLocation() (offset, length uint32) { + ptr := unsafe.Pointer(uintptr(i.ptr) + uintptr(i.offset)) + _ /*shared*/, ptr = decodeVarint(ptr) + unshared, ptr := decodeVarint(ptr) + value, ptr := decodeVarint(ptr) + return uint32(uintptr(ptr) + uintptr(unshared) - uintptr(i.ptr) - uintptr(i.offset)), value +} + func (i *rawBlockIter) loadEntry() { i.readEntry() i.ikey.UserKey = i.key diff --git a/sstable/reader.go b/sstable/reader.go index fc060ea6f3..ae75b0c39c 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -289,7 +289,7 @@ func (i *singleLevelIterator) loadBlock() bool { i.err = errCorruptIndexEntry return false } - block, err := i.reader.readBlock(i.dataBH, nil /* transform */, &i.dataRS) + block, err := i.reader.readBlock(i.dataBH, i.reader.transformMaybeSwapSuffix, &i.dataRS) if err != nil { i.err = err return false @@ -959,7 +959,7 @@ func (i *twoLevelIterator) loadIndex() bool { i.err = base.CorruptionErrorf("pebble/table: corrupt top level index entry") return false } - indexBlock, err := i.reader.readBlock(h, nil /* transform */, nil /* readaheadState */) + indexBlock, err := i.reader.readBlock(h, i.reader.transformMaybeSwapSuffix, nil /* readaheadState */) if err != nil { i.err = err return false @@ -1681,28 +1681,29 @@ func init() { // Reader is a table reader. type Reader struct { - file ReadableFile - fs vfs.FS - filename string - cacheID uint64 - fileNum base.FileNum - rawTombstones bool - err error - indexBH BlockHandle - filterBH BlockHandle - rangeDelBH BlockHandle - rangeDelTransform blockTransform - propertiesBH BlockHandle - metaIndexBH BlockHandle - footerBH BlockHandle - opts ReaderOptions - Compare Compare - FormatKey base.FormatKey - Split Split - mergerOK bool - checksumType ChecksumType - tableFilter *tableFilterReader - Properties Properties + file ReadableFile + fs vfs.FS + filename string + cacheID uint64 + fileNum base.FileNum + rawTombstones bool + err error + indexBH BlockHandle + filterBH BlockHandle + rangeDelBH BlockHandle + rangeDelTransform blockTransform + propertiesBH BlockHandle + metaIndexBH BlockHandle + footerBH BlockHandle + opts ReaderOptions + Compare Compare + FormatKey base.FormatKey + Split Split + mergerOK bool + unreplacedSuffixOK bool + checksumType ChecksumType + tableFilter *tableFilterReader + Properties Properties } // Close implements DB.Close, as documented in the pebble package. @@ -1847,7 +1848,7 @@ func (r *Reader) NewRawRangeDelIter() (base.InternalIterator, error) { } func (r *Reader) readIndex() (cache.Handle, error) { - return r.readBlock(r.indexBH, nil /* transform */, nil /* readaheadState */) + return r.readBlock(r.indexBH, r.transformMaybeSwapSuffix, nil /* readaheadState */) } func (r *Reader) readFilter() (cache.Handle, error) { @@ -1950,22 +1951,73 @@ func (r *Reader) readBlock( if transform != nil { // Transforming blocks is rare, so the extra copy of the transformed data // is not problematic. - var err error - b, err = transform(b) + transformedBlock, err := transform(b) if err != nil { r.opts.Cache.Free(v) return cache.Handle{}, err } - newV := r.opts.Cache.Alloc(len(b)) - copy(newV.Buf(), b) - r.opts.Cache.Free(v) - v = newV + // If transform returned a new slice, copy it into cache and + // free the old one. + if len(b) == 0 || len(transformedBlock) == 0 || &b[0] != &transformedBlock[0] { + newV := r.opts.Cache.Alloc(len(transformedBlock)) + copy(newV.Buf(), transformedBlock) + r.opts.Cache.Free(v) + v = newV + } + b = transformedBlock } h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, v) return h, nil } +// transformMaybeSwapSuffix is a blockTransform that performs a +// replacement of a key suffix. If the sstable's user properties +// contains the suffix replacement property, transformMaybeSwapSuffix +// adjusts all keys with the specified placeholder suffix with the +// indicated replacement value. +func (r *Reader) transformMaybeSwapSuffix(b []byte) ([]byte, error) { + if len(r.Properties.SuffixReplacement) == 0 { + return b, nil + } + + // The suffix replacement property holds the placeholder value + // concatenated with the replacement value. Values are required to + // be equal length. The SuffixReplacement property was already + // validated in when the reader was constructed. + split := len(r.Properties.SuffixReplacement) >> 1 + from, to := r.Properties.SuffixReplacement[:split], r.Properties.SuffixReplacement[split:] + + iter := &blockIter{} + if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum); err != nil { + return nil, err + } + for key, _ := iter.First(); key != nil; key, _ = iter.Next() { + split := r.Split(key.UserKey) + switch { + case split == len(key.UserKey): + // This key has no suffix. Skip it. + continue + case !bytes.Equal(key.UserKey[split:], from): + // This key has a suffix, but it doesn't match the placeholder + // value. The sstable Writer should not have allowed such an + // sstable to be produced, because we cannot know whether or not + // rewriting another key with the same prefix but the placeholder + // suffix might violate the key ordering. + panic(errors.New("pebble: suffix replacement sstable contains non-placeholder suffix")) + default: + // The suffix matches the expected placeholder. Overwrite it with + // the replacement suffix. + dest := len(iter.unsharedKey) - len(from) - base.InternalKeyTrailerLength + if dest < 0 { + panic(errors.New("pebble: suffix placeholder unexpectedly shared between keys")) + } + copy(iter.unsharedKey[dest:], to) + } + } + return b, nil +} + func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { // Convert v1 (RocksDB format) range-del blocks to v2 blocks on the fly. The // v1 format range-del blocks have unfragmented and unsorted range @@ -2184,7 +2236,7 @@ func (r *Reader) ValidateBlockChecksums() error { } // Read the block, which validates the checksum. - h, err := r.readBlock(bh, nil /* transform */, blockRS) + h, err := r.readBlock(bh, r.transformMaybeSwapSuffix, blockRS) if err != nil { return err } @@ -2241,7 +2293,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if n == 0 || n != len(val) { return 0, errCorruptIndexEntry } - startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */, nil /* readaheadState */) + startIdxBlock, err := r.readBlock(startIdxBH, r.transformMaybeSwapSuffix, nil /* readaheadState */) if err != nil { return 0, err } @@ -2261,7 +2313,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if n == 0 || n != len(val) { return 0, errCorruptIndexEntry } - endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */, nil /* readaheadState */) + endIdxBlock, err := r.readBlock(endIdxBH, r.transformMaybeSwapSuffix, nil /* readaheadState */) if err != nil { return 0, err } @@ -2386,6 +2438,29 @@ func NewReader(f ReadableFile, o ReaderOptions, extraOpts ...ReaderOption) (*Rea errors.Safe(r.fileNum), errors.Safe(r.Properties.MergerName)) } } + + if len(r.Properties.SuffixReplacement) > 0 { + // The suffix replacement property holds the placeholder value + // concatenated with the replacement value. Values are required to + // be equal length. + if len(r.Properties.SuffixReplacement)%2 != 0 { + r.err = errors.New("pebble: invalid suffix replacement property: odd length") + return nil, r.Close() + } + split := len(r.Properties.SuffixReplacement) >> 1 + if !r.unreplacedSuffixOK && bytes.Equal(r.Properties.SuffixReplacement[:split], r.Properties.SuffixReplacement[split:]) { + r.err = errors.Newf("pebble: unreplaced suffix") + return nil, r.Close() + } + // Require that r.Split is set. Suffix replacement only occurs after a + // key's Split prefix. + if r.Split == nil { + r.err = errors.Errorf("pebble/table: %d: suffix replacement requires Split, comparer %s omits", + errors.Safe(r.fileNum), errors.Safe(r.Properties.ComparerName)) + return nil, r.Close() + } + } + if r.err != nil { return nil, r.Close() } @@ -2505,7 +2580,7 @@ func (l *Layout) Describe( continue } - h, err := r.readBlock(b.BlockHandle, nil /* transform */, nil /* readaheadState */) + h, err := r.readBlock(b.BlockHandle, r.transformMaybeSwapSuffix, nil /* readaheadState */) if err != nil { fmt.Fprintf(w, " [err: %s]\n", err) continue diff --git a/sstable/writer.go b/sstable/writer.go index 8deb34e4ab..c9cca1ebef 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/pebble/internal/crc" "github.com/cockroachdb/pebble/internal/private" "github.com/cockroachdb/pebble/internal/rangedel" + "github.com/cockroachdb/pebble/vfs" ) // WriterMetadata holds info about a finished sstable. @@ -105,6 +106,7 @@ type Writer struct { tableFormat TableFormat checksumType ChecksumType cache *cache.Cache + suffixPlaceholder []byte // disableKeyOrderChecks disables the checks that keys are added to an // sstable in order. It is intended for internal use only in the construction // of invalid sstables for testing. See tool/make_test_sstables.go. @@ -232,6 +234,14 @@ func (w *Writer) addPoint(key InternalKey, value []byte) error { } } + if len(w.suffixPlaceholder) > 0 { + i := w.split(key.UserKey) + if i < len(key.UserKey) && !bytes.Equal(w.suffixPlaceholder, key.UserKey[i:]) { + w.err = errors.New("pebble: all suffixed keys must contain the suffix placeholder") + return w.err + } + } + if err := w.maybeFlush(key, value); err != nil { return err } @@ -473,6 +483,26 @@ func (w *Writer) writeTwoLevelIndex() (BlockHandle, error) { return w.writeBlock(w.topLevelIndexBlock.finish(), w.compression) } +func (w *Writer) checksumBlock( + b []byte, blockType []byte, checksumType ChecksumType, +) (uint32, error) { + switch checksumType { + case ChecksumTypeCRC32c: + return crc.New(b).Update(blockType).Value(), nil + case ChecksumTypeXXHash64: + if w.xxHasher == nil { + w.xxHasher = xxhash.New() + } else { + w.xxHasher.Reset() + } + w.xxHasher.Write(b) + w.xxHasher.Write(blockType) + return uint32(w.xxHasher.Sum64()), nil + default: + return 0, errors.Newf("unsupported checksum type: %d", checksumType) + } +} + func (w *Writer) writeBlock(b []byte, compression Compression) (BlockHandle, error) { // Compress the buffer, discarding the result if the improvement isn't at // least 12.5%. @@ -489,21 +519,9 @@ func (w *Writer) writeBlock(b []byte, compression Compression) (BlockHandle, err w.tmp[0] = byte(blockType) // Calculate the checksum. - var checksum uint32 - switch w.checksumType { - case ChecksumTypeCRC32c: - checksum = crc.New(b).Update(w.tmp[:1]).Value() - case ChecksumTypeXXHash64: - if w.xxHasher == nil { - w.xxHasher = xxhash.New() - } else { - w.xxHasher.Reset() - } - w.xxHasher.Write(b) - w.xxHasher.Write(w.tmp[:1]) - checksum = uint32(w.xxHasher.Sum64()) - default: - return BlockHandle{}, errors.Newf("unsupported checksum type: %d", w.checksumType) + checksum, err := w.checksumBlock(b, w.tmp[:1], w.checksumType) + if err != nil { + return BlockHandle{}, err } binary.LittleEndian.PutUint32(w.tmp[1:5], checksum) bh := BlockHandle{w.meta.Size, uint64(len(b))} @@ -734,7 +752,7 @@ func (w *Writer) Metadata() (*WriterMetadata, error) { // WriterOption provide an interface to do work on Writer while it is being // opened. type WriterOption interface { - // writerAPply is called on the writer during opening in order to set + // writerApply is called on the writer during opening in order to set // internal parameters. writerApply(*Writer) } @@ -772,6 +790,7 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * tableFormat: o.TableFormat, checksumType: o.Checksum, cache: o.Cache, + suffixPlaceholder: o.SuffixPlaceholder, block: blockWriter{ restartInterval: o.BlockRestartInterval, }, @@ -823,6 +842,23 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * w.props.PropertyCollectorNames = "[]" w.props.ExternalFormatVersion = rocksDBExternalFormatVersion + if len(o.SuffixPlaceholder) > 0 { + w.props.SuffixReplacement = make([]byte, 2*len(o.SuffixPlaceholder)) + copy(w.props.SuffixReplacement, o.SuffixPlaceholder) + copy(w.props.SuffixReplacement[len(o.SuffixPlaceholder):], o.SuffixPlaceholder) + + // Configure block writers to prevent sharing of suffixes after + // a key's Split point. If SuffixPlaceholder is set, the caller + // must use a Comparer that provides a Split function. + if w.split == nil { + w.err = errors.New("pebble: SuffixPlaceholder requires a comparer Split function") + return w + } + w.block.split = w.split + w.indexBlock.split = w.split + w.topLevelIndexBlock.split = w.split + } + if len(o.TablePropertyCollectors) > 0 { w.propCollectors = make([]TablePropertyCollector, len(o.TablePropertyCollectors)) var buf bytes.Buffer @@ -855,6 +891,110 @@ func NewWriter(f writeCloseSyncer, o WriterOptions, extraOpts ...WriterOption) * return w } +// AllowUnreplacedSuffix is a ReaderOption that may be passed to +// NewReader to allow reading from an sstable intended for +// suffix-replacement but which does not yet have its suffix written. +// Ordinarily, NewReader will error if there's an attempt to read an +// sstable that supports suffix replacement but does not yet specify a +// replacement suffix. +var AllowUnreplacedSuffix ReaderOption = unreplacedSuffixOK{} + +type unreplacedSuffixOK struct{} + +func (unreplacedSuffixOK) readerApply(r *Reader) { + r.unreplacedSuffixOK = true +} + +// ReplaceSuffix mutates the passed byte slice containing a serialized +// sstable, setting the replacement suffix to the provided value. The +// passed sstable must have been created with a SuffixPlaceholder set on +// the WriteOptions. +// +// ReplaceSuffix errors if the sstable was not created with a +// SuffixPlaceholder, or the existing placeholder does not match +// placeholderSuffix. +// +// Range tombstones are not subject to suffix replacement. +func ReplaceSuffix( + sstableBytes []byte, opts ReaderOptions, placeholderSuffix, replacementSuffix []byte, +) error { + if len(placeholderSuffix) != len(replacementSuffix) { + return errors.New("pebble: placeholder and replacement suffixes must be equal length") + } + + // Read the sstable to find the location of the properties block and + // validate the existing SuffixReplacement property. We pass the + // AllowUnreplacedSuffix reader option to indicate that this reading + // of a pre-replacement sstable is intentional. + offset, length, checksumType, err := func() (uint64, uint64, ChecksumType, error) { + r, err := NewReader(vfs.NewMemFile(sstableBytes), opts, AllowUnreplacedSuffix) + if err != nil { + return 0, 0, ChecksumTypeNone, err + } + defer r.Close() + + v := r.Properties.SuffixReplacement + switch { + case len(v) == 0: + return 0, 0, ChecksumTypeNone, errors.New("pebble: sstable does not support suffix replacement") + case len(v) != len(placeholderSuffix)+len(replacementSuffix) || !bytes.HasPrefix(v, placeholderSuffix): + return 0, 0, ChecksumTypeNone, errors.New("pebble: sstable created with a different placeholder suffix") + default: + return r.propertiesBH.Offset, r.propertiesBH.Length, r.checksumType, nil + } + }() + if err != nil { + return err + } + + // Iterate through the properties block, looking for the suffix + // replacement property. Find the offset of the suffix replacement + // property's value within the block, so that the replacement + // portion of the value may be overwritten with the provided + // replacement suffix. + propsBlock := sstableBytes[offset : offset+length] + i, err := newRawBlockIter(bytes.Compare, propsBlock) + if err != nil { + return err + } + var found bool + var valueOffset uint32 + for valid := i.First(); valid; valid = i.Next() { + if bytes.Equal(i.Key().UserKey, []byte(propSuffixReplacementName)) { + found = true + var valueLength uint32 + valueOffset, valueLength = i.valueLocation() + if int(valueLength) != 2*len(replacementSuffix) { + // Already validated the property value length above. A + // mismatch here indicates something very wrong. + panic("unreachable") + } + break + } + } + if !found { + // We already read the SuffixReplacement property through the + // reader up above, so we should always find the property. + panic("unreachable") + } + + // Replace the second half of the property's value to be the + // given replacement suffix. For example, if configuring a suffix + // replacement from `aaa` to `bbb`, the existing property value will + // be `aaaaaa`. Overwrite the second half so that the resulting + // property value is `aaabbb`. + copy(propsBlock[int(valueOffset)+len(placeholderSuffix):], replacementSuffix) + + // Update checksum after block to reflect the mutated values. + blockType := sstableBytes[offset+length : offset+length+1] + checksum, err := (&Writer{}).checksumBlock(propsBlock, blockType, checksumType) + if err != nil { + return err + } + binary.LittleEndian.PutUint32(sstableBytes[offset+length+1:offset+length+5], checksum) + return nil +} + func init() { private.SSTableWriterDisableKeyOrderChecks = func(i interface{}) { w := i.(*Writer) diff --git a/sstable/writer_test.go b/sstable/writer_test.go index 5837eb11e1..564447e55f 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -8,9 +8,11 @@ import ( "bytes" "encoding/binary" "fmt" + "io/ioutil" "testing" "github.com/cockroachdb/pebble/bloom" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/datadriven" "github.com/cockroachdb/pebble/vfs" @@ -104,6 +106,82 @@ func TestWriter(t *testing.T) { }) } +func TestReplaceSuffix(t *testing.T) { + mem := vfs.NewMem() + + // Suffix replacement requires a Split function. Add one that splits a key + // at the first space. + comparer := *base.DefaultComparer + comparer.Split = func(a []byte) int { + v := bytes.IndexByte(a, ' ') + if v < 0 { + return len(a) + } + return v + } + + // Write a new file containing a few keys, including a few with the + // ` world` suffix. + f, err := mem.Create("prereplacement.sst") + require.NoError(t, err) + w := NewWriter(f, WriterOptions{ + Comparer: &comparer, + SuffixPlaceholder: []byte(` world`), + }) + inputKeys := []string{ + `bonjour world`, + `foo`, + `hello world`, + `hi world`, + `world`, + `worms`, + } + for _, k := range inputKeys { + require.NoError(t, w.Add(base.MakeInternalKey([]byte(k), 0, InternalKeyKindSet), nil)) + } + require.NoError(t, w.Close()) + + // The SSTable's SuffixReplacement property should be + // ` world world`, indicating that the suffix placeholder is + // ` world` and that the replacement value is not yet known. + m, err := w.Metadata() + require.NoError(t, err) + require.Equal(t, []byte(` world world`), m.Properties.SuffixReplacement) + + // Trying to read the sstable without a replacement value in the sstable + // property should error. This behavior helps avoid accidental reading of + // unfinished sstables. + readOpts := ReaderOptions{Comparer: &comparer} + f, err = mem.Open("prereplacement.sst") + require.NoError(t, err) + _, err = NewReader(f, readOpts) + require.Error(t, err, `pebble: unreplaced suffix`) + + // Perform the replacement. + f, err = mem.Open("prereplacement.sst") + require.NoError(t, err) + sstableBytes, err := ioutil.ReadAll(f) + require.NoError(t, err) + require.NoError(t, f.Close()) + require.NoError(t, ReplaceSuffix(sstableBytes, readOpts, []byte(` world`), []byte(` monde`))) + + // Read the file post-replacement. All the ` world` suffixes should be + // replaced with ` monde`. + f = vfs.NewMemFile(sstableBytes) + r, err := NewReader(f, readOpts) + require.NoError(t, err) + defer r.Close() + iter, err := r.NewIter(nil, nil) + require.NoError(t, err) + defer iter.Close() + var got []string + for k, _ := iter.First(); k != nil; k, _ = iter.Next() { + got = append(got, string(k.UserKey)) + } + want := []string{`bonjour monde`, `foo`, `hello monde`, `hi monde`, `world`, `worms`} + require.Equal(t, want, got) +} + func TestWriterClearCache(t *testing.T) { // Verify that Writer clears the cache of blocks that it writes. mem := vfs.NewMem() diff --git a/testdata/event_listener b/testdata/event_listener index 412ba1a19c..a1971f012b 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -179,7 +179,7 @@ compact 1 2.3 K 0 B (size == estimated-debt, in = zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.4 K 5.9% (score == hit-rate) - tcache 1 616 B 0.0% (score == hit-rate) + tcache 1 640 B 0.0% (score == hit-rate) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/ingest b/testdata/ingest index 2d822112d0..05a768ed83 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -48,7 +48,7 @@ compact 0 0 B 0 B (size == estimated-debt, in = zmemtbl 0 0 B ztbl 0 0 B bcache 8 1.5 K 46.7% (score == hit-rate) - tcache 1 616 B 50.0% (score == hit-rate) + tcache 1 640 B 50.0% (score == hit-rate) titers 0 filter - - 0.0% (score == utility) diff --git a/testdata/metrics b/testdata/metrics index 46db796e95..b003deb9fd 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -34,7 +34,7 @@ compact 0 0 B 0 B (size == estimated-debt, in = zmemtbl 1 256 K ztbl 0 0 B bcache 4 698 B 0.0% (score == hit-rate) - tcache 1 616 B 0.0% (score == hit-rate) + tcache 1 640 B 0.0% (score == hit-rate) titers 1 filter - - 0.0% (score == utility) @@ -81,7 +81,7 @@ compact 1 0 B 0 B (size == estimated-debt, in = zmemtbl 2 512 K ztbl 2 1.5 K bcache 8 1.4 K 33.3% (score == hit-rate) - tcache 2 1.2 K 50.0% (score == hit-rate) + tcache 2 1.3 K 50.0% (score == hit-rate) titers 2 filter - - 0.0% (score == utility) @@ -113,7 +113,7 @@ compact 1 0 B 0 B (size == estimated-debt, in = zmemtbl 1 256 K ztbl 2 1.5 K bcache 8 1.4 K 33.3% (score == hit-rate) - tcache 2 1.2 K 50.0% (score == hit-rate) + tcache 2 1.3 K 50.0% (score == hit-rate) titers 2 filter - - 0.0% (score == utility) @@ -142,7 +142,7 @@ compact 1 0 B 0 B (size == estimated-debt, in = zmemtbl 1 256 K ztbl 1 771 B bcache 4 698 B 33.3% (score == hit-rate) - tcache 1 616 B 50.0% (score == hit-rate) + tcache 1 640 B 50.0% (score == hit-rate) titers 1 filter - - 0.0% (score == utility)