Skip to content

Commit

Permalink
iterator: require prefix filtering in merging iterator
Browse files Browse the repository at this point in the history
Currently for `SeekPrefixGE` calls we filter out keys without a matching
prefix at the top layer iterator. This causes the merging iterator to
build a larger heap containing entries without the required prefixes.
This commit shifts the prefix filtering responsibility to the merging
iterator before inserting keys in the heap.

Fix #2182.
  • Loading branch information
CheranMahalingam committed Feb 15, 2024
1 parent 38e3430 commit a97977f
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 102 deletions.
15 changes: 7 additions & 8 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,13 +526,6 @@ func (i *Iterator) findNextEntry(limit []byte) {
}

for i.iterKey != nil {
key := *i.iterKey

if i.hasPrefix {
if n := i.split(key.UserKey); !i.equal(i.prefixOrFullSeekKey, key.UserKey[:n]) {
return
}
}
// Compare with limit every time we start at a different user key.
// Note that given the best-effort contract of limit, we could avoid a
// comparison in the common case by doing this only after
Expand All @@ -546,6 +539,7 @@ func (i *Iterator) findNextEntry(limit []byte) {
return
}

key := *i.iterKey
// If the user has configured a SkipPoint function, invoke it to see
// whether we should skip over the current user key.
if i.opts.SkipPoint != nil && key.Kind() != InternalKeyKindRangeKeySet && i.opts.SkipPoint(i.iterKey.UserKey) {
Expand All @@ -563,6 +557,11 @@ func (i *Iterator) findNextEntry(limit []byte) {

switch key.Kind() {
case InternalKeyKindRangeKeySet:
if i.hasPrefix {
if n := i.split(key.UserKey); !i.equal(i.prefixOrFullSeekKey, key.UserKey[:n]) {
return
}
}
// Save the current key.
i.keyBuf = append(i.keyBuf[:0], key.UserKey...)
i.key = i.keyBuf
Expand Down Expand Up @@ -1867,7 +1866,7 @@ func (i *Iterator) nextWithLimit(limit []byte) IterValidityState {
i.iterValidityState = IterExhausted
return i.iterValidityState
} else if i.iterValidityState == IterExhausted {
// No-op, already exhasuted. We avoid executing the Next because it
// No-op, already exhausted. We avoid executing the Next because it
// can break invariants: Specifically, a file that fails the bloom
// filter test may result in its level being removed from the
// merging iterator. The level's removal can cause a lazy combined
Expand Down
3 changes: 2 additions & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,7 @@ func TestIteratorSeekOpt(t *testing.T) {
}
return iters, err
}
d.opts.Comparer.Split = func(a []byte) int { return len(a) }
return s

case "iter":
Expand All @@ -940,7 +941,7 @@ func TestIteratorSeekOpt(t *testing.T) {
}
iter, _ = snap.NewIter(nil)
iter.readSampling.forceReadSampling = true
iter.comparer.Split = func(a []byte) int { return len(a) }
iter.comparer.Split = d.opts.Comparer.Split
iter.forceEnableSeekOpt = true
iter.merging.forceEnableSeekOpt = true
}
Expand Down
93 changes: 39 additions & 54 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,31 +603,8 @@ func (m *mergingIter) switchToMaxHeap() error {
return m.initMaxHeap()
}

// maybeNextEntryWithinPrefix steps to the next entry, as long as the iteration
// prefix has not already been exceeded. If it has, it exhausts the iterator by
// resetting the heap to empty.
func (m *mergingIter) maybeNextEntryWithinPrefix(l *mergingIterLevel) error {
if s := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:s]) {
// The item at the root of the heap already exceeds the iteration
// prefix. We should not advance any more. Clear the heap to reflect
// that the iterator is now exhausted (within this prefix, at
// least).
m.heap.items = m.heap.items[:0]
return nil
}
return m.nextEntry(l, nil /* succKey */)
}

// nextEntry unconditionally steps to the next entry. item is the current top
// item in the heap.
//
// nextEntry should be called directly when not in prefix-iteration mode, or by
// Next. During prefix iteration mode, all other callers should use
// maybeNextEntryWithinPrefix which will avoid advancing the iterator if the
// current iteration prefix has been exhausted. See the comment within
// nextEntry's body for an explanation of why other callers should call
// maybeNextEntryWithinPrefix, which will ensure the documented invariant is
// preserved.
func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
// INVARIANT: If in prefix iteration mode, item.iterKey must have a prefix equal
// to m.prefix. This invariant is important for ensuring TrySeekUsingNext
Expand Down Expand Up @@ -665,20 +642,26 @@ func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
l.iterKey, l.iterValue = l.iter.NextPrefix(succKey)
}

if l.iterKey != nil {
if m.heap.len() > 1 {
if l.iterKey == nil {
if err := l.iter.Error(); err != nil {
return err
}
m.heap.pop()
} else {
if m.prefix != nil && !bytes.Equal(m.prefix, l.iterKey.UserKey[:m.split(l.iterKey.UserKey)]) {
// Set keys without a matching prefix to their zero values when in prefix
// iteration mode and remove iterated level from heap.
l.iterKey = nil
l.iterValue = base.LazyValue{}
m.heap.pop()
} else if m.heap.len() > 1 {
m.heap.fix(0)
}
if l.rangeDelIter != oldRangeDelIter {
// The rangeDelIter changed which indicates that the l.iter moved to the
// next sstable. We have to update the tombstone for oldTopLevel as well.
oldTopLevel--
}
} else {
if err := l.iter.Error(); err != nil {
return err
}
m.heap.pop()
}

// The cached tombstones are only valid for the levels
Expand Down Expand Up @@ -797,13 +780,7 @@ func (m *mergingIter) isNextEntryDeleted(item *mergingIterLevel) (bool, error) {
return true, nil
}
if l.tombstone.CoversAt(m.snapshot, item.iterKey.SeqNum()) {
var err error
if m.prefix == nil {
err = m.nextEntry(item, nil /* succKey */)
} else {
err = m.maybeNextEntryWithinPrefix(item)
}
if err != nil {
if err := m.nextEntry(item, nil /* succKey */); err != nil {
return false, err
}
return true, nil
Expand All @@ -830,11 +807,7 @@ func (m *mergingIter) findNextEntry() (*InternalKey, base.LazyValue) {
// keep sstables open until we've surpassed their end boundaries so that
// their range deletions are visible.
if m.levels[item.index].isIgnorableBoundaryKey {
if m.prefix == nil {
m.err = m.nextEntry(item, nil /* succKey */)
} else {
m.err = m.maybeNextEntryWithinPrefix(item)
}
m.err = m.nextEntry(item, nil /* succKey */)
if m.err != nil {
return nil, base.LazyValue{}
}
Expand All @@ -855,11 +828,7 @@ func (m *mergingIter) findNextEntry() (*InternalKey, base.LazyValue) {

// Check if the key is visible at the iterator sequence numbers.
if !item.iterKey.Visible(m.snapshot, m.batchSnapshot) {
if m.prefix == nil {
m.err = m.nextEntry(item, nil /* succKey */)
} else {
m.err = m.maybeNextEntryWithinPrefix(item)
}
m.err = m.nextEntry(item, nil /* succKey */)
if m.err != nil {
return nil, base.LazyValue{}
}
Expand Down Expand Up @@ -1064,6 +1033,14 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro
l := &m.levels[level]
if m.prefix != nil {
l.iterKey, l.iterValue = l.iter.SeekPrefixGE(m.prefix, key, flags)
if l.iterKey != nil {
if n := m.split(l.iterKey.UserKey); !bytes.Equal(m.prefix, l.iterKey.UserKey[:n]) {
// Prevent keys without a matching prefix from being added to the heap by setting
// iterKey and iterValue to their zero values before calling initMinHeap.
l.iterKey = nil
l.iterValue = base.LazyValue{}
}
}
} else {
l.iterKey, l.iterValue = l.iter.SeekGE(key, flags)
}
Expand Down Expand Up @@ -1131,7 +1108,13 @@ func (m *mergingIter) SeekPrefixGE(
if m.err != nil {
return nil, base.LazyValue{}
}
return m.findNextEntry()
iterKey, iterValue := m.findNextEntry()
if invariants.Enabled && iterKey != nil {
if n := m.split(iterKey.UserKey); !bytes.Equal(m.prefix, iterKey.UserKey[:n]) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %s\n", iterKey, m.prefix)
}
}
return iterKey, iterValue
}

// Seeks levels >= level to < key. Additionally uses range tombstones to extend the seeks.
Expand Down Expand Up @@ -1257,15 +1240,17 @@ func (m *mergingIter) Next() (*InternalKey, base.LazyValue) {
return nil, base.LazyValue{}
}

// NB: It's okay to call nextEntry directly even during prefix iteration
// mode (as opposed to indirectly through maybeNextEntryWithinPrefix).
// During prefix iteration mode, we rely on the caller to not call Next if
// the iterator has already advanced beyond the iteration prefix. See the
// comment above the base.InternalIterator interface.
if m.err = m.nextEntry(m.heap.items[0], nil /* succKey */); m.err != nil {
return nil, base.LazyValue{}
}
return m.findNextEntry()

iterKey, iterValue := m.findNextEntry()
if invariants.Enabled && m.prefix != nil && iterKey != nil {
if n := m.split(iterKey.UserKey); !bytes.Equal(m.prefix, iterKey.UserKey[:n]) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %s\n", iterKey, m.prefix)
}
}
return iterKey, iterValue
}

func (m *mergingIter) NextPrefix(succKey []byte) (*InternalKey, LazyValue) {
Expand Down
23 changes: 23 additions & 0 deletions testdata/iter_histories/range_keys_simple
Original file line number Diff line number Diff line change
Expand Up @@ -486,3 +486,26 @@ b: (., [b-d) @2=foo UPDATED)
b: (., [b-d) @2=foo UPDATED)
b: (., [b-d) @2=foo)
.

# Test Next-ing while in prefix iteration mode when using the keyspan iterator prefix
# seek optimization (after a SeekGE(a) the keyspan iterator can avoid reseeking for
# a SeekPrefixGE if the prefix falls within the previous span's bounds).

reset
----

batch commit
set a a
range-key-set a b @1 foo
range-key-set b d @2 bar
----
committed 3 keys

combined-iter
seek-ge a
seek-prefix-ge a
next
----
a: (a, [a-b) @1=foo UPDATED)
a: (a, [a-"a\x00") @1=foo UPDATED)
.
Loading

0 comments on commit a97977f

Please sign in to comment.