From 62e1fc6b680d1f4780d46b86e29e786d5513ad8f Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Wed, 17 Aug 2022 14:21:01 -0400 Subject: [PATCH] *: add simpleLevelIterator, reduce merging levels in external iter Currently, we create a new merging iterator level for each file passed into NewExternalIter. This is unnecessary for most use-cases of creating ExternalIters around lots of sstables, as we can externally guarantee that many of those sstables won't have overlapping points with each other. We can have the caller pass this knowledge by specifying a [][]sstable.ReadableFile where each sub-slice obeys level invariants for files within it, and is also already sorted by user keys. This change makes the interface change to allow for the above optimization, and also adds a `simpleLevelIter` that implements forward iteration within a single "level". For files that don't contain range deletes, we shove all the point iters into one `simpleLevelIter`, greatly reducing merging iterator levels and speeding up its operations by a lot. Fixes cockroachdb/cockroach#83051. --- external_iterator.go | 347 +++++++++++++++++++++++++++++++++---- external_iterator_test.go | 63 ++++++- iterator.go | 11 +- testdata/external_iterator | 19 -- testdata/simple_level_iter | 77 ++++++++ 5 files changed, 461 insertions(+), 56 deletions(-) create mode 100644 testdata/simple_level_iter diff --git a/external_iterator.go b/external_iterator.go index 102c442fff..cca9a27648 100644 --- a/external_iterator.go +++ b/external_iterator.go @@ -5,6 +5,9 @@ package pebble import ( + "fmt" + "sort" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" @@ -12,12 +15,59 @@ import ( "github.com/cockroachdb/pebble/sstable" ) -// NewExternalIter takes an input set of sstable files which may overlap -// arbitrarily and returns an Iterator over the merged contents of the sstables. +// ExternalIterOption provide an interface to specify open-time options to +// NewExternalIter. +type ExternalIterOption interface { + // iterApply is called on the iterator during opening in order to set internal + // parameters. + iterApply(*Iterator) + // readerOptions returns any reader options added by this iter option. + readerOptions() []sstable.ReaderOption +} + +type externalIterReaderOptions struct { + opts []sstable.ReaderOption +} + +func (e *externalIterReaderOptions) iterApply(iterator *Iterator) { + // Do nothing. +} + +func (e *externalIterReaderOptions) readerOptions() []sstable.ReaderOption { + return e.opts +} + +// ExternalIterReaderOptions returns an ExternalIterOption that specifies +// sstable.ReaderOptions to be applied on sstable readers in NewExternalIter. +func ExternalIterReaderOptions(opts ...sstable.ReaderOption) ExternalIterOption { + return &externalIterReaderOptions{opts: opts} +} + +// ExternalIterForwardOnly is an ExternalIterOption that specifies this iterator +// will only be used for forward positioning operations (First, SeekGE, Next). +// This could enable optimizations that take advantage of this invariant. +// Behaviour when a reverse positioning operation is done on an iterator +// opened with this option is unpredictable, though in most cases it should. +type ExternalIterForwardOnly struct{} + +func (e *ExternalIterForwardOnly) iterApply(iter *Iterator) { + iter.forwardOnly = true +} + +func (e *ExternalIterForwardOnly) readerOptions() []sstable.ReaderOption { + return nil +} + +// NewExternalIter takes an input 2d array of sstable files which may overlap +// across subarrays but not within a subarray (at least as far as points are +// concerned; range keys are allowed to overlap arbitrarily even within a +// subarray), and returns an Iterator over the merged contents of the sstables. // Input sstables may contain point keys, range keys, range deletions, etc. The -// input files slice must be sorted in reverse chronological ordering. A key in -// a file at a lower index will shadow a key with an identical user key -// contained within a file at a higher index. +// input files slice must be sorted in reverse chronological ordering. A key in a +// file at a lower index subarray will shadow a key with an identical user key +// contained within a file at a higher index subarray. Each subarray must be +// sorted in internal key order, where lower index files contain keys that sort +// left of files with higher indexes. // // Input sstables must only contain keys with the zero sequence number. // @@ -27,8 +77,8 @@ import ( func NewExternalIter( o *Options, iterOpts *IterOptions, - files []sstable.ReadableFile, - extraReaderOpts ...sstable.ReaderOption, + files [][]sstable.ReadableFile, + extraOpts ...ExternalIterOption, ) (it *Iterator, err error) { if iterOpts != nil { if err := validateExternalIterOpts(iterOpts); err != nil { @@ -36,17 +86,32 @@ func NewExternalIter( } } - var readers []*sstable.Reader + var readers [][]*sstable.Reader // Ensure we close all the opened readers if we error out. defer func() { if err != nil { for i := range readers { - _ = readers[i].Close() + for j := range readers[i] { + _ = readers[i][j].Close() + } } } }() - readers, err = openExternalTables(o, files, o.MakeReaderOptions(), extraReaderOpts...) + seqNumOffset := 0 + var extraReaderOpts []sstable.ReaderOption + for i := range extraOpts { + extraReaderOpts = append(extraReaderOpts, extraOpts[i].readerOptions()...) + } + for _, levelFiles := range files { + seqNumOffset += len(levelFiles) + } + for _, levelFiles := range files { + var subReaders []*sstable.Reader + seqNumOffset -= len(levelFiles) + subReaders, err = openExternalTables(o, levelFiles, seqNumOffset, o.MakeReaderOptions(), extraReaderOpts...) + readers = append(readers, subReaders) + } if err != nil { return nil, err } @@ -82,6 +147,9 @@ func NewExternalIter( dbi.opts = *iterOpts dbi.saveBounds(iterOpts.LowerBound, iterOpts.UpperBound) } + for i := range extraOpts { + extraOpts[i].iterApply(dbi) + } finishInitializingExternal(dbi) return dbi, nil } @@ -116,24 +184,46 @@ func finishInitializingExternal(it *Iterator) { if len(it.externalReaders) > cap(mlevels) { mlevels = make([]mergingIterLevel, 0, len(it.externalReaders)) } - for _, r := range it.externalReaders { - var ( - rangeDelIter keyspan.FragmentIterator - pointIter internalIterator - err error - ) - pointIter, err = r.NewIter(it.opts.LowerBound, it.opts.UpperBound) - if err == nil { - rangeDelIter, err = r.NewRawRangeDelIter() + for _, readers := range it.externalReaders { + var combinedIters []internalIterator + for _, r := range readers { + var ( + rangeDelIter keyspan.FragmentIterator + pointIter internalIterator + err error + ) + pointIter, err = r.NewIter(it.opts.LowerBound, it.opts.UpperBound) + if err == nil { + rangeDelIter, err = r.NewRawRangeDelIter() + } + if err != nil { + pointIter = &errorIter{err: err} + rangeDelIter = &errorKeyspanIter{err: err} + } + if err == nil && rangeDelIter == nil && pointIter != nil && it.forwardOnly { + // TODO(bilal): Consider implementing range key pausing in + // simpleLevelIter so we can reduce mergingIterLevels even more by + // sending all sstable iterators to combinedIters, not just those + // corresponding to sstables without range deletes. + combinedIters = append(combinedIters, pointIter) + continue + } + mlevels = append(mlevels, mergingIterLevel{ + iter: base.WrapIterWithStats(pointIter), + rangeDelIter: rangeDelIter, + }) } - if err != nil { - pointIter = &errorIter{err: err} - rangeDelIter = &errorKeyspanIter{err: err} + if len(combinedIters) > 0 { + sli := &simpleLevelIter{ + cmp: it.cmp, + iters: combinedIters, + } + sli.init(it.opts) + mlevels = append(mlevels, mergingIterLevel{ + iter: base.WrapIterWithStats(sli), + rangeDelIter: nil, + }) } - mlevels = append(mlevels, mergingIterLevel{ - iter: base.WrapIterWithStats(pointIter), - rangeDelIter: rangeDelIter, - }) } it.alloc.merging.init(&it.opts, it.cmp, it.split, mlevels...) it.alloc.merging.snapshot = base.InternalKeySeqNumMax @@ -152,11 +242,22 @@ func finishInitializingExternal(it *Iterator) { base.InternalKeySeqNumMax, it.opts.LowerBound, it.opts.UpperBound, ) - for _, r := range it.externalReaders { - if rki, err := r.NewRawRangeKeyIter(); err != nil { - it.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err}) - } else if rki != nil { - it.rangeKey.iterConfig.AddLevel(rki) + // We could take advantage of the lack of overlaps in range keys within + // each slice in it.externalReaders, and generate keyspan.LevelIters + // out of those. However, since range keys are expected to be sparse to + // begin with, the performance gain might not be significant enough to + // warrant it. + // + // TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not + // operate on FileMetadatas (similar to simpleLevelIter), and implements + // this optimization. + for _, readers := range it.externalReaders { + for _, r := range readers { + if rki, err := r.NewRawRangeKeyIter(); err != nil { + it.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err}) + } else if rki != nil { + it.rangeKey.iterConfig.AddLevel(rki) + } } } } @@ -169,6 +270,7 @@ func finishInitializingExternal(it *Iterator) { func openExternalTables( o *Options, files []sstable.ReadableFile, + seqNumOffset int, readerOpts sstable.ReaderOptions, extraReaderOpts ...sstable.ReaderOption, ) (readers []*sstable.Reader, err error) { @@ -180,8 +282,189 @@ func openExternalTables( } // Use the index of the file in files as the sequence number for all of // its keys. - r.Properties.GlobalSeqNum = uint64(len(files) - i) + r.Properties.GlobalSeqNum = uint64(len(files) - i + seqNumOffset) readers = append(readers, r) } return readers, err } + +// simpleLevelIter is similar to a levelIter in that it merges the points +// from multiple point iterators that are non-overlapping in the key ranges +// they return. It is only expected to support forward iteration and forward +// regular seeking; reverse iteration and prefix seeking is not supported. +// Intended to be a low-overhead, non-FileMetadata dependent option for +// NewExternalIter. To optimize seeking and forward iteration, it maintains +// two slices of child iterators; one of all iterators, and a subset of it that +// contains just the iterators that contain point keys within the current +// bounds. +// +// Note that this levelIter does not support pausing at file boundaries +// in case of range tombstones in this file that could apply to points outside +// of this file (and outside of this level). This is sufficient for optimizing +// the main use cases of NewExternalIter, however for completeness it would make +// sense to build this pausing functionality in. +type simpleLevelIter struct { + cmp Compare + opts IterOptions + boundsBuf []byte + iters []internalIterator + filtered []internalIterator + currentIdx int +} + +// init initializes this simpleLevelIter. +func (s *simpleLevelIter) init(opts IterOptions) { + s.currentIdx = 0 + s.boundsBuf = s.boundsBuf[:0] + if opts.LowerBound != nil { + s.boundsBuf = append(s.boundsBuf, opts.LowerBound...) + s.opts.LowerBound = s.boundsBuf[:len(opts.LowerBound)] + } else { + s.opts.LowerBound = nil + } + if opts.UpperBound != nil { + startN := len(s.boundsBuf) + s.boundsBuf = append(s.boundsBuf, opts.UpperBound...) + s.opts.UpperBound = s.boundsBuf[startN:] + } else { + s.opts.UpperBound = nil + } + s.resetFilteredIters() +} + +func (s *simpleLevelIter) resetFilteredIters() { + s.filtered = s.filtered[:0] + for i := range s.iters { + var iterKey *base.InternalKey + if s.opts.LowerBound != nil { + iterKey, _ = s.iters[i].SeekGE(s.opts.LowerBound, base.SeekGEFlagsNone) + } else { + iterKey, _ = s.iters[i].First() + } + if iterKey != nil { + s.filtered = append(s.filtered, s.iters[i]) + } + } +} + +func (s *simpleLevelIter) SeekGE(key []byte, flags base.SeekGEFlags) (*base.InternalKey, []byte) { + // Find the first file that is entirely >= key. The file before that could + // contain the key we're looking for. + n := sort.Search(len(s.filtered), func(i int) bool { + iterKey, _ := s.filtered[i].SeekGE(key, flags) + if iterKey == nil { + return false + } + return s.cmp(key, iterKey.UserKey) <= 0 + }) + if n > 0 { + s.currentIdx = n - 1 + } else { + s.currentIdx = n + } + if s.currentIdx < len(s.filtered) { + if iterKey, val := s.filtered[s.currentIdx].SeekGE(key, flags); iterKey != nil { + return iterKey, val + } + s.currentIdx++ + } + return s.skipEmptyFileForward(key, flags) +} + +func (s *simpleLevelIter) skipEmptyFileForward( + seekKey []byte, flags base.SeekGEFlags, +) (*base.InternalKey, []byte) { + var iterKey *base.InternalKey + var val []byte + for s.currentIdx >= 0 && s.currentIdx < len(s.filtered) { + if seekKey != nil { + iterKey, val = s.filtered[s.currentIdx].SeekGE(seekKey, flags) + } else if s.opts.LowerBound != nil { + iterKey, val = s.filtered[s.currentIdx].SeekGE(s.opts.LowerBound, flags) + } else { + iterKey, val = s.filtered[s.currentIdx].First() + } + if iterKey != nil { + return iterKey, val + } + s.currentIdx++ + } + return nil, nil +} + +func (s *simpleLevelIter) SeekPrefixGE( + prefix, key []byte, flags base.SeekGEFlags, +) (*base.InternalKey, []byte) { + panic("unimplemented") +} + +func (s *simpleLevelIter) SeekLT(key []byte, flags base.SeekLTFlags) (*base.InternalKey, []byte) { + panic("unimplemented") +} + +func (s *simpleLevelIter) First() (*base.InternalKey, []byte) { + s.currentIdx = 0 + return s.skipEmptyFileForward(nil /* seekKey */, base.SeekGEFlagsNone) +} + +func (s *simpleLevelIter) Last() (*base.InternalKey, []byte) { + panic("unimplemented") +} + +func (s *simpleLevelIter) Next() (*base.InternalKey, []byte) { + if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) { + return nil, nil + } + if iterKey, val := s.filtered[s.currentIdx].Next(); iterKey != nil { + return iterKey, val + } + s.currentIdx++ + return s.skipEmptyFileForward(nil /* seekKey */, base.SeekGEFlagsNone) +} + +func (s *simpleLevelIter) Prev() (*base.InternalKey, []byte) { + panic("unimplemented") +} + +func (s *simpleLevelIter) Error() error { + return nil +} + +func (s *simpleLevelIter) Close() error { + var err error + for i := range s.iters { + err = firstError(err, s.iters[i].Close()) + } + return err +} + +func (s *simpleLevelIter) SetBounds(lower, upper []byte) { + s.currentIdx = -1 + s.boundsBuf = s.boundsBuf[:0] + if lower != nil { + s.boundsBuf = append(s.boundsBuf, lower...) + s.opts.LowerBound = s.boundsBuf[:len(lower)] + } else { + s.opts.LowerBound = nil + } + if upper != nil { + startN := len(s.boundsBuf) + s.boundsBuf = append(s.boundsBuf, upper...) + s.opts.UpperBound = s.boundsBuf[startN:] + } else { + s.opts.UpperBound = nil + } + for i := range s.iters { + s.iters[i].SetBounds(lower, upper) + } + s.resetFilteredIters() +} + +func (s *simpleLevelIter) String() string { + if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) { + return "simpleLevelIter: current=" + } + return fmt.Sprintf("simpleLevelIter: current=%s", s.filtered[s.currentIdx]) +} + +var _ internalIterator = &simpleLevelIter{} diff --git a/external_iterator_test.go b/external_iterator_test.go index 134a4de4d8..b6e57dc8f5 100644 --- a/external_iterator_test.go +++ b/external_iterator_test.go @@ -39,7 +39,7 @@ func TestExternalIterator(t *testing.T) { return "" case "iter": opts := IterOptions{KeyTypes: IterKeyTypePointsAndRanges} - var files []sstable.ReadableFile + var files [][]sstable.ReadableFile for _, arg := range td.CmdArgs { switch arg.Key { case "mask-suffix": @@ -52,7 +52,7 @@ func TestExternalIterator(t *testing.T) { for _, v := range arg.Vals { f, err := mem.Open(v) require.NoError(t, err) - files = append(files, f) + files = append(files, []sstable.ReadableFile{f}) } } } @@ -64,3 +64,62 @@ func TestExternalIterator(t *testing.T) { } }) } + +func TestSimpleLevelIter(t *testing.T) { + mem := vfs.NewMem() + o := &Options{ + FS: mem, + Comparer: testkeys.Comparer, + FormatMajorVersion: FormatRangeKeys, + } + o.EnsureDefaults() + d, err := Open("", o) + require.NoError(t, err) + defer func() { require.NoError(t, d.Close()) }() + + datadriven.RunTest(t, "testdata/simple_level_iter", func(td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + mem = vfs.NewMem() + return "" + case "build": + if err := runBuildCmd(td, d, mem); err != nil { + return err.Error() + } + return "" + case "iter": + var files []sstable.ReadableFile + for _, arg := range td.CmdArgs { + switch arg.Key { + case "files": + for _, v := range arg.Vals { + f, err := mem.Open(v) + require.NoError(t, err) + files = append(files, f) + } + } + } + readers, err := openExternalTables(o, files, 0, o.MakeReaderOptions()) + require.NoError(t, err) + defer func() { + for i := range readers { + _ = readers[i].Close() + } + }() + var internalIters []internalIterator + for i := range readers { + iter, err := readers[i].NewIter(nil, nil) + require.NoError(t, err) + internalIters = append(internalIters, iter) + } + it := &simpleLevelIter{cmp: o.Comparer.Compare, iters: internalIters} + it.init(IterOptions{}) + + response := runInternalIterCmd(td, it) + require.NoError(t, it.Close()) + return response + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} diff --git a/iterator.go b/iterator.go index 35fc2785a2..c4aece33f1 100644 --- a/iterator.go +++ b/iterator.go @@ -186,7 +186,7 @@ type Iterator struct { prefixOrFullSeekKey []byte readSampling readSampling stats IteratorStats - externalReaders []*sstable.Reader + externalReaders [][]*sstable.Reader // Following fields used when constructing an iterator stack, eg, in Clone // and SetOptions or when re-fragmenting a batch's range keys/range dels. @@ -234,6 +234,9 @@ type Iterator struct { // Used for deriving the value of SeekPrefixGE(..., trySeekUsingNext), // and SeekGE/SeekLT optimizations lastPositioningOp lastPositioningOpKind + // Used for an optimization in external iterators to reduce the number of + // merging levels. + forwardOnly bool // Used in some tests to disable the random disabling of seek optimizations. forceEnableSeekOpt bool } @@ -1735,8 +1738,10 @@ func (i *Iterator) Close() error { i.readState = nil } - for _, r := range i.externalReaders { - err = firstError(err, r.Close()) + for _, readers := range i.externalReaders { + for _, r := range readers { + err = firstError(err, r.Close()) + } } // Close the closer for the current value if one was open. diff --git a/testdata/external_iterator b/testdata/external_iterator index f694e8f032..4786fda540 100644 --- a/testdata/external_iterator +++ b/testdata/external_iterator @@ -69,32 +69,13 @@ build 6 merge bb ac ---- -iter files=(6, 5, 4, 3, 2, 1) -seek-lt c -prev -next -next ----- -bb: (ac, .) -b: (b, .) -bb: (ac, .) -d: (., [d-e) @3=bar UPDATED) - iter files=(6, 5, 4, 3, 2, 1) seek-ge b next -prev -prev -next -next next ---- b: (b, .) bb: (ac, .) -b: (b, .) -a: (a, [a-b) @2=foo UPDATED) -b: (b, . UPDATED) -bb: (ac, .) d: (., [d-e) @3=bar UPDATED) # Test range keys that overlap each other with identical state. These diff --git a/testdata/simple_level_iter b/testdata/simple_level_iter new file mode 100644 index 0000000000..76cd65825a --- /dev/null +++ b/testdata/simple_level_iter @@ -0,0 +1,77 @@ +build 1 +set b b +set c c +---- + + +iter files=(1) +first +next +next +---- +b:b +c:c +. + +build 2 +set d d +set f f +---- + +iter files=(1, 2) +first +next +next +next +---- +b:b +c:c +d:d +f:f + +# Test seeks within files. + +iter files=(1, 2) +seek-ge bb +next +next +next +---- +c:c +d:d +f:f +. + +iter files=(1, 2) +seek-ge a +next +next +next +---- +b:b +c:c +d:d +f:f + +iter files=(1, 2) +seek-ge d +next +next +---- +d:d +f:f +. + +iter files=(1, 2) +seek-ge f +next +---- +f:f +. + +iter files=(1, 2) +seek-ge ff +next +---- +. +.