From a63110b280e4bacffe7a7fc7e897d5c97860cf05 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 22 Jan 2024 13:32:52 -0600 Subject: [PATCH] sstable: add block level suffix replacement support --- sstable/block.go | 120 ++++++++++++++++++++++++-- sstable/block_property_test.go | 6 +- sstable/block_test.go | 135 ++++++++++++++++++++++++++++-- sstable/layout.go | 4 +- sstable/reader.go | 27 +++--- sstable/reader_iter_single_lvl.go | 4 +- sstable/reader_iter_two_lvl.go | 4 +- sstable/reader_test.go | 4 +- sstable/suffix_rewriter.go | 2 +- 9 files changed, 270 insertions(+), 36 deletions(-) diff --git a/sstable/block.go b/sstable/block.go index e8d4bd39bcf..bfb472179a8 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -412,14 +412,17 @@ type blockIter struct { } hideObsoletePoints bool SyntheticPrefix + syntheticSuffix *SyntheticSuffix } // blockIter implements the base.InternalIterator interface. var _ base.InternalIterator = (*blockIter)(nil) -func newBlockIter(cmp Compare, block block, syntheticPrefix SyntheticPrefix) (*blockIter, error) { +func newBlockIter( + cmp Compare, block block, syntheticPrefix SyntheticPrefix, syntheticSuffix *SyntheticSuffix, +) (*blockIter, error) { i := &blockIter{} - return i, i.init(cmp, block, 0, false, syntheticPrefix) + return i, i.init(cmp, block, 0, false, syntheticPrefix, syntheticSuffix) } func (i *blockIter) String() string { @@ -427,13 +430,19 @@ func (i *blockIter) String() string { } func (i *blockIter) init( - cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix, + cmp Compare, + block block, + globalSeqNum uint64, + hideObsoletePoints bool, + syntheticPrefix SyntheticPrefix, + syntheticSuffix *SyntheticSuffix, ) error { numRestarts := int32(binary.LittleEndian.Uint32(block[len(block)-4:])) if numRestarts == 0 { return base.CorruptionErrorf("pebble/table: invalid table (block has no restart points)") } i.SyntheticPrefix = syntheticPrefix + i.syntheticSuffix = syntheticSuffix i.cmp = cmp i.restarts = int32(len(block)) - 4*(1+numRestarts) i.numRestarts = numRestarts @@ -464,11 +473,16 @@ func (i *blockIter) init( // ingested. // - Foreign sstable iteration: globalSeqNum is always set. func (i *blockIter) initHandle( - cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix, + cmp Compare, + block bufferHandle, + globalSeqNum uint64, + hideObsoletePoints bool, + syntheticPrefix SyntheticPrefix, + syntheticSuffix *SyntheticSuffix, ) error { i.handle.Release() i.handle = block - return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix) + return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix, syntheticSuffix) } func (i *blockIter) invalidate() { @@ -671,6 +685,21 @@ func (i *blockIter) decodeInternalKey(key []byte) (hiddenPoint bool) { return hiddenPoint } +func (i *blockIter) maybeReplaceSuffix(maybeFromCache bool) { + if i.syntheticSuffix != nil && i.ikey.UserKey != nil { + prefixLen := i.syntheticSuffix.split(i.ikey.UserKey) + if !maybeFromCache { + i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...) + return + } + // If ikey is cached or may get cached, we must de-reference + // UserKey before prefix replacement. + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:prefixLen]...) + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...) + i.ikey.UserKey = i.syntheticSuffix.buf + } +} + func (i *blockIter) clearCache() { i.cached = i.cached[:0] i.cachedBuf = i.cachedBuf[:0] @@ -811,7 +840,14 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba if !i.valid() { return nil, base.LazyValue{} } + // Before conducting any user key comparison, we must replace the suffix. + i.maybeReplaceSuffix(false) + if !hiddenPoint && i.cmp(i.ikey.UserKey, key) >= 0 { + i.correctSeekOffByOneError(key) + if !i.valid() { + return nil, base.LazyValue{} + } // Initialize i.lazyValue if !i.lazyValueHandling.hasValuePrefix || base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet { @@ -826,12 +862,58 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba for i.Next(); i.valid(); i.Next() { if i.cmp(i.ikey.UserKey, key) >= 0 { // i.Next() has already initialized i.lazyValue. + i.correctSeekOffByOneError(key) + if !i.valid() { + return nil, base.LazyValue{} + } return &i.ikey, i.lazyValue } } return nil, base.LazyValue{} } +func (i *blockIter) correctSeekOffByOneError(key []byte) { + if i.syntheticSuffix != nil && i.ikey.UserKey != nil { + // To seek in a block with suffix replacement, we rely on 2 invariants to correct a potential off by 1 error: + // - (1) each prefix has a unique suffix + // - (2) byte.Compare(replacementSuffix,originalSuffix) > 0, for all keys + // + // To illustrate the off by 1 error, consider the following block with a + // restart interval of 2 and replacement suffix of "4": + // - Pre-suffix replacement: blueapple@1, {blue}banana@3, redapple@2, {red}banana@1 + // - Post-suffix replacement: blueapple@4, {blue}banana@4, redapple@4, {red}banana@4 + // + // Suppose the client seeks with blueapple@3. Because timestamps sort in + // reverse chronological order (i.e. blueapple@3>blueapple@4), a seek on the + // block _before_ replacement would return blueapple@1, but _after_ + // replacement we return bluebanana@4. So, calling i.Next() after the + // pre-replacement seek would return the right answer. + // + // More generally, let k be the seeking key, a_pre the original key we seek to + // before suffix replacement, a_post the suffix replaced original key, and b + // the correct key to seek to. The iterator must call i.Next() iff k > a_post + // && k <= a_pre to return b. Call this the off-by-1 condition. If the + // off-by-1 conidtion isn't hit, the iterator always returns b. + // + // Further, if the iter hits the off-by-1 condition, we know the correct key + // is exactly 1 key away from a_pre because of invariant (1): each prefix has + // a unique suffix. Further, the correct key is always returned after i.Next() + // because of invariant (2). + // + // Check if k <= a_pre + seekKeyGeqPre := bytes.Compare(key, i.ikey.UserKey) <= 0 + + // Check if k > a_post; Use the syntheticSufix buf to compute a_post, and + // only replace user key if off by 1 condition is satisfied. + i.maybeReplaceSuffix(false) + seekKeyLessPost := bytes.Compare(key, i.ikey.UserKey) > 0 + + if seekKeyGeqPre && seekKeyLessPost { + i.Next() + } + } +} + // SeekPrefixGE implements internalIterator.SeekPrefixGE, as documented in the // pebble package. func (i *blockIter) SeekPrefixGE( @@ -979,6 +1061,8 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba // of hidden keys we will be able to skip whole blocks (using block // property filters) so we don't bother optimizing. hiddenPoint := i.decodeInternalKey(i.key) + // This key may get put in the cache, so + i.maybeReplaceSuffix(true) // NB: we don't use the hiddenPoint return value of decodeInternalKey // since we want to stop as soon as we reach a key >= ikey.UserKey, so @@ -1040,6 +1124,7 @@ func (i *blockIter) First() (*InternalKey, base.LazyValue) { if hiddenPoint { return i.Next() } + i.maybeReplaceSuffix(false) if !i.lazyValueHandling.hasValuePrefix || base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet { i.lazyValue = base.MakeInPlaceValue(i.val) @@ -1082,6 +1167,7 @@ func (i *blockIter) Last() (*InternalKey, base.LazyValue) { if hiddenPoint { return i.Prev() } + i.maybeReplaceSuffix(true) if !i.lazyValueHandling.hasValuePrefix || base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet { i.lazyValue = base.MakeInPlaceValue(i.val) @@ -1130,6 +1216,10 @@ start: if hiddenPoint { goto start } + if i.syntheticSuffix != nil { + prefixLen := i.syntheticSuffix.split(i.ikey.UserKey) + i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...) + } } else { i.ikey.Trailer = uint64(InternalKeyKindInvalid) i.ikey.UserKey = nil @@ -1400,6 +1490,10 @@ func (i *blockIter) nextPrefixV3(succKey []byte) (*InternalKey, base.LazyValue) if i.globalSeqNum != 0 { i.ikey.SetSeqNum(i.globalSeqNum) } + if i.syntheticSuffix != nil { + prefixLen := i.syntheticSuffix.split(i.ikey.UserKey) + i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix.suffix...) + } } else { i.ikey.Trailer = uint64(InternalKeyKindInvalid) i.ikey.UserKey = nil @@ -1458,6 +1552,14 @@ start: if i.globalSeqNum != 0 { i.ikey.SetSeqNum(i.globalSeqNum) } + if i.syntheticSuffix != nil { + suffixLen := i.syntheticSuffix.split(i.ikey.UserKey) + // If ikey is cached or may get cached, we must de-reference + // UserKey before prefix replacement. + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:suffixLen]...) + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...) + i.ikey.UserKey = i.syntheticSuffix.buf + } } else { i.ikey.Trailer = uint64(InternalKeyKindInvalid) i.ikey.UserKey = nil @@ -1535,6 +1637,14 @@ start: // Use the cache. goto start } + if i.syntheticSuffix != nil { + suffixLen := i.syntheticSuffix.split(i.ikey.UserKey) + // If ikey is cached or may get cached, we must de-reference + // UserKey before prefix replacement. + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf[:0], i.ikey.UserKey[:suffixLen]...) + i.syntheticSuffix.buf = append(i.syntheticSuffix.buf, i.syntheticSuffix.suffix...) + i.ikey.UserKey = i.syntheticSuffix.buf + } if !i.lazyValueHandling.hasValuePrefix || base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet { i.lazyValue = base.MakeInPlaceValue(i.val) diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go index eb873f0eca1..acea9c318a7 100644 --- a/sstable/block_property_test.go +++ b/sstable/block_property_test.go @@ -938,7 +938,7 @@ func TestBlockProperties(t *testing.T) { var blocks []int var i int - iter, _ := newBlockIter(r.Compare, indexH.Get(), nil) + iter, _ := newBlockIter(r.Compare, indexH.Get(), nil, nil) for key, value := iter.First(); key != nil; key, value = iter.Next() { bh, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -1274,7 +1274,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { return err.Error() } twoLevelIndex := r.Properties.IndexPartitions > 0 - i, err := newBlockIter(r.Compare, bh.Get(), nil) + i, err := newBlockIter(r.Compare, bh.Get(), nil, nil) if err != nil { return err.Error() } @@ -1322,7 +1322,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { return err.Error() } if err := subiter.init( - r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix); err != nil { + r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix, r.syntheticSuffix); err != nil { return err.Error() } for key, value := subiter.First(); key != nil; key, value = subiter.Next() { diff --git a/sstable/block_test.go b/sstable/block_test.go index e21d6a78f1a..ade4e545463 100644 --- a/sstable/block_test.go +++ b/sstable/block_test.go @@ -24,6 +24,28 @@ func ikey(s string) InternalKey { return InternalKey{UserKey: []byte(s)} } +func TestMichael(t *testing.T) { + cache := []byte("apple") + prefixLen := 3 + sepBuffer := []byte{} + userKey := []byte("pear") + if n := len(cache); n >= 0 { + // We initially assume the user key is pointed to cache + userKey = cache + + // during, suffix replacement repoint the userkey to a different buffer + userKey = sepBuffer + + // now do the byte for byte copy from the cache + userKey = append(userKey[:0], cache[:prefixLen]...) + + // manipulate user key and assert the cache was never manipulated + userKey[0] = 'b' + fmt.Printf("user key: %s %p\n", userKey, &userKey[0]) + fmt.Printf("cache key: %s %p\n", cache, &cache[0]) + } +} + func TestBlockWriter(t *testing.T) { w := &rawBlockWriter{ blockWriter: blockWriter{restartInterval: 16}, @@ -245,7 +267,7 @@ func TestBlockIter2(t *testing.T) { return "" case "iter": - iter, err := newBlockIter(bytes.Compare, block, nil) + iter, err := newBlockIter(bytes.Compare, block, nil, nil) if err != nil { return err.Error() } @@ -276,7 +298,7 @@ func TestBlockIterKeyStability(t *testing.T) { } block := w.finish() - i, err := newBlockIter(bytes.Compare, block, nil) + i, err := newBlockIter(bytes.Compare, block, nil, nil) require.NoError(t, err) // Check that the supplied slice resides within the bounds of the block. @@ -336,7 +358,7 @@ func TestBlockIterReverseDirections(t *testing.T) { for targetPos := 0; targetPos < w.restartInterval; targetPos++ { t.Run("", func(t *testing.T) { - i, err := newBlockIter(bytes.Compare, block, nil) + i, err := newBlockIter(bytes.Compare, block, nil, nil) require.NoError(t, err) pos := 3 @@ -375,10 +397,10 @@ func TestBlockSyntheticPrefix(t *testing.T) { elidedPrefixBlock, includedPrefixBlock := elidedPrefixWriter.finish(), includedPrefixWriter.finish() - expect, err := newBlockIter(bytes.Compare, includedPrefixBlock, nil) + expect, err := newBlockIter(bytes.Compare, includedPrefixBlock, nil, nil) require.NoError(t, err) - got, err := newBlockIter(bytes.Compare, elidedPrefixBlock, SyntheticPrefix([]byte(prefix))) + got, err := newBlockIter(bytes.Compare, elidedPrefixBlock, SyntheticPrefix([]byte(prefix)), nil) require.NoError(t, err) check := func(eKey *base.InternalKey, eVal base.LazyValue) func(gKey *base.InternalKey, gVal base.LazyValue) { @@ -413,6 +435,101 @@ func TestBlockSyntheticPrefix(t *testing.T) { } } +func TestBlockSyntheticSuffix(t *testing.T) { + + // TODO(msbutler): add test where replacement suffix has fewer bytes than previous suffix + expectedSuffix := "@15" + + synthSuffix := SyntheticSuffix{ + suffix: []byte(expectedSuffix), + buf: []byte{}, + split: func(a []byte) int { return len(strings.Split(string(a), "@")[0]) }, + } + + for _, restarts := range []int{1} { //, 2, 3, 4, 10} { + t.Run(fmt.Sprintf("restarts=%d", restarts), func(t *testing.T) { + + suffixWriter, expectedSuffixWriter := &blockWriter{restartInterval: restarts}, &blockWriter{restartInterval: restarts} + keys := []string{ + "apple@2", "apricot@2", "banana@13", + "grape@2", "orange@14", "peach@4", + "pear@1", "persimmon@4", + } + for _, key := range keys { + suffixWriter.add(ikey(key), nil) + replacedKey := strings.Split(key, "@")[0] + expectedSuffix + expectedSuffixWriter.add(ikey(replacedKey), nil) + } + + suffixReplacedBlock, expectedBlock := suffixWriter.finish(), expectedSuffixWriter.finish() + + expect, err := newBlockIter(bytes.Compare, expectedBlock, nil, nil) + require.NoError(t, err) + + got, err := newBlockIter(bytes.Compare, suffixReplacedBlock, nil, &synthSuffix) + require.NoError(t, err) + + check := func(eKey *base.InternalKey, eVal base.LazyValue) func(gKey *base.InternalKey, gVal base.LazyValue) { + return func(gKey *base.InternalKey, gVal base.LazyValue) { + t.Helper() + if eKey != nil { + t.Logf("expected %q, got %q", eKey.UserKey, gKey.UserKey) + require.Equal(t, eKey, gKey) + require.Equal(t, eVal, gVal) + } else { + t.Logf("expected nil, got %q", gKey) + require.Nil(t, gKey) + } + } + } + + check(expect.First())(got.First()) + check(expect.Next())(got.Next()) + check(expect.Prev())(got.Prev()) + + // Try searching with a key that matches the target key before replacement + check(expect.SeekGE([]byte("apricot@2"), base.SeekGEFlagsNone))(got.SeekGE([]byte("apricot@2"), base.SeekGEFlagsNone)) + check(expect.Next())(got.Next()) + + // Try searching with a key that matches the target key after replacement + check(expect.SeekGE([]byte("orange@15"), base.SeekGEFlagsNone))(got.SeekGE([]byte("orange@15"), base.SeekGEFlagsNone)) + check(expect.Next())(got.Next()) + check(expect.Next())(got.Next()) + + // Try searching with a key that results in off by one handling + check(expect.First())(got.First()) + check(expect.SeekGE([]byte("grape@10"), base.SeekGEFlagsNone))(got.SeekGE([]byte("grape@10"), base.SeekGEFlagsNone)) + check(expect.Next())(got.Next()) + check(expect.Next())(got.Next()) + + // Try searching with a key with suffix greater than the replacement + check(expect.SeekGE([]byte("orange@16"), base.SeekGEFlagsNone))(got.SeekGE([]byte("orange@16"), base.SeekGEFlagsNone)) + check(expect.Next())(got.Next()) + + // Exhaust the iterator via searching + check(expect.SeekGE([]byte("persimmon@17"), base.SeekGEFlagsNone))(got.SeekGE([]byte("persimmon@17"), base.SeekGEFlagsNone)) + + // Try reverse search with a key that matches the target key before replacement + check(expect.SeekLT([]byte("banana@13"), base.SeekLTFlagsNone))(got.SeekLT([]byte("banana@13"), base.SeekLTFlagsNone)) + check(expect.Prev())(got.Prev()) + check(expect.Next())(got.Next()) + + // Try reverse searching with a key that matches the target key after replacement + check(expect.Last())(got.Last()) + check(expect.SeekLT([]byte("apricot@15"), base.SeekLTFlagsNone))(got.SeekLT([]byte("apricot@15"), base.SeekLTFlagsNone)) + + // Try reverse searching with a key with suffix in between original and target replacement + check(expect.Last())(got.Last()) + check(expect.SeekLT([]byte("peach@10"), base.SeekLTFlagsNone))(got.SeekLT([]byte("peach@10"), base.SeekLTFlagsNone)) + check(expect.Prev())(got.Prev()) + check(expect.Next())(got.Next()) + + // Exhaust the iterator via reverse searching + check(expect.SeekGE([]byte("apple@17"), base.SeekGEFlagsNone))(got.SeekGE([]byte("apple@17"), base.SeekGEFlagsNone)) + }) + } +} + func BenchmarkBlockIterSeekGE(b *testing.B) { const blockSize = 32 << 10 @@ -432,7 +549,7 @@ func BenchmarkBlockIterSeekGE(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish(), nil) + it, err := newBlockIter(bytes.Compare, w.finish(), nil, nil) if err != nil { b.Fatal(err) } @@ -474,7 +591,7 @@ func BenchmarkBlockIterSeekLT(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish(), nil) + it, err := newBlockIter(bytes.Compare, w.finish(), nil, nil) if err != nil { b.Fatal(err) } @@ -520,7 +637,7 @@ func BenchmarkBlockIterNext(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish(), nil) + it, err := newBlockIter(bytes.Compare, w.finish(), nil, nil) if err != nil { b.Fatal(err) } @@ -552,7 +669,7 @@ func BenchmarkBlockIterPrev(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish(), nil) + it, err := newBlockIter(bytes.Compare, w.finish(), nil, nil) if err != nil { b.Fatal(err) } diff --git a/sstable/layout.go b/sstable/layout.go index 07b8f4f8b15..352b2f70ec6 100644 --- a/sstable/layout.go +++ b/sstable/layout.go @@ -187,7 +187,7 @@ func (l *Layout) Describe( var lastKey InternalKey switch b.name { case "data", "range-del", "range-key": - iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix) + iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix, nil) for key, value := iter.First(); key != nil; key, value = iter.Next() { ptr := unsafe.Pointer(uintptr(iter.ptr) + uintptr(iter.offset)) shared, ptr := decodeVarint(ptr) @@ -239,7 +239,7 @@ func (l *Layout) Describe( formatRestarts(iter.data, iter.restarts, iter.numRestarts) formatTrailer() case "index", "top-index": - iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix) + iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix, nil) for key, value := iter.First(); key != nil; key, value = iter.Next() { bh, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { diff --git a/sstable/reader.go b/sstable/reader.go index e3939b41791..60efae705bc 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -186,6 +186,12 @@ func (p SyntheticPrefix) readerApply(r *Reader) { r.syntheticPrefix = p[:len(p):len(p)] } +type SyntheticSuffix struct { + suffix []byte + buf []byte + split base.Split +} + // rawTombstonesOpt is a Reader open option for specifying that range // tombstones returned by Reader.NewRangeDelIter() should not be // fragmented. Used by debug tools to get a raw view of the tombstones @@ -252,6 +258,7 @@ type Reader struct { FormatKey base.FormatKey Split Split syntheticPrefix SyntheticPrefix + syntheticSuffix *SyntheticSuffix tableFilter *tableFilterReader // Keep types that are not multiples of 8 bytes at the end and with // decreasing size. @@ -459,7 +466,7 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { // sstables. This is because rangedels do not apply to points in the same // sstable at the same sequence number anyway, so exposing obsolete rangedels // is harmless. - if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { + if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false, r.syntheticPrefix, nil); err != nil { return nil, err } return i, nil @@ -481,7 +488,7 @@ func (r *Reader) newRawRangeKeyIter(vState *virtualState) (keyspan.FragmentItera if vState == nil || !vState.isSharedIngested { globalSeqNum = r.Properties.GlobalSeqNum } - if err := i.blockIter.initHandle(r.Compare, h, globalSeqNum, false /* hideObsoletePoints */, r.syntheticPrefix); err != nil { + if err := i.blockIter.initHandle(r.Compare, h, globalSeqNum, false, r.syntheticPrefix, nil); err != nil { return nil, err } return i, nil @@ -736,7 +743,7 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { // tombstones. We need properly fragmented and sorted range tombstones in // order to serve from them directly. iter := &blockIter{} - if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { + if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false, r.syntheticPrefix, r.syntheticSuffix); err != nil { return nil, err } var tombstones []keyspan.Span @@ -916,7 +923,7 @@ func (r *Reader) Layout() (*Layout, error) { if r.Properties.IndexPartitions == 0 { l.Index = append(l.Index, r.indexBH) - iter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + iter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix, r.syntheticSuffix) for key, value := iter.First(); key != nil; key, value = iter.Next() { dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -929,7 +936,7 @@ func (r *Reader) Layout() (*Layout, error) { } } else { l.TopIndex = r.indexBH - topIter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + topIter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix, r.syntheticSuffix) iter := &blockIter{SyntheticPrefix: r.syntheticPrefix} for key, value := topIter.First(); key != nil; key, value = topIter.Next() { indexBH, err := decodeBlockHandleWithProperties(value.InPlaceValue()) @@ -944,7 +951,7 @@ func (r *Reader) Layout() (*Layout, error) { return nil, err } if err := iter.init(r.Compare, subIndex.Get(), 0, /* globalSeqNum */ - false /* hideObsoletePoints */, r.syntheticPrefix); err != nil { + false /* hideObsoletePoints */, r.syntheticPrefix, r.syntheticSuffix); err != nil { return nil, err } for key, value := iter.First(); key != nil; key, value = iter.Next() { @@ -1079,14 +1086,14 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // to the same blockIter over the single index in the unpartitioned case. var startIdxIter, endIdxIter *blockIter if r.Properties.IndexPartitions == 0 { - iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix, r.syntheticSuffix) if err != nil { return 0, err } startIdxIter = iter endIdxIter = iter } else { - topIter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + topIter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix, r.syntheticSuffix) if err != nil { return 0, err } @@ -1106,7 +1113,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } defer startIdxBlock.Release() - startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get(), r.syntheticPrefix) + startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get(), r.syntheticPrefix, r.syntheticSuffix) if err != nil { return 0, err } @@ -1127,7 +1134,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } defer endIdxBlock.Release() - endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get(), r.syntheticPrefix) + endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get(), r.syntheticPrefix, r.syntheticSuffix) if err != nil { return 0, err } diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index e6f90baedc7..22d92523c0b 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -213,7 +213,7 @@ func (i *singleLevelIterator) init( i.stats = stats i.hideObsoletePoints = hideObsoletePoints i.bufferPool = bufferPool - err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix) + err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix, nil) if err != nil { // blockIter.Close releases indexH and always returns a nil error _ = i.index.Close() @@ -435,7 +435,7 @@ func (i *singleLevelIterator) loadBlock(dir int8) loadBlockResult { i.err = err return loadBlockFailed } - i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum, i.hideObsoletePoints, i.reader.syntheticPrefix) + i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum, i.hideObsoletePoints, i.reader.syntheticPrefix, nil) if i.err != nil { // The block is partially loaded, and we don't want it to appear valid. i.data.invalidate() diff --git a/sstable/reader_iter_two_lvl.go b/sstable/reader_iter_two_lvl.go index 273a487ccfb..d18c2c52879 100644 --- a/sstable/reader_iter_two_lvl.go +++ b/sstable/reader_iter_two_lvl.go @@ -67,7 +67,7 @@ func (i *twoLevelIterator) loadIndex(dir int8) loadBlockResult { i.err = err return loadBlockFailed } - if i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum, false, i.reader.syntheticPrefix); i.err == nil { + if i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum, false, i.reader.syntheticPrefix, nil); i.err == nil { return loadBlockOK } return loadBlockFailed @@ -175,7 +175,7 @@ func (i *twoLevelIterator) init( i.stats = stats i.hideObsoletePoints = hideObsoletePoints i.bufferPool = bufferPool - err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix) + err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix, nil) if err != nil { // blockIter.Close releases topLevelIndexH and always returns a nil error _ = i.topLevelIndex.Close() diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 940ba7bdb45..5a274283a14 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -715,7 +715,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { var buf strings.Builder twoLevelIndex := r.Properties.IndexType == twoLevelIndex buf.WriteString("index entries:\n") - iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix, r.syntheticSuffix) defer func() { require.NoError(t, iter.Close()) }() @@ -729,7 +729,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { context.Background(), bh.BlockHandle, nil, nil, nil, nil, nil) require.NoError(t, err) defer b.Release() - iter2, err := newBlockIter(r.Compare, b.Get(), r.syntheticPrefix) + iter2, err := newBlockIter(r.Compare, b.Get(), r.syntheticPrefix, r.syntheticSuffix) defer func() { require.NoError(t, iter2.Close()) }() diff --git a/sstable/suffix_rewriter.go b/sstable/suffix_rewriter.go index 8d6be3b9a5f..6ac2f4f04a3 100644 --- a/sstable/suffix_rewriter.go +++ b/sstable/suffix_rewriter.go @@ -201,7 +201,7 @@ func rewriteBlocks( if err != nil { return err } - if err := iter.init(r.Compare, inputBlock, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { + if err := iter.init(r.Compare, inputBlock, r.Properties.GlobalSeqNum, false, r.syntheticPrefix, r.syntheticSuffix); err != nil { return err }