From 88601126817aec8493d56964ba1b8f0db151cbae Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 22 Jan 2024 13:32:52 -0600 Subject: [PATCH] sstable: add block level suffix replacement support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch teaches the block iterator to replace the suffix of all surfaced keys with a replacement suffix passed during block initiation. In other words, the iterator will read the block as if all underlying keys actually had the replacement prefix. For correctness, this patch relies on two invariants: for all keys in the block: 1. no two keys in the sst share the same prefix 2. byte.Compare(replacementSuffix,originalSuffix) > 0 During the normal readpath, i.e. for blocks without suffix rewriting, this patch does slow Seeks down: ``` ❯ benchstat master suffix name old time/op new time/op delta BlockIterSeekGE/restart=16-24 345ns ± 0% 367ns ± 0% +6.53% (p=0.002 n=6+6) BlockIterSeekLT/restart=16-24 398ns ± 0% 416ns ± 0% +4.55% (p=0.008 n=5+5) BlockIterNext/restart=16-24 15.4ns ± 0% 15.6ns ± 0% +1.09% (p=0.002 n=6+6) BlockIterPrev/restart=16-24 30.6ns ± 0% 30.6ns ± 0% -0.23% (p=0.006 n=6+6) ``` --- sstable/block.go | 113 ++++++++++++++++++++-- sstable/block_property_test.go | 6 +- sstable/block_test.go | 152 ++++++++++++++++++++++++++++-- sstable/layout.go | 4 +- sstable/reader.go | 27 ++++-- sstable/reader_iter_single_lvl.go | 4 +- sstable/reader_iter_two_lvl.go | 4 +- sstable/reader_test.go | 4 +- sstable/suffix_rewriter.go | 2 +- 9 files changed, 279 insertions(+), 37 deletions(-) diff --git a/sstable/block.go b/sstable/block.go index e8d4bd39bc..3dd12efefb 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -412,14 +412,17 @@ type blockIter struct { } hideObsoletePoints bool SyntheticPrefix + syntheticSuffix *SyntheticSuffix } // blockIter implements the base.InternalIterator interface. var _ base.InternalIterator = (*blockIter)(nil) -func newBlockIter(cmp Compare, block block, syntheticPrefix SyntheticPrefix) (*blockIter, error) { +func newBlockIter( + cmp Compare, block block, syntheticPrefix SyntheticPrefix, syntheticSuffix *SyntheticSuffix, +) (*blockIter, error) { i := &blockIter{} - return i, i.init(cmp, block, 0, false, syntheticPrefix) + return i, i.init(cmp, block, 0, false, syntheticPrefix, syntheticSuffix) } func (i *blockIter) String() string { @@ -427,13 +430,19 @@ func (i *blockIter) String() string { } func (i *blockIter) init( - cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix, + cmp Compare, + block block, + globalSeqNum uint64, + hideObsoletePoints bool, + syntheticPrefix SyntheticPrefix, + syntheticSuffix *SyntheticSuffix, ) error { numRestarts := int32(binary.LittleEndian.Uint32(block[len(block)-4:])) if numRestarts == 0 { return base.CorruptionErrorf("pebble/table: invalid table (block has no restart points)") } i.SyntheticPrefix = syntheticPrefix + i.syntheticSuffix = syntheticSuffix i.cmp = cmp i.restarts = int32(len(block)) - 4*(1+numRestarts) i.numRestarts = numRestarts @@ -464,11 +473,16 @@ func (i *blockIter) init( // ingested. // - Foreign sstable iteration: globalSeqNum is always set. func (i *blockIter) initHandle( - cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix, + cmp Compare, + block bufferHandle, + globalSeqNum uint64, + hideObsoletePoints bool, + syntheticPrefix SyntheticPrefix, + syntheticSuffix *SyntheticSuffix, ) error { i.handle.Release() i.handle = block - return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix) + return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix, syntheticSuffix) } func (i *blockIter) invalidate() { @@ -671,6 +685,21 @@ func (i *blockIter) decodeInternalKey(key []byte) (hiddenPoint bool) { return hiddenPoint } +func (i *blockIter) maybeReplaceSuffix(maybeFromCache bool) { + if i.syntheticSuffix != nil && i.ikey.UserKey != nil { + prefixLen := i.syntheticSuffix.split(i.ikey.UserKey) + if !maybeFromCache { + i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...) + return + } + // If ikey is cached or may get cached, we must de-reference + // UserKey before prefix replacement. + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:prefixLen]...) + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...) + i.ikey.UserKey = i.syntheticSuffix.buf + } +} + func (i *blockIter) clearCache() { i.cached = i.cached[:0] i.cachedBuf = i.cachedBuf[:0] @@ -811,6 +840,9 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba if !i.valid() { return nil, base.LazyValue{} } + // Before conducting any user key comparison, we must replace the suffix. + i.maybeReplaceSuffix(false) + if !hiddenPoint && i.cmp(i.ikey.UserKey, key) >= 0 { // Initialize i.lazyValue if !i.lazyValueHandling.hasValuePrefix || @@ -953,6 +985,16 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba targetOffset = decodeRestart(i.data[i.restarts+4*(index):]) } } else if index == 0 { + if i.syntheticSuffix != nil { + // The binary search was conducted on keys without suffix replacement, + // implying the first key in the block may not be geq the search key. To + // double check, get the first key in the block with suffix replacement + // and compare to the search key. + ikey, lazyVal := i.First() + if i.cmp(ikey.UserKey, key) < 0 { + return ikey, lazyVal + } + } // If index == 0 then all keys in this block are larger than the key // sought. i.offset = -1 @@ -979,6 +1021,7 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba // of hidden keys we will be able to skip whole blocks (using block // property filters) so we don't bother optimizing. hiddenPoint := i.decodeInternalKey(i.key) + i.maybeReplaceSuffix(true) // NB: we don't use the hiddenPoint return value of decodeInternalKey // since we want to stop as soon as we reach a key >= ikey.UserKey, so @@ -1003,9 +1046,41 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba if hiddenPoint { return i.Prev() } + if i.syntheticSuffix != nil { + // The binary search was conducted on keys without suffix replacement, + // implying the returned restart point may not be geq the search key. + // + // For example: consider this block with a replacement ts of 4, and restart interval of 1: + // - pre replacement: a3,b2,c3 + // - post replacement: a4,b4,c4 + // + // Suppose the client calls SeekLT(b3), SeekLT must return b4. + // + // If the client calls SeekLT(b3), the binary search would return b2, + // the lowest key geq to b3, pre-suffix replacement. Then, SeekLT will + // begin forward iteration from a3, the previous restart point, to + // b{suffix}. The iteration stops when it encounters a key geq to the + // search key or if it reaches the upper bound. Without suffix replacement, we can assume + // that the upper bound of this forward iteration, b{suffix}, is greater + // than the search key, as implied by the binary search. + // + // If we naively hold this assumption with suffix replacement, the + // iteration would terminate at the upper bound, b4, call i.Prev, and + // incorrectly return a4. Instead, we must continue forward iteration + // past the upper bound, until we find a key geq the search key. With + // this correction, SeekLT would correctly return b4 in this example. + for i.cmp(i.ikey.UserKey, key) < 0 { + i.Next() + if !i.valid() { + break + } + } + // The current key is greater than or equal to our search key. Back up to + // the previous key which was less than our search key. + return i.Prev() + } break } - i.cacheEntry() } @@ -1040,6 +1115,7 @@ func (i *blockIter) First() (*InternalKey, base.LazyValue) { if hiddenPoint { return i.Next() } + i.maybeReplaceSuffix(false) if !i.lazyValueHandling.hasValuePrefix || base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet { i.lazyValue = base.MakeInPlaceValue(i.val) @@ -1082,6 +1158,7 @@ func (i *blockIter) Last() (*InternalKey, base.LazyValue) { if hiddenPoint { return i.Prev() } + i.maybeReplaceSuffix(true) if !i.lazyValueHandling.hasValuePrefix || base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet { i.lazyValue = base.MakeInPlaceValue(i.val) @@ -1130,6 +1207,10 @@ start: if hiddenPoint { goto start } + if i.syntheticSuffix != nil { + prefixLen := i.syntheticSuffix.split(i.ikey.UserKey) + i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...) + } } else { i.ikey.Trailer = uint64(InternalKeyKindInvalid) i.ikey.UserKey = nil @@ -1400,6 +1481,10 @@ func (i *blockIter) nextPrefixV3(succKey []byte) (*InternalKey, base.LazyValue) if i.globalSeqNum != 0 { i.ikey.SetSeqNum(i.globalSeqNum) } + if i.syntheticSuffix != nil { + prefixLen := i.syntheticSuffix.split(i.ikey.UserKey) + i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...) + } } else { i.ikey.Trailer = uint64(InternalKeyKindInvalid) i.ikey.UserKey = nil @@ -1458,6 +1543,14 @@ start: if i.globalSeqNum != 0 { i.ikey.SetSeqNum(i.globalSeqNum) } + if i.syntheticSuffix != nil { + suffixLen := i.syntheticSuffix.split(i.ikey.UserKey) + // If ikey is cached or may get cached, we must de-reference + // UserKey before prefix replacement. + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:suffixLen]...) + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...) + i.ikey.UserKey = i.syntheticSuffix.buf + } } else { i.ikey.Trailer = uint64(InternalKeyKindInvalid) i.ikey.UserKey = nil @@ -1535,6 +1628,14 @@ start: // Use the cache. goto start } + if i.syntheticSuffix != nil { + suffixLen := i.syntheticSuffix.split(i.ikey.UserKey) + // If ikey is cached or may get cached, we must de-reference + // UserKey before prefix replacement. + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:suffixLen]...) + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...) + i.ikey.UserKey = i.syntheticSuffix.buf + } if !i.lazyValueHandling.hasValuePrefix || base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet { i.lazyValue = base.MakeInPlaceValue(i.val) diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go index eb873f0eca..acea9c318a 100644 --- a/sstable/block_property_test.go +++ b/sstable/block_property_test.go @@ -938,7 +938,7 @@ func TestBlockProperties(t *testing.T) { var blocks []int var i int - iter, _ := newBlockIter(r.Compare, indexH.Get(), nil) + iter, _ := newBlockIter(r.Compare, indexH.Get(), nil, nil) for key, value := iter.First(); key != nil; key, value = iter.Next() { bh, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -1274,7 +1274,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { return err.Error() } twoLevelIndex := r.Properties.IndexPartitions > 0 - i, err := newBlockIter(r.Compare, bh.Get(), nil) + i, err := newBlockIter(r.Compare, bh.Get(), nil, nil) if err != nil { return err.Error() } @@ -1322,7 +1322,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { return err.Error() } if err := subiter.init( - r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix); err != nil { + r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix, r.syntheticSuffix); err != nil { return err.Error() } for key, value := subiter.First(); key != nil; key, value = subiter.Next() { diff --git a/sstable/block_test.go b/sstable/block_test.go index e21d6a78f1..146519fd44 100644 --- a/sstable/block_test.go +++ b/sstable/block_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/itertest" + "github.com/cockroachdb/pebble/internal/testkeys" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" ) @@ -24,6 +25,32 @@ func ikey(s string) InternalKey { return InternalKey{UserKey: []byte(s)} } +// delete this when pr is no longer a draft. +func TestMichael(t *testing.T) { + cache := []byte("apple") + prefixLen := 3 + sepBuffer := []byte{} + userKey := []byte("pear") + + res := bytes.Compare([]byte("a"), []byte("b")) + fmt.Printf("%d\n", res) + if n := len(cache); n >= 0 { + // We initially assume the user key is pointed to cache + userKey = cache + + // during, suffix replacement repoint the userkey to a different buffer + userKey = sepBuffer + + // now do the byte for byte copy from the cache + userKey = append(userKey[:0], cache[:prefixLen]...) + + // manipulate user key and assert the cache was never manipulated + userKey[0] = 'b' + fmt.Printf("user key: %s %p\n", userKey, &userKey[0]) + fmt.Printf("cache key: %s %p\n", cache, &cache[0]) + } +} + func TestBlockWriter(t *testing.T) { w := &rawBlockWriter{ blockWriter: blockWriter{restartInterval: 16}, @@ -245,7 +272,7 @@ func TestBlockIter2(t *testing.T) { return "" case "iter": - iter, err := newBlockIter(bytes.Compare, block, nil) + iter, err := newBlockIter(bytes.Compare, block, nil, nil) if err != nil { return err.Error() } @@ -276,7 +303,7 @@ func TestBlockIterKeyStability(t *testing.T) { } block := w.finish() - i, err := newBlockIter(bytes.Compare, block, nil) + i, err := newBlockIter(bytes.Compare, block, nil, nil) require.NoError(t, err) // Check that the supplied slice resides within the bounds of the block. @@ -336,7 +363,7 @@ func TestBlockIterReverseDirections(t *testing.T) { for targetPos := 0; targetPos < w.restartInterval; targetPos++ { t.Run("", func(t *testing.T) { - i, err := newBlockIter(bytes.Compare, block, nil) + i, err := newBlockIter(bytes.Compare, block, nil, nil) require.NoError(t, err) pos := 3 @@ -375,10 +402,10 @@ func TestBlockSyntheticPrefix(t *testing.T) { elidedPrefixBlock, includedPrefixBlock := elidedPrefixWriter.finish(), includedPrefixWriter.finish() - expect, err := newBlockIter(bytes.Compare, includedPrefixBlock, nil) + expect, err := newBlockIter(bytes.Compare, includedPrefixBlock, nil, nil) require.NoError(t, err) - got, err := newBlockIter(bytes.Compare, elidedPrefixBlock, SyntheticPrefix([]byte(prefix))) + got, err := newBlockIter(bytes.Compare, elidedPrefixBlock, SyntheticPrefix([]byte(prefix)), nil) require.NoError(t, err) check := func(eKey *base.InternalKey, eVal base.LazyValue) func(gKey *base.InternalKey, gVal base.LazyValue) { @@ -413,6 +440,113 @@ func TestBlockSyntheticPrefix(t *testing.T) { } } +func TestBlockSyntheticSuffix(t *testing.T) { + + // TODO(msbutler): add test where replacement suffix has fewer bytes than previous suffix + expectedSuffix := "@15" + + synthSuffix := SyntheticSuffix{ + suffix: []byte(expectedSuffix), + buf: []byte{}, + split: testkeys.Comparer.Split, + } + + // Use testkeys.Comparer.Compare which approximates EngineCompare by ordering + // multiple keys with same prefix in descending suffix order. + cmp := testkeys.Comparer.Compare + + for _, restarts := range []int{1} { //, 2, 3, 4, 10} { + t.Run(fmt.Sprintf("restarts=%d", restarts), func(t *testing.T) { + + suffixWriter, expectedSuffixWriter := &blockWriter{restartInterval: restarts}, &blockWriter{restartInterval: restarts} + keys := []string{ + "apple@2", "apricot@2", "banana@13", + "grape@2", "orange@14", "peach@4", + "pear@1", "persimmon@4", + } + for _, key := range keys { + suffixWriter.add(ikey(key), nil) + replacedKey := strings.Split(key, "@")[0] + expectedSuffix + expectedSuffixWriter.add(ikey(replacedKey), nil) + } + + suffixReplacedBlock, expectedBlock := suffixWriter.finish(), expectedSuffixWriter.finish() + + expect, err := newBlockIter(cmp, expectedBlock, nil, nil) + require.NoError(t, err) + + got, err := newBlockIter(cmp, suffixReplacedBlock, nil, &synthSuffix) + require.NoError(t, err) + + check := func(eKey *base.InternalKey, eVal base.LazyValue) func(gKey *base.InternalKey, gVal base.LazyValue) { + return func(gKey *base.InternalKey, gVal base.LazyValue) { + t.Helper() + if eKey != nil { + require.NotNil(t, gKey, "expected %q", eKey.UserKey) + t.Logf("expected %q, got %q", eKey.UserKey, gKey.UserKey) + require.Equal(t, eKey, gKey) + require.Equal(t, eVal, gVal) + } else { + t.Logf("expected nil, got %q", gKey) + require.Nil(t, gKey) + } + } + } + + check(expect.First())(got.First()) + check(expect.Next())(got.Next()) + check(expect.Prev())(got.Prev()) + + // Try searching with a key that matches the target key before replacement + check(expect.SeekGE([]byte("apricot@2"), base.SeekGEFlagsNone))(got.SeekGE([]byte("apricot@2"), base.SeekGEFlagsNone)) + check(expect.Next())(got.Next()) + + // Try searching with a key that matches the target key after replacement + check(expect.SeekGE([]byte("orange@15"), base.SeekGEFlagsNone))(got.SeekGE([]byte("orange@15"), base.SeekGEFlagsNone)) + check(expect.Next())(got.Next()) + check(expect.Next())(got.Next()) + + // Try searching with a key that results in off by one handling + check(expect.First())(got.First()) + check(expect.SeekGE([]byte("grape@10"), base.SeekGEFlagsNone))(got.SeekGE([]byte("grape@10"), base.SeekGEFlagsNone)) + check(expect.Next())(got.Next()) + check(expect.Next())(got.Next()) + + // Try searching with a key with suffix greater than the replacement + check(expect.SeekGE([]byte("orange@16"), base.SeekGEFlagsNone))(got.SeekGE([]byte("orange@16"), base.SeekGEFlagsNone)) + check(expect.Next())(got.Next()) + + // Exhaust the iterator via searching + check(expect.SeekGE([]byte("persimmon@17"), base.SeekGEFlagsNone))(got.SeekGE([]byte("persimmon@17"), base.SeekGEFlagsNone)) + + // Ensure off by one handling works at end of iterator + check(expect.SeekGE([]byte("persimmon@10"), base.SeekGEFlagsNone))(got.SeekGE([]byte("persimmon@10"), base.SeekGEFlagsNone)) + + // Try reverse search with a key that matches the target key before replacement + check(expect.SeekLT([]byte("banana@13"), base.SeekLTFlagsNone))(got.SeekLT([]byte("banana@13"), base.SeekLTFlagsNone)) + check(expect.Prev())(got.Prev()) + check(expect.Next())(got.Next()) + + // Try reverse searching with a key that matches the target key after replacement + check(expect.Last())(got.Last()) + check(expect.SeekLT([]byte("apricot@15"), base.SeekLTFlagsNone))(got.SeekLT([]byte("apricot@15"), base.SeekLTFlagsNone)) + + // Try reverse searching with a key with suffix in between original and target replacement + check(expect.Last())(got.Last()) + check(expect.SeekLT([]byte("peach@10"), base.SeekLTFlagsNone))(got.SeekLT([]byte("peach@10"), base.SeekLTFlagsNone)) + check(expect.Prev())(got.Prev()) + check(expect.Next())(got.Next()) + + // Exhaust the iterator via reverse searching + check(expect.SeekLT([]byte("apple@17"), base.SeekLTFlagsNone))(got.SeekLT([]byte("apple@17"), base.SeekLTFlagsNone)) + + // Ensure off by one handling works at end of iterator + check(expect.Last())(got.Last()) + check(expect.SeekLT([]byte("apple@10"), base.SeekLTFlagsNone))(got.SeekLT([]byte("apple@10"), base.SeekLTFlagsNone)) + }) + } +} + func BenchmarkBlockIterSeekGE(b *testing.B) { const blockSize = 32 << 10 @@ -432,7 +566,7 @@ func BenchmarkBlockIterSeekGE(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish(), nil) + it, err := newBlockIter(bytes.Compare, w.finish(), nil, nil) if err != nil { b.Fatal(err) } @@ -474,7 +608,7 @@ func BenchmarkBlockIterSeekLT(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish(), nil) + it, err := newBlockIter(bytes.Compare, w.finish(), nil, nil) if err != nil { b.Fatal(err) } @@ -520,7 +654,7 @@ func BenchmarkBlockIterNext(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish(), nil) + it, err := newBlockIter(bytes.Compare, w.finish(), nil, nil) if err != nil { b.Fatal(err) } @@ -552,7 +686,7 @@ func BenchmarkBlockIterPrev(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish(), nil) + it, err := newBlockIter(bytes.Compare, w.finish(), nil, nil) if err != nil { b.Fatal(err) } diff --git a/sstable/layout.go b/sstable/layout.go index 07b8f4f8b1..352b2f70ec 100644 --- a/sstable/layout.go +++ b/sstable/layout.go @@ -187,7 +187,7 @@ func (l *Layout) Describe( var lastKey InternalKey switch b.name { case "data", "range-del", "range-key": - iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix) + iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix, nil) for key, value := iter.First(); key != nil; key, value = iter.Next() { ptr := unsafe.Pointer(uintptr(iter.ptr) + uintptr(iter.offset)) shared, ptr := decodeVarint(ptr) @@ -239,7 +239,7 @@ func (l *Layout) Describe( formatRestarts(iter.data, iter.restarts, iter.numRestarts) formatTrailer() case "index", "top-index": - iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix) + iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix, nil) for key, value := iter.First(); key != nil; key, value = iter.Next() { bh, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { diff --git a/sstable/reader.go b/sstable/reader.go index e3939b4179..60efae705b 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -186,6 +186,12 @@ func (p SyntheticPrefix) readerApply(r *Reader) { r.syntheticPrefix = p[:len(p):len(p)] } +type SyntheticSuffix struct { + suffix []byte + buf []byte + split base.Split +} + // rawTombstonesOpt is a Reader open option for specifying that range // tombstones returned by Reader.NewRangeDelIter() should not be // fragmented. Used by debug tools to get a raw view of the tombstones @@ -252,6 +258,7 @@ type Reader struct { FormatKey base.FormatKey Split Split syntheticPrefix SyntheticPrefix + syntheticSuffix *SyntheticSuffix tableFilter *tableFilterReader // Keep types that are not multiples of 8 bytes at the end and with // decreasing size. @@ -459,7 +466,7 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { // sstables. This is because rangedels do not apply to points in the same // sstable at the same sequence number anyway, so exposing obsolete rangedels // is harmless. - if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { + if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false, r.syntheticPrefix, nil); err != nil { return nil, err } return i, nil @@ -481,7 +488,7 @@ func (r *Reader) newRawRangeKeyIter(vState *virtualState) (keyspan.FragmentItera if vState == nil || !vState.isSharedIngested { globalSeqNum = r.Properties.GlobalSeqNum } - if err := i.blockIter.initHandle(r.Compare, h, globalSeqNum, false /* hideObsoletePoints */, r.syntheticPrefix); err != nil { + if err := i.blockIter.initHandle(r.Compare, h, globalSeqNum, false, r.syntheticPrefix, nil); err != nil { return nil, err } return i, nil @@ -736,7 +743,7 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { // tombstones. We need properly fragmented and sorted range tombstones in // order to serve from them directly. iter := &blockIter{} - if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { + if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false, r.syntheticPrefix, r.syntheticSuffix); err != nil { return nil, err } var tombstones []keyspan.Span @@ -916,7 +923,7 @@ func (r *Reader) Layout() (*Layout, error) { if r.Properties.IndexPartitions == 0 { l.Index = append(l.Index, r.indexBH) - iter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + iter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix, r.syntheticSuffix) for key, value := iter.First(); key != nil; key, value = iter.Next() { dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -929,7 +936,7 @@ func (r *Reader) Layout() (*Layout, error) { } } else { l.TopIndex = r.indexBH - topIter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + topIter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix, r.syntheticSuffix) iter := &blockIter{SyntheticPrefix: r.syntheticPrefix} for key, value := topIter.First(); key != nil; key, value = topIter.Next() { indexBH, err := decodeBlockHandleWithProperties(value.InPlaceValue()) @@ -944,7 +951,7 @@ func (r *Reader) Layout() (*Layout, error) { return nil, err } if err := iter.init(r.Compare, subIndex.Get(), 0, /* globalSeqNum */ - false /* hideObsoletePoints */, r.syntheticPrefix); err != nil { + false /* hideObsoletePoints */, r.syntheticPrefix, r.syntheticSuffix); err != nil { return nil, err } for key, value := iter.First(); key != nil; key, value = iter.Next() { @@ -1079,14 +1086,14 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // to the same blockIter over the single index in the unpartitioned case. var startIdxIter, endIdxIter *blockIter if r.Properties.IndexPartitions == 0 { - iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix, r.syntheticSuffix) if err != nil { return 0, err } startIdxIter = iter endIdxIter = iter } else { - topIter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + topIter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix, r.syntheticSuffix) if err != nil { return 0, err } @@ -1106,7 +1113,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } defer startIdxBlock.Release() - startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get(), r.syntheticPrefix) + startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get(), r.syntheticPrefix, r.syntheticSuffix) if err != nil { return 0, err } @@ -1127,7 +1134,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } defer endIdxBlock.Release() - endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get(), r.syntheticPrefix) + endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get(), r.syntheticPrefix, r.syntheticSuffix) if err != nil { return 0, err } diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index e6f90baedc..22d92523c0 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -213,7 +213,7 @@ func (i *singleLevelIterator) init( i.stats = stats i.hideObsoletePoints = hideObsoletePoints i.bufferPool = bufferPool - err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix) + err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix, nil) if err != nil { // blockIter.Close releases indexH and always returns a nil error _ = i.index.Close() @@ -435,7 +435,7 @@ func (i *singleLevelIterator) loadBlock(dir int8) loadBlockResult { i.err = err return loadBlockFailed } - i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum, i.hideObsoletePoints, i.reader.syntheticPrefix) + i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum, i.hideObsoletePoints, i.reader.syntheticPrefix, nil) if i.err != nil { // The block is partially loaded, and we don't want it to appear valid. i.data.invalidate() diff --git a/sstable/reader_iter_two_lvl.go b/sstable/reader_iter_two_lvl.go index 273a487ccf..d18c2c5287 100644 --- a/sstable/reader_iter_two_lvl.go +++ b/sstable/reader_iter_two_lvl.go @@ -67,7 +67,7 @@ func (i *twoLevelIterator) loadIndex(dir int8) loadBlockResult { i.err = err return loadBlockFailed } - if i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum, false, i.reader.syntheticPrefix); i.err == nil { + if i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum, false, i.reader.syntheticPrefix, nil); i.err == nil { return loadBlockOK } return loadBlockFailed @@ -175,7 +175,7 @@ func (i *twoLevelIterator) init( i.stats = stats i.hideObsoletePoints = hideObsoletePoints i.bufferPool = bufferPool - err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix) + err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix, nil) if err != nil { // blockIter.Close releases topLevelIndexH and always returns a nil error _ = i.topLevelIndex.Close() diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 940ba7bdb4..5a274283a1 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -715,7 +715,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { var buf strings.Builder twoLevelIndex := r.Properties.IndexType == twoLevelIndex buf.WriteString("index entries:\n") - iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix, r.syntheticSuffix) defer func() { require.NoError(t, iter.Close()) }() @@ -729,7 +729,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { context.Background(), bh.BlockHandle, nil, nil, nil, nil, nil) require.NoError(t, err) defer b.Release() - iter2, err := newBlockIter(r.Compare, b.Get(), r.syntheticPrefix) + iter2, err := newBlockIter(r.Compare, b.Get(), r.syntheticPrefix, r.syntheticSuffix) defer func() { require.NoError(t, iter2.Close()) }() diff --git a/sstable/suffix_rewriter.go b/sstable/suffix_rewriter.go index 8d6be3b9a5..6ac2f4f04a 100644 --- a/sstable/suffix_rewriter.go +++ b/sstable/suffix_rewriter.go @@ -201,7 +201,7 @@ func rewriteBlocks( if err != nil { return err } - if err := iter.init(r.Compare, inputBlock, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { + if err := iter.init(r.Compare, inputBlock, r.Properties.GlobalSeqNum, false, r.syntheticPrefix, r.syntheticSuffix); err != nil { return err }