Skip to content

Commit

Permalink
iterator: require prefix filtering in merging iterator
Browse files Browse the repository at this point in the history
Currently for `SeekPrefixGE` calls we filter out keys without a matching
prefix at the top layer iterator. This causes the merging iterator to
build a larger heap containing entries without the required prefixes.
This commit shifts the prefix filtering responsibility to the merging
iterator before inserting keys in the heap.

Fix #2182.
  • Loading branch information
CheranMahalingam committed Mar 6, 2024
1 parent c779525 commit 1f7e8ee
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 124 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1486,7 +1486,7 @@ func (i *Iterator) constructPointIter(
buf.merging.snapshot = i.seqNum
buf.merging.batchSnapshot = i.batchSeqNum
buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging)
i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging).(topLevelIterator)
i.merging = &buf.merging
}

Expand Down
6 changes: 6 additions & 0 deletions error_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func (c *errorIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba

func (c *errorIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return c.SeekPrefixGEStrict(prefix, key, flags)
}

func (c *errorIter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return nil, base.LazyValue{}
}
Expand Down
11 changes: 1 addition & 10 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func validateExternalIterOpts(iterOpts *IterOptions) error {
return nil
}

func createExternalPointIter(ctx context.Context, it *Iterator) (internalIterator, error) {
func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterator, error) {
// TODO(jackson): In some instances we could generate fewer levels by using
// L0Sublevels code to organize nonoverlapping files into the same level.
// This would allow us to use levelIters and keep a smaller set of data and
Expand Down Expand Up @@ -236,15 +236,6 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (internalIterato
})
}
}
if len(mlevels) == 1 && mlevels[0].rangeDelIter == nil {
// Set closePointIterOnce to true. This is because we're bypassing the
// merging iter, which turns Close()s on it idempotent for any child
// iterators. The outer Iterator could call Close() on a point iter twice,
// which sstable iterators do not support (as they release themselves to
// a pool).
it.closePointIterOnce = true
return mlevels[0].iter, nil
}

it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
it.alloc.merging.snapshot = base.InternalKeySeqNumMax
Expand Down
6 changes: 6 additions & 0 deletions get_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func (g *getIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, base

func (g *getIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return g.SeekPrefixGEStrict(prefix, key, flags)
}

func (g *getIter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
panic("pebble: SeekPrefixGE unimplemented")
}
Expand Down
2 changes: 2 additions & 0 deletions internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type InternalKey = base.InternalKey

type internalIterator = base.InternalIterator

type topLevelIterator = base.TopLevelIterator

// ErrCorruption is a marker to indicate that data in a file (WAL, MANIFEST,
// sstable) isn't in the expected format.
var ErrCorruption = base.ErrCorruption
Expand Down
10 changes: 10 additions & 0 deletions internal/base/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ type InternalIterator interface {
fmt.Stringer
}

// TopLevelIterator extends InternalIterator to include an additional absolute
// positioning method, SeekPrefixGEStrict.
type TopLevelIterator interface {
InternalIterator

// SeekPrefixGEStrict extends InternalIterator.SeekPrefixGE with a guarantee
// that the iterator only returns keys matching the prefix.
SeekPrefixGEStrict(prefix, key []byte, flags SeekGEFlags) (*InternalKey, LazyValue)
}

// SeekGEFlags holds flags that may configure the behavior of a forward seek.
// Not all flags are relevant to all iterators.
type SeekGEFlags uint8
Expand Down
8 changes: 7 additions & 1 deletion internal/invalidating/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func IgnoreKinds(kinds ...base.InternalKeyKind) Option {

// NewIter constructs a new invalidating iterator that wraps the provided
// iterator, trashing buffers for previously returned keys.
func NewIter(originalIterator base.InternalIterator, opts ...Option) base.InternalIterator {
func NewIter(originalIterator base.InternalIterator, opts ...Option) base.TopLevelIterator {
i := &iter{iter: originalIterator}
for _, opt := range opts {
opt.apply(i)
Expand Down Expand Up @@ -120,6 +120,12 @@ func (i *iter) SeekPrefixGE(
return i.update(i.iter.SeekPrefixGE(prefix, key, flags))
}

func (i *iter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return i.update(i.iter.SeekPrefixGE(prefix, key, flags))
}

func (i *iter) SeekLT(key []byte, flags base.SeekLTFlags) (*base.InternalKey, base.LazyValue) {
return i.update(i.iter.SeekLT(key, flags))
}
Expand Down
35 changes: 19 additions & 16 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type Iterator struct {
merge Merge
comparer base.Comparer
iter internalIterator
pointIter internalIterator
pointIter topLevelIterator
// Either readState or version is set, but not both.
readState *readState
version *version
Expand Down Expand Up @@ -302,16 +302,6 @@ type Iterator struct {
batchJustRefreshed bool
// batchOnlyIter is set to true for Batch.NewBatchOnlyIter.
batchOnlyIter bool
// closePointIterOnce is set to true if this point iter can only be Close()d
// once, _and_ closing i.iter and then i.pointIter would close i.pointIter
// twice. This is necessary to track if the point iter is an internal iterator
// that could release its resources to a pool on Close(), making it harder for
// that iterator to make its own closes idempotent.
//
// TODO(bilal): Update SetOptions to always close out point key iterators when
// they won't be used, so that Close() doesn't need to default to closing
// point iterators twice.
closePointIterOnce bool
// Used in some tests to disable the random disabling of seek optimizations.
forceEnableSeekOpt bool
// Set to true if NextPrefix is not currently permitted. Defaults to false
Expand Down Expand Up @@ -529,11 +519,19 @@ func (i *Iterator) findNextEntry(limit []byte) {
for i.iterKey != nil {
key := *i.iterKey

if i.hasPrefix {
if n := i.split(key.UserKey); !i.equal(i.prefixOrFullSeekKey, key.UserKey[:n]) {
return
// The topLevelIterator.StrictSeekPrefixGE contract requires that in
// prefix mode [i.hasPrefix=t], every point key returned by the internal
// iterator must have the current iteration prefix.
if invariants.Enabled && i.hasPrefix {
// Range keys are an exception to the contract and may return a different
// prefix. This case is explicitly handled in the switch statement below.
if key.Kind() != base.InternalKeyKindRangeKeySet {
if n := i.split(key.UserKey); !i.equal(i.prefixOrFullSeekKey, key.UserKey[:n]) {
i.opts.logger.Fatalf("pebble: prefix violation: key %q does not have prefix %q\n", key.UserKey, i.prefixOrFullSeekKey)
}
}
}

// Compare with limit every time we start at a different user key.
// Note that given the best-effort contract of limit, we could avoid a
// comparison in the common case by doing this only after
Expand Down Expand Up @@ -564,6 +562,11 @@ func (i *Iterator) findNextEntry(limit []byte) {

switch key.Kind() {
case InternalKeyKindRangeKeySet:
if i.hasPrefix {
if n := i.split(key.UserKey); !i.equal(i.prefixOrFullSeekKey, key.UserKey[:n]) {
return
}
}
// Save the current key.
i.keyBuf = append(i.keyBuf[:0], key.UserKey...)
i.key = i.keyBuf
Expand Down Expand Up @@ -1865,7 +1868,7 @@ func (i *Iterator) nextWithLimit(limit []byte) IterValidityState {
i.iterValidityState = IterExhausted
return i.iterValidityState
} else if i.iterValidityState == IterExhausted {
// No-op, already exhasuted. We avoid executing the Next because it
// No-op, already exhausted. We avoid executing the Next because it
// can break invariants: Specifically, a file that fails the bloom
// filter test may result in its level being removed from the
// merging iterator. The level's removal can cause a lazy combined
Expand Down Expand Up @@ -2304,7 +2307,7 @@ func (i *Iterator) Close() error {
// NB: If the iterators were still connected to i.iter, they may be
// closed, but calling Close on a closed internal iterator or fragment
// iterator is allowed.
if i.pointIter != nil && !i.closePointIterOnce {
if i.pointIter != nil {
i.err = firstError(i.err, i.pointIter.Close())
}
if i.rangeKey != nil && i.rangeKey.rangeKeyIter != nil {
Expand Down
3 changes: 2 additions & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,7 @@ func TestIteratorSeekOpt(t *testing.T) {
}
return iters, err
}
d.opts.Comparer.Split = func(a []byte) int { return len(a) }
return s

case "iter":
Expand All @@ -944,7 +945,7 @@ func TestIteratorSeekOpt(t *testing.T) {
}
iter, _ = snap.NewIter(nil)
iter.readSampling.forceReadSampling = true
iter.comparer.Split = func(a []byte) int { return len(a) }
iter.comparer.Split = d.opts.Comparer.Split
iter.forceEnableSeekOpt = true
iter.merging.forceEnableSeekOpt = true
}
Expand Down
108 changes: 52 additions & 56 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,31 +603,8 @@ func (m *mergingIter) switchToMaxHeap() error {
return m.initMaxHeap()
}

// maybeNextEntryWithinPrefix steps to the next entry, as long as the iteration
// prefix has not already been exceeded. If it has, it exhausts the iterator by
// resetting the heap to empty.
func (m *mergingIter) maybeNextEntryWithinPrefix(l *mergingIterLevel) error {
if s := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:s]) {
// The item at the root of the heap already exceeds the iteration
// prefix. We should not advance any more. Clear the heap to reflect
// that the iterator is now exhausted (within this prefix, at
// least).
m.heap.items = m.heap.items[:0]
return nil
}
return m.nextEntry(l, nil /* succKey */)
}

// nextEntry unconditionally steps to the next entry. item is the current top
// item in the heap.
//
// nextEntry should be called directly when not in prefix-iteration mode, or by
// Next. During prefix iteration mode, all other callers should use
// maybeNextEntryWithinPrefix which will avoid advancing the iterator if the
// current iteration prefix has been exhausted. See the comment within
// nextEntry's body for an explanation of why other callers should call
// maybeNextEntryWithinPrefix, which will ensure the documented invariant is
// preserved.
func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
// INVARIANT: If in prefix iteration mode, item.iterKey must have a prefix equal
// to m.prefix. This invariant is important for ensuring TrySeekUsingNext
Expand Down Expand Up @@ -665,20 +642,26 @@ func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
l.iterKey, l.iterValue = l.iter.NextPrefix(succKey)
}

if l.iterKey != nil {
if m.heap.len() > 1 {
if l.iterKey == nil {
if err := l.iter.Error(); err != nil {
return err
}
m.heap.pop()
} else {
if m.prefix != nil && !bytes.Equal(m.prefix, l.iterKey.UserKey[:m.split(l.iterKey.UserKey)]) {
// Set keys without a matching prefix to their zero values when in prefix
// iteration mode and remove iterated level from heap.
l.iterKey = nil
l.iterValue = base.LazyValue{}
m.heap.pop()
} else if m.heap.len() > 1 {
m.heap.fix(0)
}
if l.rangeDelIter != oldRangeDelIter {
// The rangeDelIter changed which indicates that the l.iter moved to the
// next sstable. We have to update the tombstone for oldTopLevel as well.
oldTopLevel--
}
} else {
if err := l.iter.Error(); err != nil {
return err
}
m.heap.pop()
}

// The cached tombstones are only valid for the levels
Expand Down Expand Up @@ -797,13 +780,7 @@ func (m *mergingIter) isNextEntryDeleted(item *mergingIterLevel) (bool, error) {
return true, nil
}
if l.tombstone.CoversAt(m.snapshot, item.iterKey.SeqNum()) {
var err error
if m.prefix == nil {
err = m.nextEntry(item, nil /* succKey */)
} else {
err = m.maybeNextEntryWithinPrefix(item)
}
if err != nil {
if err := m.nextEntry(item, nil /* succKey */); err != nil {
return false, err
}
return true, nil
Expand All @@ -830,11 +807,7 @@ func (m *mergingIter) findNextEntry() (*InternalKey, base.LazyValue) {
// keep sstables open until we've surpassed their end boundaries so that
// their range deletions are visible.
if m.levels[item.index].isIgnorableBoundaryKey {
if m.prefix == nil {
m.err = m.nextEntry(item, nil /* succKey */)
} else {
m.err = m.maybeNextEntryWithinPrefix(item)
}
m.err = m.nextEntry(item, nil /* succKey */)
if m.err != nil {
return nil, base.LazyValue{}
}
Expand All @@ -855,11 +828,7 @@ func (m *mergingIter) findNextEntry() (*InternalKey, base.LazyValue) {

// Check if the key is visible at the iterator sequence numbers.
if !item.iterKey.Visible(m.snapshot, m.batchSnapshot) {
if m.prefix == nil {
m.err = m.nextEntry(item, nil /* succKey */)
} else {
m.err = m.maybeNextEntryWithinPrefix(item)
}
m.err = m.nextEntry(item, nil /* succKey */)
if m.err != nil {
return nil, base.LazyValue{}
}
Expand Down Expand Up @@ -1064,6 +1033,14 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro
l := &m.levels[level]
if m.prefix != nil {
l.iterKey, l.iterValue = l.iter.SeekPrefixGE(m.prefix, key, flags)
if l.iterKey != nil {
if n := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:n]) {
// Prevent keys without a matching prefix from being added to the heap by setting
// iterKey and iterValue to their zero values before calling initMinHeap.
l.iterKey = nil
l.iterValue = base.LazyValue{}
}
}
} else {
l.iterKey, l.iterValue = l.iter.SeekGE(key, flags)
}
Expand Down Expand Up @@ -1120,18 +1097,31 @@ func (m *mergingIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey,
return m.findNextEntry()
}

// SeekPrefixGE implements base.InternalIterator.SeekPrefixGE. Note that
// SeekPrefixGE only checks the upper bound. It is up to the caller to ensure
// that key is greater than or equal to the lower bound.
// SeekPrefixGE implements base.InternalIterator.SeekPrefixGE.
func (m *mergingIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return m.SeekPrefixGEStrict(prefix, key, flags)
}

// SeekPrefixGEStrict implements topLevelIterator.SeekPrefixGEStrict. Note that
// SeekPrefixGEStrict explicitly checks that the key has a matching prefix.
func (m *mergingIter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
m.prefix = prefix
m.err = m.seekGE(key, 0 /* start level */, flags)
if m.err != nil {
return nil, base.LazyValue{}
}
return m.findNextEntry()

iterKey, iterValue := m.findNextEntry()
if invariants.Enabled && iterKey != nil {
if n := m.split(iterKey.UserKey); !bytes.Equal(m.prefix, iterKey.UserKey[:n]) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKey, m.prefix)
}
}
return iterKey, iterValue
}

// Seeks levels >= level to < key. Additionally uses range tombstones to extend the seeks.
Expand Down Expand Up @@ -1258,14 +1248,20 @@ func (m *mergingIter) Next() (*InternalKey, base.LazyValue) {
}

// NB: It's okay to call nextEntry directly even during prefix iteration
// mode (as opposed to indirectly through maybeNextEntryWithinPrefix).
// During prefix iteration mode, we rely on the caller to not call Next if
// the iterator has already advanced beyond the iteration prefix. See the
// comment above the base.InternalIterator interface.
// mode. During prefix iteration mode, we rely on the caller to not call
// Next if the iterator has already advanced beyond the iteration prefix.
// See the comment above the base.InternalIterator interface.
if m.err = m.nextEntry(m.heap.items[0], nil /* succKey */); m.err != nil {
return nil, base.LazyValue{}
}
return m.findNextEntry()

iterKey, iterValue := m.findNextEntry()
if invariants.Enabled && m.prefix != nil && iterKey != nil {
if n := m.split(iterKey.UserKey); !bytes.Equal(m.prefix, iterKey.UserKey[:n]) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKey, m.prefix)
}
}
return iterKey, iterValue
}

func (m *mergingIter) NextPrefix(succKey []byte) (*InternalKey, LazyValue) {
Expand Down
Loading

0 comments on commit 1f7e8ee

Please sign in to comment.