Skip to content

Commit

Permalink
internal/keyspan: move MergingIter from internal/rangekey
Browse files Browse the repository at this point in the history
Move the rangekey.MergingIter iterator into the internal/keyspan
package. The MergingIter wraps keyspan.FragmentIterators, implements
keyspan.FragmentIterator itself, and has no logic directly dependent on
range keys.
  • Loading branch information
jbowens committed Feb 24, 2022
1 parent 09203fd commit 0e0d279
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 99 deletions.
21 changes: 10 additions & 11 deletions internal/rangekey/bound_iter.go → internal/keyspan/bound_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package rangekey
package keyspan

import (
"bytes"
"fmt"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
)

type boundKind int8
Expand Down Expand Up @@ -38,7 +37,7 @@ type boundKey struct {
// Start keys have user keys equal to the boundKey's key field. If the
// boundKind is boundKindFragmentEnd, all fragments' End keys equal the
// boundKey's key field.
fragments []keyspan.Span
fragments []Span
}

func (k boundKey) valid() bool {
Expand Down Expand Up @@ -83,31 +82,31 @@ func (k boundKey) String() string {
//
// Along with the key and bound kind, fragmentBoundIterator surfaces the set of
// all the fragments that share that bound. fragmentBoundIterator is used
// internally by the range-key merging iterator.
// internally by the span merging iterator.
//
// Note that fragmentBoundIterator's interface differs from FragmentIterator.
// The fragmentBoundIterator iterates over individual bounds, including end
// boundaries, whereas FragmentIterator returns whole keyspan.Spans ordered by
// boundaries, whereas FragmentIterator returns whole Spans ordered by
// Start key. The fragmentBoundIterator's interface is designed this way to
// simplify the mergingIter implementation, which must include both start and
// end boundaries in its heap.
type fragmentBoundIterator struct {
// iter holds the underlying fragment iterator. iterSpan is always set to
// the span at iter's current position.
iter keyspan.FragmentIterator
iterSpan keyspan.Span
iter FragmentIterator
iterSpan Span
// start and end hold the user keys for the iterator's current fragment. One
// of either the start or the end is the current iterator position,
// indicated by boundKind. All of the underlying iterators fragments with
// the bounds [start, end) are contained within the fragments field.
start []byte
end []byte
fragments []keyspan.Span
fragments []Span
boundKind boundKind
dir int8
}

func (i *fragmentBoundIterator) init(fragmentIter keyspan.FragmentIterator) {
func (i *fragmentBoundIterator) init(fragmentIter FragmentIterator) {
*i = fragmentBoundIterator{iter: fragmentIter}
}

Expand Down Expand Up @@ -291,8 +290,8 @@ func (i *fragmentBoundIterator) initBackwardSpan(cmp base.Compare) boundKey {

func (i *fragmentBoundIterator) clearFragments() {
for j := range i.fragments {
// Clear any pointers into range key blocks to avoid retaining them.
i.fragments[j] = keyspan.Span{}
// Clear any pointers into blocks to avoid retaining them.
i.fragments[j] = Span{}
}
i.fragments = i.fragments[:0]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package rangekey
package keyspan

import (
"bytes"
Expand All @@ -12,8 +12,6 @@ import (

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/stretchr/testify/require"
)

func TestFragmentBoundIterator(t *testing.T) {
Expand All @@ -32,19 +30,12 @@ func TestFragmentBoundIterator(t *testing.T) {
datadriven.RunTest(t, "testdata/fragment_bound_iterator", func(td *datadriven.TestData) string {
switch td.Cmd {
case "define":
var spans []keyspan.Span
var spans []Span
lines := strings.Split(strings.TrimSpace(td.Input), "\n")
for _, line := range lines {
startKey, value := Parse(line)
endKey, v, ok := DecodeEndKey(startKey.Kind(), value)
require.True(t, ok)
spans = append(spans, keyspan.Span{
Start: startKey,
End: endKey,
Value: v,
})
spans = append(spans, parseSpanWithKind(t, line))
}
iter.init(keyspan.NewIter(cmp, spans))
iter.init(NewIter(cmp, spans))
return "OK"
case "iter":
buf.Reset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package rangekey
package keyspan

import (
"bytes"
Expand All @@ -11,7 +11,6 @@ import (

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
)

// TODO(jackson): Document memory safety requirements. Once we're reading from
Expand All @@ -28,11 +27,6 @@ import (
// seeks would require introducing key comparisons to switchTo{Min,Max}Heap
// where there currently are none.

// TODO(jackson): Consider moving this to the internal/keyspan package and
// removing any explicit mention of range keys. The algorithm works for all key
// span typees, and could be used to fragment RANGEDELs across levels during a
// compaction.

// Fragments holds a set of fragments all with the bounds [Start, End).
// Individual, sorted fragments may be retrieved using the At method.
type Fragments struct {
Expand All @@ -51,17 +45,17 @@ func (f *Fragments) Count() int {
}

// At retrieves the i-th span.
func (f *Fragments) At(i int) keyspan.Span {
func (f *Fragments) At(i int) Span {
// Construct the fragment with f.spans[i].Start's trailer, so it adopts its
// sequence number and kind, and with f.spans[i]'s Value.
return keyspan.Span{
return Span{
Start: base.InternalKey{UserKey: f.Start, Trailer: f.spans[i].Start.Trailer},
End: f.End,
Value: f.spans[i].Value,
}
}

type bySeqKind []*keyspan.Span
type bySeqKind []*Span

func (s bySeqKind) Len() int { return len(s) }
func (s bySeqKind) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
Expand All @@ -76,9 +70,8 @@ func (s bySeqKind) Less(i, j int) bool {
}
}

// MergingIter merges range keys across levels of the LSM, exposing a fragment
// iterator that yields range key spans fragmented at unique range key
// boundaries.
// MergingIter merges fragments across levels of the LSM, exposing a fragment
// iterator that yields sets of spans fragmented at unique user key boundaries.
//
// A MergingIter is initialized with an arbitrary number of child iterators over
// fragmented spans. Each child iterator exposes fragmented key spans, such that
Expand All @@ -94,7 +87,7 @@ func (s bySeqKind) Less(i, j int) bool {
// The MergingIter configured with c1 and c2 further fragments the key spans and
// surfaces the key spans:
//
// a-b a-b b-c b-c b-c c--e c--e c--e c--e e-g h-j
// [a-b a-b] [b-c b-c b-c] [c--e c--e c--e c--e] [e-g] [h-j]
//
// This is the result of fragmenting all of c1 and c2's fragments at every
// unique key bound (a, b, c, e, g, h, j).
Expand Down Expand Up @@ -218,9 +211,9 @@ func (s bySeqKind) Less(i, j int) bool {
// part of a boundKey. In the above example, i1 and i2 are positioned at end
// boundaries, so findNextFragmentSet collects [a,c) and [a,p). These spans
// contain the merging iterator's [m.start, m.end) span, but they may also
// extend beyond the m.start and m.end. Before returning the spans to the
// caller, these spans are truncated to m.start and m.end user keys, preserving
// the existing spans' sequence numbers, key kinds and values.
// extend beyond the m.start and m.end. The merging iterator returns the
// fragments with the merging iter's m.start and m.end bounds, preserving the
// underlying keys spans' sequence numbers, key kinds and values.
//
// It may be the case that findNextFragmentSet finds no levels positioned at end
// boundaries, in which case the span [m.start, m.end) overlaps with nothing. In
Expand All @@ -236,11 +229,11 @@ type MergingIter struct {
// Invariant: None of the levels' iterators contain spans with a bound
// between start and end. For all bounds b, b ≤ start || b ≥ end.
start, end []byte
// fragments holds all of the range key fragments across all levels that
// overlap the key span [start, end), sorted by sequence number and kind
// descending. The fragments are not truncated to start and end. This slice
// is reconstituted in synthesizeFragments from each mergiingIterLevel's
// fragments every time the [start, end) bounds change.
// fragments holds all of the fragments across all levels that overlap the
// key span [start, end), sorted by sequence number and kind descending. The
// fragments are not truncated to start and end. This slice is reconstituted
// in synthesizeFragments from each mergiingIterLevel's fragments every time
// the [start, end) bounds change.
//
// Each element points into a child iterator's memory, so the spans may not
// be directly modified.
Expand All @@ -258,7 +251,7 @@ type mergingIterLevel struct {
}

// Init initializes the merging iterator with the provided fragment iterators.
func (m *MergingIter) Init(cmp base.Compare, iters ...keyspan.FragmentIterator) {
func (m *MergingIter) Init(cmp base.Compare, iters ...FragmentIterator) {
levels, items := m.levels, m.heap.items

*m = MergingIter{
Expand All @@ -278,9 +271,8 @@ func (m *MergingIter) Init(cmp base.Compare, iters ...keyspan.FragmentIterator)
}
}

// SeekGE implements base.InternalIterator.SeekGE. Like other implementations of
// keyspan.FragmentIterator, MergingIter seeks based on the span's Start key,
// returning the fragment with the smallest Start ≥ key.
// SeekGE moves the iterator to the first set of fragments with a start key
// greater than or equal to key.
func (m *MergingIter) SeekGE(key []byte, trySeekUsingNext bool) Fragments {
m.invalidate() // clear state about current position
for i := range m.levels {
Expand All @@ -291,9 +283,8 @@ func (m *MergingIter) SeekGE(key []byte, trySeekUsingNext bool) Fragments {
return m.findNextFragmentSet()
}

// SeekLT implements base.InternalIterator.SeekLT. Like other implementations of
// keyspan.FragmentIterator, MergingIter seeks based on fragements' Start keys,
// returning the fragment with the largest Start that's < key.
// SeekLT moves the iterator to the last set of fragments with a start key
// less than key.
func (m *MergingIter) SeekLT(key []byte) Fragments {
// TODO(jackson): Evaluate whether there's an implementation of SeekLT
// independent of SeekGE that is more efficient. It's tricky, because the
Expand Down Expand Up @@ -321,7 +312,7 @@ func (m *MergingIter) SeekLT(key []byte) Fragments {
return m.Prev()
}

// First implements base.InternalIterator.First.
// First seeks the iterator to the first set of fragments.
func (m *MergingIter) First() Fragments {
m.invalidate() // clear state about current position
for i := range m.levels {
Expand All @@ -332,7 +323,7 @@ func (m *MergingIter) First() Fragments {
return m.findNextFragmentSet()
}

// Last implements base.InternalIterator.Last.
// Last seeks the iterator to the last set of fragments.
func (m *MergingIter) Last() Fragments {
m.invalidate() // clear state about current position
for i := range m.levels {
Expand All @@ -343,7 +334,7 @@ func (m *MergingIter) Last() Fragments {
return m.findPrevFragmentSet()
}

// Next implements base.InternalIterator.Next.
// Next advances the iterator to the next set of fragments.
func (m *MergingIter) Next() Fragments {
if m.err != nil {
return Fragments{}
Expand All @@ -360,7 +351,7 @@ func (m *MergingIter) Next() Fragments {
return m.findNextFragmentSet()
}

// Prev implements base.InternalIterator.Prev.
// Prev advances the iterator to the prev set of fragments.
func (m *MergingIter) Prev() Fragments {
if m.err != nil {
return Fragments{}
Expand All @@ -377,22 +368,24 @@ func (m *MergingIter) Prev() Fragments {
return m.findPrevFragmentSet()
}

// Error implements (base.InternalIterator).Error.
// Error returns any accumulated error.
func (m *MergingIter) Error() error {
if m.heap.len() == 0 || m.err != nil {
return m.err
}
return m.levels[m.heap.items[0].index].iter.iter.Error()
}

// SetBounds implements (base.InternalIterator).SetBounds.
// SetBounds sets the lower and upper bounds for the iterator. Note that the
// result of Next and Prev will be undefined until the iterator has been
// repositioned with SeekGE, SeekLT, First, or Last.
func (m *MergingIter) SetBounds(lower, upper []byte) {
for i := range m.levels {
m.levels[i].iter.iter.SetBounds(lower, upper)
}
}

// Close implements (base.InternalIterator).Close.
// Close closes the iterator, releasing all acquired resources.
func (m *MergingIter) Close() error {
for i := range m.levels {
if err := m.levels[i].iter.iter.Close(); err != nil && m.err == nil {
Expand All @@ -404,6 +397,7 @@ func (m *MergingIter) Close() error {
return m.err
}

// String implements fmt.Stringer.
func (m *MergingIter) String() string {
return "merging-keyspan"
}
Expand Down Expand Up @@ -778,11 +772,11 @@ func (m *MergingIter) prevEntry() {
}
}

// clonedIters makes a clone of the merging iterator's underlying iterators and
// ClonedIters makes a clone of the merging iterator's underlying iterators and
// returns them.
func (m *MergingIter) clonedIters() []keyspan.FragmentIterator {
func (m *MergingIter) ClonedIters() []FragmentIterator {
// TODO(jackson): Remove when range-key state is included in readState.
var iters []keyspan.FragmentIterator
var iters []FragmentIterator
for l := range m.levels {
iters = append(iters, m.levels[l].iter.iter.Clone())
}
Expand Down Expand Up @@ -892,3 +886,12 @@ func (h *mergingIterHeap) down(i0, n int) bool {
}
return i > i0
}

// firstError returns the first non-nil error of err0 and err1, or nil if both
// are nil.
func firstError(err0, err1 error) error {
if err0 != nil {
return err0
}
return err1
}
Loading

0 comments on commit 0e0d279

Please sign in to comment.