Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iterator: require prefix filtering in merging iterator #3299

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ func (i *Iterator) constructPointIter(
buf.merging.snapshot = i.seqNum
buf.merging.batchSnapshot = i.batchSeqNum
buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging)
i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging).(topLevelIterator)
i.merging = &buf.merging
}

Expand Down
6 changes: 6 additions & 0 deletions error_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func (c *errorIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba

func (c *errorIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return c.SeekPrefixGEStrict(prefix, key, flags)
}

func (c *errorIter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return nil, base.LazyValue{}
}
Expand Down
11 changes: 1 addition & 10 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func validateExternalIterOpts(iterOpts *IterOptions) error {
return nil
}

func createExternalPointIter(ctx context.Context, it *Iterator) (internalIterator, error) {
func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterator, error) {
// TODO(jackson): In some instances we could generate fewer levels by using
// L0Sublevels code to organize nonoverlapping files into the same level.
// This would allow us to use levelIters and keep a smaller set of data and
Expand Down Expand Up @@ -236,15 +236,6 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (internalIterato
})
}
}
if len(mlevels) == 1 && mlevels[0].rangeDelIter == nil {
// Set closePointIterOnce to true. This is because we're bypassing the
// merging iter, which turns Close()s on it idempotent for any child
// iterators. The outer Iterator could call Close() on a point iter twice,
// which sstable iterators do not support (as they release themselves to
// a pool).
it.closePointIterOnce = true
return mlevels[0].iter, nil
}

it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
it.alloc.merging.snapshot = base.InternalKeySeqNumMax
Expand Down
6 changes: 6 additions & 0 deletions get_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func (g *getIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, base

func (g *getIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return g.SeekPrefixGEStrict(prefix, key, flags)
}

func (g *getIter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
panic("pebble: SeekPrefixGE unimplemented")
}
Expand Down
2 changes: 2 additions & 0 deletions internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type InternalKey = base.InternalKey

type internalIterator = base.InternalIterator

type topLevelIterator = base.TopLevelIterator

// ErrCorruption is a marker to indicate that data in a file (WAL, MANIFEST,
// sstable) isn't in the expected format.
var ErrCorruption = base.ErrCorruption
Expand Down
10 changes: 10 additions & 0 deletions internal/base/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ type InternalIterator interface {
fmt.Stringer
}

// TopLevelIterator extends InternalIterator to include an additional absolute
// positioning method, SeekPrefixGEStrict.
type TopLevelIterator interface {
InternalIterator

// SeekPrefixGEStrict extends InternalIterator.SeekPrefixGE with a guarantee
// that the iterator only returns keys matching the prefix.
SeekPrefixGEStrict(prefix, key []byte, flags SeekGEFlags) (*InternalKey, LazyValue)
}

// SeekGEFlags holds flags that may configure the behavior of a forward seek.
// Not all flags are relevant to all iterators.
type SeekGEFlags uint8
Expand Down
8 changes: 7 additions & 1 deletion internal/invalidating/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func IgnoreKinds(kinds ...base.InternalKeyKind) Option {

// NewIter constructs a new invalidating iterator that wraps the provided
// iterator, trashing buffers for previously returned keys.
func NewIter(originalIterator base.InternalIterator, opts ...Option) base.InternalIterator {
func NewIter(originalIterator base.InternalIterator, opts ...Option) base.TopLevelIterator {
i := &iter{iter: originalIterator}
for _, opt := range opts {
opt.apply(i)
Expand Down Expand Up @@ -120,6 +120,12 @@ func (i *iter) SeekPrefixGE(
return i.update(i.iter.SeekPrefixGE(prefix, key, flags))
}

func (i *iter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return i.update(i.iter.SeekPrefixGE(prefix, key, flags))
}

func (i *iter) SeekLT(key []byte, flags base.SeekLTFlags) (*base.InternalKey, base.LazyValue) {
return i.update(i.iter.SeekLT(key, flags))
}
Expand Down
35 changes: 19 additions & 16 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type Iterator struct {
merge Merge
comparer base.Comparer
iter internalIterator
pointIter internalIterator
pointIter topLevelIterator
// Either readState or version is set, but not both.
readState *readState
version *version
Expand Down Expand Up @@ -302,16 +302,6 @@ type Iterator struct {
batchJustRefreshed bool
// batchOnlyIter is set to true for Batch.NewBatchOnlyIter.
batchOnlyIter bool
// closePointIterOnce is set to true if this point iter can only be Close()d
// once, _and_ closing i.iter and then i.pointIter would close i.pointIter
// twice. This is necessary to track if the point iter is an internal iterator
// that could release its resources to a pool on Close(), making it harder for
// that iterator to make its own closes idempotent.
//
// TODO(bilal): Update SetOptions to always close out point key iterators when
// they won't be used, so that Close() doesn't need to default to closing
// point iterators twice.
closePointIterOnce bool
// Used in some tests to disable the random disabling of seek optimizations.
forceEnableSeekOpt bool
// Set to true if NextPrefix is not currently permitted. Defaults to false
Expand Down Expand Up @@ -529,11 +519,19 @@ func (i *Iterator) findNextEntry(limit []byte) {
for i.iterKey != nil {
key := *i.iterKey

if i.hasPrefix {
if n := i.split(key.UserKey); !i.equal(i.prefixOrFullSeekKey, key.UserKey[:n]) {
return
// The topLevelIterator.StrictSeekPrefixGE contract requires that in
// prefix mode [i.hasPrefix=t], every point key returned by the internal
// iterator must have the current iteration prefix.
if invariants.Enabled && i.hasPrefix {
// Range keys are an exception to the contract and may return a different
// prefix. This case is explicitly handled in the switch statement below.
if key.Kind() != base.InternalKeyKindRangeKeySet {
if n := i.split(key.UserKey); !i.equal(i.prefixOrFullSeekKey, key.UserKey[:n]) {
i.opts.logger.Fatalf("pebble: prefix violation: key %q does not have prefix %q\n", key.UserKey, i.prefixOrFullSeekKey)
}
}
}

// Compare with limit every time we start at a different user key.
// Note that given the best-effort contract of limit, we could avoid a
// comparison in the common case by doing this only after
Expand Down Expand Up @@ -564,6 +562,11 @@ func (i *Iterator) findNextEntry(limit []byte) {

switch key.Kind() {
case InternalKeyKindRangeKeySet:
if i.hasPrefix {
if n := i.split(key.UserKey); !i.equal(i.prefixOrFullSeekKey, key.UserKey[:n]) {
return
}
}
// Save the current key.
i.keyBuf = append(i.keyBuf[:0], key.UserKey...)
i.key = i.keyBuf
Expand Down Expand Up @@ -1865,7 +1868,7 @@ func (i *Iterator) nextWithLimit(limit []byte) IterValidityState {
i.iterValidityState = IterExhausted
return i.iterValidityState
} else if i.iterValidityState == IterExhausted {
// No-op, already exhasuted. We avoid executing the Next because it
// No-op, already exhausted. We avoid executing the Next because it
// can break invariants: Specifically, a file that fails the bloom
// filter test may result in its level being removed from the
// merging iterator. The level's removal can cause a lazy combined
Expand Down Expand Up @@ -2304,7 +2307,7 @@ func (i *Iterator) Close() error {
// NB: If the iterators were still connected to i.iter, they may be
// closed, but calling Close on a closed internal iterator or fragment
// iterator is allowed.
if i.pointIter != nil && !i.closePointIterOnce {
if i.pointIter != nil {
i.err = firstError(i.err, i.pointIter.Close())
}
if i.rangeKey != nil && i.rangeKey.rangeKeyIter != nil {
Expand Down
3 changes: 2 additions & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,7 @@ func TestIteratorSeekOpt(t *testing.T) {
}
return iters, err
}
d.opts.Comparer.Split = func(a []byte) int { return len(a) }
return s

case "iter":
Expand All @@ -944,7 +945,7 @@ func TestIteratorSeekOpt(t *testing.T) {
}
iter, _ = snap.NewIter(nil)
iter.readSampling.forceReadSampling = true
iter.comparer.Split = func(a []byte) int { return len(a) }
iter.comparer.Split = d.opts.Comparer.Split
iter.forceEnableSeekOpt = true
iter.merging.forceEnableSeekOpt = true
}
Expand Down
108 changes: 52 additions & 56 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,31 +603,8 @@ func (m *mergingIter) switchToMaxHeap() error {
return m.initMaxHeap()
}

// maybeNextEntryWithinPrefix steps to the next entry, as long as the iteration
// prefix has not already been exceeded. If it has, it exhausts the iterator by
// resetting the heap to empty.
func (m *mergingIter) maybeNextEntryWithinPrefix(l *mergingIterLevel) error {
if s := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:s]) {
// The item at the root of the heap already exceeds the iteration
// prefix. We should not advance any more. Clear the heap to reflect
// that the iterator is now exhausted (within this prefix, at
// least).
m.heap.items = m.heap.items[:0]
return nil
}
return m.nextEntry(l, nil /* succKey */)
}

// nextEntry unconditionally steps to the next entry. item is the current top
// item in the heap.
//
// nextEntry should be called directly when not in prefix-iteration mode, or by
// Next. During prefix iteration mode, all other callers should use
// maybeNextEntryWithinPrefix which will avoid advancing the iterator if the
// current iteration prefix has been exhausted. See the comment within
// nextEntry's body for an explanation of why other callers should call
// maybeNextEntryWithinPrefix, which will ensure the documented invariant is
// preserved.
func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
// INVARIANT: If in prefix iteration mode, item.iterKey must have a prefix equal
// to m.prefix. This invariant is important for ensuring TrySeekUsingNext
Expand Down Expand Up @@ -665,20 +642,26 @@ func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
l.iterKey, l.iterValue = l.iter.NextPrefix(succKey)
}

if l.iterKey != nil {
if m.heap.len() > 1 {
if l.iterKey == nil {
if err := l.iter.Error(); err != nil {
return err
}
m.heap.pop()
} else {
if m.prefix != nil && !bytes.Equal(m.prefix, l.iterKey.UserKey[:m.split(l.iterKey.UserKey)]) {
// Set keys without a matching prefix to their zero values when in prefix
// iteration mode and remove iterated level from heap.
l.iterKey = nil
l.iterValue = base.LazyValue{}
m.heap.pop()
} else if m.heap.len() > 1 {
m.heap.fix(0)
}
if l.rangeDelIter != oldRangeDelIter {
// The rangeDelIter changed which indicates that the l.iter moved to the
// next sstable. We have to update the tombstone for oldTopLevel as well.
oldTopLevel--
}
} else {
if err := l.iter.Error(); err != nil {
return err
}
m.heap.pop()
}

// The cached tombstones are only valid for the levels
Expand Down Expand Up @@ -797,13 +780,7 @@ func (m *mergingIter) isNextEntryDeleted(item *mergingIterLevel) (bool, error) {
return true, nil
}
if l.tombstone.CoversAt(m.snapshot, item.iterKey.SeqNum()) {
var err error
if m.prefix == nil {
err = m.nextEntry(item, nil /* succKey */)
} else {
err = m.maybeNextEntryWithinPrefix(item)
}
if err != nil {
if err := m.nextEntry(item, nil /* succKey */); err != nil {
return false, err
}
return true, nil
Expand All @@ -830,11 +807,7 @@ func (m *mergingIter) findNextEntry() (*InternalKey, base.LazyValue) {
// keep sstables open until we've surpassed their end boundaries so that
// their range deletions are visible.
if m.levels[item.index].isIgnorableBoundaryKey {
if m.prefix == nil {
m.err = m.nextEntry(item, nil /* succKey */)
} else {
m.err = m.maybeNextEntryWithinPrefix(item)
}
m.err = m.nextEntry(item, nil /* succKey */)
if m.err != nil {
return nil, base.LazyValue{}
}
Expand All @@ -855,11 +828,7 @@ func (m *mergingIter) findNextEntry() (*InternalKey, base.LazyValue) {

// Check if the key is visible at the iterator sequence numbers.
if !item.iterKey.Visible(m.snapshot, m.batchSnapshot) {
if m.prefix == nil {
m.err = m.nextEntry(item, nil /* succKey */)
} else {
m.err = m.maybeNextEntryWithinPrefix(item)
}
m.err = m.nextEntry(item, nil /* succKey */)
if m.err != nil {
return nil, base.LazyValue{}
}
Expand Down Expand Up @@ -1064,6 +1033,14 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro
l := &m.levels[level]
if m.prefix != nil {
l.iterKey, l.iterValue = l.iter.SeekPrefixGE(m.prefix, key, flags)
if l.iterKey != nil {
if n := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:n]) {
// Prevent keys without a matching prefix from being added to the heap by setting
// iterKey and iterValue to their zero values before calling initMinHeap.
l.iterKey = nil
l.iterValue = base.LazyValue{}
}
}
} else {
l.iterKey, l.iterValue = l.iter.SeekGE(key, flags)
}
Expand Down Expand Up @@ -1120,18 +1097,31 @@ func (m *mergingIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey,
return m.findNextEntry()
}

// SeekPrefixGE implements base.InternalIterator.SeekPrefixGE. Note that
// SeekPrefixGE only checks the upper bound. It is up to the caller to ensure
// that key is greater than or equal to the lower bound.
// SeekPrefixGE implements base.InternalIterator.SeekPrefixGE.
func (m *mergingIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return m.SeekPrefixGEStrict(prefix, key, flags)
}

// SeekPrefixGEStrict implements topLevelIterator.SeekPrefixGEStrict. Note that
// SeekPrefixGEStrict explicitly checks that the key has a matching prefix.
func (m *mergingIter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
m.prefix = prefix
m.err = m.seekGE(key, 0 /* start level */, flags)
if m.err != nil {
return nil, base.LazyValue{}
}
return m.findNextEntry()

iterKey, iterValue := m.findNextEntry()
if invariants.Enabled && iterKey != nil {
if n := m.split(iterKey.UserKey); !bytes.Equal(m.prefix, iterKey.UserKey[:n]) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKey, m.prefix)
}
}
return iterKey, iterValue
}

// Seeks levels >= level to < key. Additionally uses range tombstones to extend the seeks.
Expand Down Expand Up @@ -1258,14 +1248,20 @@ func (m *mergingIter) Next() (*InternalKey, base.LazyValue) {
}

// NB: It's okay to call nextEntry directly even during prefix iteration
// mode (as opposed to indirectly through maybeNextEntryWithinPrefix).
// During prefix iteration mode, we rely on the caller to not call Next if
// the iterator has already advanced beyond the iteration prefix. See the
// comment above the base.InternalIterator interface.
// mode. During prefix iteration mode, we rely on the caller to not call
// Next if the iterator has already advanced beyond the iteration prefix.
// See the comment above the base.InternalIterator interface.
if m.err = m.nextEntry(m.heap.items[0], nil /* succKey */); m.err != nil {
return nil, base.LazyValue{}
}
return m.findNextEntry()

iterKey, iterValue := m.findNextEntry()
if invariants.Enabled && m.prefix != nil && iterKey != nil {
if n := m.split(iterKey.UserKey); !bytes.Equal(m.prefix, iterKey.UserKey[:n]) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKey, m.prefix)
}
}
return iterKey, iterValue
}

func (m *mergingIter) NextPrefix(succKey []byte) (*InternalKey, LazyValue) {
Expand Down
Loading
Loading