Skip to content

Commit

Permalink
keyspan: remove error from FragmentIterator.Close
Browse files Browse the repository at this point in the history
None of the actual `FragmentIterator` implementations generate an
error during Close. This change removes the error return, simplifying
relevant code.
  • Loading branch information
RaduBerinde committed Jun 19, 2024
1 parent b17532f commit 41441bc
Show file tree
Hide file tree
Showing 30 changed files with 105 additions and 145 deletions.
38 changes: 17 additions & 21 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"math"
"runtime/pprof"
"slices"
Expand Down Expand Up @@ -77,9 +76,7 @@ type noCloseIter struct {
keyspan.FragmentIterator
}

func (i noCloseIter) Close() error {
return nil
}
func (i *noCloseIter) Close() {}

type compactionLevel struct {
level int
Expand Down Expand Up @@ -228,10 +225,10 @@ type compaction struct {
smallest InternalKey
largest InternalKey

// A list of objects to close when the compaction finishes. Used by input
// iteration to keep rangeDelIters open for the lifetime of the compaction,
// and only close them when the compaction finishes.
closers []io.Closer
// A list of fragment iterators to close when the compaction finishes. Used by
// input iteration to keep rangeDelIters open for the lifetime of the
// compaction, and only close them when the compaction finishes.
closers []*noCloseIter

// grandparents are the tables in level+2 that overlap with the files being
// compacted. Used to determine output table boundaries. Do not assume that the actual files
Expand Down Expand Up @@ -788,8 +785,7 @@ func (c *compaction) newInputIters(
// mergingIter.
iter := level.files.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
rangeDelIter, closer, err := c.newRangeDelIter(
newIters, iter.Take(), iterOpts, l)
rangeDelIter, err := c.newRangeDelIter(newIters, iter.Take(), iterOpts, l)
if err != nil {
// The error will already be annotated with the BackingFileNum, so
// we annotate it with the FileNum.
Expand All @@ -799,7 +795,7 @@ func (c *compaction) newInputIters(
continue
}
rangeDelIters = append(rangeDelIters, rangeDelIter)
c.closers = append(c.closers, closer)
c.closers = append(c.closers, rangeDelIter)
}

// Check if this level has any range keys.
Expand All @@ -813,25 +809,25 @@ func (c *compaction) newInputIters(
if hasRangeKeys {
li := &keyspanimpl.LevelIter{}
newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
iter, err := newRangeKeyIter(file, iterOptions)
rangeKeyIter, err := newRangeKeyIter(file, iterOptions)
if err != nil {
return nil, err
} else if iter == nil {
} else if rangeKeyIter == nil {
return emptyKeyspanIter, nil
}
// Ensure that the range key iter is not closed until the compaction is
// finished. This is necessary because range key processing
// requires the range keys to be held in memory for up to the
// lifetime of the compaction.
c.closers = append(c.closers, iter)
iter = noCloseIter{iter}
noCloseIter := &noCloseIter{rangeKeyIter}
c.closers = append(c.closers, noCloseIter)

// We do not need to truncate range keys to sstable boundaries, or
// only read within the file's atomic compaction units, unlike with
// range tombstones. This is because range keys were added after we
// stopped splitting user keys across sstables, so all the range keys
// in this sstable must wholly lie within the file's bounds.
return iter, err
return noCloseIter, err
}
li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
rangeKeyIters = append(rangeKeyIters, li)
Expand Down Expand Up @@ -902,24 +898,24 @@ func (c *compaction) newInputIters(

func (c *compaction) newRangeDelIter(
newIters tableNewIters, f manifest.LevelFile, opts IterOptions, l manifest.Level,
) (keyspan.FragmentIterator, io.Closer, error) {
) (*noCloseIter, error) {
opts.level = l
iterSet, err := newIters(context.Background(), f.FileMetadata, &opts,
internalIterOpts{
compaction: true,
bufferPool: &c.bufferPool,
}, iterRangeDeletions)
if err != nil {
return nil, nil, err
return nil, err
} else if iterSet.rangeDeletion == nil {
// The file doesn't contain any range deletions.
return nil, nil, nil
return nil, nil
}
// Ensure that rangeDelIter is not closed until the compaction is
// finished. This is necessary because range tombstone processing
// requires the range tombstones to be held in memory for up to the
// lifetime of the compaction.
return noCloseIter{iterSet.rangeDeletion}, iterSet.rangeDeletion, nil
return &noCloseIter{iterSet.rangeDeletion}, nil
}

func (c *compaction) String() string {
Expand Down Expand Up @@ -2497,7 +2493,7 @@ func (d *DB) compactAndWrite(
pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter)
defer func() {
for _, closer := range c.closers {
result.Err = firstError(result.Err, closer.Close())
closer.FragmentIterator.Close()
}
}()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion error_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ func (i *errorKeyspanIter) First() (*keyspan.Span, error) { return ni
func (i *errorKeyspanIter) Last() (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) Next() (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) Prev() (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) Close() error { return i.err }
func (i *errorKeyspanIter) Close() {}
func (*errorKeyspanIter) String() string { return "error" }
func (*errorKeyspanIter) WrapChildren(wrap keyspan.WrapFn) {}
23 changes: 13 additions & 10 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package pebble
import (
"context"
"fmt"
"io"
"sync/atomic"
"time"

Expand Down Expand Up @@ -347,9 +346,9 @@ func computePossibleOverlapsGenericImpl[F flushable](
) {
iter := f.newIter(nil)
rangeDelIter := f.newRangeDelIter(nil)
rkeyIter := f.newRangeKeyIter(nil)
rangeKeyIter := f.newRangeKeyIter(nil)
for _, b := range bounded {
overlap, err := determineOverlapAllIters(cmp, b.UserKeyBounds(), iter, rangeDelIter, rkeyIter)
overlap, err := determineOverlapAllIters(cmp, b.UserKeyBounds(), iter, rangeDelIter, rangeKeyIter)
if invariants.Enabled && err != nil {
panic(errors.AssertionFailedf("expected iterator to be infallible: %v", err))
}
Expand All @@ -360,15 +359,19 @@ func computePossibleOverlapsGenericImpl[F flushable](
}
}

for _, c := range [3]io.Closer{iter, rangeDelIter, rkeyIter} {
if c != nil {
if err := c.Close(); err != nil {
// This implementation must be used in circumstances where
// reading through the iterator is infallible.
panic(err)
}
if iter != nil {
if err := iter.Close(); err != nil {
// This implementation must be used in circumstances where
// reading through the iterator is infallible.
panic(err)
}
}
if rangeDelIter != nil {
rangeDelIter.Close()
}
if rangeKeyIter != nil {
rangeKeyIter.Close()
}
}

// determineOverlapAllIters checks for overlap in a point iterator, range
Expand Down
4 changes: 2 additions & 2 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
buf.WriteString(span.String())
buf.WriteString("\n")
}
err = firstError(err, iter.Close())
iter.Close()
if err != nil {
fmt.Fprintf(&buf, "err=%q", err.Error())
}
Expand All @@ -153,7 +153,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
buf.WriteString(span.String())
buf.WriteString("\n")
}
err = firstError(err, iter.Close())
iter.Close()
if err != nil {
fmt.Fprintf(&buf, "err=%q", err.Error())
}
Expand Down
4 changes: 1 addition & 3 deletions get_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,6 @@ func (g *getIter) maybeSetTombstone(rangeDelIter keyspan.FragmentIterator) (ok b
// care about the most recent range deletion that's visible because it's the
// "most powerful."
g.tombstonedSeqNum, g.tombstoned = t.LargestVisibleSeqNum(g.snapshot)
if g.err = firstError(g.err, rangeDelIter.Close()); g.err != nil {
return false
}
rangeDelIter.Close()
return true
}
4 changes: 2 additions & 2 deletions internal/keyspan/assert_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ func (i *assertIter) Prev() (*Span, error) {
}

// Close implements FragmentIterator.
func (i *assertIter) Close() error {
return i.iter.Close()
func (i *assertIter) Close() {
i.iter.Close()
}

// WrapChildren implements FragmentIterator.
Expand Down
4 changes: 2 additions & 2 deletions internal/keyspan/bounded.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ func (i *BoundedIter) Prev() (*Span, error) {
}

// Close implements FragmentIterator.
func (i *BoundedIter) Close() error {
return i.iter.Close()
func (i *BoundedIter) Close() {
i.iter.Close()
}

// SetBounds modifies the FragmentIterator's bounds.
Expand Down
4 changes: 2 additions & 2 deletions internal/keyspan/defragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ func (i *DefragmentingIter) Init(
}

// Close closes the underlying iterators.
func (i *DefragmentingIter) Close() error {
return i.iter.Close()
func (i *DefragmentingIter) Close() {
i.iter.Close()
}

// SeekGE moves the iterator to the first span covering a key greater than or
Expand Down
4 changes: 2 additions & 2 deletions internal/keyspan/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (i *filteringIter) Prev() (*Span, error) {
}

// Close implements FragmentIterator.
func (i *filteringIter) Close() error {
return i.iter.Close()
func (i *filteringIter) Close() {
i.iter.Close()
}

// filter uses the filterFn (if configured) to filter and possibly mutate the
Expand Down
6 changes: 3 additions & 3 deletions internal/keyspan/interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,9 +1152,9 @@ func (i *InterleavingIter) Error() error {

// Close implements (base.InternalIterator).Close.
func (i *InterleavingIter) Close() error {
perr := i.pointIter.Close()
rerr := i.keyspanIter.Close()
return firstError(perr, rerr)
err := i.pointIter.Close()
i.keyspanIter.Close()
return err
}

// String implements (base.InternalIterator).String.
Expand Down
12 changes: 4 additions & 8 deletions internal/keyspan/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ type FragmentIterator interface {
// previous call to SeekLT or Prev returned an invalid span.
Prev() (*Span, error)

// Close closes the iterator and returns any accumulated error. Exhausting
// the iterator is not considered to be an error. It is valid to call Close
// multiple times. Other methods should not be called after the iterator has
// been closed.
Close() error
// Close closes the iterator. It is valid to call Close multiple times. Other
// methods should not be called after the iterator has been closed.
Close()

// WrapChildren wraps any child iterators using the given function. The
// function can call WrapChildren to recursively wrap an entire iterator
Expand Down Expand Up @@ -207,9 +205,7 @@ func (i *Iter) Prev() (*Span, error) {
}

// Close implements FragmentIterator.Close.
func (i *Iter) Close() error {
return nil
}
func (i *Iter) Close() {}

func (i *Iter) String() string {
return "fragmented-spans"
Expand Down
25 changes: 7 additions & 18 deletions internal/keyspan/keyspanimpl/level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,7 @@ func (l *LevelIter) loadFile(file *manifest.FileMetadata, dir int) loadFileRetur
}

// Note that LevelIter.Close() can be called multiple times.
if err := l.Close(); err != nil {
return noFileLoaded
}
l.Close()

l.iterFile = file
l.iter = nil
Expand Down Expand Up @@ -197,9 +195,7 @@ func (l *LevelIter) SeekGE(key []byte) (*keyspan.Span, error) {
// cases similar to the above, while still retaining correctness.
// Return a straddling key instead of loading the file.
l.iterFile = f
if l.err = l.Close(); l.err != nil {
return l.verify(nil, l.err)
}
l.Close()
l.straddleDir = +1
l.straddle = keyspan.Span{
Start: prevFile.LargestRangeKey.UserKey,
Expand Down Expand Up @@ -250,9 +246,7 @@ func (l *LevelIter) SeekLT(key []byte) (*keyspan.Span, error) {
// cases similar to the above, while still retaining correctness.
// Return a straddling key instead of loading the file.
l.iterFile = f
if l.err = l.Close(); l.err != nil {
return l.verify(nil, l.err)
}
l.Close()
l.straddleDir = -1
l.straddle = keyspan.Span{
Start: f.LargestRangeKey.UserKey,
Expand Down Expand Up @@ -360,9 +354,7 @@ func (l *LevelIter) skipEmptyFileForward() (*keyspan.Span, error) {
// a "straddle span" in l.straddle and return that.
//
// Straddle spans are not created in rangedel mode.
if l.err = l.Close(); l.err != nil {
return l.verify(nil, l.err)
}
l.Close()
startKey := l.iterFile.LargestRangeKey.UserKey
// Resetting l.iterFile without loading the file into l.iter is okay and
// does not change the logic in loadFile() as long as l.iter is also nil;
Expand Down Expand Up @@ -423,9 +415,7 @@ func (l *LevelIter) skipEmptyFileBackward() (*keyspan.Span, error) {
// Straddle spans are not created in rangedel mode.
if l.straddleDir == 0 && l.keyType == manifest.KeyTypeRange &&
l.iterFile != nil && l.iter != nil {
if l.err = l.Close(); l.err != nil {
return l.verify(nil, l.err)
}
l.Close()
endKey := l.iterFile.SmallestRangeKey.UserKey
// Resetting l.iterFile without loading the file into l.iter is okay and
// does not change the logic in loadFile() as long as l.iter is also nil;
Expand Down Expand Up @@ -503,12 +493,11 @@ func (l *LevelIter) Error() error {
}

// Close implements keyspan.FragmentIterator.
func (l *LevelIter) Close() error {
func (l *LevelIter) Close() {
if l.iter != nil {
l.err = l.iter.Close()
l.iter.Close()
l.iter = nil
}
return l.err
}

// String implements keyspan.FragmentIterator.
Expand Down
2 changes: 1 addition & 1 deletion internal/keyspan/keyspanimpl/level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func TestLevelIter(t *testing.T) {
case "num-files":
return fmt.Sprintf("%d", len(level))
case "close-iter":
_ = iter.Close()
iter.Close()
iter = nil
return "ok"
case "iter":
Expand Down
13 changes: 2 additions & 11 deletions internal/keyspan/keyspanimpl/merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,14 +676,12 @@ func (m *MergingIter) Prev() (*keyspan.Span, error) {
}

// Close closes the iterator, releasing all acquired resources.
func (m *MergingIter) Close() error {
var err error
func (m *MergingIter) Close() {
for i := range m.levels {
err = firstError(err, m.levels[i].iter.Close())
m.levels[i].iter.Close()
}
m.levels = nil
m.heap.items = m.heap.items[:0]
return err
}

// String implements fmt.Stringer.
Expand Down Expand Up @@ -1240,10 +1238,3 @@ func (k boundKey) String() string {
fmt.Fprint(&buf, "]")
return buf.String()
}

func firstError(e1, e2 error) error {
if e1 == nil {
return e2
}
return e1
}
Loading

0 comments on commit 41441bc

Please sign in to comment.