diff --git a/sstable/block.go b/sstable/block.go index 6cfea806ac..9e4fcd09f4 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -5,6 +5,7 @@ package sstable import ( + "bytes" "context" "encoding/binary" "unsafe" @@ -404,20 +405,22 @@ type blockIter struct { cachedBuf []byte handle bufferHandle // for block iteration for already loaded blocks. - firstUserKey []byte - lazyValueHandling struct { + firstUserKey []byte + firstUserKeyWithPrefix []byte + lazyValueHandling struct { vbr *valueBlockReader hasValuePrefix bool } hideObsoletePoints bool + prefix SyntheticPrefix } // blockIter implements the base.InternalIterator interface. var _ base.InternalIterator = (*blockIter)(nil) -func newBlockIter(cmp Compare, block block) (*blockIter, error) { +func newBlockIter(cmp Compare, block block, syntheticPrefix SyntheticPrefix) (*blockIter, error) { i := &blockIter{} - return i, i.init(cmp, block, 0, false) + return i, i.init(cmp, block, 0, false, syntheticPrefix) } func (i *blockIter) String() string { @@ -425,19 +428,24 @@ func (i *blockIter) String() string { } func (i *blockIter) init( - cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool, + cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix, ) error { numRestarts := int32(binary.LittleEndian.Uint32(block[len(block)-4:])) if numRestarts == 0 { return base.CorruptionErrorf("pebble/table: invalid table (block has no restart points)") } + i.prefix = syntheticPrefix i.cmp = cmp i.restarts = int32(len(block)) - 4*(1+numRestarts) i.numRestarts = numRestarts i.globalSeqNum = globalSeqNum i.ptr = unsafe.Pointer(&block[0]) i.data = block - i.fullKey = i.fullKey[:0] + if i.prefix != nil { + i.fullKey = append(i.fullKey[:0], i.prefix...) + } else { + i.fullKey = i.fullKey[:0] + } i.val = nil i.hideObsoletePoints = hideObsoletePoints i.clearCache() @@ -457,11 +465,11 @@ func (i *blockIter) init( // ingested. // - Foreign sstable iteration: globalSeqNum is always set. func (i *blockIter) initHandle( - cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, + cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, syntheticPrefix SyntheticPrefix, ) error { i.handle.Release() i.handle = block - return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints) + return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints, syntheticPrefix) } func (i *blockIter) invalidate() { @@ -482,10 +490,11 @@ func (i *blockIter) isDataInvalidated() bool { func (i *blockIter) resetForReuse() blockIter { return blockIter{ - fullKey: i.fullKey[:0], - cached: i.cached[:0], - cachedBuf: i.cachedBuf[:0], - data: nil, + fullKey: i.fullKey[:0], + cached: i.cached[:0], + cachedBuf: i.cachedBuf[:0], + firstUserKeyWithPrefix: i.firstUserKeyWithPrefix[:0], + data: nil, } } @@ -557,6 +566,7 @@ func (i *blockIter) readEntry() { ptr = unsafe.Pointer(uintptr(ptr) + 5) } + shared += uint32(len(i.prefix)) unsharedKey := getBytes(ptr, int(unshared)) // TODO(sumeer): move this into the else block below. i.fullKey = append(i.fullKey[:shared], unsharedKey...) @@ -633,6 +643,9 @@ func (i *blockIter) readFirstKey() error { i.firstUserKey = nil return base.CorruptionErrorf("pebble/table: invalid firstKey in block") } + if i.prefix != nil { + i.firstUserKey = append(append(i.firstUserKeyWithPrefix[:0], i.prefix...), i.firstUserKey...) + } return nil } @@ -693,6 +706,17 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba panic(errors.AssertionFailedf("invalidated blockIter used")) } + searchKey := key + if i.prefix != nil { + if !bytes.HasPrefix(key, i.prefix) { + if i.cmp(i.prefix, key) >= 0 { + return i.First() + } + return nil, base.LazyValue{} + } + searchKey = key[len(i.prefix):] + } + i.clearCache() // Find the index of the smallest restart point whose key is > the key // sought; index will be numRestarts if there is no such restart point. @@ -756,7 +780,7 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba } // Else k is invalid, and left as nil - if i.cmp(key, k) > 0 { + if i.cmp(searchKey, k) > 0 { // The search key is greater than the user key at this restart point. // Search beyond this restart point, since we are trying to find the // first restart point with a user key >= the search key. @@ -833,6 +857,17 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba var index int32 { + searchKey := key + if i.prefix != nil { + if !bytes.HasPrefix(key, i.prefix) { + if i.cmp(i.prefix, key) < 0 { + return i.Last() + } + return nil, base.LazyValue{} + } + searchKey = key[len(i.prefix):] + } + // NB: manually inlined sort.Search is ~5% faster. // // Define f(-1) == false and f(n) == true. @@ -889,7 +924,7 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba } // Else k is invalid, and left as nil - if i.cmp(key, k) > 0 { + if i.cmp(searchKey, k) > 0 { // The search key is greater than the user key at this restart point. // Search beyond this restart point, since we are trying to find the // first restart point with a user key >= the search key. @@ -1226,6 +1261,9 @@ func (i *blockIter) nextPrefixV3(succKey []byte) (*InternalKey, base.LazyValue) value = uint32(e)<<28 | uint32(d)<<21 | uint32(c)<<14 | uint32(b)<<7 | uint32(a) ptr = unsafe.Pointer(uintptr(ptr) + 5) } + if i.prefix != nil { + shared += uint32(len(i.prefix)) + } // The starting position of the value. valuePtr := unsafe.Pointer(uintptr(ptr) + uintptr(unshared)) i.nextOffset = int32(uintptr(valuePtr)-uintptr(i.ptr)) + int32(value) diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go index 0f30205403..eb873f0eca 100644 --- a/sstable/block_property_test.go +++ b/sstable/block_property_test.go @@ -938,7 +938,7 @@ func TestBlockProperties(t *testing.T) { var blocks []int var i int - iter, _ := newBlockIter(r.Compare, indexH.Get()) + iter, _ := newBlockIter(r.Compare, indexH.Get(), nil) for key, value := iter.First(); key != nil; key, value = iter.Next() { bh, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -1274,7 +1274,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { return err.Error() } twoLevelIndex := r.Properties.IndexPartitions > 0 - i, err := newBlockIter(r.Compare, bh.Get()) + i, err := newBlockIter(r.Compare, bh.Get(), nil) if err != nil { return err.Error() } @@ -1322,7 +1322,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { return err.Error() } if err := subiter.init( - r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false); err != nil { + r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false, r.syntheticPrefix); err != nil { return err.Error() } for key, value := subiter.First(); key != nil; key, value = subiter.Next() { diff --git a/sstable/block_test.go b/sstable/block_test.go index 14e6f7ff8a..e21d6a78f1 100644 --- a/sstable/block_test.go +++ b/sstable/block_test.go @@ -245,7 +245,7 @@ func TestBlockIter2(t *testing.T) { return "" case "iter": - iter, err := newBlockIter(bytes.Compare, block) + iter, err := newBlockIter(bytes.Compare, block, nil) if err != nil { return err.Error() } @@ -276,7 +276,7 @@ func TestBlockIterKeyStability(t *testing.T) { } block := w.finish() - i, err := newBlockIter(bytes.Compare, block) + i, err := newBlockIter(bytes.Compare, block, nil) require.NoError(t, err) // Check that the supplied slice resides within the bounds of the block. @@ -336,7 +336,7 @@ func TestBlockIterReverseDirections(t *testing.T) { for targetPos := 0; targetPos < w.restartInterval; targetPos++ { t.Run("", func(t *testing.T) { - i, err := newBlockIter(bytes.Compare, block) + i, err := newBlockIter(bytes.Compare, block, nil) require.NoError(t, err) pos := 3 @@ -357,6 +357,62 @@ func TestBlockIterReverseDirections(t *testing.T) { } } +func TestBlockSyntheticPrefix(t *testing.T) { + for _, prefix := range []string{"", "_", "~", "fruits/"} { + for _, restarts := range []int{1, 2, 3, 4, 10} { + t.Run(fmt.Sprintf("prefix=%s/restarts=%d", prefix, restarts), func(t *testing.T) { + + elidedPrefixWriter, includedPrefixWriter := &blockWriter{restartInterval: restarts}, &blockWriter{restartInterval: restarts} + keys := []string{ + "apple", "apricot", "banana", + "grape", "orange", "peach", + "pear", "persimmon", + } + for _, k := range keys { + elidedPrefixWriter.add(ikey(k), nil) + includedPrefixWriter.add(ikey(prefix+k), nil) + } + + elidedPrefixBlock, includedPrefixBlock := elidedPrefixWriter.finish(), includedPrefixWriter.finish() + + expect, err := newBlockIter(bytes.Compare, includedPrefixBlock, nil) + require.NoError(t, err) + + got, err := newBlockIter(bytes.Compare, elidedPrefixBlock, SyntheticPrefix([]byte(prefix))) + require.NoError(t, err) + + check := func(eKey *base.InternalKey, eVal base.LazyValue) func(gKey *base.InternalKey, gVal base.LazyValue) { + return func(gKey *base.InternalKey, gVal base.LazyValue) { + t.Helper() + if eKey != nil { + t.Logf("[%q] expected %q, got %q", prefix, eKey.UserKey, gKey.UserKey) + require.Equal(t, eKey, gKey) + require.Equal(t, eVal, gVal) + } else { + t.Logf("[%q] expected nil, got %q", prefix, gKey) + require.Nil(t, gKey) + } + } + } + + check(expect.First())(got.First()) + check(expect.Next())(got.Next()) + check(expect.Prev())(got.Prev()) + + check(expect.SeekGE([]byte(prefix+"or"), base.SeekGEFlagsNone))(got.SeekGE([]byte(prefix+"or"), base.SeekGEFlagsNone)) + check(expect.SeekGE([]byte(prefix+"peach"), base.SeekGEFlagsNone))(got.SeekGE([]byte(prefix+"peach"), base.SeekGEFlagsNone)) + check(expect.Next())(got.Next()) + check(expect.Next())(got.Next()) + check(expect.Next())(got.Next()) + + check(expect.SeekLT([]byte(prefix+"banana"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"banana"), base.SeekLTFlagsNone)) + check(expect.SeekLT([]byte(prefix+"pomegranate"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"pomegranate"), base.SeekLTFlagsNone)) + check(expect.SeekLT([]byte(prefix+"apple"), base.SeekLTFlagsNone))(got.SeekLT([]byte(prefix+"apple"), base.SeekLTFlagsNone)) + }) + } + } +} + func BenchmarkBlockIterSeekGE(b *testing.B) { const blockSize = 32 << 10 @@ -376,7 +432,7 @@ func BenchmarkBlockIterSeekGE(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, w.finish(), nil) if err != nil { b.Fatal(err) } @@ -418,7 +474,7 @@ func BenchmarkBlockIterSeekLT(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, w.finish(), nil) if err != nil { b.Fatal(err) } @@ -464,7 +520,7 @@ func BenchmarkBlockIterNext(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, w.finish(), nil) if err != nil { b.Fatal(err) } @@ -496,7 +552,7 @@ func BenchmarkBlockIterPrev(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, w.finish(), nil) if err != nil { b.Fatal(err) } diff --git a/sstable/layout.go b/sstable/layout.go index 8c375741de..07b8f4f8b1 100644 --- a/sstable/layout.go +++ b/sstable/layout.go @@ -24,18 +24,19 @@ type Layout struct { // ValidateBlockChecksums, which validates a static list of BlockHandles // referenced in this struct. - Data []BlockHandleWithProperties - Index []BlockHandle - TopIndex BlockHandle - Filter BlockHandle - RangeDel BlockHandle - RangeKey BlockHandle - ValueBlock []BlockHandle - ValueIndex BlockHandle - Properties BlockHandle - MetaIndex BlockHandle - Footer BlockHandle - Format TableFormat + Data []BlockHandleWithProperties + Index []BlockHandle + TopIndex BlockHandle + Filter BlockHandle + RangeDel BlockHandle + RangeKey BlockHandle + ValueBlock []BlockHandle + ValueIndex BlockHandle + Properties BlockHandle + MetaIndex BlockHandle + Footer BlockHandle + Format TableFormat + SyntheticPrefix SyntheticPrefix } // Describe returns a description of the layout. If the verbose parameter is @@ -186,7 +187,7 @@ func (l *Layout) Describe( var lastKey InternalKey switch b.name { case "data", "range-del", "range-key": - iter, _ := newBlockIter(r.Compare, h.Get()) + iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix) for key, value := iter.First(); key != nil; key, value = iter.Next() { ptr := unsafe.Pointer(uintptr(iter.ptr) + uintptr(iter.offset)) shared, ptr := decodeVarint(ptr) @@ -238,7 +239,7 @@ func (l *Layout) Describe( formatRestarts(iter.data, iter.restarts, iter.numRestarts) formatTrailer() case "index", "top-index": - iter, _ := newBlockIter(r.Compare, h.Get()) + iter, _ := newBlockIter(r.Compare, h.Get(), l.SyntheticPrefix) for key, value := iter.First(); key != nil; key, value = iter.Next() { bh, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { diff --git a/sstable/prefix_replacing_iterator.go b/sstable/prefix_replacing_iterator.go index e2cee14cf6..33f6db6c74 100644 --- a/sstable/prefix_replacing_iterator.go +++ b/sstable/prefix_replacing_iterator.go @@ -113,7 +113,9 @@ func (p *prefixReplacingIterator) SeekLT( p.i.First() return p.rewriteResult(p.i.Prev()) } - return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags)) + key = p.rewriteArg(key) + resKey, resResult := p.i.SeekLT(key, flags) + return p.rewriteResult(resKey, resResult) } // First implements the Iterator interface. diff --git a/sstable/prefix_replacing_iterator_test.go b/sstable/prefix_replacing_iterator_test.go index 1ff6e6b899..2a47a5aa26 100644 --- a/sstable/prefix_replacing_iterator_test.go +++ b/sstable/prefix_replacing_iterator_test.go @@ -24,7 +24,11 @@ func TestPrefixReplacingIterator(t *testing.T) { {from: []byte("zzz"), to: []byte("aa")}, } { t.Run(fmt.Sprintf("%s_%s", tc.from, tc.to), func(t *testing.T) { - r := buildTestTable(t, 20, 256, 256, DefaultCompression, tc.from) + var readerOpts []ReaderOption + if len(tc.from) == 0 { + readerOpts = append(readerOpts, WithSyntheticPrefix(tc.to)) + } + r := buildTestTable(t, 20, 256, 256, DefaultCompression, tc.from, readerOpts...) defer r.Close() rawIter, err := r.NewIter(nil, nil) require.NoError(t, err) @@ -32,7 +36,12 @@ func TestPrefixReplacingIterator(t *testing.T) { raw := rawIter.(*singleLevelIterator) - it := newPrefixReplacingIterator(raw, tc.from, tc.to, DefaultComparer.Compare) + var it Iterator + if r.syntheticPrefix != nil { + it = raw + } else { + it = newPrefixReplacingIterator(raw, tc.from, tc.to, DefaultComparer.Compare) + } kMin, kMax, k := []byte{0}, []byte("~"), func(i uint64) []byte { return binary.BigEndian.AppendUint64(tc.to[:len(tc.to):len(tc.to)], i) diff --git a/sstable/reader.go b/sstable/reader.go index ce8f1dac76..11d3312123 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/pebble/internal/crc" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" + "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/private" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing" @@ -157,6 +158,40 @@ func (c *cacheOpts) writerApply(w *Writer) { } } +type noopOpt struct{} + +func (noopOpt) preApply() {} + +func (noopOpt) readerApply(_ *Reader) {} + +func WithSyntheticPrefix(prefix []byte) ReaderOption { + if len(prefix) == 0 { + return noopOpt{} + } + return SyntheticPrefix(prefix) +} + +// SyntheticPrefix represents a byte slice that it implicitly prepended to every +// key in a file being read or accessed by a reader. Specifically, the file is +// implied to contain keys that do not explicitly include the prefix, and the +// file's bloom filters similarly are constructed on keys that do not include it +// but interactions with the file, including seeks and reads, will all behave as +// if the file had been constructed from files that did include the prefix. +type SyntheticPrefix []byte + +func (p SyntheticPrefix) Implements(r *manifest.PrefixReplacement) bool { + if p == nil { + return r == nil || (len(r.ContentPrefix) == 0 && len(r.SyntheticPrefix) == 0) + } + return len(r.ContentPrefix) == 0 && bytes.Equal(r.SyntheticPrefix, p) +} + +func (SyntheticPrefix) preApply() {} + +func (p SyntheticPrefix) readerApply(r *Reader) { + r.syntheticPrefix = p[:len(p):len(p)] +} + // rawTombstonesOpt is a Reader open option for specifying that range // tombstones returned by Reader.NewRangeDelIter() should not be // fragmented. Used by debug tools to get a raw view of the tombstones @@ -222,6 +257,7 @@ type Reader struct { Equal Equal FormatKey base.FormatKey Split Split + syntheticPrefix SyntheticPrefix tableFilter *tableFilterReader // Keep types that are not multiples of 8 bytes at the end and with // decreasing size. @@ -429,7 +465,7 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { // sstables. This is because rangedels do not apply to points in the same // sstable at the same sequence number anyway, so exposing obsolete rangedels // is harmless. - if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil { + if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { return nil, err } return i, nil @@ -451,7 +487,7 @@ func (r *Reader) newRawRangeKeyIter(vState *virtualState) (keyspan.FragmentItera if vState == nil || !vState.isSharedIngested { globalSeqNum = r.Properties.GlobalSeqNum } - if err := i.blockIter.initHandle(r.Compare, h, globalSeqNum, false /* hideObsoletePoints */); err != nil { + if err := i.blockIter.initHandle(r.Compare, h, globalSeqNum, false /* hideObsoletePoints */, r.syntheticPrefix); err != nil { return nil, err } return i, nil @@ -706,7 +742,7 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { // tombstones. We need properly fragmented and sorted range tombstones in // order to serve from them directly. iter := &blockIter{} - if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false); err != nil { + if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { return nil, err } var tombstones []keyspan.Span @@ -886,7 +922,7 @@ func (r *Reader) Layout() (*Layout, error) { if r.Properties.IndexPartitions == 0 { l.Index = append(l.Index, r.indexBH) - iter, _ := newBlockIter(r.Compare, indexH.Get()) + iter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) for key, value := iter.First(); key != nil; key, value = iter.Next() { dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -899,8 +935,8 @@ func (r *Reader) Layout() (*Layout, error) { } } else { l.TopIndex = r.indexBH - topIter, _ := newBlockIter(r.Compare, indexH.Get()) - iter := &blockIter{} + topIter, _ := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) + iter := &blockIter{prefix: r.syntheticPrefix} for key, value := topIter.First(); key != nil; key, value = topIter.Next() { indexBH, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -914,7 +950,7 @@ func (r *Reader) Layout() (*Layout, error) { return nil, err } if err := iter.init(r.Compare, subIndex.Get(), 0, /* globalSeqNum */ - false /* hideObsoletePoints */); err != nil { + false /* hideObsoletePoints */, r.syntheticPrefix); err != nil { return nil, err } for key, value := iter.First(); key != nil; key, value = iter.Next() { @@ -1049,14 +1085,14 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // to the same blockIter over the single index in the unpartitioned case. var startIdxIter, endIdxIter *blockIter if r.Properties.IndexPartitions == 0 { - iter, err := newBlockIter(r.Compare, indexH.Get()) + iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) if err != nil { return 0, err } startIdxIter = iter endIdxIter = iter } else { - topIter, err := newBlockIter(r.Compare, indexH.Get()) + topIter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) if err != nil { return 0, err } @@ -1076,7 +1112,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } defer startIdxBlock.Release() - startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get()) + startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get(), r.syntheticPrefix) if err != nil { return 0, err } @@ -1097,7 +1133,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } defer endIdxBlock.Release() - endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get()) + endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get(), r.syntheticPrefix) if err != nil { return 0, err } diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index ed676957d6..e6f90baedc 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -213,7 +213,7 @@ func (i *singleLevelIterator) init( i.stats = stats i.hideObsoletePoints = hideObsoletePoints i.bufferPool = bufferPool - err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum, false) + err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix) if err != nil { // blockIter.Close releases indexH and always returns a nil error _ = i.index.Close() @@ -435,7 +435,7 @@ func (i *singleLevelIterator) loadBlock(dir int8) loadBlockResult { i.err = err return loadBlockFailed } - i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum, i.hideObsoletePoints) + i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum, i.hideObsoletePoints, i.reader.syntheticPrefix) if i.err != nil { // The block is partially loaded, and we don't want it to appear valid. i.data.invalidate() @@ -822,6 +822,9 @@ func (i *singleLevelIterator) seekPrefixGE( i.data.invalidate() return nil, base.LazyValue{} } + if i.reader.syntheticPrefix != nil { + prefix = bytes.TrimPrefix(prefix, i.reader.syntheticPrefix) + } mayContain := i.reader.tableFilter.mayContain(dataH.Get(), prefix) dataH.Release() if !mayContain { diff --git a/sstable/reader_iter_two_lvl.go b/sstable/reader_iter_two_lvl.go index 9a2a3f254c..273a487ccf 100644 --- a/sstable/reader_iter_two_lvl.go +++ b/sstable/reader_iter_two_lvl.go @@ -67,7 +67,7 @@ func (i *twoLevelIterator) loadIndex(dir int8) loadBlockResult { i.err = err return loadBlockFailed } - if i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum, false); i.err == nil { + if i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum, false, i.reader.syntheticPrefix); i.err == nil { return loadBlockOK } return loadBlockFailed @@ -175,7 +175,7 @@ func (i *twoLevelIterator) init( i.stats = stats i.hideObsoletePoints = hideObsoletePoints i.bufferPool = bufferPool - err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum, false) + err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum, false, r.syntheticPrefix) if err != nil { // blockIter.Close releases topLevelIndexH and always returns a nil error _ = i.topLevelIndex.Close() diff --git a/sstable/reader_test.go b/sstable/reader_test.go index cc20f8abc3..e72e9763f9 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -707,7 +707,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { var buf strings.Builder twoLevelIndex := r.Properties.IndexType == twoLevelIndex buf.WriteString("index entries:\n") - iter, err := newBlockIter(r.Compare, indexH.Get()) + iter, err := newBlockIter(r.Compare, indexH.Get(), r.syntheticPrefix) defer func() { require.NoError(t, iter.Close()) }() @@ -721,7 +721,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { context.Background(), bh.BlockHandle, nil, nil, nil, nil, nil) require.NoError(t, err) defer b.Release() - iter2, err := newBlockIter(r.Compare, b.Get()) + iter2, err := newBlockIter(r.Compare, b.Get(), r.syntheticPrefix) defer func() { require.NoError(t, iter2.Close()) }() @@ -1402,11 +1402,12 @@ func buildTestTable( blockSize, indexBlockSize int, compression Compression, prefix []byte, + readerOpts ...ReaderOption, ) *Reader { provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(vfs.NewMem(), "" /* dirName */)) require.NoError(t, err) defer provider.Close() - return buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, compression, prefix) + return buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, compression, prefix, readerOpts...) } func buildTestTableWithProvider( @@ -1416,6 +1417,7 @@ func buildTestTableWithProvider( blockSize, indexBlockSize int, compression Compression, prefix []byte, + readerOpts ...ReaderOption, ) *Reader { f0, _, err := provider.Create(context.Background(), base.FileTypeTable, base.DiskFileNum(0), objstorage.CreateOptions{}) require.NoError(t, err) @@ -1447,7 +1449,9 @@ func buildTestTableWithProvider( defer c.Unref() r, err := NewReader(f1, ReaderOptions{ Cache: c, - }) + }, + readerOpts..., + ) require.NoError(t, err) return r } diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index 1a06f88e8b..3de68ffa44 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -90,7 +90,7 @@ func (v *VirtualReader) NewCompactionIter( ) (Iterator, error) { i, err := v.reader.newCompactionIter( bytesIterated, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool) - if err == nil && v.vState.prefixChange != nil { + if err == nil && v.vState.prefixChange != nil && !v.reader.syntheticPrefix.Implements(v.vState.prefixChange) { i = newPrefixReplacingIterator(i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, v.reader.Compare) } return i, err @@ -113,7 +113,7 @@ func (v *VirtualReader) NewIterWithBlockPropertyFiltersAndContextEtc( i, err := v.reader.newIterWithBlockPropertyFiltersAndContext( ctx, lower, upper, filterer, hideObsoletePoints, useFilterBlock, stats, categoryAndQoS, statsCollector, rp, &v.vState) - if err == nil && v.vState.prefixChange != nil { + if err == nil && v.vState.prefixChange != nil && !v.reader.syntheticPrefix.Implements(v.vState.prefixChange) { i = newPrefixReplacingIterator(i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix, v.reader.Compare) } return i, err diff --git a/sstable/suffix_rewriter.go b/sstable/suffix_rewriter.go index a226e8eb9c..fb292df25a 100644 --- a/sstable/suffix_rewriter.go +++ b/sstable/suffix_rewriter.go @@ -201,7 +201,7 @@ func rewriteBlocks( if err != nil { return err } - if err := iter.init(r.Compare, inputBlock, r.Properties.GlobalSeqNum, false); err != nil { + if err := iter.init(r.Compare, inputBlock, r.Properties.GlobalSeqNum, false, r.syntheticPrefix); err != nil { return err } diff --git a/table_cache.go b/table_cache.go index b730ffec89..c04a94faae 100644 --- a/table_cache.go +++ b/table_cache.go @@ -880,11 +880,16 @@ func (c *tableCacheShard) findNodeInternal( // Note adding to the cache lists must complete before we begin loading the // table as a failure during load will result in the node being unlinked. pprof.Do(context.Background(), tableCacheLabels, func(context.Context) { + var prefix []byte + if meta.PrefixReplacement != nil && len(meta.PrefixReplacement.ContentPrefix) == 0 { + prefix = meta.PrefixReplacement.SyntheticPrefix + } v.load( loadInfo{ - backingFileNum: meta.FileBacking.DiskFileNum, - smallestSeqNum: meta.SmallestSeqNum, - largestSeqNum: meta.LargestSeqNum, + backingFileNum: meta.FileBacking.DiskFileNum, + smallestSeqNum: meta.SmallestSeqNum, + largestSeqNum: meta.LargestSeqNum, + syntheticPrefix: prefix, }, c, dbOpts) }) return v @@ -1111,9 +1116,10 @@ type tableCacheValue struct { } type loadInfo struct { - backingFileNum base.DiskFileNum - largestSeqNum uint64 - smallestSeqNum uint64 + backingFileNum base.DiskFileNum + largestSeqNum uint64 + smallestSeqNum uint64 + syntheticPrefix []byte } func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) { @@ -1125,7 +1131,7 @@ func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *ta ) if err == nil { cacheOpts := private.SSTableCacheOpts(dbOpts.cacheID, loadInfo.backingFileNum).(sstable.ReaderOption) - v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics) + v.reader, err = sstable.NewReader(f, dbOpts.opts, cacheOpts, dbOpts.filterMetrics, sstable.WithSyntheticPrefix(loadInfo.syntheticPrefix)) } if err != nil { v.err = errors.Wrapf( diff --git a/testdata/event_listener b/testdata/event_listener index ad32671e53..b9bf39d4dd 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -222,7 +222,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 6 entries (1.1KB) hit rate: 11.1% -Table cache: 1 entries (808B) hit rate: 40.0% +Table cache: 1 entries (832B) hit rate: 40.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -321,7 +321,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 12 entries (2.3KB) hit rate: 14.3% -Table cache: 1 entries (808B) hit rate: 50.0% +Table cache: 1 entries (832B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 diff --git a/testdata/ingest b/testdata/ingest index 22a7f14bd1..d917ee8538 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -52,7 +52,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 6 entries (1.2KB) hit rate: 35.7% -Table cache: 1 entries (808B) hit rate: 50.0% +Table cache: 1 entries (832B) hit rate: 50.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 diff --git a/testdata/metrics b/testdata/metrics index 55a1beacc1..474f85a3e7 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -68,7 +68,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 3 entries (556B) hit rate: 0.0% -Table cache: 1 entries (808B) hit rate: 0.0% +Table cache: 1 entries (832B) hit rate: 0.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 1 @@ -201,7 +201,7 @@ Zombie tables: 1 (661B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 3 entries (556B) hit rate: 42.9% -Table cache: 1 entries (808B) hit rate: 66.7% +Table cache: 1 entries (832B) hit rate: 66.7% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 1 @@ -467,7 +467,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 12 entries (2.4KB) hit rate: 24.5% -Table cache: 1 entries (808B) hit rate: 60.0% +Table cache: 1 entries (832B) hit rate: 60.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0 @@ -528,7 +528,7 @@ Zombie tables: 0 (0B) Backing tables: 0 (0B) Virtual tables: 0 (0B) Block cache: 12 entries (2.4KB) hit rate: 24.5% -Table cache: 1 entries (808B) hit rate: 60.0% +Table cache: 1 entries (832B) hit rate: 60.0% Secondary cache: 0 entries (0B) hit rate: 0.0% Snapshots: 0 earliest seq num: 0 Table iters: 0