Skip to content

Commit

Permalink
iterator: require prefix filtering in merging iterator
Browse files Browse the repository at this point in the history
Currently for `SeekPrefixGE` calls we filter out keys without a matching
prefix at the top layer iterator. This causes the merging iterator to
build a larger heap containing entries without the required prefixes.
This commit shifts the prefix filtering responsibility to the merging
iterator before inserting keys in the heap.

Fix #2182.
  • Loading branch information
CheranMahalingam committed Mar 5, 2024
1 parent f1528d7 commit 507b74f
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 117 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,7 +1483,7 @@ func (i *Iterator) constructPointIter(
buf.merging.snapshot = i.seqNum
buf.merging.batchSnapshot = i.batchSeqNum
buf.merging.combinedIterState = &i.lazyCombinedIter.combinedIterState
i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging)
i.pointIter = invalidating.MaybeWrapIfInvariants(&buf.merging).(topLevelIterator)
i.merging = &buf.merging
}

Expand Down
6 changes: 6 additions & 0 deletions error_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func (c *errorIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba

func (c *errorIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return c.SeekPrefixGEStrict(prefix, key, flags)
}

func (c *errorIter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return nil, base.LazyValue{}
}
Expand Down
11 changes: 1 addition & 10 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func validateExternalIterOpts(iterOpts *IterOptions) error {
return nil
}

func createExternalPointIter(ctx context.Context, it *Iterator) (internalIterator, error) {
func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterator, error) {
// TODO(jackson): In some instances we could generate fewer levels by using
// L0Sublevels code to organize nonoverlapping files into the same level.
// This would allow us to use levelIters and keep a smaller set of data and
Expand Down Expand Up @@ -236,15 +236,6 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (internalIterato
})
}
}
if len(mlevels) == 1 && mlevels[0].rangeDelIter == nil {
// Set closePointIterOnce to true. This is because we're bypassing the
// merging iter, which turns Close()s on it idempotent for any child
// iterators. The outer Iterator could call Close() on a point iter twice,
// which sstable iterators do not support (as they release themselves to
// a pool).
it.closePointIterOnce = true
return mlevels[0].iter, nil
}

it.alloc.merging.init(&it.opts, &it.stats.InternalStats, it.comparer.Compare, it.comparer.Split, mlevels...)
it.alloc.merging.snapshot = base.InternalKeySeqNumMax
Expand Down
6 changes: 6 additions & 0 deletions get_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func (g *getIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, base

func (g *getIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return g.SeekPrefixGEStrict(prefix, key, flags)
}

func (g *getIter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
panic("pebble: SeekPrefixGE unimplemented")
}
Expand Down
2 changes: 2 additions & 0 deletions internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type InternalKey = base.InternalKey

type internalIterator = base.InternalIterator

type topLevelIterator = base.TopLevelIterator

// ErrCorruption is a marker to indicate that data in a file (WAL, MANIFEST,
// sstable) isn't in the expected format.
var ErrCorruption = base.ErrCorruption
Expand Down
10 changes: 10 additions & 0 deletions internal/base/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ type InternalIterator interface {
fmt.Stringer
}

// TopLevelIterator extends InternalIterator to include an additional absolute
// positioning method, SeekPrefixGEStrict.
type TopLevelIterator interface {
InternalIterator

// SeekPrefixGEStrict extends InternalIterator.SeekPrefixGE with a guarantee
// that the iterator only returns keys matching the prefix.
SeekPrefixGEStrict(prefix, key []byte, flags SeekGEFlags) (*InternalKey, LazyValue)
}

// SeekGEFlags holds flags that may configure the behavior of a forward seek.
// Not all flags are relevant to all iterators.
type SeekGEFlags uint8
Expand Down
8 changes: 7 additions & 1 deletion internal/invalidating/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func IgnoreKinds(kinds ...base.InternalKeyKind) Option {

// NewIter constructs a new invalidating iterator that wraps the provided
// iterator, trashing buffers for previously returned keys.
func NewIter(originalIterator base.InternalIterator, opts ...Option) base.InternalIterator {
func NewIter(originalIterator base.InternalIterator, opts ...Option) base.TopLevelIterator {
i := &iter{iter: originalIterator}
for _, opt := range opts {
opt.apply(i)
Expand Down Expand Up @@ -120,6 +120,12 @@ func (i *iter) SeekPrefixGE(
return i.update(i.iter.SeekPrefixGE(prefix, key, flags))
}

func (i *iter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return i.update(i.iter.SeekPrefixGE(prefix, key, flags))
}

func (i *iter) SeekLT(key []byte, flags base.SeekLTFlags) (*base.InternalKey, base.LazyValue) {
return i.update(i.iter.SeekLT(key, flags))
}
Expand Down
17 changes: 8 additions & 9 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type Iterator struct {
merge Merge
comparer base.Comparer
iter internalIterator
pointIter internalIterator
pointIter topLevelIterator
// Either readState or version is set, but not both.
readState *readState
version *version
Expand Down Expand Up @@ -527,13 +527,6 @@ func (i *Iterator) findNextEntry(limit []byte) {
}

for i.iterKey != nil {
key := *i.iterKey

if i.hasPrefix {
if n := i.split(key.UserKey); !i.equal(i.prefixOrFullSeekKey, key.UserKey[:n]) {
return
}
}
// Compare with limit every time we start at a different user key.
// Note that given the best-effort contract of limit, we could avoid a
// comparison in the common case by doing this only after
Expand All @@ -547,6 +540,7 @@ func (i *Iterator) findNextEntry(limit []byte) {
return
}

key := *i.iterKey
// If the user has configured a SkipPoint function, invoke it to see
// whether we should skip over the current user key.
if i.opts.SkipPoint != nil && key.Kind() != InternalKeyKindRangeKeySet && i.opts.SkipPoint(i.iterKey.UserKey) {
Expand All @@ -564,6 +558,11 @@ func (i *Iterator) findNextEntry(limit []byte) {

switch key.Kind() {
case InternalKeyKindRangeKeySet:
if i.hasPrefix {
if n := i.split(key.UserKey); !i.equal(i.prefixOrFullSeekKey, key.UserKey[:n]) {
return
}
}
// Save the current key.
i.keyBuf = append(i.keyBuf[:0], key.UserKey...)
i.key = i.keyBuf
Expand Down Expand Up @@ -1865,7 +1864,7 @@ func (i *Iterator) nextWithLimit(limit []byte) IterValidityState {
i.iterValidityState = IterExhausted
return i.iterValidityState
} else if i.iterValidityState == IterExhausted {
// No-op, already exhasuted. We avoid executing the Next because it
// No-op, already exhausted. We avoid executing the Next because it
// can break invariants: Specifically, a file that fails the bloom
// filter test may result in its level being removed from the
// merging iterator. The level's removal can cause a lazy combined
Expand Down
3 changes: 2 additions & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,7 @@ func TestIteratorSeekOpt(t *testing.T) {
}
return iters, err
}
d.opts.Comparer.Split = func(a []byte) int { return len(a) }
return s

case "iter":
Expand All @@ -944,7 +945,7 @@ func TestIteratorSeekOpt(t *testing.T) {
}
iter, _ = snap.NewIter(nil)
iter.readSampling.forceReadSampling = true
iter.comparer.Split = func(a []byte) int { return len(a) }
iter.comparer.Split = d.opts.Comparer.Split
iter.forceEnableSeekOpt = true
iter.merging.forceEnableSeekOpt = true
}
Expand Down
108 changes: 52 additions & 56 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,31 +603,8 @@ func (m *mergingIter) switchToMaxHeap() error {
return m.initMaxHeap()
}

// maybeNextEntryWithinPrefix steps to the next entry, as long as the iteration
// prefix has not already been exceeded. If it has, it exhausts the iterator by
// resetting the heap to empty.
func (m *mergingIter) maybeNextEntryWithinPrefix(l *mergingIterLevel) error {
if s := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:s]) {
// The item at the root of the heap already exceeds the iteration
// prefix. We should not advance any more. Clear the heap to reflect
// that the iterator is now exhausted (within this prefix, at
// least).
m.heap.items = m.heap.items[:0]
return nil
}
return m.nextEntry(l, nil /* succKey */)
}

// nextEntry unconditionally steps to the next entry. item is the current top
// item in the heap.
//
// nextEntry should be called directly when not in prefix-iteration mode, or by
// Next. During prefix iteration mode, all other callers should use
// maybeNextEntryWithinPrefix which will avoid advancing the iterator if the
// current iteration prefix has been exhausted. See the comment within
// nextEntry's body for an explanation of why other callers should call
// maybeNextEntryWithinPrefix, which will ensure the documented invariant is
// preserved.
func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
// INVARIANT: If in prefix iteration mode, item.iterKey must have a prefix equal
// to m.prefix. This invariant is important for ensuring TrySeekUsingNext
Expand Down Expand Up @@ -665,20 +642,26 @@ func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
l.iterKey, l.iterValue = l.iter.NextPrefix(succKey)
}

if l.iterKey != nil {
if m.heap.len() > 1 {
if l.iterKey == nil {
if err := l.iter.Error(); err != nil {
return err
}
m.heap.pop()
} else {
if m.prefix != nil && !bytes.Equal(m.prefix, l.iterKey.UserKey[:m.split(l.iterKey.UserKey)]) {
// Set keys without a matching prefix to their zero values when in prefix
// iteration mode and remove iterated level from heap.
l.iterKey = nil
l.iterValue = base.LazyValue{}
m.heap.pop()
} else if m.heap.len() > 1 {
m.heap.fix(0)
}
if l.rangeDelIter != oldRangeDelIter {
// The rangeDelIter changed which indicates that the l.iter moved to the
// next sstable. We have to update the tombstone for oldTopLevel as well.
oldTopLevel--
}
} else {
if err := l.iter.Error(); err != nil {
return err
}
m.heap.pop()
}

// The cached tombstones are only valid for the levels
Expand Down Expand Up @@ -797,13 +780,7 @@ func (m *mergingIter) isNextEntryDeleted(item *mergingIterLevel) (bool, error) {
return true, nil
}
if l.tombstone.CoversAt(m.snapshot, item.iterKey.SeqNum()) {
var err error
if m.prefix == nil {
err = m.nextEntry(item, nil /* succKey */)
} else {
err = m.maybeNextEntryWithinPrefix(item)
}
if err != nil {
if err := m.nextEntry(item, nil /* succKey */); err != nil {
return false, err
}
return true, nil
Expand All @@ -830,11 +807,7 @@ func (m *mergingIter) findNextEntry() (*InternalKey, base.LazyValue) {
// keep sstables open until we've surpassed their end boundaries so that
// their range deletions are visible.
if m.levels[item.index].isIgnorableBoundaryKey {
if m.prefix == nil {
m.err = m.nextEntry(item, nil /* succKey */)
} else {
m.err = m.maybeNextEntryWithinPrefix(item)
}
m.err = m.nextEntry(item, nil /* succKey */)
if m.err != nil {
return nil, base.LazyValue{}
}
Expand All @@ -855,11 +828,7 @@ func (m *mergingIter) findNextEntry() (*InternalKey, base.LazyValue) {

// Check if the key is visible at the iterator sequence numbers.
if !item.iterKey.Visible(m.snapshot, m.batchSnapshot) {
if m.prefix == nil {
m.err = m.nextEntry(item, nil /* succKey */)
} else {
m.err = m.maybeNextEntryWithinPrefix(item)
}
m.err = m.nextEntry(item, nil /* succKey */)
if m.err != nil {
return nil, base.LazyValue{}
}
Expand Down Expand Up @@ -1064,6 +1033,14 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro
l := &m.levels[level]
if m.prefix != nil {
l.iterKey, l.iterValue = l.iter.SeekPrefixGE(m.prefix, key, flags)
if l.iterKey != nil {
if n := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:n]) {
// Prevent keys without a matching prefix from being added to the heap by setting
// iterKey and iterValue to their zero values before calling initMinHeap.
l.iterKey = nil
l.iterValue = base.LazyValue{}
}
}
} else {
l.iterKey, l.iterValue = l.iter.SeekGE(key, flags)
}
Expand Down Expand Up @@ -1120,18 +1097,31 @@ func (m *mergingIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey,
return m.findNextEntry()
}

// SeekPrefixGE implements base.InternalIterator.SeekPrefixGE. Note that
// SeekPrefixGE only checks the upper bound. It is up to the caller to ensure
// that key is greater than or equal to the lower bound.
// SeekPrefixGE implements base.InternalIterator.SeekPrefixGE.
func (m *mergingIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
return m.SeekPrefixGEStrict(prefix, key, flags)
}

// SeekPrefixGEStrict implements topLevelIterator.SeekPrefixGEStrict. Note that
// SeekPrefixGEStrict explicitly checks that the key has a matching prefix.
func (m *mergingIter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
m.prefix = prefix
m.err = m.seekGE(key, 0 /* start level */, flags)
if m.err != nil {
return nil, base.LazyValue{}
}
return m.findNextEntry()

iterKey, iterValue := m.findNextEntry()
if invariants.Enabled && iterKey != nil {
if n := m.split(iterKey.UserKey); !bytes.Equal(m.prefix, iterKey.UserKey[:n]) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %s\n", iterKey, m.prefix)
}
}
return iterKey, iterValue
}

// Seeks levels >= level to < key. Additionally uses range tombstones to extend the seeks.
Expand Down Expand Up @@ -1258,14 +1248,20 @@ func (m *mergingIter) Next() (*InternalKey, base.LazyValue) {
}

// NB: It's okay to call nextEntry directly even during prefix iteration
// mode (as opposed to indirectly through maybeNextEntryWithinPrefix).
// During prefix iteration mode, we rely on the caller to not call Next if
// the iterator has already advanced beyond the iteration prefix. See the
// comment above the base.InternalIterator interface.
// mode. During prefix iteration mode, we rely on the caller to not call
// Next if the iterator has already advanced beyond the iteration prefix.
// See the comment above the base.InternalIterator interface.
if m.err = m.nextEntry(m.heap.items[0], nil /* succKey */); m.err != nil {
return nil, base.LazyValue{}
}
return m.findNextEntry()

iterKey, iterValue := m.findNextEntry()
if invariants.Enabled && m.prefix != nil && iterKey != nil {
if n := m.split(iterKey.UserKey); !bytes.Equal(m.prefix, iterKey.UserKey[:n]) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %s\n", iterKey, m.prefix)
}
}
return iterKey, iterValue
}

func (m *mergingIter) NextPrefix(succKey []byte) (*InternalKey, LazyValue) {
Expand Down
23 changes: 23 additions & 0 deletions testdata/iter_histories/range_keys_simple
Original file line number Diff line number Diff line change
Expand Up @@ -486,3 +486,26 @@ b: (., [b-d) @2=foo UPDATED)
b: (., [b-d) @2=foo UPDATED)
b: (., [b-d) @2=foo)
.

# Test Next-ing while in prefix iteration mode when using the keyspan iterator prefix
# seek optimization (after a SeekGE(a) the keyspan iterator can avoid reseeking for
# a SeekPrefixGE if the prefix falls within the previous span's bounds).

reset
----

batch commit
set a a
range-key-set a b @1 foo
range-key-set b d @2 bar
----
committed 3 keys

combined-iter
seek-ge a
seek-prefix-ge a
next
----
a: (a, [a-b) @1=foo UPDATED)
a: (a, [a-"a\x00") @1=foo UPDATED)
.
Loading

0 comments on commit 507b74f

Please sign in to comment.