From 29f85dac9eb0355f854b83789231d272f040d218 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 22 Jan 2024 13:32:52 -0600 Subject: [PATCH 1/4] sstable: add block level suffix replacement support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch teaches the block iterator to replace the suffix of all surfaced keys with a replacement suffix passed during block initiation. In other words, the iterator will read the block as if all underlying keys actually had the replacement suffix. For correctness, this patch relies on two invariants: for all keys in the in the sst: 1. no two keys in the sst share the same prefix 2. pebble.Compare(replacementSuffix,originalSuffix) > 0 During the normal readpath, i.e. for blocks without suffix rewriting, this patch does not seem to slow things down: ``` ❯ benchstat pre.txt post.txt goos: linux goarch: amd64 pkg: github.com/cockroachdb/pebble/sstable cpu: Intel(R) Xeon(R) CPU @ 2.80GHz │ pre.txt │ post.txt │ │ sec/op │ sec/op vs base │ BlockIterSeekGE/restart=16-24 351.5n ± 0% 348.8n ± 0% -0.75% (p=0.000 n=10) BlockIterSeekLT/restart=16-24 397.9n ± 0% 408.5n ± 0% +2.68% (p=0.000 n=10) BlockIterNext/restart=16-24 15.92n ± 0% 15.24n ± 0% -4.30% (p=0.000 n=10) BlockIterPrev/restart=16-24 30.51n ± 0% 30.40n ± 0% -0.33% (p=0.001 n=10) geomean 90.78n 90.14n -0.71% ``` --- sstable/block.go | 170 +++++++++++++++++++- sstable/block_property_test.go | 7 +- sstable/block_test.go | 137 +++++++++++++++- sstable/layout.go | 4 +- sstable/reader.go | 29 ++-- sstable/reader_iter_single_lvl.go | 15 +- sstable/reader_iter_two_lvl.go | 4 +- sstable/reader_test.go | 249 +++++++++++++++++++++++++++++- sstable/reader_virtual.go | 1 + sstable/suffix_rewriter.go | 2 +- 10 files changed, 578 insertions(+), 40 deletions(-) diff --git a/sstable/block.go b/sstable/block.go index c0abd4c2c8..cfe782d93f 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -341,7 +341,8 @@ type blockEntry struct { // // We have picked the first option here. type blockIter struct { - cmp Compare + cmp Compare + split Split // offset is the byte index that marks where the current key/value is // encoded in the block. offset int32 @@ -410,14 +411,18 @@ type blockIter struct { hasValuePrefix bool } hideObsoletePoints bool + syntheticSuffix SyntheticSuffix + synthSuffixBuf []byte } // blockIter implements the base.InternalIterator interface. var _ base.InternalIterator = (*blockIter)(nil) -func newBlockIter(cmp Compare, block block) (*blockIter, error) { +func newBlockIter( + cmp Compare, split Split, block block, syntheticSuffix SyntheticSuffix, +) (*blockIter, error) { i := &blockIter{} - return i, i.init(cmp, block, 0, false) + return i, i.init(cmp, split, block, 0, false, syntheticSuffix) } func (i *blockIter) String() string { @@ -425,12 +430,22 @@ func (i *blockIter) String() string { } func (i *blockIter) init( - cmp Compare, block block, globalSeqNum uint64, hideObsoletePoints bool, + cmp Compare, + split Split, + block block, + globalSeqNum uint64, + hideObsoletePoints bool, + syntheticSuffix SyntheticSuffix, ) error { numRestarts := int32(binary.LittleEndian.Uint32(block[len(block)-4:])) if numRestarts == 0 { return base.CorruptionErrorf("pebble/table: invalid table (block has no restart points)") } + i.syntheticSuffix = syntheticSuffix + if i.syntheticSuffix != nil { + i.synthSuffixBuf = []byte{} + } + i.split = split i.cmp = cmp i.restarts = int32(len(block)) - 4*(1+numRestarts) i.numRestarts = numRestarts @@ -457,11 +472,16 @@ func (i *blockIter) init( // ingested. // - Foreign sstable iteration: globalSeqNum is always set. func (i *blockIter) initHandle( - cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, + cmp Compare, + split Split, + block bufferHandle, + globalSeqNum uint64, + hideObsoletePoints bool, + syntheticSuffix SyntheticSuffix, ) error { i.handle.Release() i.handle = block - return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints) + return i.init(cmp, split, block.Get(), globalSeqNum, hideObsoletePoints, syntheticSuffix) } func (i *blockIter) invalidate() { @@ -660,6 +680,25 @@ func (i *blockIter) decodeInternalKey(key []byte) (hiddenPoint bool) { return hiddenPoint } +// maybeReplaceSuffix replaces the suffix in i.ikey.UserKey with +// i.syntheticSuffix. allowInPlace is set to false if there's a chance that +// i.ikey.UserKey points to the same buffer as i.cachedBuf (i.e. during reverse +// iteration). +func (i *blockIter) maybeReplaceSuffix(allowInPlace bool) { + if i.syntheticSuffix != nil && i.ikey.UserKey != nil { + prefixLen := i.split(i.ikey.UserKey) + if allowInPlace && cap(i.ikey.UserKey) >= prefixLen+len(i.syntheticSuffix) { + i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix...) + return + } + // If ikey is cached or may get cached, we must copy + // UserKey to a new buffer before prefix replacement. + i.synthSuffixBuf = append(i.synthSuffixBuf[:0], i.ikey.UserKey[:prefixLen]...) + i.synthSuffixBuf = append(i.synthSuffixBuf, i.syntheticSuffix...) + i.ikey.UserKey = i.synthSuffixBuf + } +} + func (i *blockIter) clearCache() { i.cached = i.cached[:0] i.cachedBuf = i.cachedBuf[:0] @@ -789,6 +828,37 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba if !i.valid() { return nil, base.LazyValue{} } + + // A note on seeking in a block with a suffix replacement rule: even though + // the binary search above was conducted on keys without suffix replacement, + // Seek will still return the correct suffix replaced key. A binary + // search without suffix replacement will land on a key that is _less_ than + // the key the search would have landed on if all keys were already suffix + // replaced. Since Seek then conducts forward iteration to the first suffix + // replaced user key that is greater than or equal to the search key, the + // correct key is still returned. + // + // As an example, consider the following block with a restart interval of 1, + // with a replacement suffix of "4": + // - Pre-suffix replacement: apple@1, banana@3 + // - Post-suffix replacement: apple@4, banana@4 + // + // Suppose the client seeks with apple@3. Assuming suffixes sort in reverse + // chronological order (i.e. apple@1>apple@3), the binary search without + // suffix replacement would return apple@1. A binary search with suffix + // replacement would return banana@4. After beginning forward iteration from + // either returned restart point, forward iteration would + // always return the correct key, banana@4. + // + // Further, if the user searched with apple@0 (i.e. a suffix less than the + // pre replacement suffix) or with apple@5 (a suffix larger than the post + // replacement suffix), the binary search with or without suffix replacement + // would land on the same key, as we assume the following: + // (1) no two keys in the sst share the same prefix. + // (2) pebble.Compare(replacementSuffix,originalSuffix) > 0 + + i.maybeReplaceSuffix(true /*allowInPlace*/) + if !hiddenPoint && i.cmp(i.ikey.UserKey, key) >= 0 { // Initialize i.lazyValue if !i.lazyValueHandling.hasValuePrefix || @@ -920,6 +990,16 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba targetOffset = decodeRestart(i.data[i.restarts+4*(index):]) } } else if index == 0 { + if i.syntheticSuffix != nil { + // The binary search was conducted on keys without suffix replacement, + // implying the first key in the block may be less than the search key. To + // double check, get the first key in the block with suffix replacement + // and compare to the search key. + ikey, lazyVal := i.First() + if i.cmp(ikey.UserKey, key) < 0 { + return ikey, lazyVal + } + } // If index == 0 then all keys in this block are larger than the key // sought. i.offset = -1 @@ -946,6 +1026,7 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba // of hidden keys we will be able to skip whole blocks (using block // property filters) so we don't bother optimizing. hiddenPoint := i.decodeInternalKey(i.key) + i.maybeReplaceSuffix(false /*allowInPlace*/) // NB: we don't use the hiddenPoint return value of decodeInternalKey // since we want to stop as soon as we reach a key >= ikey.UserKey, so @@ -970,9 +1051,42 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) (*InternalKey, ba if hiddenPoint { return i.Prev() } + if i.syntheticSuffix != nil { + // The binary search was conducted on keys without suffix replacement, + // implying the returned restart point may be less than the search key. + // + // For example: consider this block with a replacement ts of 4, and + // restart interval of 1: + // - pre replacement: a3,b2,c3 + // - post replacement: a4,b4,c4 + // + // Suppose the client calls SeekLT(b3), SeekLT must return b4. + // + // If the client calls SeekLT(b3), the binary search would return b2, + // the lowest key geq to b3, pre-suffix replacement. Then, SeekLT will + // begin forward iteration from a3, the previous restart point, to + // b{suffix}. The iteration stops when it encounters a key geq to the + // search key or if it reaches the upper bound. Without suffix replacement, we can assume + // that the upper bound of this forward iteration, b{suffix}, is greater + // than the search key, as implied by the binary search. + // + // If we naively hold this assumption with suffix replacement, the + // iteration would terminate at the upper bound, b4, call i.Prev, and + // incorrectly return a4. Instead, we must continue forward iteration + // past the upper bound, until we find a key geq the search key. With + // this correction, SeekLT would correctly return b4 in this example. + for i.cmp(i.ikey.UserKey, key) < 0 { + i.Next() + if !i.valid() { + break + } + } + // The current key is greater than or equal to our search key. Back up to + // the previous key which was less than our search key. + return i.Prev() + } break } - i.cacheEntry() } @@ -1007,6 +1121,7 @@ func (i *blockIter) First() (*InternalKey, base.LazyValue) { if hiddenPoint { return i.Next() } + i.maybeReplaceSuffix(true /*allowInPlace*/) if !i.lazyValueHandling.hasValuePrefix || base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet { i.lazyValue = base.MakeInPlaceValue(i.val) @@ -1049,6 +1164,7 @@ func (i *blockIter) Last() (*InternalKey, base.LazyValue) { if hiddenPoint { return i.Prev() } + i.maybeReplaceSuffix(false /*allowInPlace*/) if !i.lazyValueHandling.hasValuePrefix || base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet { i.lazyValue = base.MakeInPlaceValue(i.val) @@ -1097,6 +1213,17 @@ start: if hiddenPoint { goto start } + if i.syntheticSuffix != nil { + // Inlined version of i.maybeReplaceSuffix(true /* allowInPlace */) + prefixLen := i.split(i.ikey.UserKey) + if cap(i.ikey.UserKey) >= prefixLen+len(i.syntheticSuffix) { + i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix...) + } else { + i.synthSuffixBuf = append(i.synthSuffixBuf[:0], i.ikey.UserKey[:prefixLen]...) + i.synthSuffixBuf = append(i.synthSuffixBuf, i.syntheticSuffix...) + i.ikey.UserKey = i.synthSuffixBuf + } + } } else { i.ikey.Trailer = uint64(InternalKeyKindInvalid) i.ikey.UserKey = nil @@ -1364,6 +1491,17 @@ func (i *blockIter) nextPrefixV3(succKey []byte) (*InternalKey, base.LazyValue) if i.globalSeqNum != 0 { i.ikey.SetSeqNum(i.globalSeqNum) } + if i.syntheticSuffix != nil { + // Inlined version of i.maybeReplaceSuffix(true /* allowInPlace */) + prefixLen := i.split(i.ikey.UserKey) + if cap(i.ikey.UserKey) >= prefixLen+len(i.syntheticSuffix) { + i.ikey.UserKey = append(i.ikey.UserKey[:prefixLen], i.syntheticSuffix...) + } else { + i.synthSuffixBuf = append(i.synthSuffixBuf[:0], i.ikey.UserKey[:prefixLen]...) + i.synthSuffixBuf = append(i.synthSuffixBuf, i.syntheticSuffix...) + i.ikey.UserKey = i.synthSuffixBuf + } + } } else { i.ikey.Trailer = uint64(InternalKeyKindInvalid) i.ikey.UserKey = nil @@ -1422,6 +1560,15 @@ start: if i.globalSeqNum != 0 { i.ikey.SetSeqNum(i.globalSeqNum) } + if i.syntheticSuffix != nil { + // Inlined version of i.maybeReplaceSuffix(false /* allowInPlace */) + prefixLen := i.split(i.ikey.UserKey) + // If ikey is cached or may get cached, we must de-reference + // UserKey before prefix replacement. + i.synthSuffixBuf = append(i.synthSuffixBuf[:0], i.ikey.UserKey[:prefixLen]...) + i.synthSuffixBuf = append(i.synthSuffixBuf, i.syntheticSuffix...) + i.ikey.UserKey = i.synthSuffixBuf + } } else { i.ikey.Trailer = uint64(InternalKeyKindInvalid) i.ikey.UserKey = nil @@ -1499,6 +1646,15 @@ start: // Use the cache. goto start } + if i.syntheticSuffix != nil { + // Inlined version of i.maybeReplaceSuffix(false /* allowInPlace */) + prefixLen := i.split(i.ikey.UserKey) + // If ikey is cached or may get cached, we must de-reference + // UserKey before prefix replacement. + i.synthSuffixBuf = append(i.synthSuffixBuf[:0], i.ikey.UserKey[:prefixLen]...) + i.synthSuffixBuf = append(i.synthSuffixBuf, i.syntheticSuffix...) + i.ikey.UserKey = i.synthSuffixBuf + } if !i.lazyValueHandling.hasValuePrefix || base.TrailerKind(i.ikey.Trailer) != InternalKeyKindSet { i.lazyValue = base.MakeInPlaceValue(i.val) diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go index 0f30205403..b5b52584a6 100644 --- a/sstable/block_property_test.go +++ b/sstable/block_property_test.go @@ -938,7 +938,7 @@ func TestBlockProperties(t *testing.T) { var blocks []int var i int - iter, _ := newBlockIter(r.Compare, indexH.Get()) + iter, _ := newBlockIter(r.Compare, r.Split, indexH.Get(), nil /* syntheticSuffix */) for key, value := iter.First(); key != nil; key, value = iter.Next() { bh, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -1274,7 +1274,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { return err.Error() } twoLevelIndex := r.Properties.IndexPartitions > 0 - i, err := newBlockIter(r.Compare, bh.Get()) + i, err := newBlockIter(r.Compare, r.Split, bh.Get(), nil /* syntheticSuffix */) if err != nil { return err.Error() } @@ -1321,8 +1321,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { if err != nil { return err.Error() } - if err := subiter.init( - r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false); err != nil { + if err := subiter.init(r.Compare, r.Split, subIndex.Get(), 0, false, nil); err != nil { return err.Error() } for key, value := subiter.First(); key != nil; key, value = subiter.Next() { diff --git a/sstable/block_test.go b/sstable/block_test.go index 14e6f7ff8a..3b2e06f78a 100644 --- a/sstable/block_test.go +++ b/sstable/block_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/itertest" + "github.com/cockroachdb/pebble/internal/testkeys" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" ) @@ -245,7 +246,7 @@ func TestBlockIter2(t *testing.T) { return "" case "iter": - iter, err := newBlockIter(bytes.Compare, block) + iter, err := newBlockIter(bytes.Compare, nil, block, nil /* syntheticSuffix */) if err != nil { return err.Error() } @@ -276,7 +277,7 @@ func TestBlockIterKeyStability(t *testing.T) { } block := w.finish() - i, err := newBlockIter(bytes.Compare, block) + i, err := newBlockIter(bytes.Compare, nil, block, nil /* syntheticSuffix */) require.NoError(t, err) // Check that the supplied slice resides within the bounds of the block. @@ -336,7 +337,7 @@ func TestBlockIterReverseDirections(t *testing.T) { for targetPos := 0; targetPos < w.restartInterval; targetPos++ { t.Run("", func(t *testing.T) { - i, err := newBlockIter(bytes.Compare, block) + i, err := newBlockIter(bytes.Compare, nil, block, nil /* syntheticSuffix */) require.NoError(t, err) pos := 3 @@ -357,6 +358,128 @@ func TestBlockIterReverseDirections(t *testing.T) { } } +// checker is a test helper that verifies that two different iterators running +// the same sequence of operations return the same result. To use correctly, pass +// the iter call directly as an arg to check(), i.e.: +// +// c.check(expect.SeekGE([]byte("apricot@2"), base.SeekGEFlagsNone))(got.SeekGE([]byte("apricot@2"), base.SeekGEFlagsNone)) +// c.check(expect.Next())(got.Next()) +// +// NB: the signature to check is not simply `check(eKey,eVal,gKey,gVal)` because +// `check(expect.Next(),got.Next())` does not compile. +type checker struct { + t *testing.T + notValid bool +} + +func (c *checker) check( + eKey *base.InternalKey, eVal base.LazyValue, +) func(gKey *base.InternalKey, gVal base.LazyValue) { + return func(gKey *base.InternalKey, gVal base.LazyValue) { + c.t.Helper() + if eKey != nil { + require.NotNil(c.t, gKey, "expected %q", eKey.UserKey) + c.t.Logf("expected %q, got %q", eKey.UserKey, gKey.UserKey) + require.Equal(c.t, eKey, gKey) + require.Equal(c.t, eVal, gVal) + } else { + c.t.Logf("expected nil, got %q", gKey) + require.Nil(c.t, gKey) + c.notValid = true + } + } +} + +func TestBlockSyntheticSuffix(t *testing.T) { + // TODO(msbutler): add test where replacement suffix has fewer bytes than previous suffix + expectedSuffix := "15" + synthSuffix := []byte("@" + expectedSuffix) + + // Use testkeys.Comparer.Compare which approximates EngineCompare by ordering + // multiple keys with same prefix in descending suffix order. + cmp := testkeys.Comparer.Compare + split := testkeys.Comparer.Split + + for _, restarts := range []int{1, 2, 3, 4, 10} { + t.Run(fmt.Sprintf("restarts=%d", restarts), func(t *testing.T) { + + suffixWriter, expectedSuffixWriter := &blockWriter{restartInterval: restarts}, &blockWriter{restartInterval: restarts} + keys := []string{ + "apple@2", "apricot@2", "banana@13", + "grape@2", "orange@14", "peach@4", + "pear@1", "persimmon@4", + } + for _, key := range keys { + suffixWriter.add(ikey(key), nil) + replacedKey := strings.Split(key, "@")[0] + "@" + expectedSuffix + expectedSuffixWriter.add(ikey(replacedKey), nil) + } + + suffixReplacedBlock := suffixWriter.finish() + expectedBlock := expectedSuffixWriter.finish() + + expect, err := newBlockIter(cmp, split, expectedBlock, nil /* syntheticSuffix */) + require.NoError(t, err) + + got, err := newBlockIter(cmp, split, suffixReplacedBlock, synthSuffix) + require.NoError(t, err) + + c := checker{t: t} + + c.check(expect.First())(got.First()) + c.check(expect.Next())(got.Next()) + c.check(expect.Prev())(got.Prev()) + + // Try searching with a key that matches the target key before replacement + c.check(expect.SeekGE([]byte("apricot@2"), base.SeekGEFlagsNone))(got.SeekGE([]byte("apricot@2"), base.SeekGEFlagsNone)) + c.check(expect.Next())(got.Next()) + + // Try searching with a key that matches the target key after replacement + c.check(expect.SeekGE([]byte("orange@15"), base.SeekGEFlagsNone))(got.SeekGE([]byte("orange@15"), base.SeekGEFlagsNone)) + c.check(expect.Next())(got.Next()) + c.check(expect.Next())(got.Next()) + + // Try searching with a key that results in off by one handling + c.check(expect.First())(got.First()) + c.check(expect.SeekGE([]byte("grape@10"), base.SeekGEFlagsNone))(got.SeekGE([]byte("grape@10"), base.SeekGEFlagsNone)) + c.check(expect.Next())(got.Next()) + c.check(expect.Next())(got.Next()) + + // Try searching with a key with suffix greater than the replacement + c.check(expect.SeekGE([]byte("orange@16"), base.SeekGEFlagsNone))(got.SeekGE([]byte("orange@16"), base.SeekGEFlagsNone)) + c.check(expect.Next())(got.Next()) + + // Exhaust the iterator via searching + c.check(expect.SeekGE([]byte("persimmon@17"), base.SeekGEFlagsNone))(got.SeekGE([]byte("persimmon@17"), base.SeekGEFlagsNone)) + + // Ensure off by one handling works at end of iterator + c.check(expect.SeekGE([]byte("persimmon@10"), base.SeekGEFlagsNone))(got.SeekGE([]byte("persimmon@10"), base.SeekGEFlagsNone)) + + // Try reverse search with a key that matches the target key before replacement + c.check(expect.SeekLT([]byte("banana@13"), base.SeekLTFlagsNone))(got.SeekLT([]byte("banana@13"), base.SeekLTFlagsNone)) + c.check(expect.Prev())(got.Prev()) + c.check(expect.Next())(got.Next()) + + // Try reverse searching with a key that matches the target key after replacement + c.check(expect.Last())(got.Last()) + c.check(expect.SeekLT([]byte("apricot@15"), base.SeekLTFlagsNone))(got.SeekLT([]byte("apricot@15"), base.SeekLTFlagsNone)) + + // Try reverse searching with a key with suffix in between original and target replacement + c.check(expect.Last())(got.Last()) + c.check(expect.SeekLT([]byte("peach@10"), base.SeekLTFlagsNone))(got.SeekLT([]byte("peach@10"), base.SeekLTFlagsNone)) + c.check(expect.Prev())(got.Prev()) + c.check(expect.Next())(got.Next()) + + // Exhaust the iterator via reverse searching + c.check(expect.SeekLT([]byte("apple@17"), base.SeekLTFlagsNone))(got.SeekLT([]byte("apple@17"), base.SeekLTFlagsNone)) + + // Ensure off by one handling works at end of iterator + c.check(expect.Last())(got.Last()) + c.check(expect.SeekLT([]byte("apple@10"), base.SeekLTFlagsNone))(got.SeekLT([]byte("apple@10"), base.SeekLTFlagsNone)) + }) + } +} + func BenchmarkBlockIterSeekGE(b *testing.B) { const blockSize = 32 << 10 @@ -376,7 +499,7 @@ func BenchmarkBlockIterSeekGE(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, nil, w.finish(), nil /* syntheticSuffix */) if err != nil { b.Fatal(err) } @@ -418,7 +541,7 @@ func BenchmarkBlockIterSeekLT(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, nil, w.finish(), nil /* syntheticSuffix */) if err != nil { b.Fatal(err) } @@ -464,7 +587,7 @@ func BenchmarkBlockIterNext(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, nil, w.finish(), nil /* syntheticSuffix */) if err != nil { b.Fatal(err) } @@ -496,7 +619,7 @@ func BenchmarkBlockIterPrev(b *testing.B) { w.add(ikey, nil) } - it, err := newBlockIter(bytes.Compare, w.finish()) + it, err := newBlockIter(bytes.Compare, nil, w.finish(), nil /* syntheticSuffix */) if err != nil { b.Fatal(err) } diff --git a/sstable/layout.go b/sstable/layout.go index 8c375741de..7c7a74dd66 100644 --- a/sstable/layout.go +++ b/sstable/layout.go @@ -186,7 +186,7 @@ func (l *Layout) Describe( var lastKey InternalKey switch b.name { case "data", "range-del", "range-key": - iter, _ := newBlockIter(r.Compare, h.Get()) + iter, _ := newBlockIter(r.Compare, r.Split, h.Get(), nil /* syntheticSuffix */) for key, value := iter.First(); key != nil; key, value = iter.Next() { ptr := unsafe.Pointer(uintptr(iter.ptr) + uintptr(iter.offset)) shared, ptr := decodeVarint(ptr) @@ -238,7 +238,7 @@ func (l *Layout) Describe( formatRestarts(iter.data, iter.restarts, iter.numRestarts) formatTrailer() case "index", "top-index": - iter, _ := newBlockIter(r.Compare, h.Get()) + iter, _ := newBlockIter(r.Compare, r.Split, h.Get(), nil /* syntheticSuffix */) for key, value := iter.First(); key != nil; key, value = iter.Next() { bh, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { diff --git a/sstable/reader.go b/sstable/reader.go index ce8f1dac76..3fcf3259a8 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -157,6 +157,13 @@ func (c *cacheOpts) writerApply(w *Writer) { } } +// SyntheticSuffix will replace every suffix of every key surfaced during block +// iteration. The client should only initiate a reader with SuffixReplacement if: +// +// (1) no two keys in the sst share the same prefix +// (2) pebble.Compare(replacementSuffix,originalSuffix) > 0, for all keys +type SyntheticSuffix []byte + // rawTombstonesOpt is a Reader open option for specifying that range // tombstones returned by Reader.NewRangeDelIter() should not be // fragmented. Used by debug tools to get a raw view of the tombstones @@ -429,7 +436,7 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { // sstables. This is because rangedels do not apply to points in the same // sstable at the same sequence number anyway, so exposing obsolete rangedels // is harmless. - if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil { + if err := i.blockIter.initHandle(r.Compare, r.Split, h, r.Properties.GlobalSeqNum, false, nil); err != nil { return nil, err } return i, nil @@ -451,7 +458,7 @@ func (r *Reader) newRawRangeKeyIter(vState *virtualState) (keyspan.FragmentItera if vState == nil || !vState.isSharedIngested { globalSeqNum = r.Properties.GlobalSeqNum } - if err := i.blockIter.initHandle(r.Compare, h, globalSeqNum, false /* hideObsoletePoints */); err != nil { + if err := i.blockIter.initHandle(r.Compare, r.Split, h, globalSeqNum, false, nil); err != nil { return nil, err } return i, nil @@ -706,7 +713,7 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { // tombstones. We need properly fragmented and sorted range tombstones in // order to serve from them directly. iter := &blockIter{} - if err := iter.init(r.Compare, b, r.Properties.GlobalSeqNum, false); err != nil { + if err := iter.init(r.Compare, r.Split, b, r.Properties.GlobalSeqNum, false, nil); err != nil { return nil, err } var tombstones []keyspan.Span @@ -886,7 +893,7 @@ func (r *Reader) Layout() (*Layout, error) { if r.Properties.IndexPartitions == 0 { l.Index = append(l.Index, r.indexBH) - iter, _ := newBlockIter(r.Compare, indexH.Get()) + iter, _ := newBlockIter(r.Compare, r.Split, indexH.Get(), nil /* syntheticSuffix */) for key, value := iter.First(); key != nil; key, value = iter.Next() { dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue()) if err != nil { @@ -899,7 +906,7 @@ func (r *Reader) Layout() (*Layout, error) { } } else { l.TopIndex = r.indexBH - topIter, _ := newBlockIter(r.Compare, indexH.Get()) + topIter, _ := newBlockIter(r.Compare, r.Split, indexH.Get(), nil /* syntheticSuffix */) iter := &blockIter{} for key, value := topIter.First(); key != nil; key, value = topIter.Next() { indexBH, err := decodeBlockHandleWithProperties(value.InPlaceValue()) @@ -913,8 +920,8 @@ func (r *Reader) Layout() (*Layout, error) { if err != nil { return nil, err } - if err := iter.init(r.Compare, subIndex.Get(), 0, /* globalSeqNum */ - false /* hideObsoletePoints */); err != nil { + // TODO(msbutler): figure out how to pass virtualState to layout call. + if err := iter.init(r.Compare, r.Split, subIndex.Get(), 0, false, nil); err != nil { return nil, err } for key, value := iter.First(); key != nil; key, value = iter.Next() { @@ -1049,14 +1056,14 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { // to the same blockIter over the single index in the unpartitioned case. var startIdxIter, endIdxIter *blockIter if r.Properties.IndexPartitions == 0 { - iter, err := newBlockIter(r.Compare, indexH.Get()) + iter, err := newBlockIter(r.Compare, r.Split, indexH.Get(), nil /* syntheticSuffix */) if err != nil { return 0, err } startIdxIter = iter endIdxIter = iter } else { - topIter, err := newBlockIter(r.Compare, indexH.Get()) + topIter, err := newBlockIter(r.Compare, r.Split, indexH.Get(), nil /* syntheticSuffix */) if err != nil { return 0, err } @@ -1076,7 +1083,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } defer startIdxBlock.Release() - startIdxIter, err = newBlockIter(r.Compare, startIdxBlock.Get()) + startIdxIter, err = newBlockIter(r.Compare, r.Split, startIdxBlock.Get(), nil /* syntheticSuffix */) if err != nil { return 0, err } @@ -1097,7 +1104,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, err } defer endIdxBlock.Release() - endIdxIter, err = newBlockIter(r.Compare, endIdxBlock.Get()) + endIdxIter, err = newBlockIter(r.Compare, r.Split, endIdxBlock.Get(), nil /* syntheticSuffix */) if err != nil { return 0, err } diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index c950a59447..628d1157d8 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -218,7 +218,7 @@ func (i *singleLevelIterator) init( i.stats = stats i.hideObsoletePoints = hideObsoletePoints i.bufferPool = bufferPool - err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum, false) + err = i.index.initHandle(i.cmp, r.Split, indexH, r.Properties.GlobalSeqNum, false, i.getSyntheticSuffx()) if err != nil { // blockIter.Close releases indexH and always returns a nil error _ = i.index.Close() @@ -250,19 +250,26 @@ func (i *singleLevelIterator) init( return nil } +func (i *singleLevelIterator) getSyntheticSuffx() SyntheticSuffix { + if i.vState != nil { + return i.vState.syntheticSuffix + } + return nil +} + // Helper function to check if keys returned from iterator are within global and virtual bounds. func (i *singleLevelIterator) maybeVerifyKey( iKey *InternalKey, val base.LazyValue, ) (*InternalKey, base.LazyValue) { // maybeVerify key is only used for virtual sstable iterators. - if invariants.Enabled && i.vState != nil && iKey != nil { + if invariants.Enabled && i.vState != nil && iKey != nil && i.upper != nil && i.lower != nil { key := iKey.UserKey uc, vuc := i.cmp(key, i.upper), i.cmp(key, i.vState.upper.UserKey) lc, vlc := i.cmp(key, i.lower), i.cmp(key, i.vState.lower.UserKey) if (i.vState.upper.IsExclusiveSentinel() && vuc == 0) || (!i.endKeyInclusive && uc == 0) || uc > 0 || vuc > 0 || lc < 0 || vlc < 0 { - panic(fmt.Sprintf("key: %s out of bounds of singleLevelIterator", key)) + panic(fmt.Sprintf("key: %s out of bounds of singleLevelIterator: i.upper %s, i.lower %s, vstate upper %s, vstate lower %s", key, i.upper, i.lower, i.vState.upper.UserKey, i.vState.lower.UserKey)) } } return iKey, val @@ -441,7 +448,7 @@ func (i *singleLevelIterator) loadBlock(dir int8) loadBlockResult { i.err = err return loadBlockFailed } - i.err = i.data.initHandle(i.cmp, block, i.reader.Properties.GlobalSeqNum, i.hideObsoletePoints) + i.err = i.data.initHandle(i.cmp, i.reader.Split, block, i.reader.Properties.GlobalSeqNum, i.hideObsoletePoints, i.getSyntheticSuffx()) if i.err != nil { // The block is partially loaded, and we don't want it to appear valid. i.data.invalidate() diff --git a/sstable/reader_iter_two_lvl.go b/sstable/reader_iter_two_lvl.go index 24ef0e3256..e486316295 100644 --- a/sstable/reader_iter_two_lvl.go +++ b/sstable/reader_iter_two_lvl.go @@ -68,7 +68,7 @@ func (i *twoLevelIterator) loadIndex(dir int8) loadBlockResult { i.err = err return loadBlockFailed } - if i.err = i.index.initHandle(i.cmp, indexBlock, i.reader.Properties.GlobalSeqNum, false); i.err == nil { + if i.err = i.index.initHandle(i.cmp, i.reader.Split, indexBlock, i.reader.Properties.GlobalSeqNum, false, i.getSyntheticSuffx()); i.err == nil { return loadBlockOK } return loadBlockFailed @@ -177,7 +177,7 @@ func (i *twoLevelIterator) init( i.stats = stats i.hideObsoletePoints = hideObsoletePoints i.bufferPool = bufferPool - err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum, false) + err = i.topLevelIndex.initHandle(i.cmp, i.reader.Split, topLevelIndexH, r.Properties.GlobalSeqNum, false, i.getSyntheticSuffx()) if err != nil { // blockIter.Close releases topLevelIndexH and always returns a nil error _ = i.topLevelIndex.Close() diff --git a/sstable/reader_test.go b/sstable/reader_test.go index c94083eca8..c4f024b270 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -14,6 +14,7 @@ import ( "os" "path" "path/filepath" + "strconv" "strings" "testing" "time" @@ -715,7 +716,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { var buf strings.Builder twoLevelIndex := r.Properties.IndexType == twoLevelIndex buf.WriteString("index entries:\n") - iter, err := newBlockIter(r.Compare, indexH.Get()) + iter, err := newBlockIter(r.Compare, r.Split, indexH.Get(), nil /* syntheticSuffix */) defer func() { require.NoError(t, iter.Close()) }() @@ -729,7 +730,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { context.Background(), bh.BlockHandle, nil, nil, nil, nil, nil) require.NoError(t, err) defer b.Release() - iter2, err := newBlockIter(r.Compare, b.Get()) + iter2, err := newBlockIter(r.Compare, r.Split, b.Get(), nil /* syntheticSuffix */) defer func() { require.NoError(t, iter2.Close()) }() @@ -1092,6 +1093,250 @@ func TestReadaheadSetupForV3TablesWithMultipleVersions(t *testing.T) { } } +type readCallType int + +const ( + SeekGE readCallType = iota + Next + Prev + SeekLT + First + Last +) + +// readCall represents an Iterator call. For seek calls, the seekKey must be +// set. For Next and Prev, the repeatCount instructs the iterator to repeat the +// call. +type readCall struct { + callType readCallType + seekKey []byte + repeatCount int +} + +// readerWorkload creates a random sequence of Iterator calls. If the iterator +// becomes invalid, the user can call handleInvalid to move the iterator so +// random part in the keyspace. The readerWorkload assumes the underlying file +// contains keys throughout the keyspace. +// +// TODO(msbutler): eventually merge this randomized reader helper with the utility +// in sstable/random_test.go. This will require teaching `buildRandomSSTable()` to +// build a "treatment" sst with randomized timestamp suffixes and a "control" sst +// with fixed timestamp suffixes. +type readerWorkload struct { + ks testkeys.Keyspace + t *testing.T + calls []readCall + maxTs int64 + seekKeyAfterInvalid []byte + rng *rand.Rand +} + +func (rw *readerWorkload) setSeekKeyAfterInvalid() { + idx := rw.rng.Int63n(rw.ks.Count()) + ts := rw.rng.Int63n(rw.maxTs) + rw.seekKeyAfterInvalid = testkeys.KeyAt(rw.ks, idx, ts) +} + +func (rw *readerWorkload) handleInvalid( + call readCall, iter Iterator, +) (*InternalKey, base.LazyValue) { + switch { + case SeekGE == call.callType || Next == call.callType || Last == call.callType: + return iter.SeekLT(rw.seekKeyAfterInvalid, base.SeekLTFlagsNone) + case SeekLT == call.callType || Prev == call.callType || First == call.callType: + return iter.SeekGE(rw.seekKeyAfterInvalid, base.SeekGEFlagsNone) + default: + rw.t.Fatalf("unkown call") + return nil, base.LazyValue{} + } +} + +func (rw *readerWorkload) read(call readCall, iter Iterator) (*InternalKey, base.LazyValue) { + switch call.callType { + case SeekGE: + return iter.SeekGE(call.seekKey, base.SeekGEFlagsNone) + case Next: + return rw.repeatRead(call, iter) + case SeekLT: + return iter.SeekLT(call.seekKey, base.SeekLTFlagsNone) + case Prev: + return rw.repeatRead(call, iter) + case First: + return iter.First() + case Last: + return iter.Last() + default: + rw.t.Fatalf("unkown call") + return nil, base.LazyValue{} + } +} + +func (rw *readerWorkload) repeatRead(call readCall, iter Iterator) (*InternalKey, base.LazyValue) { + var repeatCall func() (*InternalKey, base.LazyValue) + + switch call.callType { + case Next: + repeatCall = iter.Next + case Prev: + repeatCall = iter.Prev + default: + rw.t.Fatalf("unknown repeat read call") + } + for i := 0; i < call.repeatCount; i++ { + key, val := repeatCall() + if key != nil { + return key, val + } + } + return repeatCall() +} + +func createReadWorkload( + t *testing.T, rng *rand.Rand, callCount int, ks testkeys.Keyspace, maxTS int64, +) readerWorkload { + calls := make([]readCall, 0, callCount) + + for i := 0; i < callCount; i++ { + key := make([]byte, 0) + callType := readCallType(rng.Intn(int(Last + 1))) + repeatCount := 0 + if callType == First || callType == Last { + // Sqrt the likelihood of calling First and Last as they're not very interesting. + callType = readCallType(rng.Intn(int(Last + 1))) + } + if callType == SeekLT || callType == SeekGE { + idx := rng.Int63n(int64(ks.MaxLen())) + ts := rng.Int63n(maxTS) + 1 + key = testkeys.KeyAt(ks, idx, ts) + } + if callType == Next || callType == Prev { + repeatCount = rng.Intn(100) + } + calls = append(calls, readCall{callType: callType, seekKey: key, repeatCount: repeatCount}) + } + return readerWorkload{ + calls: calls, + t: t, + ks: ks, + maxTs: maxTS, + rng: rng, + } +} + +// TestRandomizedSuffixRewriter runs a random sequence of iterator calls +// on an sst with keys with a fixed timestamp and asserts that all calls +// return the same sequence of keys as another iterator initialized with +// a suffix replacement rule to that fixed timestamp which reads an sst +// with the same keys and randomized suffixes. In other words, this is a +// randomized version of TestBlockSyntheticSuffix. +func TestRandomizedSuffixRewriter(t *testing.T) { + + ks := testkeys.Alpha(3) + + callCount := 500 + maxTs := int64(10) + suffix := int64(12) + syntheticSuffix := []byte("@" + strconv.Itoa(int(suffix))) + + potentialBlockSize := []int{32, 64, 128, 256} + potentialRestartInterval := []int{1, 4, 8, 16} + + seed := uint64(time.Now().UnixNano()) + rng := rand.New(rand.NewSource(seed)) + mem := vfs.NewMem() + + blockSize := potentialBlockSize[rng.Intn(len(potentialBlockSize))] + restartInterval := potentialRestartInterval[rng.Intn(len(potentialRestartInterval))] + t.Logf("Configured Block Size %d, Restart Interval %d, Seed %d", blockSize, restartInterval, seed) + + createIter := func(fileName string, syntheticSuffix SyntheticSuffix) (Iterator, func()) { + f, err := mem.Open(fileName) + require.NoError(t, err) + eReader, err := newReader(f, ReaderOptions{Comparer: testkeys.Comparer}) + require.NoError(t, err) + iter, err := eReader.newIterWithBlockPropertyFiltersAndContext( + context.Background(), + nil, nil, nil, false, + true, nil, CategoryAndQoS{}, nil, + TrivialReaderProvider{Reader: eReader}, &virtualState{syntheticSuffix: syntheticSuffix}) + require.NoError(t, err) + return iter, func() { + require.NoError(t, iter.Close()) + require.NoError(t, eReader.Close()) + } + } + + for _, twoLevelIndex := range []bool{false, true} { + + testCaseName := "single-level" + if twoLevelIndex == true { + testCaseName = "two-level" + } + t.Run(testCaseName, func(t *testing.T) { + indexBlockSize := 4096 + if twoLevelIndex { + indexBlockSize = 1 + } + + createFile := func(randSuffix bool) string { + // initialize a new rng so that every createFile + // call generates the same sequence of random numbers. + localRng := rand.New(rand.NewSource(seed)) + name := "randTS" + if !randSuffix { + name = "fixedTS" + } + name = name + testCaseName + f, err := mem.Create(name) + require.NoError(t, err) + + w := NewWriter(objstorageprovider.NewFileWritable(f), WriterOptions{ + BlockRestartInterval: restartInterval, + BlockSize: blockSize, + IndexBlockSize: indexBlockSize, + Comparer: testkeys.Comparer, + }) + + keyIdx := int64(0) + maxIdx := ks.Count() + for keyIdx < maxIdx { + // We need to call rng here even if we don't actually use the ts + // because the sequence of rng calls must be identical for the treatment + // and control sst. + ts := localRng.Int63n(maxTs) + if !randSuffix { + ts = suffix + } + key := testkeys.KeyAt(ks, keyIdx, ts) + require.NoError(t, w.Set(key, []byte(fmt.Sprint(keyIdx)))) + skipIdx := localRng.Int63n(5) + 1 + keyIdx += skipIdx + } + require.NoError(t, w.Close()) + return name + } + + eFileName := createFile(false) + eIter, eCleanup := createIter(eFileName, nil) + defer eCleanup() + + fileName := createFile(true) + iter, cleanup := createIter(fileName, syntheticSuffix) + defer cleanup() + + w := createReadWorkload(t, rng, callCount, ks, maxTs+2) + workloadChecker := checker{t: t} + for _, call := range w.calls { + workloadChecker.check(w.read(call, eIter))(w.read(call, iter)) + if workloadChecker.notValid { + w.setSeekKeyAfterInvalid() + workloadChecker.check(w.handleInvalid(call, eIter))(w.handleInvalid(call, iter)) + } + } + }) + } +} + func TestReaderChecksumErrors(t *testing.T) { for _, checksumType := range []ChecksumType{ChecksumTypeCRC32c, ChecksumTypeXXHash64} { t.Run(fmt.Sprintf("checksum-type=%d", checksumType), func(t *testing.T) { diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index 1a06f88e8b..d2d09714a9 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -35,6 +35,7 @@ type virtualState struct { Compare Compare isSharedIngested bool prefixChange *manifest.PrefixReplacement + syntheticSuffix SyntheticSuffix } func ceilDiv(a, b uint64) uint64 { diff --git a/sstable/suffix_rewriter.go b/sstable/suffix_rewriter.go index 359d2e5d5a..480bad0818 100644 --- a/sstable/suffix_rewriter.go +++ b/sstable/suffix_rewriter.go @@ -201,7 +201,7 @@ func rewriteBlocks( if err != nil { return err } - if err := iter.init(r.Compare, inputBlock, r.Properties.GlobalSeqNum, false); err != nil { + if err := iter.init(r.Compare, r.Split, inputBlock, r.Properties.GlobalSeqNum, false, nil); err != nil { return err } From e37a9ae9fbd47f7d28fe167adb4cedbb200fb291 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 5 Feb 2024 13:00:23 -0700 Subject: [PATCH 2/4] sstable: add synthetic suffix replacement block level benchmarks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The microbenchmark reveals that block iter operations with suffix replacement are between 15 and 50% slower than without suffix replacement. ``` ❯ go test -benchmem -run=^$ -bench ^BenchmarkBlockIter github.com/cockroachdb/pebble/sstable goos: linux goarch: amd64 pkg: github.com/cockroachdb/pebble/sstable cpu: Intel(R) Xeon(R) CPU @ 2.80GHz BenchmarkBlockIterSeekGE/syntheticSuffix=false;restart=16-24 2405984 498.9 ns/op 0 B/op 0 allocs/op BenchmarkBlockIterSeekGE/syntheticSuffix=true;restart=16-24 2096414 572.7 ns/op 0 B/op 0 allocs/op BenchmarkBlockIterSeekLT/syntheticSuffix=false;restart=16-24 2198102 547.7 ns/op 0 B/op 0 allocs/op BenchmarkBlockIterSeekLT/syntheticSuffix=true;restart=16-24 1677667 715.6 ns/op 0 B/op 0 allocs/op BenchmarkBlockIterNext/syntheticSuffix=false;restart=16-24 74066689 16.33 ns/op 0 B/op 0 allocs/op BenchmarkBlockIterNext/syntheticSuffix=true;restart=16-24 53286625 22.43 ns/op 0 B/op 0 allocs/op BenchmarkBlockIterPrev/syntheticSuffix=false;restart=16-24 38461141 31.13 ns/op 0 B/op 0 allocs/op BenchmarkBlockIterPrev/syntheticSuffix=true;restart=16-24 27656383 43.49 ns/op 0 B/op 0 allocs/op PASS ok github.com/cockroachdb/pebble/sstable 12.142s ``` Note that this patch changed the workload run by the baseline block iter benchmark (i.e. without suffix replacement). --- sstable/block_test.go | 266 +++++++++++++++++++++++------------------- 1 file changed, 148 insertions(+), 118 deletions(-) diff --git a/sstable/block_test.go b/sstable/block_test.go index 3b2e06f78a..282ce094d7 100644 --- a/sstable/block_test.go +++ b/sstable/block_test.go @@ -480,157 +480,187 @@ func TestBlockSyntheticSuffix(t *testing.T) { } } -func BenchmarkBlockIterSeekGE(b *testing.B) { - const blockSize = 32 << 10 +var ( + benchSynthSuffix = []byte("@15") - for _, restartInterval := range []int{16} { - b.Run(fmt.Sprintf("restart=%d", restartInterval), - func(b *testing.B) { - w := &blockWriter{ - restartInterval: restartInterval, - } + // Use testkeys.Comparer.Compare which approximates EngineCompare by ordering + // multiple keys with same prefix in descending suffix order. + benchCmp = testkeys.Comparer.Compare + benchSplit = testkeys.Comparer.Split +) - var ikey InternalKey - var keys [][]byte - for i := 0; w.estimatedSize() < blockSize; i++ { - key := []byte(fmt.Sprintf("%05d", i)) - keys = append(keys, key) - ikey.UserKey = key - w.add(ikey, nil) - } +// choosOrigSuffix randomly chooses a suffix that is either 1 or 2 bytes large. +// This ensures we benchmark when suffix replacement adds a larger suffix. +func chooseOrigSuffix(rng *rand.Rand) []byte { + origSuffix := []byte("@10") + if rng.Intn(10)%2 == 0 { + origSuffix = []byte("@9") + } + return origSuffix +} - it, err := newBlockIter(bytes.Compare, nil, w.finish(), nil /* syntheticSuffix */) - if err != nil { - b.Fatal(err) - } - rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) +func createBenchBlock(blockSize int, w *blockWriter, rng *rand.Rand) [][]byte { + origSuffix := chooseOrigSuffix(rng) + var ikey InternalKey + var keys [][]byte + for i := 0; w.estimatedSize() < blockSize; i++ { + key := []byte(fmt.Sprintf("%05d%s", i, origSuffix)) + ikey.UserKey = key + w.add(ikey, nil) + keys = append(keys, key) + } + return keys +} - b.ResetTimer() - for i := 0; i < b.N; i++ { - k := keys[rng.Intn(len(keys))] - it.SeekGE(k, base.SeekGEFlagsNone) - if testing.Verbose() { - if !it.valid() { - b.Fatal("expected to find key") - } - if !bytes.Equal(k, it.Key().UserKey) { - b.Fatalf("expected %s, but found %s", k, it.Key().UserKey) +func BenchmarkBlockIterSeekGE(b *testing.B) { + const blockSize = 32 << 10 + for _, withSyntheticSuffix := range []bool{false, true} { + for _, restartInterval := range []int{16} { + b.Run(fmt.Sprintf("syntheticSuffix=%t;restart=%d", withSyntheticSuffix, restartInterval), + func(b *testing.B) { + w := &blockWriter{ + restartInterval: restartInterval, + } + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) + + keys := createBenchBlock(blockSize, w, rng) + var syntheticSuffix []byte + if withSyntheticSuffix { + syntheticSuffix = benchSynthSuffix + } + + it, err := newBlockIter(benchCmp, benchSplit, w.finish(), syntheticSuffix) + if err != nil { + b.Fatal(err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + k := keys[rng.Intn(len(keys))] + it.SeekGE(k, base.SeekGEFlagsNone) + if testing.Verbose() { + if !it.valid() && !withSyntheticSuffix { + b.Fatal("expected to find key") + } + if !bytes.Equal(k, it.Key().UserKey) && !withSyntheticSuffix { + b.Fatalf("expected %s, but found %s", k, it.Key().UserKey) + } } } - } - }) + }) + } } } func BenchmarkBlockIterSeekLT(b *testing.B) { const blockSize = 32 << 10 + for _, withSyntheticSuffix := range []bool{false, true} { + for _, restartInterval := range []int{16} { + b.Run(fmt.Sprintf("syntheticSuffix=%t;restart=%d", withSyntheticSuffix, restartInterval), + func(b *testing.B) { + w := &blockWriter{ + restartInterval: restartInterval, + } + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) - for _, restartInterval := range []int{16} { - b.Run(fmt.Sprintf("restart=%d", restartInterval), - func(b *testing.B) { - w := &blockWriter{ - restartInterval: restartInterval, - } - - var ikey InternalKey - var keys [][]byte - for i := 0; w.estimatedSize() < blockSize; i++ { - key := []byte(fmt.Sprintf("%05d", i)) - keys = append(keys, key) - ikey.UserKey = key - w.add(ikey, nil) - } + keys := createBenchBlock(blockSize, w, rng) + var syntheticSuffix []byte + if withSyntheticSuffix { + syntheticSuffix = benchSynthSuffix + } - it, err := newBlockIter(bytes.Compare, nil, w.finish(), nil /* syntheticSuffix */) - if err != nil { - b.Fatal(err) - } - rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - j := rng.Intn(len(keys)) - it.SeekLT(keys[j], base.SeekLTFlagsNone) - if testing.Verbose() { - if j == 0 { - if it.valid() { - b.Fatal("unexpected key") - } - } else { - if !it.valid() { - b.Fatal("expected to find key") - } - k := keys[j-1] - if !bytes.Equal(k, it.Key().UserKey) { - b.Fatalf("expected %s, but found %s", k, it.Key().UserKey) + it, err := newBlockIter(benchCmp, benchSplit, w.finish(), syntheticSuffix) + if err != nil { + b.Fatal(err) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + j := rng.Intn(len(keys)) + it.SeekLT(keys[j], base.SeekLTFlagsNone) + if testing.Verbose() { + if j == 0 { + if it.valid() && !withSyntheticSuffix { + b.Fatal("unexpected key") + } + } else { + if !it.valid() && !withSyntheticSuffix { + b.Fatal("expected to find key") + } + k := keys[j-1] + if !bytes.Equal(k, it.Key().UserKey) && !withSyntheticSuffix { + b.Fatalf("expected %s, but found %s", k, it.Key().UserKey) + } } } } - } - }) + }) + } } } func BenchmarkBlockIterNext(b *testing.B) { const blockSize = 32 << 10 + for _, withSyntheticSuffix := range []bool{false, true} { + for _, restartInterval := range []int{16} { + b.Run(fmt.Sprintf("syntheticSuffix=%t;restart=%d", withSyntheticSuffix, restartInterval), + func(b *testing.B) { + w := &blockWriter{ + restartInterval: restartInterval, + } + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) - for _, restartInterval := range []int{16} { - b.Run(fmt.Sprintf("restart=%d", restartInterval), - func(b *testing.B) { - w := &blockWriter{ - restartInterval: restartInterval, - } - - var ikey InternalKey - for i := 0; w.estimatedSize() < blockSize; i++ { - ikey.UserKey = []byte(fmt.Sprintf("%05d", i)) - w.add(ikey, nil) - } + createBenchBlock(blockSize, w, rng) + var syntheticSuffix []byte + if withSyntheticSuffix { + syntheticSuffix = benchSynthSuffix + } - it, err := newBlockIter(bytes.Compare, nil, w.finish(), nil /* syntheticSuffix */) - if err != nil { - b.Fatal(err) - } + it, err := newBlockIter(benchCmp, benchSplit, w.finish(), syntheticSuffix) + if err != nil { + b.Fatal(err) + } - b.ResetTimer() - for i := 0; i < b.N; i++ { - if !it.valid() { - it.First() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if !it.valid() { + it.First() + } + it.Next() } - it.Next() - } - }) + }) + } } } func BenchmarkBlockIterPrev(b *testing.B) { const blockSize = 32 << 10 + for _, withSyntheticSuffix := range []bool{false, true} { + for _, restartInterval := range []int{16} { + b.Run(fmt.Sprintf("syntheticSuffix=%t;restart=%d", withSyntheticSuffix, restartInterval), + func(b *testing.B) { + w := &blockWriter{ + restartInterval: restartInterval, + } + rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) - for _, restartInterval := range []int{16} { - b.Run(fmt.Sprintf("restart=%d", restartInterval), - func(b *testing.B) { - w := &blockWriter{ - restartInterval: restartInterval, - } - - var ikey InternalKey - for i := 0; w.estimatedSize() < blockSize; i++ { - ikey.UserKey = []byte(fmt.Sprintf("%05d", i)) - w.add(ikey, nil) - } + createBenchBlock(blockSize, w, rng) + var syntheticSuffix []byte + if withSyntheticSuffix { + syntheticSuffix = benchSynthSuffix + } - it, err := newBlockIter(bytes.Compare, nil, w.finish(), nil /* syntheticSuffix */) - if err != nil { - b.Fatal(err) - } + it, err := newBlockIter(benchCmp, benchSplit, w.finish(), syntheticSuffix) + if err != nil { + b.Fatal(err) + } - b.ResetTimer() - for i := 0; i < b.N; i++ { - if !it.valid() { - it.Last() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if !it.valid() { + it.Last() + } + it.Prev() } - it.Prev() - } - }) + }) + } } } From a7ed317e622da96949e77a8e25837d58cd3c5f2e Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Fri, 26 Jan 2024 11:32:30 -0600 Subject: [PATCH 3/4] sstable/manifest: plumb synthetic suffix to virtual sst ingestion --- format_major_version.go | 2 ++ ingest.go | 5 +++++ internal/manifest/version.go | 7 ++++++ internal/manifest/version_edit.go | 12 +++++++++++ sstable/reader_test.go | 20 +++++++++++++---- sstable/reader_virtual.go | 1 + sstable/testdata/virtual_reader | 36 +++++++++++++++++++++++++++++++ table_cache.go | 6 ++++-- 8 files changed, 83 insertions(+), 6 deletions(-) diff --git a/format_major_version.go b/format_major_version.go index fc6c8dd17b..f3df40da8d 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -185,6 +185,8 @@ const ( // requires a format major version. FormatSyntheticPrefixes + // TODO(msbutler): add major version for synthetic suffixes + // -- Add new versions here -- // FormatNewest is the most recent format major version. diff --git a/ingest.go b/ingest.go index ef802473ad..6f736f5912 100644 --- a/ingest.go +++ b/ingest.go @@ -226,6 +226,7 @@ func ingestLoad1External( SyntheticPrefix: e.SyntheticPrefix, } } + meta.SyntheticSuffix = e.SyntheticSuffix if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil { return nil, err @@ -1125,6 +1126,10 @@ type ExternalFile struct { // is accessed as if those keys all instead have prefix SyntheticPrefix. // SyntheticPrefix must be a prefix of both SmallestUserKey and LargestUserKey. ContentPrefix, SyntheticPrefix []byte + // SyntheticSuffix will replace the suffix of every key in the file during + // iteration. Note that the file itself is not modifed, rather, every key + // returned by an iterator will have the synthetic suffix. + SyntheticSuffix []byte } // IngestWithStats does the same as Ingest, and additionally returns diff --git a/internal/manifest/version.go b/internal/manifest/version.go index dab434448c..5666438080 100644 --- a/internal/manifest/version.go +++ b/internal/manifest/version.go @@ -273,6 +273,7 @@ type FileMetadata struct { // PrefixReplacement is used for virtual files where the backing file has a // different prefix on its keys than the span in which it is being exposed. PrefixReplacement *PrefixReplacement + SyntheticSuffix []byte } // InternalKeyBounds returns the set of overall table bounds. @@ -900,6 +901,12 @@ func (m *FileMetadata) Validate(cmp Compare, formatKey base.FormatKey) error { } } + if m.SyntheticSuffix != nil { + if !m.Virtual { + return base.CorruptionErrorf("suffix replacement rule set with non-virtual file") + } + } + return nil } diff --git a/internal/manifest/version_edit.go b/internal/manifest/version_edit.go index e10e2e0294..3ba23f9910 100644 --- a/internal/manifest/version_edit.go +++ b/internal/manifest/version_edit.go @@ -64,6 +64,7 @@ const ( customTagNonSafeIgnoreMask = 1 << 6 customTagVirtual = 66 customTagPrefixRewrite = 67 + customTagSuffixRewrite = 68 ) // DeletedFileEntry holds the state for a file deletion from a level. The file @@ -338,6 +339,7 @@ func (v *VersionEdit) Decode(r io.Reader) error { backingFileNum uint64 }{} var virtualPrefix *PrefixReplacement + var syntheticSuffix []byte if tag == tagNewFile4 || tag == tagNewFile5 { for { customTag, err := d.readUvarint() @@ -368,6 +370,11 @@ func (v *VersionEdit) Decode(r io.Reader) error { SyntheticPrefix: synthetic, } continue + } else if customTag == customTagSuffixRewrite { + syntheticSuffix, err = d.readBytes() + if err != nil { + return err + } } field, err := d.readBytes() @@ -407,6 +414,7 @@ func (v *VersionEdit) Decode(r io.Reader) error { MarkedForCompaction: markedForCompaction, Virtual: virtualState.virtual, PrefixReplacement: virtualPrefix, + SyntheticSuffix: syntheticSuffix, } if tag != tagNewFile5 { // no range keys present m.SmallestPointKey = base.DecodeInternalKey(smallestPointKey) @@ -628,6 +636,10 @@ func (v *VersionEdit) Encode(w io.Writer) error { e.writeBytes(x.Meta.PrefixReplacement.ContentPrefix) e.writeBytes(x.Meta.PrefixReplacement.SyntheticPrefix) } + if x.Meta.SyntheticSuffix != nil { + e.writeUvarint(customTagSuffixRewrite) + e.writeBytes(x.Meta.SyntheticSuffix) + } e.writeUvarint(customTagTerminate) } } diff --git a/sstable/reader_test.go b/sstable/reader_test.go index c4f024b270..bd2dba3968 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -299,6 +299,9 @@ func TestVirtualReader(t *testing.T) { writerOpts.BlockSize = 1 } } + if td.HasArg("with-suffix") { + writerOpts.Comparer = testkeys.Comparer + } wMeta, r, err = runBuildCmd(td, writerOpts, 0) if err != nil { return err.Error() @@ -327,11 +330,20 @@ func TestVirtualReader(t *testing.T) { vMeta1.FileMetadata = nil v = VirtualReader{} } + + var syntheticSuffix []byte + if td.HasArg("suffix") { + var synthSuffixStr string + td.ScanArgs(t, "suffix", &synthSuffixStr) + syntheticSuffix = []byte(synthSuffixStr) + } + vMeta := &manifest.FileMetadata{ - FileBacking: meta.FileBacking, - SmallestSeqNum: meta.SmallestSeqNum, - LargestSeqNum: meta.LargestSeqNum, - Virtual: true, + FileBacking: meta.FileBacking, + SmallestSeqNum: meta.SmallestSeqNum, + LargestSeqNum: meta.LargestSeqNum, + Virtual: true, + SyntheticSuffix: syntheticSuffix, } // Parse the virtualization bounds. bounds := strings.Split(td.CmdArgs[0].String(), "-") diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index d2d09714a9..ee30c735a0 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -56,6 +56,7 @@ func MakeVirtualReader(reader *Reader, meta manifest.VirtualFileMeta, isShared b Compare: reader.Compare, isSharedIngested: isShared && reader.Properties.GlobalSeqNum != 0, prefixChange: meta.PrefixReplacement, + syntheticSuffix: meta.SyntheticSuffix, } v := VirtualReader{ vState: vState, diff --git a/sstable/testdata/virtual_reader b/sstable/testdata/virtual_reader index 1fa1669206..7f0ef064cd 100644 --- a/sstable/testdata/virtual_reader +++ b/sstable/testdata/virtual_reader @@ -690,3 +690,39 @@ virtualize a.SET.1-b.SET.5 bounds: [a#1,1-b#5,1] filenum: 000026 props: NumEntries: 1, RawKeySize: 3, RawValueSize: 1, RawPointTombstoneKeySize: 0, RawPointTombstoneValueSize: 0, NumSizedDeletions: 0, NumDeletions: 1, NumRangeDeletions: 1, NumRangeKeyDels: 0, NumRangeKeySets: 1, ValueBlocksSize: 0 + +# Test that a virtual reader with a suffix replacement rule replaces the +# suffixes from the backing file during iteration. +build with-suffix +a@2.SET.1:a +b@4.SET.2:b +c@3.SET.3:c +d@5.SET.4:d +e@1.SET.5:e +f@2.SET.6:f +g@2.SET.8:g +h@3.SET.9:h +---- +point: [a@2#1,1-h@3#9,1] +seqnums: [1-9] + +# Set bounds c@7-f@2 for the virtual sstable. Notice that we correctly elide c +# because post suffix replacement, it is not in the bounds. Further, notice that +# we do surface f because post suffix replacement, it is within the bounds. +virtualize c@7.SET.3-f@4.SET.8 suffix=@8 +---- +bounds: [c@7#3,1-f@4#8,1] +filenum: 000028 +props: NumEntries: 2, RawKeySize: 13, RawValueSize: 2, RawPointTombstoneKeySize: 0, RawPointTombstoneValueSize: 0, NumSizedDeletions: 0, NumDeletions: 0, NumRangeDeletions: 0, NumRangeKeyDels: 0, NumRangeKeySets: 0, ValueBlocksSize: 0 + +# Just test a basic iterator once virtual sstable bounds have been set. +iter +first +next +next +next +---- +:d +:e +:f +. diff --git a/table_cache.go b/table_cache.go index c1ce0b305a..81b5c8782e 100644 --- a/table_cache.go +++ b/table_cache.go @@ -829,7 +829,8 @@ func (c *tableCacheShard) findNode(meta *fileMetadata, dbOpts *tableCacheOpts) * // the same backing will use the same reader from the cache; so no information // that can differ among these virtual tables can be plumbed into loadInfo. info := loadInfo{ - backingFileNum: meta.FileBacking.DiskFileNum, + backingFileNum: meta.FileBacking.DiskFileNum, + syntheticSuffix: meta.SyntheticSuffix, } // All virtual tables sharing an ingested backing will have the same // SmallestSeqNum=LargestSeqNum value. We assert that below. @@ -1167,7 +1168,8 @@ type tableCacheValue struct { type loadInfo struct { backingFileNum base.DiskFileNum // See sstable.Properties.GlobalSeqNum. - globalSeqNum uint64 + globalSeqNum uint64 + syntheticSuffix []byte } func (v *tableCacheValue) load(loadInfo loadInfo, c *tableCacheShard, dbOpts *tableCacheOpts) { From b94edb656bd9e2201c7b3814e5b96c43d8826650 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Sat, 27 Jan 2024 13:03:58 -0600 Subject: [PATCH 4/4] sstable: inject suffix replacement into block property filtering This patch teaches block property filters to update the passed in block property at read time via the new `SyntheticSuffixIntersects()` interface method. The BlockPropertyFilterer will only call this method if the underlying reader was initialized with a synthetic suffix. This patch assumes that the underlying sst with a suffix replacement rule only contains point keys, implying the only BlockPropertyFilter implementation that needs to implement this new method is the BlockPropertyIntervalFilter. When the filterer calls `SyntheticIntersects` with a `BlockPropertyIntervalFilter`, the interval bounds of the passed in block prop may update, depending on the new `BlockIntervalSyntheticReplacer` that the `BlockPropertyIntervalFilter` is initialized with. The production `BlockIntervalSyntheticReplacer` will live in the cockroach repository, as it's aware of the cockroach key encoding. --- external_iterator_test.go | 2 +- internal/base/options.go | 5 ++ iterator_test.go | 13 +-- range_keys.go | 8 ++ sstable/block_property.go | 85 ++++++++++++++++--- sstable/block_property_test.go | 63 ++++++++++++-- sstable/block_property_test_utils.go | 7 +- sstable/random_test.go | 2 +- sstable/reader_test.go | 4 +- sstable/testdata/block_properties | 42 +++++++++ .../testdata/block_properties_boundlimited | 24 ++++++ sstable/writer.go | 15 ++++ table_cache.go | 6 +- 13 files changed, 242 insertions(+), 34 deletions(-) diff --git a/external_iterator_test.go b/external_iterator_test.go index ebb984ea17..6d2c87ee6f 100644 --- a/external_iterator_test.go +++ b/external_iterator_test.go @@ -235,7 +235,7 @@ func TestIterRandomizedMaybeFilteredKeys(t *testing.T) { defer r.Close() filter := sstable.NewTestKeysBlockPropertyFilter(uint64(tsSeparator), math.MaxUint64) - filterer, err := sstable.IntersectsTable([]BlockPropertyFilter{filter}, nil, r.Properties.UserProperties) + filterer, err := sstable.IntersectsTable([]BlockPropertyFilter{filter}, nil, r.Properties.UserProperties, nil) require.NoError(t, err) require.NotNil(t, filterer) diff --git a/internal/base/options.go b/internal/base/options.go index 316717e8f3..eff22c7fac 100644 --- a/internal/base/options.go +++ b/internal/base/options.go @@ -73,4 +73,9 @@ type BlockPropertyFilter interface { // Intersects returns true if the set represented by prop intersects with // the set in the filter. Intersects(prop []byte) (bool, error) + // SyntheticSuffixIntersects runs Intersects, but only after using the passed in + // suffix arg to modify a decoded copy of the passed in prop. This method only + // needs to be implemented for filters which that will be used with suffix + // replacement. + SyntheticSuffixIntersects(prop []byte, suffix []byte) (bool, error) } diff --git a/iterator_test.go b/iterator_test.go index c4823a2cb4..8e77c8012b 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -509,6 +509,10 @@ func (f *minSeqNumFilter) Intersects(prop []byte) (bool, error) { return minSeqNum < f.seqNumUpperBound, nil } +func (f *minSeqNumFilter) SyntheticSuffixIntersects(prop []byte, suffix []byte) (bool, error) { + panic("unimplemented") +} + func TestReadSampling(t *testing.T) { var d *DB defer func() { @@ -1254,8 +1258,7 @@ func TestIteratorBlockIntervalFilter(t *testing.T) { return err.Error() } opts.PointKeyFilters = append(opts.PointKeyFilters, - sstable.NewBlockIntervalFilter(fmt.Sprintf("%d", id), - uint64(lower), uint64(upper))) + sstable.NewBlockIntervalFilter(fmt.Sprintf("%d", id), uint64(lower), uint64(upper), nil)) default: return fmt.Sprintf("unknown key: %s", arg.Key) } @@ -1347,8 +1350,7 @@ func TestIteratorRandomizedBlockIntervalFilter(t *testing.T) { var iterOpts IterOptions iterOpts.PointKeyFilters = []BlockPropertyFilter{ - sstable.NewBlockIntervalFilter("0", - uint64(lower), uint64(upper)), + sstable.NewBlockIntervalFilter("0", uint64(lower), uint64(upper), nil), } iter, _ := d.NewIter(&iterOpts) defer func() { @@ -2217,8 +2219,7 @@ func BenchmarkBlockPropertyFilter(b *testing.B) { var iterOpts IterOptions if filter { iterOpts.PointKeyFilters = []BlockPropertyFilter{ - sstable.NewBlockIntervalFilter("0", - uint64(0), uint64(1)), + sstable.NewBlockIntervalFilter("0", uint64(0), uint64(1), nil), } } iter, _ := d.NewIter(&iterOpts) diff --git a/range_keys.go b/range_keys.go index eb39ae4779..2399bd6c81 100644 --- a/range_keys.go +++ b/range_keys.go @@ -403,6 +403,14 @@ func (m *rangeKeyMasking) Intersects(prop []byte) (bool, error) { return m.filter.Intersects(prop) } +func (m *rangeKeyMasking) SyntheticSuffixIntersects(prop []byte, suffix []byte) (bool, error) { + if m.maskSpan == nil { + // No span is actively masking. + return true, nil + } + return m.filter.SyntheticSuffixIntersects(prop, suffix) +} + // KeyIsWithinLowerBound implements the limitedBlockPropertyFilter interface // defined in the sstable package. It's used to restrict the masking block // property filter to only applying within the bounds of the active range key. diff --git a/sstable/block_property.go b/sstable/block_property.go index d85a2b2c31..1b3ed04ada 100644 --- a/sstable/block_property.go +++ b/sstable/block_property.go @@ -11,6 +11,7 @@ import ( "sync" "unsafe" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/rangekey" ) @@ -413,12 +414,23 @@ func (w *suffixReplacementBlockCollectorWrapper) UpdateKeySuffixes( return w.BlockIntervalCollector.points.(SuffixReplaceableBlockCollector).UpdateKeySuffixes(oldProp, from, to) } +// BlockIntervalSyntheticReplacer provides methods to conduct just in time +// adjustments of a passed in block prop interval before filtering. +type BlockIntervalSyntheticReplacer interface { + // AdjustIntervalWithSyntheticSuffix adjusts the [lower, upper) range for a data + // block (specifically, the range returned by the corresponding + // DataBlockIntervalCollector.FinishDataBlock) to what it would have been if + // all the input keys had the given synthetic suffix. + AdjustIntervalWithSyntheticSuffix(lower uint64, upper uint64, suffix []byte) (adjustedLower uint64, adjustedUpper uint64, err error) +} + // BlockIntervalFilter is an implementation of BlockPropertyFilter when the // corresponding collector is a BlockIntervalCollector. That is, the set is of // the form [lower, upper). type BlockIntervalFilter struct { - name string - filterInterval interval + name string + filterInterval interval + syntheticReplacer BlockIntervalSyntheticReplacer } var _ BlockPropertyFilter = (*BlockIntervalFilter)(nil) @@ -427,9 +439,11 @@ var _ BlockPropertyFilter = (*BlockIntervalFilter)(nil) // based on an interval property collected by BlockIntervalCollector and the // given [lower, upper) bounds. The given name specifies the // BlockIntervalCollector's properties to read. -func NewBlockIntervalFilter(name string, lower uint64, upper uint64) *BlockIntervalFilter { +func NewBlockIntervalFilter( + name string, lower uint64, upper uint64, syntheticReplacer BlockIntervalSyntheticReplacer, +) *BlockIntervalFilter { b := new(BlockIntervalFilter) - b.Init(name, lower, upper) + b.Init(name, lower, upper, syntheticReplacer) return b } @@ -437,10 +451,13 @@ func NewBlockIntervalFilter(name string, lower uint64, upper uint64) *BlockInter // BLockPropertyFilter to filter blocks based on an interval property collected // by BlockIntervalCollector and the given [lower, upper) bounds. The given name // specifies the BlockIntervalCollector's properties to read. -func (b *BlockIntervalFilter) Init(name string, lower, upper uint64) { +func (b *BlockIntervalFilter) Init( + name string, lower, upper uint64, syntheticReplacer BlockIntervalSyntheticReplacer, +) { *b = BlockIntervalFilter{ - name: name, - filterInterval: interval{lower: lower, upper: upper}, + name: name, + filterInterval: interval{lower: lower, upper: upper}, + syntheticReplacer: syntheticReplacer, } } @@ -458,6 +475,28 @@ func (b *BlockIntervalFilter) Intersects(prop []byte) (bool, error) { return i.intersects(b.filterInterval), nil } +// SyntheticSuffixIntersects implements the BlockPropertyFilter interface. +func (b *BlockIntervalFilter) SyntheticSuffixIntersects(prop []byte, suffix []byte) (bool, error) { + if b.syntheticReplacer == nil { + return false, errors.AssertionFailedf("missing SyntheticReplacer for SyntheticSuffixIntersects()") + } + var i interval + if err := i.decode(prop); err != nil { + return false, err + } + + newLower, newUpper, err := b.syntheticReplacer.AdjustIntervalWithSyntheticSuffix(i.lower, i.upper, suffix) + if err != nil { + return false, err + } + if newLower < i.upper { + // This isn't an =< comparator because the original i.upper bound is exclusive. + return false, base.CorruptionErrorf(fmt.Sprintf("the synthetic suffix %d is the less than the property upper bound %d", newLower, i.upper)) + } + newInterval := interval{lower: newLower, upper: newUpper} + return newInterval.intersects(b.filterInterval), nil +} + // SetInterval adjusts the [lower, upper) bounds used by the filter. It is not // generally safe to alter the filter while it's in use, except as part of the // implementation of BlockPropertyFilterMask.SetSuffix used for range-key @@ -580,6 +619,8 @@ type BlockPropertiesFilterer struct { // collected when the table was built. boundLimitedFilter BoundLimitedBlockPropertyFilter boundLimitedShortID int + + syntheticSuffix SyntheticSuffix } var blockPropertiesFiltererPool = sync.Pool{ @@ -591,7 +632,9 @@ var blockPropertiesFiltererPool = sync.Pool{ // newBlockPropertiesFilterer returns a partially initialized filterer. To complete // initialization, call IntersectsUserPropsAndFinishInit. func newBlockPropertiesFilterer( - filters []BlockPropertyFilter, limited BoundLimitedBlockPropertyFilter, + filters []BlockPropertyFilter, + limited BoundLimitedBlockPropertyFilter, + syntheticSuffix SyntheticSuffix, ) *BlockPropertiesFilterer { filterer := blockPropertiesFiltererPool.Get().(*BlockPropertiesFilterer) *filterer = BlockPropertiesFilterer{ @@ -599,6 +642,7 @@ func newBlockPropertiesFilterer( shortIDToFiltersIndex: filterer.shortIDToFiltersIndex[:0], boundLimitedFilter: limited, boundLimitedShortID: -1, + syntheticSuffix: syntheticSuffix, } return filterer } @@ -620,8 +664,9 @@ func IntersectsTable( filters []BlockPropertyFilter, limited BoundLimitedBlockPropertyFilter, userProperties map[string]string, + syntheticSuffix SyntheticSuffix, ) (*BlockPropertiesFilterer, error) { - f := newBlockPropertiesFilterer(filters, limited) + f := newBlockPropertiesFilterer(filters, limited, syntheticSuffix) ok, err := f.intersectsUserPropsAndFinishInit(userProperties) if !ok || err != nil { releaseBlockPropertiesFilterer(f) @@ -655,7 +700,13 @@ func (f *BlockPropertiesFilterer) intersectsUserPropsAndFinishInit( // Note that unsafe.StringData only works if the string is not empty // (which we already checked). byteProps := unsafe.Slice(unsafe.StringData(props), len(props)) - intersects, err := f.filters[i].Intersects(byteProps[1:]) + var intersects bool + var err error + if len(f.syntheticSuffix) == 0 { + intersects, err = f.filters[i].Intersects(byteProps[1:]) + } else { + intersects, err = f.filters[i].SyntheticSuffixIntersects(byteProps[1:], f.syntheticSuffix) + } if err != nil || !intersects { return false, err } @@ -792,8 +843,14 @@ func (f *BlockPropertiesFilterer) intersects(props []byte) (ret intersectsResult } func (f *BlockPropertiesFilterer) intersectsFilter(i int, prop []byte) (intersectsResult, error) { + var intersects bool + var err error if f.shortIDToFiltersIndex[i] >= 0 { - intersects, err := f.filters[f.shortIDToFiltersIndex[i]].Intersects(prop) + if len(f.syntheticSuffix) == 0 { + intersects, err = f.filters[f.shortIDToFiltersIndex[i]].Intersects(prop) + } else { + intersects, err = f.filters[f.shortIDToFiltersIndex[i]].SyntheticSuffixIntersects(prop, f.syntheticSuffix) + } if err != nil { return blockIntersects, err } @@ -809,7 +866,11 @@ func (f *BlockPropertiesFilterer) intersectsFilter(i int, prop []byte) (intersec // Intersects determines that there is no intersection, we return // `blockMaybeExcluded` if no other bpf unconditionally excludes the // block. - intersects, err := f.boundLimitedFilter.Intersects(prop) + if len(f.syntheticSuffix) == 0 { + intersects, err = f.boundLimitedFilter.Intersects(prop) + } else { + intersects, err = f.boundLimitedFilter.SyntheticSuffixIntersects(prop, f.syntheticSuffix) + } if err != nil { return blockIntersects, err } else if !intersects { diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go index b5b52584a6..cf144a64fa 100644 --- a/sstable/block_property_test.go +++ b/sstable/block_property_test.go @@ -7,6 +7,7 @@ package sstable import ( "bytes" "context" + "encoding/binary" "fmt" "io" "math" @@ -271,7 +272,7 @@ func TestBlockIntervalFilter(t *testing.T) { var points testDataBlockIntervalCollector name := "foo" bic := NewBlockIntervalCollector(name, &points, nil) - bif := NewBlockIntervalFilter(name, tc.filter.lower, tc.filter.upper) + bif := NewBlockIntervalFilter(name, tc.filter.lower, tc.filter.upper, nil) points.i = tc.prop prop, _ := bic.FinishDataBlock(nil) intersects, err := bif.Intersects(prop) @@ -343,6 +344,12 @@ func (b filterWithTrueForEmptyProp) Intersects(prop []byte) (bool, error) { return b.BlockPropertyFilter.Intersects(prop) } +func (b filterWithTrueForEmptyProp) SyntheticSuffixIntersects( + prop []byte, suffix []byte, +) (bool, error) { + panic("unimplemented") +} + func TestBlockPropertiesFilterer_IntersectsUserPropsAndFinishInit(t *testing.T) { // props with id=0, interval [10, 20); id=10, interval [110, 120). var dbic testDataBlockIntervalCollector @@ -475,10 +482,10 @@ func TestBlockPropertiesFilterer_IntersectsUserPropsAndFinishInit(t *testing.T) t.Run(tc.name, func(t *testing.T) { var filters []BlockPropertyFilter for _, f := range tc.filters { - filter := NewBlockIntervalFilter(f.name, f.i.lower, f.i.upper) + filter := NewBlockIntervalFilter(f.name, f.i.lower, f.i.upper, nil) filters = append(filters, filter) } - filterer := newBlockPropertiesFilterer(filters, nil) + filterer := newBlockPropertiesFilterer(filters, nil, nil) intersects, err := filterer.intersectsUserPropsAndFinishInit(tc.userProps) require.NoError(t, err) require.Equal(t, tc.intersects, intersects) @@ -719,7 +726,7 @@ func TestBlockPropertiesFilterer_Intersects(t *testing.T) { } } for _, f := range tc.filters { - filter := NewBlockIntervalFilter("", f.i.lower, f.i.upper) + filter := NewBlockIntervalFilter("", f.i.lower, f.i.upper, nil) bpf := BlockPropertyFilter(filter) if f.intersectsForEmptyProp { bpf = filterWithTrueForEmptyProp{filter} @@ -895,8 +902,15 @@ func TestBlockProperties(t *testing.T) { return runBlockPropsCmd(r, td) case "filter": + syntheticSuffix := make([]byte, 0, binary.MaxVarintLen64) var points, ranges []BlockPropertyFilter for _, cmd := range td.CmdArgs { + if cmd.Key == "synthetic" { + var suffix uint64 + td.ScanArgs(t, "synthetic", &suffix) + syntheticSuffix = binary.AppendUvarint(nil, suffix) + continue + } filter, err := parseIntervalFilter(cmd) if err != nil { return err.Error() @@ -916,7 +930,7 @@ func TestBlockProperties(t *testing.T) { var f *BlockPropertiesFilterer buf.WriteString("points: ") if len(points) > 0 { - f = newBlockPropertiesFilterer(points, nil) + f = newBlockPropertiesFilterer(points, nil, syntheticSuffix) ok, err := f.intersectsUserPropsAndFinishInit(r.Properties.UserProperties) if err != nil { return err.Error() @@ -970,7 +984,7 @@ func TestBlockProperties(t *testing.T) { // Range key filter matches. buf.WriteString("ranges: ") if len(ranges) > 0 { - f := newBlockPropertiesFilterer(ranges, nil) + f := newBlockPropertiesFilterer(ranges, nil, nil) ok, err := f.intersectsUserPropsAndFinishInit(r.Properties.UserProperties) if err != nil { return err.Error() @@ -1001,7 +1015,7 @@ func TestBlockProperties(t *testing.T) { filters = append(filters, f) } } - filterer := newBlockPropertiesFilterer(filters, nil) + filterer := newBlockPropertiesFilterer(filters, nil, nil) ok, err := filterer.intersectsUserPropsAndFinishInit(r.Properties.UserProperties) if err != nil { return err.Error() @@ -1053,6 +1067,7 @@ func TestBlockProperties_BoundLimited(t *testing.T) { case "iter": var buf bytes.Buffer var lower, upper []byte + syntheticSuffix := make([]byte, 0, binary.MaxVarintLen64) filter := boundLimitedWrapper{ w: &buf, cmp: testkeys.Comparer.Compare, @@ -1075,13 +1090,17 @@ func TestBlockProperties_BoundLimited(t *testing.T) { case "filter-lower": ik := base.MakeInternalKey([]byte(arg.Vals[0]), 0, base.InternalKeyKindSet) filter.lower = &ik + case "synthetic": + var suffix uint64 + td.ScanArgs(t, "synthetic", &suffix) + syntheticSuffix = binary.AppendUvarint(nil, suffix) } } if filter.inner == nil { return "missing block property filter" } - filterer := newBlockPropertiesFilterer(nil, &filter) + filterer := newBlockPropertiesFilterer(nil, &filter, syntheticSuffix) ok, err := filterer.intersectsUserPropsAndFinishInit(r.Properties.UserProperties) if err != nil { return err.Error() @@ -1135,6 +1154,21 @@ func (bl *boundLimitedWrapper) Intersects(prop []byte) (bool, error) { return v, err } +func (bl *boundLimitedWrapper) SyntheticSuffixIntersects(prop []byte, suffix []byte) (bool, error) { + propString := fmt.Sprintf("%x", prop) + var i interval + if err := i.decode(prop); err == nil { + // If it decodes as an interval, pretty print it as an interval. + propString = fmt.Sprintf("[%d, %d)", i.lower, i.upper) + } + + v, err := bl.inner.SyntheticSuffixIntersects(prop, suffix) + if bl.w != nil { + fmt.Fprintf(bl.w, " filter.SyntheticSuffixIntersects(%s) = (%t, %v)\n", propString, v, err) + } + return v, err +} + func (bl *boundLimitedWrapper) KeyIsWithinLowerBound(key []byte) (ret bool) { if bl.lower == nil { ret = true @@ -1159,6 +1193,17 @@ func (bl *boundLimitedWrapper) KeyIsWithinUpperBound(key []byte) (ret bool) { return ret } +var _ BlockIntervalSyntheticReplacer = testingBlockIntervalSyntheticReplacer{} + +type testingBlockIntervalSyntheticReplacer struct{} + +func (sr testingBlockIntervalSyntheticReplacer) AdjustIntervalWithSyntheticSuffix( + lower uint64, upper uint64, suffix []byte, +) (adjustedLower uint64, adjustedUpper uint64, err error) { + synthDecoded, _ := binary.Uvarint(suffix) + return synthDecoded, synthDecoded + 1, nil +} + func parseIntervalFilter(cmd datadriven.CmdArg) (BlockPropertyFilter, error) { name := cmd.Vals[0] minS, maxS := cmd.Vals[1], cmd.Vals[2] @@ -1170,7 +1215,7 @@ func parseIntervalFilter(cmd datadriven.CmdArg) (BlockPropertyFilter, error) { if err != nil { return nil, err } - return NewBlockIntervalFilter(name, min, max), nil + return NewBlockIntervalFilter(name, min, max, testingBlockIntervalSyntheticReplacer{}), nil } func runCollectorsCmd(r *Reader, td *datadriven.TestData) string { diff --git a/sstable/block_property_test_utils.go b/sstable/block_property_test_utils.go index 0ade68ff9b..38b32528e7 100644 --- a/sstable/block_property_test_utils.go +++ b/sstable/block_property_test_utils.go @@ -35,7 +35,7 @@ func NewTestKeysBlockPropertyCollector() BlockPropertyCollector { // and keys with suffixes within the range [filterMin, filterMax). For keys with // suffixes outside the range, iteration is nondeterministic. func NewTestKeysBlockPropertyFilter(filterMin, filterMax uint64) *BlockIntervalFilter { - return NewBlockIntervalFilter(testKeysBlockPropertyName, filterMin, filterMax) + return NewBlockIntervalFilter(testKeysBlockPropertyName, filterMin, filterMax, nil) } // NewTestKeysMaskingFilter constructs a TestKeysMaskingFilter that implements @@ -68,6 +68,11 @@ func (f TestKeysMaskingFilter) Intersects(prop []byte) (bool, error) { return f.BlockIntervalFilter.Intersects(prop) } +// SyntheticSuffixIntersects implements the BlockPropertyFilter interface. +func (f TestKeysMaskingFilter) SyntheticSuffixIntersects(prop []byte, suffix []byte) (bool, error) { + panic("unimplemented") +} + var _ DataBlockIntervalCollector = (*testKeysSuffixIntervalCollector)(nil) // testKeysSuffixIntervalCollector maintains an interval over the timestamps in diff --git a/sstable/random_test.go b/sstable/random_test.go index 83e83caa3a..f45ba08695 100644 --- a/sstable/random_test.go +++ b/sstable/random_test.go @@ -88,7 +88,7 @@ func runErrorInjectionTest(t *testing.T, seed int64) { } filterer = newBlockPropertiesFilterer([]BlockPropertyFilter{ NewTestKeysBlockPropertyFilter(low, high), - }, nil) + }, nil, nil) } // TOOD(jackson): NewIterWithBlockPropertyFilters returns an iterator over diff --git a/sstable/reader_test.go b/sstable/reader_test.go index bd2dba3968..6d32a83a1c 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -815,7 +815,7 @@ func runTestReader(t *testing.T, o WriterOptions, dir string, r *Reader, printVa } var filterer *BlockPropertiesFilterer if len(bpfs) > 0 { - filterer = newBlockPropertiesFilterer(bpfs, nil) + filterer = newBlockPropertiesFilterer(bpfs, nil, nil) intersects, err := filterer.intersectsUserPropsAndFinishInit(r.Properties.UserProperties) if err != nil { @@ -2315,7 +2315,7 @@ func BenchmarkIteratorScanObsolete(b *testing.B) { var filterer *BlockPropertiesFilterer if format == TableFormatPebblev4 && hideObsoletePoints { filterer = newBlockPropertiesFilterer( - []BlockPropertyFilter{obsoleteKeyBlockPropertyFilter{}}, nil) + []BlockPropertyFilter{obsoleteKeyBlockPropertyFilter{}}, nil, nil) intersects, err := filterer.intersectsUserPropsAndFinishInit(r.Properties.UserProperties) if err != nil { diff --git a/sstable/testdata/block_properties b/sstable/testdata/block_properties index 8cf2db12e3..de3b04b661 100644 --- a/sstable/testdata/block_properties +++ b/sstable/testdata/block_properties @@ -669,3 +669,45 @@ seek-ge ee . MaybeFilteredKeys()=true . MaybeFilteredKeys()=true . MaybeFilteredKeys()=true + +# Create a table with a single block of point keys to test suffix replacement +# injection during block property filtering + +build block-size=1 collectors=(suffix-point-keys-only) +a@5.SET.1:foo +b@8.SET.2:bar +c@7.SET.3:baz +---- +point: [a@5#1,1,c@7#3,1] +rangedel: [#0,0,#0,0] +rangekey: [#0,0,#0,0] +seqnums: [1,3] + +collectors +---- +0: suffix-point-keys-only + +block-props +---- +b#72057594037927935,17: + 0: [5, 6) +c#72057594037927935,17: + 0: [8, 9) +d#72057594037927935,17: + 0: [7, 8) + +table-props +---- +0: [5, 9) + +# Blocks only match with synthetic replacement + +filter point-filter=(suffix-point-keys-only,10,20) synthetic=12 +---- +points: true, blocks=[0,1,2] +ranges: true (no filters provided) + +filter point-filter=(suffix-point-keys-only,10,20) +---- +points: false +ranges: true (no filters provided) diff --git a/sstable/testdata/block_properties_boundlimited b/sstable/testdata/block_properties_boundlimited index d67907ebb4..f6decc90b2 100644 --- a/sstable/testdata/block_properties_boundlimited +++ b/sstable/testdata/block_properties_boundlimited @@ -114,6 +114,30 @@ next MaybeFilteredKeys()=false . MaybeFilteredKeys()=false +# Test the same case above but inject a synthetic suffix +# that causes block 3 to intersect in the filter + +iter filter=(suffix,3,20) synthetic=10 +first +next +next +next +next +next +next +---- + filter.SyntheticSuffixIntersects([2, 6)) = (true, ) + MaybeFilteredKeys()=false + MaybeFilteredKeys()=false + filter.SyntheticSuffixIntersects([3, 10)) = (true, ) + MaybeFilteredKeys()=false + MaybeFilteredKeys()=false + filter.SyntheticSuffixIntersects([0, 3)) = (true, ) + MaybeFilteredKeys()=false + MaybeFilteredKeys()=false + filter.SyntheticSuffixIntersects([3, 9)) = (true, ) + MaybeFilteredKeys()=false + # Test a case that filters the first two blocks. The third block is not filtered # due to block-property intersection. The fourth block is not filtered due to # the upper bound. diff --git a/sstable/writer.go b/sstable/writer.go index 0b2a193d9f..80a578480f 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -2413,6 +2413,21 @@ func (o obsoleteKeyBlockPropertyFilter) Intersects(prop []byte) (bool, error) { return propToIsObsolete(prop) } +func (o obsoleteKeyBlockPropertyFilter) SyntheticSuffixIntersects( + prop []byte, suffix []byte, +) (bool, error) { + // A block with suffix replacement should never be + // obselete, so return an assertion error if it is. + isNotObsolete, err := o.Intersects(prop) + if err != nil { + return false, err + } + if !isNotObsolete { + return false, errors.AssertionFailedf("block with synthetic suffix is obselete") + } + return true, nil +} + func propToIsObsolete(prop []byte) (bool, error) { if len(prop) == 0 { return true, nil diff --git a/table_cache.go b/table_cache.go index 81b5c8782e..74f12960bc 100644 --- a/table_cache.go +++ b/table_cache.go @@ -458,6 +458,7 @@ func (c *tableCacheShard) checkAndIntersectFilters( tableFilter func(userProps map[string]string) bool, blockPropertyFilters []BlockPropertyFilter, boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter, + syntheticSuffix sstable.SyntheticSuffix, ) (ok bool, filterer *sstable.BlockPropertiesFilterer, err error) { if tableFilter != nil && !tableFilter(v.reader.Properties.UserProperties) { @@ -469,6 +470,7 @@ func (c *tableCacheShard) checkAndIntersectFilters( blockPropertyFilters, boundLimitedFilter, v.reader.Properties.UserProperties, + syntheticSuffix, ) // NB: IntersectsTable will return a nil filterer if the table-level // properties indicate there's no intersection with the provided filters. @@ -583,7 +585,7 @@ func (c *tableCacheShard) newPointIter( var ok bool var err error ok, filterer, err = c.checkAndIntersectFilters(v, opts.TableFilter, - pointKeyFilters, internalOpts.boundLimitedFilter) + pointKeyFilters, internalOpts.boundLimitedFilter, file.SyntheticSuffix) if err != nil { return nil, err } else if !ok { @@ -687,7 +689,7 @@ func (c *tableCacheShard) newRangeKeyIter( // done here, rather than deferring to the block-property collector in order // to maintain parity with point keys and the treatment of RANGEDELs. if v.reader.Properties.NumRangeKeyDels == 0 && len(opts.RangeKeyFilters) > 0 { - ok, _, err := c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil) + ok, _, err := c.checkAndIntersectFilters(v, nil, opts.RangeKeyFilters, nil, nil) if err != nil { return nil, err } else if !ok {