Skip to content

Commit

Permalink
*: add simpleLevelIterator, reduce merging levels in external iter
Browse files Browse the repository at this point in the history
Currently, we create a new merging iterator level for each file
passed into NewExternalIter. This is unnecessary for most use-cases
of creating ExternalIters around lots of sstables, as we can externally
guarantee that many of those sstables won't have overlapping points
with each other. We can have the caller pass this knowledge
by specifying a [][]sstable.ReadableFile where each sub-slice
obeys level invariants for files within it, and is also already
sorted by user keys.

This change makes the interface change to allow for the above
optimization, and also adds a `simpleLevelIter` that implements
forward iteration within a single "level". For files that don't
contain range deletes, we shove all the point iters into one
`simpleLevelIter`, greatly reducing merging iterator levels
and speeding up its operations by a lot.

Fixes cockroachdb/cockroach#83051.
  • Loading branch information
itsbilal committed Aug 17, 2022
1 parent c135b6d commit a370e71
Show file tree
Hide file tree
Showing 5 changed files with 406 additions and 56 deletions.
295 changes: 263 additions & 32 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@
package pebble

import (
"fmt"
"sort"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/sstable"
)

// NewExternalIter takes an input set of sstable files which may overlap
// arbitrarily and returns an Iterator over the merged contents of the sstables.
// Input sstables may contain point keys, range keys, range deletions, etc. The
// input files slice must be sorted in reverse chronological ordering. A key in
// a file at a lower index will shadow a key with an identical user key
// contained within a file at a higher index.
// NewExternalIter takes an input 2d array of sstable files which may overlap
// across subarrays but not within a subarray, and returns an Iterator over the
// merged contents of the sstables. Input sstables may contain point keys, range
// keys, range deletions, etc. The input files slice must be sorted in reverse
// chronological ordering. A key in a file at a lower index subarray will shadow
// a key with an identical user key contained within a file at a higher index
// subarray. Each subarray must be sorted in internal key order, where lower
// index files contain keys that sort left of files with higer indexes.
//
// Input sstables must only contain keys with the zero sequence number.
//
Expand All @@ -27,7 +32,7 @@ import (
func NewExternalIter(
o *Options,
iterOpts *IterOptions,
files []sstable.ReadableFile,
files [][]sstable.ReadableFile,
extraReaderOpts ...sstable.ReaderOption,
) (it *Iterator, err error) {
if iterOpts != nil {
Expand All @@ -36,17 +41,28 @@ func NewExternalIter(
}
}

var readers []*sstable.Reader
var readers [][]*sstable.Reader

// Ensure we close all the opened readers if we error out.
defer func() {
if err != nil {
for i := range readers {
_ = readers[i].Close()
for j := range readers[i] {
_ = readers[i][j].Close()
}
}
}
}()
readers, err = openExternalTables(o, files, o.MakeReaderOptions(), extraReaderOpts...)
seqNumOffset := 0
for _, levelFiles := range files {
seqNumOffset += len(levelFiles)
}
for _, levelFiles := range files {
var subReaders []*sstable.Reader
seqNumOffset -= len(levelFiles)
subReaders, err = openExternalTables(o, levelFiles, seqNumOffset, o.MakeReaderOptions(), extraReaderOpts...)
readers = append(readers, subReaders)
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -116,24 +132,46 @@ func finishInitializingExternal(it *Iterator) {
if len(it.externalReaders) > cap(mlevels) {
mlevels = make([]mergingIterLevel, 0, len(it.externalReaders))
}
for _, r := range it.externalReaders {
var (
rangeDelIter keyspan.FragmentIterator
pointIter internalIterator
err error
)
pointIter, err = r.NewIter(it.opts.LowerBound, it.opts.UpperBound)
if err == nil {
rangeDelIter, err = r.NewRawRangeDelIter()
for _, readers := range it.externalReaders {
var combinedIters []internalIterator
for _, r := range readers {
var (
rangeDelIter keyspan.FragmentIterator
pointIter internalIterator
err error
)
pointIter, err = r.NewIter(it.opts.LowerBound, it.opts.UpperBound)
if err == nil {
rangeDelIter, err = r.NewRawRangeDelIter()
}
if err != nil {
pointIter = &errorIter{err: err}
rangeDelIter = &errorKeyspanIter{err: err}
}
if err == nil && rangeDelIter == nil && pointIter != nil {
// TODO(bilal): Consider implementing range key pausing in
// simpleLevelIter so we can reduce mergingIterLevels even more by
// sending all sstable iterators to combinedIters, not just those
// corresponding to sstables without range deletes.
combinedIters = append(combinedIters, pointIter)
continue
}
mlevels = append(mlevels, mergingIterLevel{
iter: base.WrapIterWithStats(pointIter),
rangeDelIter: rangeDelIter,
})
}
if err != nil {
pointIter = &errorIter{err: err}
rangeDelIter = &errorKeyspanIter{err: err}
if len(combinedIters) > 0 {
sli := &simpleLevelIter{
cmp: it.cmp,
iters: combinedIters,
}
sli.init(it.opts)
mlevels = append(mlevels, mergingIterLevel{
iter: base.WrapIterWithStats(sli),
rangeDelIter: nil,
})
}
mlevels = append(mlevels, mergingIterLevel{
iter: base.WrapIterWithStats(pointIter),
rangeDelIter: rangeDelIter,
})
}
it.alloc.merging.init(&it.opts, it.cmp, it.split, mlevels...)
it.alloc.merging.snapshot = base.InternalKeySeqNumMax
Expand All @@ -152,11 +190,22 @@ func finishInitializingExternal(it *Iterator) {
base.InternalKeySeqNumMax,
it.opts.LowerBound, it.opts.UpperBound,
)
for _, r := range it.externalReaders {
if rki, err := r.NewRawRangeKeyIter(); err != nil {
it.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err})
} else if rki != nil {
it.rangeKey.iterConfig.AddLevel(rki)
// We could take advantage of the lack of overlaps in range keys within
// each slice in it.externalReaders, and generate keyspan.LevelIters
// out of those. However, since range keys are expected to be sparse to
// begin with, the performance gain might not be significant enough to
// warrant it.
//
// TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not
// operate on FileMetadatas (similar to simpleLevelIter), and implements
// this optimization.
for _, readers := range it.externalReaders {
for _, r := range readers {
if rki, err := r.NewRawRangeKeyIter(); err != nil {
it.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err})
} else if rki != nil {
it.rangeKey.iterConfig.AddLevel(rki)
}
}
}
}
Expand All @@ -169,6 +218,7 @@ func finishInitializingExternal(it *Iterator) {
func openExternalTables(
o *Options,
files []sstable.ReadableFile,
seqNumOffset int,
readerOpts sstable.ReaderOptions,
extraReaderOpts ...sstable.ReaderOption,
) (readers []*sstable.Reader, err error) {
Expand All @@ -180,8 +230,189 @@ func openExternalTables(
}
// Use the index of the file in files as the sequence number for all of
// its keys.
r.Properties.GlobalSeqNum = uint64(len(files) - i)
r.Properties.GlobalSeqNum = uint64(len(files) - i + seqNumOffset)
readers = append(readers, r)
}
return readers, err
}

// simpleLevelIter is similar to a levelIter in that it merges the points
// from multiple point iterators that are non-overlapping in the key ranges
// they return. It is only expected to support forward iteration and forward
// regular seeking; reverse iteration and prefix seeking is not supported.
// Intended to be a low-overhead, non-FileMetadata dependent option for
// NewExternalIter. To optimize seeking and forward iteration, it maintains
// two slices of child iterators; one of all iterators, and a subset of it that
// contains just the iterators that contain point keys within the current
// bounds.
//
// Note that this levelIter does not support pausing at file boundaries
// in case of range tombstones in this file that could apply to points outside
// of this file (and outside of this level). This is sufficient for optimizing
// the main use cases of NewExternalIter, however for completeness it would make
// sense to build this pausing functionality in.
type simpleLevelIter struct {
cmp Compare
opts IterOptions
boundsBuf []byte
iters []internalIterator
filtered []internalIterator
currentIdx int
}

// init initializes this simpleLevelIter.
func (s *simpleLevelIter) init(opts IterOptions) {
s.currentIdx = 0
s.boundsBuf = s.boundsBuf[:0]
if opts.LowerBound != nil {
s.boundsBuf = append(s.boundsBuf, opts.LowerBound...)
s.opts.LowerBound = s.boundsBuf[:len(opts.LowerBound)]
} else {
s.opts.LowerBound = nil
}
if opts.UpperBound != nil {
startN := len(s.boundsBuf)
s.boundsBuf = append(s.boundsBuf, opts.UpperBound...)
s.opts.UpperBound = s.boundsBuf[startN:]
} else {
s.opts.UpperBound = nil
}
s.resetFilteredIters()
}

func (s *simpleLevelIter) resetFilteredIters() {
s.filtered = s.filtered[:0]
for i := range s.iters {
var iterKey *base.InternalKey
if s.opts.LowerBound != nil {
iterKey, _ = s.iters[i].SeekGE(s.opts.LowerBound, base.SeekGEFlagsNone)
} else {
iterKey, _ = s.iters[i].First()
}
if iterKey != nil {
s.filtered = append(s.filtered, s.iters[i])
}
}
}

func (s *simpleLevelIter) SeekGE(key []byte, flags base.SeekGEFlags) (*base.InternalKey, []byte) {
// Find the first file that is entirely >= key. The file before that could
// contain the key we're looking for.
n := sort.Search(len(s.filtered), func(i int) bool {
iterKey, _ := s.filtered[i].SeekGE(key, flags)
if iterKey == nil {
return false
}
return s.cmp(key, iterKey.UserKey) <= 0
})
if n > 0 {
s.currentIdx = n - 1
} else {
s.currentIdx = n
}
if s.currentIdx < len(s.filtered) {
if iterKey, val := s.filtered[s.currentIdx].SeekGE(key, flags); iterKey != nil {
return iterKey, val
}
s.currentIdx++
}
return s.skipEmptyFileForward(key, flags)
}

func (s *simpleLevelIter) skipEmptyFileForward(
seekKey []byte, flags base.SeekGEFlags,
) (*base.InternalKey, []byte) {
var iterKey *base.InternalKey
var val []byte
for s.currentIdx >= 0 && s.currentIdx < len(s.filtered) {
if seekKey != nil {
iterKey, val = s.filtered[s.currentIdx].SeekGE(seekKey, flags)
} else if s.opts.LowerBound != nil {
iterKey, val = s.filtered[s.currentIdx].SeekGE(s.opts.LowerBound, flags)
} else {
iterKey, val = s.filtered[s.currentIdx].First()
}
if iterKey != nil {
return iterKey, val
}
s.currentIdx++
}
return nil, nil
}

func (s *simpleLevelIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, []byte) {
panic("unimplemented")
}

func (s *simpleLevelIter) SeekLT(key []byte, flags base.SeekLTFlags) (*base.InternalKey, []byte) {
panic("unimplemented")
}

func (s *simpleLevelIter) First() (*base.InternalKey, []byte) {
s.currentIdx = 0
return s.skipEmptyFileForward(nil /* seekKey */, base.SeekGEFlagsNone)
}

func (s *simpleLevelIter) Last() (*base.InternalKey, []byte) {
panic("unimplemented")
}

func (s *simpleLevelIter) Next() (*base.InternalKey, []byte) {
if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) {
return nil, nil
}
if iterKey, val := s.filtered[s.currentIdx].Next(); iterKey != nil {
return iterKey, val
}
s.currentIdx++
return s.skipEmptyFileForward(nil /* seekKey */, base.SeekGEFlagsNone)
}

func (s *simpleLevelIter) Prev() (*base.InternalKey, []byte) {
panic("unimplemented")
}

func (s *simpleLevelIter) Error() error {
return nil
}

func (s *simpleLevelIter) Close() error {
var err error
for i := range s.iters {
err = firstError(err, s.iters[i].Close())
}
return err
}

func (s *simpleLevelIter) SetBounds(lower, upper []byte) {
s.currentIdx = -1
s.boundsBuf = s.boundsBuf[:0]
if lower != nil {
s.boundsBuf = append(s.boundsBuf, lower...)
s.opts.LowerBound = s.boundsBuf[:len(lower)]
} else {
s.opts.LowerBound = nil
}
if upper != nil {
startN := len(s.boundsBuf)
s.boundsBuf = append(s.boundsBuf, upper...)
s.opts.UpperBound = s.boundsBuf[startN:]
} else {
s.opts.UpperBound = nil
}
for i := range s.iters {
s.iters[i].SetBounds(lower, upper)
}
s.resetFilteredIters()
}

func (s *simpleLevelIter) String() string {
if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) {
return "simpleLevelIter: current=<nil>"
}
return fmt.Sprintf("simpleLevelIter: current=%s", s.filtered[s.currentIdx])
}

var _ internalIterator = &simpleLevelIter{}
Loading

0 comments on commit a370e71

Please sign in to comment.