Skip to content

Commit

Permalink
db: support SetOptions on iterators created through NewExternalIter
Browse files Browse the repository at this point in the history
Allow iterators constructed through NewExternalIter to be reconfigured using
SetOptions. This required some restructuring of the external iterator
initialization code. Also, document and enforce the limitations around
unsupported IterOptions for external iterators.
  • Loading branch information
jbowens committed Jun 13, 2022
1 parent f13de49 commit 9a8e474
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 66 deletions.
139 changes: 76 additions & 63 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package pebble

import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
Expand All @@ -19,23 +20,30 @@ import (
// contained within a file at a higher index.
//
// Input sstables must only contain keys with the zero sequence number.
//
// Iterators constructed through NewExternalIter do not support all iterator
// options, including block-property and table filters. NewExternalIter errors
// if an incompatible option is set.
func NewExternalIter(
o *Options,
iterOpts *IterOptions,
files []sstable.ReadableFile,
extraReaderOpts ...sstable.ReaderOption,
) (it *Iterator, err error) {
if iterOpts != nil {
if err := validateExternalIterOpts(iterOpts); err != nil {
return nil, err
}
}

var readers []*sstable.Reader

// Ensure we close all the opened readers if we error out.
closeReaders := func() {
for i := range readers {
_ = readers[i].Close()
}
}
defer func() {
if err != nil {
closeReaders()
for i := range readers {
_ = readers[i].Close()
}
}
}()
readers, err = openExternalTables(o, files, o.MakeReaderOptions(), extraReaderOpts...)
Expand All @@ -56,6 +64,9 @@ func NewExternalIter(
prefixOrFullSeekKey: buf.prefixOrFullSeekKey,
boundsBuf: buf.boundsBuf,
batch: nil,
// Add the readers to the Iterator so that Close closes them, and
// SetOptions can re-construct iterators from them.
externalReaders: readers,
newIters: func(f *manifest.FileMetadata, opts *IterOptions, bytesIterated *uint64) (internalIterator, keyspan.FragmentIterator, error) {
// NB: External iterators are currently constructed without any
// `levelIters`. newIters should never be called. When we support
Expand All @@ -71,84 +82,86 @@ func NewExternalIter(
dbi.opts = *iterOpts
dbi.saveBounds(iterOpts.LowerBound, iterOpts.UpperBound)
}
finishInitializingExternal(dbi)
return dbi, nil
}

func validateExternalIterOpts(iterOpts *IterOptions) error {
switch {
case iterOpts.TableFilter != nil:
return errors.Errorf("pebble: external iterator: TableFilter unsupported")
case iterOpts.PointKeyFilters != nil:
return errors.Errorf("pebble: external iterator: PointKeyFilters unsupported")
case iterOpts.RangeKeyFilters != nil:
return errors.Errorf("pebble: external iterator: RangeKeyFilters unsupported")
case iterOpts.OnlyReadGuaranteedDurable:
return errors.Errorf("pebble: external iterator: OnlyReadGuaranteedDurable unsupported")
case iterOpts.UseL6Filters:
return errors.Errorf("pebble: external iterator: UseL6Filters unsupported")
}
return nil
}

func finishInitializingExternal(it *Iterator) {
// TODO(jackson): In some instances we could generate fewer levels by using
// L0Sublevels code to organize nonoverlapping files into the same level.
// This would allow us to use levelIters and keep a smaller set of data and
// files in-memory. However, it would also require us to identify the bounds
// of all the files upfront.

// Ensure we close all iters if error out early.
mlevels := buf.mlevels[:0]
var rangeKeyIters []keyspan.FragmentIterator
defer func() {
if err != nil {
for i := range rangeKeyIters {
_ = rangeKeyIters[i].Close()
}
for i := range mlevels {
if mlevels[i].iter != nil {
_ = mlevels[i].iter.Close()
}
if mlevels[i].rangeDelIter != nil {
_ = mlevels[i].rangeDelIter.Close()
}
}
mlevels := it.alloc.mlevels[:0]
if !it.opts.pointKeys() {
it.pointIter = emptyIter
} else if it.pointIter == nil {
if len(it.externalReaders) > cap(mlevels) {
mlevels = make([]mergingIterLevel, 0, len(it.externalReaders))
}
}()
if iterOpts.pointKeys() {
if len(files) > cap(mlevels) {
mlevels = make([]mergingIterLevel, 0, len(files))
}
for _, r := range readers {
pointIter, err := r.NewIter(dbi.opts.LowerBound, dbi.opts.UpperBound)
if err != nil {
return nil, err
for _, r := range it.externalReaders {
var (
rangeDelIter keyspan.FragmentIterator
pointIter internalIterator
err error
)
pointIter, err = r.NewIter(it.opts.LowerBound, it.opts.UpperBound)
if err == nil {
rangeDelIter, err = r.NewRawRangeDelIter()
}
rangeDelIter, err := r.NewRawRangeDelIter()
if err != nil {
_ = pointIter.Close()
return nil, err
pointIter = &errorIter{err: err}
rangeDelIter = &errorKeyspanIter{err: err}
}
mlevels = append(mlevels, mergingIterLevel{
iter: base.WrapIterWithStats(pointIter),
rangeDelIter: rangeDelIter,
})
}
it.alloc.merging.init(&it.opts, it.cmp, it.split, mlevels...)
it.alloc.merging.snapshot = base.InternalKeySeqNumMax
it.alloc.merging.elideRangeTombstones = true
it.pointIter = &it.alloc.merging
}
buf.merging.init(&dbi.opts, dbi.cmp, dbi.split, mlevels...)
buf.merging.snapshot = base.InternalKeySeqNumMax
buf.merging.elideRangeTombstones = true
dbi.pointIter = &buf.merging
dbi.iter = dbi.pointIter
it.iter = it.pointIter

if dbi.opts.rangeKeys() {
for _, r := range readers {
rki, err := r.NewRawRangeKeyIter()
if err != nil {
return nil, err
}
if rki != nil {
rangeKeyIters = append(rangeKeyIters, rki)
if it.opts.rangeKeys() {
if it.rangeKey == nil {
it.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
it.rangeKey.init(it.cmp, it.split, &it.opts)
it.rangeKey.rangeKeyIter = it.rangeKey.iterConfig.Init(
it.cmp,
base.InternalKeySeqNumMax,
)
for _, r := range it.externalReaders {
if rki, err := r.NewRawRangeKeyIter(); err != nil {
it.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err})
} else if rki != nil {
it.rangeKey.iterConfig.AddLevel(rki)
}
}
}

dbi.rangeKey = iterRangeKeyStateAllocPool.Get().(*iteratorRangeKeyState)
dbi.rangeKey.init(o.Comparer.Compare, o.Comparer.Split, &dbi.opts)
dbi.rangeKey.rangeKeyIter = dbi.rangeKey.iterConfig.Init(
o.Comparer.Compare,
base.InternalKeySeqNumMax,
rangeKeyIters...,
)

dbi.rangeKey.iter.Init(dbi.cmp, &buf.merging, dbi.rangeKey.rangeKeyIter, dbi.rangeKey,
dbi.opts.LowerBound, dbi.opts.UpperBound)
dbi.iter = &dbi.rangeKey.iter
it.rangeKey.iter.Init(it.cmp, it.iter, it.rangeKey.rangeKeyIter, it.rangeKey,
it.opts.LowerBound, it.opts.UpperBound)
it.iter = &it.rangeKey.iter
}

// Close all the opened sstable.Readers when the Iterator is closed.
dbi.closeHook = closeReaders
return dbi, nil
}

func openExternalTables(
Expand Down
20 changes: 17 additions & 3 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -184,7 +185,7 @@ type Iterator struct {
prefixOrFullSeekKey []byte
readSampling readSampling
stats IteratorStats
closeHook func()
externalReaders []*sstable.Reader

// Following fields used when constructing an iterator stack, eg, in Clone
// and SetOptions or when re-fragmenting a batch's range keys/range dels.
Expand Down Expand Up @@ -1763,8 +1764,8 @@ func (i *Iterator) Close() error {
i.readState = nil
}

if i.closeHook != nil {
i.closeHook()
for _, r := range i.externalReaders {
err = firstError(err, r.Close())
}

// Close the closer for the current value if one was open.
Expand Down Expand Up @@ -1896,6 +1897,12 @@ func (i *Iterator) saveBounds(lower, upper []byte) {
//
// If only lower and upper bounds need to be modified, prefer SetBounds.
func (i *Iterator) SetOptions(o *IterOptions) {
if i.externalReaders != nil {
if err := validateExternalIterOpts(o); err != nil {
panic(err)
}
}

// Ensure that the Iterator appears exhausted, regardless of whether we
// actually have to invalidate the internal iterator. Optimizations that
// avoid exhaustion are an internal implementation detail that shouldn't
Expand Down Expand Up @@ -2023,6 +2030,13 @@ func (i *Iterator) SetOptions(o *IterOptions) {
// Even though this is not a positioning operation, the invalidation of the
// iterator stack means we cannot optimize Seeks by using Next.
i.invalidate()

// Iterators created through NewExternalIter have a different iterator
// initialization process.
if i.externalReaders != nil {
finishInitializingExternal(i)
return
}
finishInitializingIter(i.alloc)
}

Expand Down
17 changes: 17 additions & 0 deletions testdata/external_iterator
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,20 @@ next
----
a: (., [a-k) @5=foo, @4=bar, @1=bax)
a@4: (v, [a-k) @5=foo, @4=bar, @1=bax)

# Test mutating the external iterator's options through SetOptions.

iter files=(points, ag, ek)
set-options key-types=point
first
next
set-options lower=e upper=p
first
next
----
.
a@4:v
c@2:v
.
e@5:v
k@3:v

0 comments on commit 9a8e474

Please sign in to comment.