Skip to content

Commit

Permalink
keyspanimpl: use base.Equal in MergingIter
Browse files Browse the repository at this point in the history
Adapt keyspanimpl.MergingIter to take a *base.Comparer and use its equals Equal
method when appropriate.
  • Loading branch information
jbowens committed May 15, 2024
1 parent c1cfad1 commit 18bbac5
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 32 deletions.
4 changes: 2 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,15 +883,15 @@ func (c *compaction) newInputIters(
// boundaries.
if len(rangeDelIters) > 0 {
mi := &keyspanimpl.MergingIter{}
mi.Init(c.cmp, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
rangeDelIter = mi
}

// If there are range key iterators, we need to combine them using
// keyspanimpl.MergingIter, and then interleave them among the points.
if len(rangeKeyIters) > 0 {
mi := &keyspanimpl.MergingIter{}
mi.Init(c.cmp, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeKeyIters...)
mi.Init(c.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeKeyIters...)
// TODO(radu): why do we have a defragmenter here but not above?
di := &keyspan.DefragmentingIter{}
di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
Expand Down
4 changes: 2 additions & 2 deletions internal/keyspan/keyspanimpl/level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ func TestLevelIterEquivalence(t *testing.T) {
levelIters = append(levelIters, &levelIter)
}

iter1.Init(base.DefaultComparer.Compare, keyspan.VisibleTransform(base.InternalKeySeqNumMax), new(MergingBuffers), fileIters...)
iter2.Init(base.DefaultComparer.Compare, keyspan.VisibleTransform(base.InternalKeySeqNumMax), new(MergingBuffers), levelIters...)
iter1.Init(base.DefaultComparer, keyspan.VisibleTransform(base.InternalKeySeqNumMax), new(MergingBuffers), fileIters...)
iter2.Init(base.DefaultComparer, keyspan.VisibleTransform(base.InternalKeySeqNumMax), new(MergingBuffers), levelIters...)

// Check iter1 and iter2 for equivalence.
s1, err1 := iter1.First()
Expand Down
36 changes: 14 additions & 22 deletions internal/keyspan/keyspanimpl/merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ import (
// seeks would require introducing key comparisons to switchTo{Min,Max}Heap
// where there currently are none.

// TODO(jackson): There are several opportunities to use base.Equal in the
// MergingIter implementation, but will require a bit of plumbing to thread the
// Equal function.

// MergingIter merges spans across levels of the LSM, exposing an iterator over
// spans that yields sets of spans fragmented at unique user key boundaries.
//
Expand Down Expand Up @@ -170,6 +166,7 @@ import (
// accommodate this, find{Next,Prev}FragmentSet copy the initial boundary if the
// subsequent Next/Prev would move to the next span.
type MergingIter struct {
comparer *base.Comparer
*MergingBuffers
// start and end hold the bounds for the span currently under the
// iterator position.
Expand Down Expand Up @@ -294,16 +291,17 @@ func (l *mergingIterLevel) prev() error {

// Init initializes the merging iterator with the provided fragment iterators.
func (m *MergingIter) Init(
cmp base.Compare,
comparer *base.Comparer,
transformer keyspan.Transformer,
bufs *MergingBuffers,
iters ...keyspan.FragmentIterator,
) {
*m = MergingIter{
comparer: comparer,
MergingBuffers: bufs,
transformer: transformer,
}
m.heap.cmp = cmp
m.heap.cmp = comparer.Compare
levels, items := m.levels, m.heap.items

// Invariant: cap(levels) >= cap(items)
Expand Down Expand Up @@ -393,7 +391,7 @@ func (m *MergingIter) SeekGE(key []byte) (*keyspan.Span, error) {
return nil, err
case s == nil:
l.heapKey = boundKey{kind: boundKindInvalid}
case m.cmp(s.End, key) <= 0:
case m.comparer.Compare(s.End, key) <= 0:
l.heapKey = boundKey{
kind: boundKindFragmentEnd,
key: s.End,
Expand Down Expand Up @@ -459,7 +457,7 @@ func (m *MergingIter) SeekGE(key []byte) (*keyspan.Span, error) {
// equal to the seek key `key`. In this case, we want this key to be our
// start boundary.
if m.heap.items[0].boundKey.kind == boundKindFragmentStart &&
m.cmp(m.heap.items[0].boundKey.key, key) == 0 {
m.comparer.Equal(m.heap.items[0].boundKey.key, key) {
// Call findNextFragmentSet, which will set m.start to the heap root and
// proceed forward.
return m.findNextFragmentSet()
Expand Down Expand Up @@ -529,7 +527,7 @@ func (m *MergingIter) SeekLT(key []byte) (*keyspan.Span, error) {
return nil, err
case s == nil:
l.heapKey = boundKey{kind: boundKindInvalid}
case m.cmp(s.Start, key) >= 0:
case m.comparer.Compare(s.Start, key) >= 0:
l.heapKey = boundKey{
kind: boundKindFragmentStart,
key: s.Start,
Expand Down Expand Up @@ -594,7 +592,7 @@ func (m *MergingIter) SeekLT(key []byte) (*keyspan.Span, error) {
// equal to the seek key `key`. In this case, we want this key to be our end
// boundary.
if m.heap.items[0].boundKey.kind == boundKindFragmentEnd &&
m.cmp(m.heap.items[0].boundKey.key, key) == 0 {
m.comparer.Equal(m.heap.items[0].boundKey.key, key) {
// Call findPrevFragmentSet, which will set m.end to the heap root and
// proceed backwards.
return m.findPrevFragmentSet()
Expand Down Expand Up @@ -757,7 +755,7 @@ func (m *MergingIter) switchToMinHeap() error {
if invariants.Enabled {
for i := range m.levels {
l := &m.levels[i]
if l.heapKey.kind != boundKindInvalid && m.cmp(l.heapKey.key, m.start) > 0 {
if l.heapKey.kind != boundKindInvalid && m.comparer.Compare(l.heapKey.key, m.start) > 0 {
panic("pebble: invariant violation: max-heap key > m.start")
}
}
Expand Down Expand Up @@ -812,7 +810,7 @@ func (m *MergingIter) switchToMaxHeap() error {
if invariants.Enabled {
for i := range m.levels {
l := &m.levels[i]
if l.heapKey.kind != boundKindInvalid && m.cmp(l.heapKey.key, m.end) < 0 {
if l.heapKey.kind != boundKindInvalid && m.comparer.Compare(l.heapKey.key, m.end) < 0 {
panic("pebble: invariant violation: min-heap key < m.end")
}
}
Expand All @@ -827,10 +825,6 @@ func (m *MergingIter) switchToMaxHeap() error {
return nil
}

func (m *MergingIter) cmp(a, b []byte) int {
return m.heap.cmp(a, b)
}

func (m *MergingIter) findNextFragmentSet() (*keyspan.Span, error) {
// Each iteration of this loop considers a new merged span between unique
// user keys. An iteration may find that there exists no overlap for a given
Expand Down Expand Up @@ -880,7 +874,7 @@ func (m *MergingIter) findNextFragmentSet() (*keyspan.Span, error) {
if err := m.nextEntry(); err != nil {
return nil, err
}
for len(m.heap.items) > 0 && m.cmp(m.heapRoot(), m.start) == 0 {
for len(m.heap.items) > 0 && m.comparer.Equal(m.heapRoot(), m.start) {
if err := m.nextEntry(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -962,7 +956,7 @@ func (m *MergingIter) findPrevFragmentSet() (*keyspan.Span, error) {
if err := m.prevEntry(); err != nil {
return nil, err
}
for len(m.heap.items) > 0 && m.cmp(m.heapRoot(), m.end) == 0 {
for len(m.heap.items) > 0 && m.comparer.Equal(m.heapRoot(), m.end) {
if err := m.prevEntry(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1015,7 +1009,7 @@ func (m *MergingIter) heapRoot() []byte {
// with a span returned by a child iterator.
func (m *MergingIter) synthesizeKeys(dir int8) (bool, *keyspan.Span, error) {
if invariants.Enabled {
if m.cmp(m.start, m.end) >= 0 {
if m.comparer.Compare(m.start, m.end) >= 0 {
panic(fmt.Sprintf("pebble: invariant violation: span start ≥ end: %s >= %s", m.start, m.end))
}
}
Expand Down Expand Up @@ -1046,9 +1040,7 @@ func (m *MergingIter) synthesizeKeys(dir int8) (bool, *keyspan.Span, error) {
Keys: m.keys,
KeysOrder: keyspan.ByTrailerDesc,
}
// NB: m.heap.cmp is a base.Compare, whereas m.cmp is a method on
// MergingIter.
if err := m.transformer.Transform(m.heap.cmp, m.span, &m.span); err != nil {
if err := m.transformer.Transform(m.comparer.Compare, m.span, &m.span); err != nil {
return false, nil, err
}
return found, &m.span, nil
Expand Down
4 changes: 2 additions & 2 deletions internal/keyspan/keyspanimpl/merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestMergingIter(t *testing.T) {
}
}
var iter MergingIter
iter.Init(cmp, keyspan.VisibleTransform(snapshot), new(MergingBuffers), iters...)
iter.Init(base.DefaultComparer, keyspan.VisibleTransform(snapshot), new(MergingBuffers), iters...)
keyspan.RunIterCmd(td.Input, &iter, &buf)
return buf.String()
default:
Expand Down Expand Up @@ -181,7 +181,7 @@ func testFragmenterEquivalenceOnce(t *testing.T, seed int64) {

fragmenterIter := keyspan.NewIter(f.Cmp, allFragmented)
mergingIter := &MergingIter{}
mergingIter.Init(f.Cmp, keyspan.VisibleTransform(base.InternalKeySeqNumMax), new(MergingBuffers), iters...)
mergingIter.Init(testkeys.Comparer, keyspan.VisibleTransform(base.InternalKeySeqNumMax), new(MergingBuffers), iters...)

// Position both so that it's okay to perform relative positioning
// operations immediately.
Expand Down
2 changes: 1 addition & 1 deletion internal/rangekeystack/user_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (ui *UserIteratorConfig) Init(
ui.snapshot = snapshot
ui.comparer = comparer
ui.internalKeys = internalKeys
ui.miter.Init(comparer.Compare, ui, &bufs.merging, iters...)
ui.miter.Init(comparer, ui, &bufs.merging, iters...)
ui.biter.Init(comparer.Compare, comparer.Split, &ui.miter, lower, upper, hasPrefix, prefix)
if internalKeys {
ui.diter.Init(comparer, &ui.biter, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, &bufs.defragmenting)
Expand Down
2 changes: 1 addition & 1 deletion internal/rangekeystack/user_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestIter(t *testing.T) {
dst.End = s.End
return nil
})
iter.Init(cmp, transform, new(keyspanimpl.MergingBuffers), keyspan.NewIter(cmp, spans))
iter.Init(testkeys.Comparer, transform, new(keyspanimpl.MergingBuffers), keyspan.NewIter(cmp, spans))
return "OK"
case "iter":
buf.Reset()
Expand Down
2 changes: 1 addition & 1 deletion scan_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ func (i *scanInternalIterator) constructPointIter(

buf.merging.init(&i.opts.IterOptions, &InternalIteratorStats{}, i.comparer.Compare, i.comparer.Split, mlevels...)
buf.merging.snapshot = i.seqNum
rangeDelMiter.Init(i.comparer.Compare, keyspan.VisibleTransform(i.seqNum), new(keyspanimpl.MergingBuffers), rangeDelIters...)
rangeDelMiter.Init(i.comparer, keyspan.VisibleTransform(i.seqNum), new(keyspanimpl.MergingBuffers), rangeDelIters...)

if i.opts.includeObsoleteKeys {
iiter := &keyspan.InterleavingIter{}
Expand Down
2 changes: 1 addition & 1 deletion table_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ func newCombinedDeletionKeyspanIter(
keyspan.SortKeysByTrailer(&out.Keys)
return nil
})
mIter.Init(comparer.Compare, transform, new(keyspanimpl.MergingBuffers))
mIter.Init(comparer, transform, new(keyspanimpl.MergingBuffers))

iter, err := cr.NewRawRangeDelIter(m.IterTransforms())
if err != nil {
Expand Down

0 comments on commit 18bbac5

Please sign in to comment.