diff --git a/compaction.go b/compaction.go index f37e889bc11..dcd238dce53 100644 --- a/compaction.go +++ b/compaction.go @@ -1028,8 +1028,8 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r iterOpts := IterOptions{logger: c.logger} addItersForLevel := func(iters []internalIterator, level *compactionLevel) ([]internalIterator, error) { - iters = append(iters, newLevelIter(iterOpts, c.cmp, newIters, level.files.Iter(), - manifest.Level(level.level), &c.bytesIterated)) + iters = append(iters, newLevelIter(iterOpts, c.cmp, nil /* split */, newIters, + level.files.Iter(), manifest.Level(level.level), &c.bytesIterated)) // Add the range deletion iterator for each file as an independent level // in mergingIter, as opposed to making a levelIter out of those. This // is safer as levelIter expects all keys coming from underlying diff --git a/db.go b/db.go index 5561e54348a..218df6c5857 100644 --- a/db.go +++ b/db.go @@ -793,7 +793,7 @@ func finishInitializingIter(buf *iterAlloc) *Iterator { li = &levelIter{} } - li.init(dbi.opts, dbi.cmp, dbi.newIters, files, level, nil) + li.init(dbi.opts, dbi.cmp, dbi.split, dbi.newIters, files, level, nil) li.initRangeDel(&mlevels[0].rangeDelIter) li.initSmallestLargestUserKey(&mlevels[0].smallestUserKey, &mlevels[0].largestUserKey, &mlevels[0].isLargestUserKeyRangeDelSentinel) diff --git a/get_iter.go b/get_iter.go index 87b241bea17..61791808774 100644 --- a/get_iter.go +++ b/get_iter.go @@ -145,7 +145,7 @@ func (g *getIter) Next() (*InternalKey, []byte) { files := g.l0[n-1].Iter() g.l0 = g.l0[:n-1] iterOpts := IterOptions{logger: g.logger} - g.levelIter.init(iterOpts, g.cmp, g.newIters, files, manifest.L0Sublevel(n), nil) + g.levelIter.init(iterOpts, g.cmp, nil, g.newIters, files, manifest.L0Sublevel(n), nil) g.levelIter.initRangeDel(&g.rangeDelIter) g.iter = &g.levelIter g.iterKey, g.iterValue = g.iter.SeekGE(g.key) @@ -163,8 +163,7 @@ func (g *getIter) Next() (*InternalKey, []byte) { } iterOpts := IterOptions{logger: g.logger} - g.levelIter.init(iterOpts, g.cmp, g.newIters, - g.version.Levels[g.level].Iter(), manifest.Level(g.level), nil) + g.levelIter.init(iterOpts, g.cmp, nil, g.newIters, g.version.Levels[g.level].Iter(), manifest.Level(g.level), nil) g.levelIter.initRangeDel(&g.rangeDelIter) g.level++ g.iter = &g.levelIter diff --git a/ingest.go b/ingest.go index a0bff276544..fe219d7ee95 100644 --- a/ingest.go +++ b/ingest.go @@ -417,7 +417,8 @@ func ingestTargetLevel( level := baseLevel for ; level < numLevels; level++ { - levelIter := newLevelIter(iterOps, cmp, newIters, v.Levels[level].Iter(), manifest.Level(level), nil) + levelIter := newLevelIter(iterOps, cmp, nil /* split */, newIters, + v.Levels[level].Iter(), manifest.Level(level), nil) var rangeDelIter internalIterator // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE sets it up for the target file. levelIter.initRangeDel(&rangeDelIter) diff --git a/iterator_test.go b/iterator_test.go index e37acaff17a..797ac38f5d4 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/datadriven" + "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" @@ -1003,10 +1004,31 @@ func BenchmarkIteratorSeqSeekPrefixGENotFound(b *testing.B) { const restartInterval = 16 const levelCount = 5 const keyOffset = 100000 - readers, levelSlices, _ := buildLevelsForMergingIterSeqSeek( - b, blockSize, restartInterval, levelCount, keyOffset, false) - readersWithTombstone, levelSlicesWithTombstone, _ := buildLevelsForMergingIterSeqSeek( - b, blockSize, restartInterval, 1, keyOffset, true) + + var readers [4][][]*sstable.Reader + var levelSlices [4][]manifest.LevelSlice + indexFunc := func(bloom bool, withTombstone bool) int { + index := 0 + if bloom { + index = 2 + } + if withTombstone { + index++ + } + return index + } + for _, bloom := range []bool{false, true} { + for _, withTombstone := range []bool{false, true} { + index := indexFunc(bloom, withTombstone) + levels := levelCount + if withTombstone { + levels = 1 + } + readers[index], levelSlices[index], _ = buildLevelsForMergingIterSeqSeek( + b, blockSize, restartInterval, levels, keyOffset, withTombstone, bloom) + + } + } // We will not be seeking to the keys that were written but instead to // keys before the written keys. This is to validate that the optimization // to use Next still functions when mergingIter checks for the prefix @@ -1018,53 +1040,52 @@ func BenchmarkIteratorSeqSeekPrefixGENotFound(b *testing.B) { keys = append(keys, []byte(fmt.Sprintf("%08d", i))) } for _, skip := range []int{1, 2, 4} { - for _, withTombstone := range []bool{false, true} { - b.Run(fmt.Sprintf("skip=%d/with-tombstone=%t", skip, withTombstone), - func(b *testing.B) { - readers := readers - levelSlices := levelSlices - if withTombstone { - readers = readersWithTombstone - levelSlices = levelSlicesWithTombstone - } - m := buildMergingIter(readers, levelSlices) - iter := Iterator{ - cmp: DefaultComparer.Compare, - equal: DefaultComparer.Equal, - split: func(a []byte) int { return len(a) }, - merge: DefaultMerger.Merge, - iter: m, - } - pos := 0 - b.ResetTimer() - for i := 0; i < b.N; i++ { - // When withTombstone=true, and prior to the - // optimization to stop early due to a range - // tombstone, the iteration would continue into the - // next file, and not be able to use Next at the lower - // level in the next SeekPrefixGE call. So we would - // incur the cost of iterating over all the deleted - // keys for every seek. Note that it is not possible - // to do a noop optimization in Iterator for the - // prefix case, unlike SeekGE/SeekLT, since we don't - // know if the iterators inside mergingIter are all - // appropriately positioned -- some may not be due to - // bloom filters not matching. - valid := iter.SeekPrefixGE(keys[pos]) - if valid { - b.Fatalf("key should not be found") + for _, bloom := range []bool{false, true} { + for _, withTombstone := range []bool{false, true} { + b.Run(fmt.Sprintf("skip=%d/bloom=%t/with-tombstone=%t", skip, bloom, withTombstone), + func(b *testing.B) { + index := indexFunc(bloom, withTombstone) + readers := readers[index] + levelSlices := levelSlices[index] + m := buildMergingIter(readers, levelSlices) + iter := Iterator{ + cmp: DefaultComparer.Compare, + equal: DefaultComparer.Equal, + split: func(a []byte) int { return len(a) }, + merge: DefaultMerger.Merge, + iter: m, } - pos += skip - if pos >= keyOffset { - pos = 0 + pos := 0 + b.ResetTimer() + for i := 0; i < b.N; i++ { + // When withTombstone=true, and prior to the + // optimization to stop early due to a range + // tombstone, the iteration would continue into the + // next file, and not be able to use Next at the lower + // level in the next SeekPrefixGE call. So we would + // incur the cost of iterating over all the deleted + // keys for every seek. Note that it is not possible + // to do a noop optimization in Iterator for the + // prefix case, unlike SeekGE/SeekLT, since we don't + // know if the iterators inside mergingIter are all + // appropriately positioned -- some may not be due to + // bloom filters not matching. + valid := iter.SeekPrefixGE(keys[pos]) + if valid { + b.Fatalf("key should not be found") + } + pos += skip + if pos >= keyOffset { + pos = 0 + } } - } - b.StopTimer() - iter.Close() - }) + b.StopTimer() + iter.Close() + }) + } } } - for _, r := range [][][]*sstable.Reader{readers, readersWithTombstone} { + for _, r := range readers { for i := range r { for j := range r[i] { r[i][j].Close() @@ -1081,7 +1102,7 @@ func BenchmarkIteratorSeqSeekGEWithBounds(b *testing.B) { const restartInterval = 16 const levelCount = 5 readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek( - b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false) + b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false, false) m := buildMergingIter(readers, levelSlices) iter := Iterator{ cmp: DefaultComparer.Compare, @@ -1118,7 +1139,7 @@ func BenchmarkIteratorSeekGENoop(b *testing.B) { const levelCount = 5 const keyOffset = 10000 readers, levelSlices, _ := buildLevelsForMergingIterSeqSeek( - b, blockSize, restartInterval, levelCount, keyOffset, false) + b, blockSize, restartInterval, levelCount, keyOffset, false, false) var keys [][]byte for i := 0; i < keyOffset; i++ { keys = append(keys, []byte(fmt.Sprintf("%08d", i))) diff --git a/level_checker.go b/level_checker.go index 6a4cae2eb44..02c42d947c9 100644 --- a/level_checker.go +++ b/level_checker.go @@ -636,8 +636,7 @@ func checkLevelsInternal(c *checkConfig) (err error) { manifestIter := current.L0Sublevels.Levels[sublevel].Iter() iterOpts := IterOptions{logger: c.logger} li := &levelIter{} - li.init(iterOpts, c.cmp, c.newIters, manifestIter, - manifest.L0Sublevel(sublevel), nil) + li.init(iterOpts, c.cmp, nil, c.newIters, manifestIter, manifest.L0Sublevel(sublevel), nil) li.initRangeDel(&mlevelAlloc[0].rangeDelIter) li.initSmallestLargestUserKey(&mlevelAlloc[0].smallestUserKey, nil, nil) mlevelAlloc[0].iter = li @@ -650,7 +649,7 @@ func checkLevelsInternal(c *checkConfig) (err error) { iterOpts := IterOptions{logger: c.logger} li := &levelIter{} - li.init(iterOpts, c.cmp, c.newIters, current.Levels[level].Iter(), manifest.Level(level), nil) + li.init(iterOpts, c.cmp, nil, c.newIters, current.Levels[level].Iter(), manifest.Level(level), nil) li.initRangeDel(&mlevelAlloc[0].rangeDelIter) li.initSmallestLargestUserKey(&mlevelAlloc[0].smallestUserKey, nil, nil) mlevelAlloc[0].iter = li diff --git a/level_iter.go b/level_iter.go index cc599b42e5d..35e8814ab96 100644 --- a/level_iter.go +++ b/level_iter.go @@ -45,6 +45,7 @@ type tableNewIters func( type levelIter struct { logger Logger cmp Compare + split Split // The lower/upper bounds for iteration as specified at creation or the most // recent call to SetBounds. lower []byte @@ -153,22 +154,26 @@ type levelIter struct { // levelIter implements the base.InternalIterator interface. var _ base.InternalIterator = (*levelIter)(nil) +// newLevelIter returns a levelIter. It is permissible to pass a nil split +// parameter if the caller is never going to call SeekPrefixGE. func newLevelIter( opts IterOptions, cmp Compare, + split Split, newIters tableNewIters, files manifest.LevelIterator, level manifest.Level, bytesIterated *uint64, ) *levelIter { l := &levelIter{} - l.init(opts, cmp, newIters, files, level, bytesIterated) + l.init(opts, cmp, split, newIters, files, level, bytesIterated) return l } func (l *levelIter) init( opts IterOptions, cmp Compare, + split Split, newIters tableNewIters, files manifest.LevelIterator, level manifest.Level, @@ -181,6 +186,7 @@ func (l *levelIter) init( l.upper = opts.UpperBound l.tableOpts.TableFilter = opts.TableFilter l.cmp = cmp + l.split = split l.iterFile = nil l.newIters = newIters l.files = files @@ -435,6 +441,17 @@ func (l *levelIter) SeekPrefixGE( } return l.verify(l.largestBoundary, nil) } + // It is possible that we are here because bloom filter matching failed. + // In that case it is likely that all keys matching the prefix are wholly + // within the current file and cannot be in the subsequent file. In that + // case we don't want to go to the next file, since loading and seeking in + // there has some cost. Additionally, for sparse key spaces, loading the + // next file will defeat the optimization for the next SeekPrefixGE that + // is called with trySeekUsingNext=true, since for sparse key spaces it is + // likely that the next key will also be contained in the current file. + if n := l.split(l.iterFile.Largest.UserKey); l.cmp(prefix, l.iterFile.Largest.UserKey[:n]) < 0 { + return nil, nil + } return l.verify(l.skipEmptyFileForward()) } diff --git a/level_iter_test.go b/level_iter_test.go index 75d7f051107..c9110feab24 100644 --- a/level_iter_test.go +++ b/level_iter_test.go @@ -81,7 +81,8 @@ func TestLevelIter(t *testing.T) { } iter := newLevelIter(opts, DefaultComparer.Compare, - newIters, files.Iter(), manifest.Level(level), nil) + func(a []byte) int { return len(a) }, newIters, files.Iter(), manifest.Level(level), + nil) defer iter.Close() // Fake up the range deletion initialization. iter.initRangeDel(new(internalIterator)) @@ -122,7 +123,8 @@ func TestLevelIter(t *testing.T) { } iter := newLevelIter(opts, DefaultComparer.Compare, - newIters2, files.Iter(), manifest.Level(level), nil) + func(a []byte) int { return len(a) }, newIters2, files.Iter(), + manifest.Level(level), nil) iter.SeekGE([]byte(key)) lower, upper := tableOpts.GetLowerBound(), tableOpts.GetUpperBound() return fmt.Sprintf("[%s,%s]\n", lower, upper) @@ -283,8 +285,9 @@ func TestLevelIterBoundaries(t *testing.T) { } if iter == nil { slice := manifest.NewLevelSliceKeySorted(lt.cmp.Compare, lt.metas) - iter = newLevelIter(IterOptions{}, DefaultComparer.Compare, lt.newIters, - slice.Iter(), manifest.Level(level), nil) + iter = newLevelIter(IterOptions{}, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, lt.newIters, slice.Iter(), + manifest.Level(level), nil) // Fake up the range deletion initialization. iter.initRangeDel(new(internalIterator)) } @@ -377,8 +380,9 @@ func TestLevelIterSeek(t *testing.T) { case "iter": slice := manifest.NewLevelSliceKeySorted(lt.cmp.Compare, lt.metas) iter := &levelIterTestIter{ - levelIter: newLevelIter(IterOptions{}, DefaultComparer.Compare, lt.newIters, - slice.Iter(), manifest.Level(level), nil), + levelIter: newLevelIter(IterOptions{}, DefaultComparer.Compare, + func(a []byte) int { return len(a) }, lt.newIters, slice.Iter(), + manifest.Level(level), nil), } defer iter.Close() iter.initRangeDel(&iter.rangeDelIter) @@ -479,8 +483,7 @@ func BenchmarkLevelIterSeekGE(b *testing.B) { iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, - newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) b.ResetTimer() @@ -522,8 +525,7 @@ func BenchmarkLevelIterSeqSeekGEWithBounds(b *testing.B) { opts.LowerBound, opts.UpperBound) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, - newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) // Fake up the range deletion initialization, to resemble the usage // in a mergingIter. l.initRangeDel(new(internalIterator)) @@ -571,7 +573,8 @@ func BenchmarkLevelIterSeqSeekPrefixGE(b *testing.B) { b.Run(fmt.Sprintf("skip=%d/use-next=%t", skip, useNext), func(b *testing.B) { l := newLevelIter(IterOptions{}, DefaultComparer.Compare, - newIters, metas.Iter(), manifest.Level(level), nil) + func(a []byte) int { return len(a) }, newIters, metas.Iter(), + manifest.Level(level), nil) // Fake up the range deletion initialization, to resemble the usage // in a mergingIter. l.initRangeDel(new(internalIterator)) @@ -613,8 +616,7 @@ func BenchmarkLevelIterNext(b *testing.B) { iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, - newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -648,8 +650,7 @@ func BenchmarkLevelIterPrev(b *testing.B) { iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, - newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/merging_iter_test.go b/merging_iter_test.go index 37272cac7d1..7ec1a31be6a 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/cockroachdb/pebble/bloom" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/datadriven" "github.com/cockroachdb/pebble/internal/manifest" @@ -245,7 +246,8 @@ func TestMergingIterCornerCases(t *testing.T) { continue } li := &levelIter{} - li.init(IterOptions{}, cmp, newIters, slice.Iter(), manifest.Level(i), nil) + li.init(IterOptions{}, cmp, func(a []byte) int { return len(a) }, newIters, + slice.Iter(), manifest.Level(i), nil) i := len(levelIters) levelIters = append(levelIters, mergingIterLevel{iter: li}) li.initRangeDel(&levelIters[i].rangeDelIter) @@ -449,6 +451,7 @@ func buildLevelsForMergingIterSeqSeek( blockSize, restartInterval, levelCount int, keyOffset int, writeRangeTombstoneToLowestLevel bool, + writeBloomFilters bool, ) ([][]*sstable.Reader, []manifest.LevelSlice, [][]byte) { mem := vfs.NewMem() if writeRangeTombstoneToLowestLevel && levelCount != 1 { @@ -466,13 +469,20 @@ func buildLevelsForMergingIterSeqSeek( } writers := make([][]*sstable.Writer, levelCount) + // A policy unlikely to have false positives. + filterPolicy := bloom.FilterPolicy(100) for i := range files { for j := range files[i] { - writers[i] = append(writers[i], sstable.NewWriter(files[i][j], sstable.WriterOptions{ + writerOptions := sstable.WriterOptions{ BlockRestartInterval: restartInterval, BlockSize: blockSize, Compression: NoCompression, - })) + } + if writeBloomFilters { + writerOptions.FilterPolicy = filterPolicy + writerOptions.FilterType = base.TableFilter + } + writers[i] = append(writers[i], sstable.NewWriter(files[i][j], writerOptions)) } } @@ -511,6 +521,10 @@ func buildLevelsForMergingIterSeqSeek( } opts := sstable.ReaderOptions{Cache: NewCache(128 << 20)} + if writeBloomFilters { + opts.Filters = make(map[string]FilterPolicy) + opts.Filters[filterPolicy.Name()] = filterPolicy + } defer opts.Cache.Unref() readers := make([][]*sstable.Reader, levelCount) @@ -569,7 +583,8 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS return iter, rdIter, err } l := newLevelIter(IterOptions{}, DefaultComparer.Compare, - newIters, levelSlices[i].Iter(), manifest.Level(level), nil) + func(a []byte) int { return len(a) }, newIters, levelSlices[i].Iter(), + manifest.Level(level), nil) l.initRangeDel(&mils[level].rangeDelIter) l.initSmallestLargestUserKey( &mils[level].smallestUserKey, &mils[level].largestUserKey, @@ -598,7 +613,7 @@ func BenchmarkMergingIterSeqSeekGEWithBounds(b *testing.B) { b.Run(fmt.Sprintf("levelCount=%d", levelCount), func(b *testing.B) { readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek( - b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false) + b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false, false) m := buildMergingIter(readers, levelSlices) keyCount := len(keys) b.ResetTimer() @@ -626,7 +641,7 @@ func BenchmarkMergingIterSeqSeekPrefixGE(b *testing.B) { const restartInterval = 16 const levelCount = 5 readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek( - b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false) + b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false, false) for _, skip := range []int{1, 2, 4, 8, 16} { for _, useNext := range []bool{false, true} {