From d6f9dd4b270da73cb8f8fd0469e85a08ca6875a8 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 28 Dec 2023 19:59:33 +0000 Subject: [PATCH] sstable: add block level synthetic prefix support This adds the concept of synthetic prefixes to sstable readers and their underlying block iterators, treating the synthetic prefix as an extra shared prefix, shared even by restart keys. When a reader is configured with a synthetic prefix, it will assume that that prefix is implicitly prepended to every key in the underlying sst blocks when interacting with or returning those keys. --- sstable/block.go | 52 ++++++++++++++--- sstable/block_property_test.go | 6 +- sstable/block_test.go | 70 ++++++++++++++++++++--- sstable/layout.go | 29 +++++----- sstable/prefix_replacing_iterator.go | 4 +- sstable/prefix_replacing_iterator_test.go | 13 ++++- sstable/reader.go | 52 +++++++++++++---- sstable/reader_iter_single_lvl.go | 7 ++- sstable/reader_iter_two_lvl.go | 4 +- sstable/reader_test.go | 12 ++-- sstable/reader_virtual.go | 4 +- sstable/suffix_rewriter.go | 2 +- table_cache.go | 20 ++++--- testdata/event_listener | 4 +- testdata/ingest | 2 +- testdata/metrics | 8 +-- 16 files changed, 218 insertions(+), 71 deletions(-) diff --git a/sstable/block.go b/sstable/block.go index 6cfea806acf..657c343156f 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -5,6 +5,7 @@ package sstable import ( + "bytes" "context" "encoding/binary" "unsafe" @@ -410,14 +411,15 @@ type blockIter struct { hasValuePrefix bool } hideObsoletePoints bool + SyntheticPrefix } // blockIter implements the base.InternalIterator interface. var _ base.InternalIterator = (*blockIter)(nil) -func newBlockIter(cmp Compare, block block) (*blockIter, error) { +func newBlockIter(cmp Compare, block block, syntheticPrefix SyntheticPrefix) (*blockIter, error) { i := &blockIter{} - return i, i.init(cmp, block, 0, false) + return i, i.init(cmp, block, 0, false, syntheticPrefix) } func (i *blockIter) String() string { @@ -425,19 +427,24 @@ func (i *blockIter) String() string { } func (i *blockIter) init( - cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool, + cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix, ) error { numRestarts := int32(binary.LittleEndian.Uint32(block[len(block)-4:])) if numRestarts == 0 { return base.CorruptionErrorf("pebble/table: invalid table (block has no restart points)") } + i.SyntheticPrefix = syntheticPrefix i.cmp = cmp i.restarts = int32(len(block)) - 4*(1+numRestarts) i.numRestarts = numRestarts i.globalSeqNum = globalSeqNum i.ptr = unsafe.Pointer(&block[0]) i.data = block - i.fullKey = i.fullKey[:0] + if i.SyntheticPrefix != nil { + i.fullKey = append(i.fullKey[:0], i.SyntheticPrefix...) + } else { + i.fullKey = i.fullKey[:0] + } i.val = nil i.hideObsoletePoints = hideObsoletePoints i.clearCache() @@ -457,11 +464,11 @@ func (i *blockIter) init( // ingested. // - Foreign sstable iteration: globalSeqNum is always set. func (i *blockIter) initHandle( - cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, + cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix, ) error { i.handle.Release() i.handle = block - return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints) + return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix) } func (i *blockIter) invalidate() { @@ -557,6 +564,7 @@ func (i *blockIter) readEntry() { ptr = unsafe.Pointer(uintptr(ptr) + 5) } + shared += uint32(len(i.SyntheticPrefix)) unsharedKey := getBytes(ptr, int(unshared)) // TODO(sumeer): move this into the else block below. i.fullKey = append(i.fullKey[:shared], unsharedKey...) @@ -633,6 +641,9 @@ func (i *blockIter) readFirstKey() error { i.firstUserKey = nil return base.CorruptionErrorf("pebble/table: invalid firstKey in block") } + if i.SyntheticPrefix != nil { + i.firstUserKey = append(i.SyntheticPrefix, i.firstUserKey...) + } return nil } @@ -693,6 +704,17 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba panic(errors.AssertionFailedf("invalidated blockIter used")) } + searchKey := key + if i.SyntheticPrefix != nil { + if !bytes.HasPrefix(key, i.SyntheticPrefix) { + if i.cmp(i.SyntheticPrefix, key) >= 0 { + return i.First() + } + return nil, base.LazyValue{} + } + searchKey = key[len(i.SyntheticPrefix):] + } + i.clearCache() // Find the index of the smallest restart point whose key is > the key // sought; index will be numRestarts if there is no such restart point. @@ -756,7 +778,7 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba } // Else k is invalid, and left as nil - if i.cmp(key, k) > 0 { + if i.cmp(searchKey, k) > 0 { // The search key is greater than the user key at this restart point. // Search beyond this restart point, since we are trying to find the // first restart point with a user key >= the search key. @@ -833,6 +855,17 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba var index int32 { + searchKey := key + if i.SyntheticPrefix != nil { + if !bytes.HasPrefix(key, i.SyntheticPrefix) { + if i.cmp(i.SyntheticPrefix, key) < 0 { + return i.Last() + } + return nil, base.LazyValue{} + } + searchKey = key[len(i.SyntheticPrefix):] + } + // NB: manually inlined sort.Search is ~5% faster. // // Define f(-1) == false and f(n) == true. @@ -889,7 +922,7 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba } // Else k is invalid, and left as nil - if i.cmp(key, k) > 0 { + if i.cmp(searchKey, k) > 0 { // The search key is greater than the user key at this restart point. // Search beyond this restart point, since we are trying to find the // first restart point with a user key >= the search key. @@ -1226,6 +1259,9 @@ func (i *blockIter) nextPrefixV3(succKey []byte) (*InternalKey, base.LazyValue) value = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a) ptr = unsafe.Pointer(uintptr(ptr) + 5) } + if i.SyntheticPrefix != nil { + shared += uint32(len(i.SyntheticPrefix)) + } // The starting position of the value. valuePtr := unsafe.Pointer(uintptr(ptr) + uintptr(unshared)) i.nextOffset = int32(uintptr(valuePtr)-uintptr(i.ptr)) + int32(value) diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go index 0f302054034..eb873f0eca1 100644 --- a/sstable/block_property_test.go +++ b/sstable/block_property_test.go @@ -938,7 +938,7 @@ func TestBlockProperties(t *testing.T) { var blocks []int var i int - iter, _ := newBlockIter(r.Compare, indexH.Get()) + iter, _ := newBlockIter(r.Compare, indexH.Get(), nil) for key, value := iter.First(); key != nil; key, value = iter.Next() { bh, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -1274,7 +1274,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { return err.Error() } twoLevelIndex := r.Properties.IndexPartitions > 0 - i, err := newBlockIter(r.Compare, bh.Get()) + i, err := newBlockIter(r.Compare, bh.Get(), nil) if err != nil { return err.Error() } @@ -1322,7 +1322,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { return err.Error() } if err := subiter.init( - r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false); err != nil { + r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix); err != nil { return err.Error() } for key, value := subiter.First(); key != nil; key, value = subiter.Next() { diff --git a/sstable/block_test.go b/sstable/block_test.go index 14e6f7ff8a6..e21d6a78f1a 100644 --- a/sstable/block_test.go +++ b/sstable/block_test.go @@ -245,7 +245,7 @@ func TestBlockIter2(t *testing.T) { return "" case "iter": - iter, err := newBlockIter(bytes.Compare, block) + iter, err := newBlockIter(bytes.Compare, block, nil) if err != nil { return err.Error() } @@ -276,7 +276,7 @@ func TestBlockIterKeyStability(t *testing.T) { } block := w.finish() - i, err := newBlockIter(bytes.Compare, block) + i, err := newBlockIter(bytes.Compare, block, nil) require.NoError(t, err) // Check that the supplied slice resides within the bounds of the block. @@ -336,7 +336,7 @@ func TestBlockIterReverseDirections(t *testing.T) { for targetPos := 0; targetPos < w.restartInterval; targetPos++ { t.Run("", func(t *testing.T) { - i, err := newBlockIter(bytes.Compare, block) + i, err := newBlockIter(bytes.Compare, block, nil) require.NoError(t, err) pos := 3 @@ -357,6 +357,62 @@ func TestBlockIterReverseDirections(t *testing.T) { } } +func TestBlockSyntheticPrefix(t *testing.T) { + for _, prefix := range []string{"", "_", "~", "fruits/"} { + for _, restarts := range []int{1, 2, 3, 4, 10} { + t.Run(fmt.Sprintf("prefix=%s/restarts=%d", prefix, restarts), func(t *testing.T) { + + elidedPrefixWriter, includedPrefixWriter := &blockWriter{restartInterval: restarts}, &blockWriter{restartInterval: restarts} + keys := []string{ + "apple", "apricot", "banana", + "grape", "orange", "peach", + "pear", "persimmon", + } + for _, k := range keys { + elidedPrefixWriter.add(ikey(k), nil) + includedPrefixWriter.add(ikey(prefix+k), nil) + } + + elidedPrefixBlock, includedPrefixBlock := elidedPrefixWriter.finish(), includedPrefixWriter.finish() + + expect, err := newBlockIter(bytes.Compare, includedPrefixBlock, nil) + require.NoError(t, err) + + got, err := newBlockIter(bytes.Compare, elidedPrefixBlock, SyntheticPrefix([]byte(prefix))) + require.NoError(t, err) + + check := func(eKey *base.InternalKey, eVal base.LazyValue) func(gKey *base.InternalKey, gVal base.LazyValue) { + return func(gKey *base.InternalKey, gVal base.LazyValue) { + t.Helper() + if eKey != nil { + t.Logf("[%q] expected %q, got %q", prefix, eKey.UserKey, gKey.UserKey) + require.Equal(t, eKey, gKey) + require.Equal(t, eVal, gVal) + } else { + t.Logf("[%q] expected nil, got %q", prefix, gKey) + require.Nil(t, gKey) + } + } + } + + check(expect.First())(got.First()) + check(expect.Next())(got.Next()) + check(expect.Prev())(got.Prev()) + + check(expect.SeekGE([]byte(prefix+"or"), base.SeekGEFlagsNone))(got.SeekGE([]byte(prefix+"or"), base.SeekGEFlagsNone)) + check(expect.SeekGE([]byte(prefix+"peach"), base.SeekGEFlagsNone))(got.SeekGE([]byte(prefix+"peach"), base.SeekGEFlagsNone)) + check(expect.Next())(got.Next()) + check(expect.Next())(got.Next()) + check(expect.Next())(got.Next()) + + check(expect.SeekLT([]byte(prefix+"banana"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"banana"), base.SeekLTFlagsNone)) + check(expect.SeekLT([]byte(prefix+"pomegranate"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"pomegranate"), base.SeekLTFlagsNone)) + check(expect.SeekLT([]byte(prefix+"apple"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"apple"), base.SeekLTFlagsNone)) + }) + } + } +} + func BenchmarkBlockIterSeekGE(b *testing.B) { const blockSize = 32 << 10 @@ -376,7 +432,7 @@ func BenchmarkBlockIterSeekGE(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, w.finish(), nil) if err != nil { b.Fatal(err) } @@ -418,7 +474,7 @@ func BenchmarkBlockIterSeekLT(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, w.finish(), nil) if err != nil { b.Fatal(err) } @@ -464,7 +520,7 @@ func BenchmarkBlockIterNext(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, w.finish(), nil) if err != nil { b.Fatal(err) } @@ -496,7 +552,7 @@ func BenchmarkBlockIterPrev(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, w.finish(), nil) if err != nil { b.Fatal(err) } diff --git a/sstable/layout.go b/sstable/layout.go index 8c375741de8..07b8f4f8b15 100644 --- a/sstable/layout.go +++ b/sstable/layout.go @@ -24,18 +24,19 @@ type Layout struct { // ValidateBlockChecksums, which validates a static list of BlockHandles // referenced in this struct. - Data []BlockHandleWithProperties - Index []BlockHandle - TopIndex BlockHandle - Filter BlockHandle - RangeDel BlockHandle - RangeKey BlockHandle - ValueBlock []BlockHandle - ValueIndex BlockHandle - Properties BlockHandle - MetaIndex BlockHandle - Footer BlockHandle - Format TableFormat + Data []BlockHandleWithProperties + Index []BlockHandle + TopIndex BlockHandle + Filter BlockHandle + RangeDel BlockHandle + RangeKey BlockHandle + ValueBlock []BlockHandle + ValueIndex BlockHandle + Properties BlockHandle + MetaIndex BlockHandle + Footer BlockHandle + Format TableFormat + SyntheticPrefix SyntheticPrefix } // Describe returns a description of the layout. If the verbose parameter is @@ -186,7 +187,7 @@ func (l *Layout) Describe( var lastKey InternalKey switch b.name { case "data", "range-del", "range-key": - iter, _ := newBlockIter(r.Compare, h.Get()) + iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix) for key, value := iter.First(); key != nil; key, value = iter.Next() { ptr := unsafe.Pointer(uintptr(iter.ptr) + uintptr(iter.offset)) shared, ptr := decodeVarint(ptr) @@ -238,7 +239,7 @@ func (l *Layout) Describe( formatRestarts(iter.data, iter.restarts, iter.numRestarts) formatTrailer() case "index", "top-index": - iter, _ := newBlockIter(r.Compare, h.Get()) + iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix) for key, value := iter.First(); key != nil; key, value = iter.Next() { bh, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { diff --git a/sstable/prefix_replacing_iterator.go b/sstable/prefix_replacing_iterator.go index e2cee14cf68..33f6db6c742 100644 --- a/sstable/prefix_replacing_iterator.go +++ b/sstable/prefix_replacing_iterator.go @@ -113,7 +113,9 @@ func (p *prefixReplacingIterator) SeekLT( p.i.First() return p.rewriteResult(p.i.Prev()) } - return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags)) + key = p.rewriteArg(key) + resKey, resResult := p.i.SeekLT(key, flags) + return p.rewriteResult(resKey, resResult) } // First implements the Iterator interface. diff --git a/sstable/prefix_replacing_iterator_test.go b/sstable/prefix_replacing_iterator_test.go index 1ff6e6b8995..2a47a5aa26b 100644 --- a/sstable/prefix_replacing_iterator_test.go +++ b/sstable/prefix_replacing_iterator_test.go @@ -24,7 +24,11 @@ func TestPrefixReplacingIterator(t *testing.T) { {from: []byte("zzz"), to: []byte("aa")}, } { t.Run(fmt.Sprintf("%s_%s", tc.from, tc.to), func(t *testing.T) { - r := buildTestTable(t, 20, 256, 256, DefaultCompression, tc.from) + var readerOpts []ReaderOption + if len(tc.from) == 0 { + readerOpts = append(readerOpts, WithSyntheticPrefix(tc.to)) + } + r := buildTestTable(t, 20, 256, 256, DefaultCompression, tc.from, readerOpts...) defer r.Close() rawIter, err := r.NewIter(nil, nil) require.NoError(t, err) @@ -32,7 +36,12 @@ func TestPrefixReplacingIterator(t *testing.T) { raw := rawIter.(*singleLevelIterator) - it := newPrefixReplacingIterator(raw, tc.from, tc.to, DefaultComparer.Compare) + var it Iterator + if r.syntheticPrefix != nil { + it = raw + } else { + it = newPrefixReplacingIterator(raw, tc.from, tc.to, DefaultComparer.Compare) + } kMin, kMax, k := []byte{0}, []byte("~"), func(i uint64) []byte { return binary.BigEndian.AppendUint64(tc.to[:len(tc.to):len(tc.to)], i) diff --git a/sstable/reader.go b/sstable/reader.go index ce8f1dac766..8cc43b8520e 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/pebble/internal/crc" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" + "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/private" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing" @@ -157,6 +158,34 @@ func (c *cacheOpts) writerApply(w *Writer) { } } +type noopOpt struct{} + +func (noopOpt) preApply() {} + +func (noopOpt) readerApply(_ *Reader) {} + +func WithSyntheticPrefix(prefix []byte) ReaderOption { + if len(prefix) == 0 { + return noopOpt{} + } + return SyntheticPrefix(prefix) +} + +type SyntheticPrefix []byte + +func (p SyntheticPrefix) Implements(r *manifest.PrefixReplacement) bool { + if p == nil { + return r == nil || (len(r.ContentPrefix) == 0 && len(r.SyntheticPrefix) == 0) + } + return len(r.ContentPrefix) == 0 && bytes.Equal(r.SyntheticPrefix, p) +} + +func (SyntheticPrefix) preApply() {} + +func (p SyntheticPrefix) readerApply(r *Reader) { + r.syntheticPrefix = p +} + // rawTombstonesOpt is a Reader open option for specifying that range // tombstones returned by Reader.NewRangeDelIter() should not be // fragmented. Used by debug tools to get a raw view of the tombstones @@ -222,6 +251,7 @@ type Reader struct { Equal Equal FormatKey base.FormatKey Split Split + syntheticPrefix SyntheticPrefix tableFilter *tableFilterReader // Keep types that are not multiples of 8 bytes at the end and with // decreasing size. @@ -429,7 +459,7 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { // sstables. This is because rangedels do not apply to points in the same // sstable at the same sequence number anyway, so exposing obsolete rangedels // is harmless. - if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil { + if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { return nil, err } return i, nil @@ -451,7 +481,7 @@ func (r *Reader) newRawRangeKeyIter(vState *virtualState) (keyspan.FragmentItera if vState == nil || !vState.isSharedIngested { globalSeqNum = r.Properties.GlobalSeqNum } - if err := i.blockIter.initHandle(r.Compare, h, globalSeqNum, false /* hideObsoletePoints */); err != nil { + if err := i.blockIter.initHandle(r.Compare, h, globalSeqNum, false /* hideObsoletePoints */, r.syntheticPrefix); err != nil { return nil, err } return i, nil @@ -706,7 +736,7 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { // tombstones. We need properly fragmented and sorted range tombstones in // order to serve from them directly. iter := &blockIter{} - if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false); err != nil { + if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { return nil, err } var tombstones []keyspan.Span @@ -886,7 +916,7 @@ func (r *Reader) Layout() (*Layout, error) { if r.Properties.IndexPartitions == 0 { l.Index = append(l.Index, r.indexBH) - iter, _ := newBlockIter(r.Compare, indexH.Get()) + iter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) for key, value := iter.First(); key != nil; key, value = iter.Next() { dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -899,8 +929,8 @@ func (r *Reader) Layout() (*Layout, error) { } } else { l.TopIndex = r.indexBH - topIter, _ := newBlockIter(r.Compare, indexH.Get()) - iter := &blockIter{} + topIter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + iter := &blockIter{SyntheticPrefix: r.syntheticPrefix} for key, value := topIter.First(); key != nil; key, value = topIter.Next() { indexBH, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -914,7 +944,7 @@ func (r *Reader) Layout() (*Layout, error) { return nil, err } if err := iter.init(r.Compare, subIndex.Get(), 0, /* globalSeqNum */ - false /* hideObsoletePoints */); err != nil { + false /* hideObsoletePoints */, r.syntheticPrefix); err != nil { return nil, err } for key, value := iter.First(); key != nil; key, value = iter.Next() { @@ -1049,14 +1079,14 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // to the same blockIter over the single index in the unpartitioned case. var startIdxIter, endIdxIter *blockIter if r.Properties.IndexPartitions == 0 { - iter, err := newBlockIter(r.Compare, indexH.Get()) + iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) if err != nil { return 0, err } startIdxIter = iter endIdxIter = iter } else { - topIter, err := newBlockIter(r.Compare, indexH.Get()) + topIter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) if err != nil { return 0, err } @@ -1076,7 +1106,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } defer startIdxBlock.Release() - startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get()) + startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get(), r.syntheticPrefix) if err != nil { return 0, err } @@ -1097,7 +1127,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } defer endIdxBlock.Release() - endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get()) + endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get(), r.syntheticPrefix) if err != nil { return 0, err } diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index ed676957d6e..e6f90baedc7 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -213,7 +213,7 @@ func (i *singleLevelIterator) init( i.stats = stats i.hideObsoletePoints = hideObsoletePoints i.bufferPool = bufferPool - err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum, false) + err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix) if err != nil { // blockIter.Close releases indexH and always returns a nil error _ = i.index.Close() @@ -435,7 +435,7 @@ func (i *singleLevelIterator) loadBlock(dir int8) loadBlockResult { i.err = err return loadBlockFailed } - i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum, i.hideObsoletePoints) + i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum, i.hideObsoletePoints, i.reader.syntheticPrefix) if i.err != nil { // The block is partially loaded, and we don't want it to appear valid. i.data.invalidate() @@ -822,6 +822,9 @@ func (i *singleLevelIterator) seekPrefixGE( i.data.invalidate() return nil, base.LazyValue{} } + if i.reader.syntheticPrefix != nil { + prefix = bytes.TrimPrefix(prefix, i.reader.syntheticPrefix) + } mayContain := i.reader.tableFilter.mayContain(dataH.Get(), prefix) dataH.Release() if !mayContain { diff --git a/sstable/reader_iter_two_lvl.go b/sstable/reader_iter_two_lvl.go index 9a2a3f254cf..273a487ccfb 100644 --- a/sstable/reader_iter_two_lvl.go +++ b/sstable/reader_iter_two_lvl.go @@ -67,7 +67,7 @@ func (i *twoLevelIterator) loadIndex(dir int8) loadBlockResult { i.err = err return loadBlockFailed } - if i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum, false); i.err == nil { + if i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum, false, i.reader.syntheticPrefix); i.err == nil { return loadBlockOK } return loadBlockFailed @@ -175,7 +175,7 @@ func (i *twoLevelIterator) init( i.stats = stats i.hideObsoletePoints = hideObsoletePoints i.bufferPool = bufferPool - err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum, false) + err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix) if err != nil { // blockIter.Close releases topLevelIndexH and always returns a nil error _ = i.topLevelIndex.Close() diff --git a/sstable/reader_test.go b/sstable/reader_test.go index cc20f8abc3f..e72e9763f95 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -707,7 +707,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { var buf strings.Builder twoLevelIndex := r.Properties.IndexType == twoLevelIndex buf.WriteString("index entries:\n") - iter, err := newBlockIter(r.Compare, indexH.Get()) + iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) defer func() { require.NoError(t, iter.Close()) }() @@ -721,7 +721,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { context.Background(), bh.BlockHandle, nil, nil, nil, nil, nil) require.NoError(t, err) defer b.Release() - iter2, err := newBlockIter(r.Compare, b.Get()) + iter2, err := newBlockIter(r.Compare, b.Get(), r.syntheticPrefix) defer func() { require.NoError(t, iter2.Close()) }() @@ -1402,11 +1402,12 @@ func buildTestTable( blockSize, indexBlockSize int, compression Compression, prefix []byte, + readerOpts ...ReaderOption, ) *Reader { provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(vfs.NewMem(), "" /* dirName */)) require.NoError(t, err) defer provider.Close() - return buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, compression, prefix) + return buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, compression, prefix, readerOpts...) } func buildTestTableWithProvider( @@ -1416,6 +1417,7 @@ func buildTestTableWithProvider( blockSize, indexBlockSize int, compression Compression, prefix []byte, + readerOpts ...ReaderOption, ) *Reader { f0, _, err := provider.Create(context.Background(), base.FileTypeTable, base.DiskFileNum(0), objstorage.CreateOptions{}) require.NoError(t, err) @@ -1447,7 +1449,9 @@ func buildTestTableWithProvider( defer c.Unref() r, err := NewReader(f1, ReaderOptions{ Cache: c, - }) + }, + readerOpts..., + ) require.NoError(t, err) return r } diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index 1a06f88e8b4..3de68ffa44f 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -90,7 +90,7 @@ func (v *VirtualReader) NewCompactionIter( ) (Iterator, error) { i, err := v.reader.newCompactionIter( bytesIterated, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool) - if err == nil && v.vState.prefixChange != nil { + if err == nil && v.vState.prefixChange != nil && !v.reader.syntheticPrefix.Implements(v.vState.prefixChange) { i = newPrefixReplacingIterator(i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, v.reader.Compare) } return i, err @@ -113,7 +113,7 @@ func (v *VirtualReader) NewIterWithBlockPropertyFiltersAndContextEtc( i, err := v.reader.newIterWithBlockPropertyFiltersAndContext( ctx, lower, upper, filterer, hideObsoletePoints, useFilterBlock, stats, categoryAndQoS, statsCollector, rp, &v.vState) - if err == nil && v.vState.prefixChange != nil { + if err == nil && v.vState.prefixChange != nil && !v.reader.syntheticPrefix.Implements(v.vState.prefixChange) { i = newPrefixReplacingIterator(i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, v.reader.Compare) } return i, err diff --git a/sstable/suffix_rewriter.go b/sstable/suffix_rewriter.go index a226e8eb9cb..fb292df25a7 100644 --- a/sstable/suffix_rewriter.go +++ b/sstable/suffix_rewriter.go @@ -201,7 +201,7 @@ func rewriteBlocks( if err != nil { return err } - if err := iter.init(r.Compare, inputBlock, r.Properties.GlobalSeqNum, false); err != nil { + if err := iter.init(r.Compare, inputBlock, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { return err } diff --git a/table_cache.go b/table_cache.go index b730ffec896..c04a94faae2 100644 --- a/table_cache.go +++ b/table_cache.go @@ -880,11 +880,16 @@ func (c *tableCacheShard) findNodeInternal( // Note adding to the cache lists must complete before we begin loading the // table as a failure during load will result in the node being unlinked. pprof.Do(context.Background(), tableCacheLabels, func(context.Context) { + var prefix []byte + if meta.PrefixReplacement != nil && len(meta.PrefixReplacement.ContentPrefix) == 0 { + prefix = meta.PrefixReplacement.SyntheticPrefix + } v.load( loadInfo{ - backingFileNum: meta.FileBacking.DiskFileNum, - smallestSeqNum: meta.SmallestSeqNum, - largestSeqNum: meta.LargestSeqNum, + backingFileNum: meta.FileBacking.DiskFileNum, + smallestSeqNum: meta.SmallestSeqNum, + largestSeqNum: meta.LargestSeqNum, + syntheticPrefix: prefix, }, c, dbOpts) }) return v @@ -1111,9 +1116,10 @@ type tableCacheValue struct { } type loadInfo struct { - backingFileNum base.DiskFileNum - largestSeqNum uint64 - smallestSeqNum uint64 + backingFileNum base.DiskFileNum + largestSeqNum uint64 + smallestSeqNum uint64 + syntheticPrefix []byte } func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) { @@ -1125,7 +1131,7 @@ func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *ta ) if err == nil { cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption) - v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics) + v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics, sstable.WithSyntheticPrefix(loadInfo.syntheticPrefix)) } if err != nil { v.err = errors.Wrapf( diff --git a/testdata/event_listener b/testdata/event_listener index ad32671e53d..b9bf39d4dd3 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -222,7 +222,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 6 entries (1.1KB) hit rate: 11.1% -Table cache: 1 entries (808B) hit rate: 40.0% +Table cache: 1 entries (832B) hit rate: 40.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -321,7 +321,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 12 entries (2.3KB) hit rate: 14.3% -Table cache: 1 entries (808B) hit rate: 50.0% +Table cache: 1 entries (832B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 diff --git a/testdata/ingest b/testdata/ingest index 22a7f14bd16..d917ee85385 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -52,7 +52,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 6 entries (1.2KB) hit rate: 35.7% -Table cache: 1 entries (808B) hit rate: 50.0% +Table cache: 1 entries (832B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 diff --git a/testdata/metrics b/testdata/metrics index 55a1beacc1a..474f85a3e74 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -68,7 +68,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 3 entries (556B) hit rate: 0.0% -Table cache: 1 entries (808B) hit rate: 0.0% +Table cache: 1 entries (832B) hit rate: 0.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 1 @@ -201,7 +201,7 @@ Zombie tables: 1 (661B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 3 entries (556B) hit rate: 42.9% -Table cache: 1 entries (808B) hit rate: 66.7% +Table cache: 1 entries (832B) hit rate: 66.7% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 1 @@ -467,7 +467,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 12 entries (2.4KB) hit rate: 24.5% -Table cache: 1 entries (808B) hit rate: 60.0% +Table cache: 1 entries (832B) hit rate: 60.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -528,7 +528,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 12 entries (2.4KB) hit rate: 24.5% -Table cache: 1 entries (808B) hit rate: 60.0% +Table cache: 1 entries (832B) hit rate: 60.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0