Skip to content

Commit

Permalink
keyspan: add contexts to keyspan iterators
Browse files Browse the repository at this point in the history
This change adds a `SetContext()` method to `FragmentIterator` which
updates any context stored in an iterator and its children. The
iterator context is now plumbed correctly through to the table range
key iterators.
  • Loading branch information
RaduBerinde committed Jul 24, 2024
1 parent 54e5ba6 commit 2752abb
Show file tree
Hide file tree
Showing 22 changed files with 170 additions and 26 deletions.
10 changes: 6 additions & 4 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,9 +807,8 @@ func (c *compaction) newInputIters(
}
}
if hasRangeKeys {
li := &keyspanimpl.LevelIter{}
newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
rangeKeyIter, err := newRangeKeyIter(file, iterOptions)
newRangeKeyIterWrapper := func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
rangeKeyIter, err := newRangeKeyIter(ctx, file, iterOptions)
if err != nil {
return nil, err
} else if rangeKeyIter == nil {
Expand All @@ -829,7 +828,10 @@ func (c *compaction) newInputIters(
// in this sstable must wholly lie within the file's bounds.
return noCloseIter, err
}
li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
li := keyspanimpl.NewLevelIter(
context.Background(), keyspan.SpanIterOptions{}, c.cmp,
newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange,
)
rangeKeyIters = append(rangeKeyIters, li)
}
return nil
Expand Down
1 change: 1 addition & 0 deletions error_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (i *errorKeyspanIter) First() (*keyspan.Span, error) { return ni
func (i *errorKeyspanIter) Last() (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) Next() (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) Prev() (*keyspan.Span, error) { return nil, i.err }
func (i *errorKeyspanIter) SetContext(ctx context.Context) {}
func (i *errorKeyspanIter) Close() {}
func (*errorKeyspanIter) String() string { return "error" }
func (*errorKeyspanIter) WrapChildren(wrap keyspan.WrapFn) {}
Expand Down
6 changes: 4 additions & 2 deletions flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
}

func (s *ingestedFlushable) constructRangeDelIter(
file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
ctx context.Context, file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
) (keyspan.FragmentIterator, error) {
iters, err := s.newIters(context.Background(), file, nil, internalIterOpts{}, iterRangeDeletions)
iters, err := s.newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
if err != nil {
return nil, err
}
Expand All @@ -250,6 +250,7 @@ func (s *ingestedFlushable) constructRangeDelIter(
// the point iterator in constructRangeDeIter is not tracked.
func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
return keyspanimpl.NewLevelIter(
context.TODO(),
keyspan.SpanIterOptions{}, s.comparer.Compare,
s.constructRangeDelIter, s.slice.Iter(), manifest.Level(0),
manifest.KeyTypePoint,
Expand All @@ -263,6 +264,7 @@ func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIter
}

return keyspanimpl.NewLevelIter(
context.TODO(),
keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange,
)
Expand Down
6 changes: 6 additions & 0 deletions internal/keyspan/assert_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package keyspan

import (
"context"
"fmt"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -165,6 +166,11 @@ func (i *assertIter) Prev() (*Span, error) {
return span, err
}

// SetContext is part of the FragmentIterator interface.
func (i *assertIter) SetContext(ctx context.Context) {
i.iter.SetContext(ctx)
}

// Close implements FragmentIterator.
func (i *assertIter) Close() {
i.iter.Close()
Expand Down
7 changes: 7 additions & 0 deletions internal/keyspan/bounded.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package keyspan

import (
"context"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/treeprinter"
)
Expand Down Expand Up @@ -209,6 +211,11 @@ func (i *BoundedIter) Close() {
i.iter.Close()
}

// SetContext is part of the FragmentIterator interface.
func (i *BoundedIter) SetContext(ctx context.Context) {
i.iter.SetContext(ctx)
}

// SetBounds modifies the FragmentIterator's bounds.
func (i *BoundedIter) SetBounds(lower, upper []byte) {
i.lower, i.upper = lower, upper
Expand Down
6 changes: 6 additions & 0 deletions internal/keyspan/defragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package keyspan

import (
"bytes"
"context"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/bytealloc"
Expand Down Expand Up @@ -196,6 +197,11 @@ func (i *DefragmentingIter) Init(
}
}

// SetContext is part of the FragmentIterator interface.
func (i *DefragmentingIter) SetContext(ctx context.Context) {
i.iter.SetContext(ctx)
}

// Close closes the underlying iterators.
func (i *DefragmentingIter) Close() {
i.iter.Close()
Expand Down
7 changes: 7 additions & 0 deletions internal/keyspan/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package keyspan

import (
"context"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/treeprinter"
)
Expand Down Expand Up @@ -93,6 +95,11 @@ func (i *filteringIter) Prev() (*Span, error) {
return i.filter(s, -1)
}

// SetContext is part of the FragmentIterator interface.
func (i *filteringIter) SetContext(ctx context.Context) {
i.iter.SetContext(ctx)
}

// Close implements FragmentIterator.
func (i *filteringIter) Close() {
i.iter.Close()
Expand Down
1 change: 1 addition & 0 deletions internal/keyspan/interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,7 @@ func (i *InterleavingIter) SetBounds(lower, upper []byte) {
// SetContext implements (base.InternalIterator).SetContext.
func (i *InterleavingIter) SetContext(ctx context.Context) {
i.pointIter.SetContext(ctx)
i.keyspanIter.SetContext(ctx)
}

// DebugTree is part of the InternalIterator interface.
Expand Down
9 changes: 9 additions & 0 deletions internal/keyspan/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package keyspan

import (
"context"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/treeprinter"
)
Expand Down Expand Up @@ -69,6 +71,10 @@ type FragmentIterator interface {
// stack. Used only for debug logging.
WrapChildren(wrap WrapFn)

// SetContext replaces the context provided at iterator creation, or the last
// one provided by SetContext.
SetContext(ctx context.Context)

base.IteratorDebug
}

Expand Down Expand Up @@ -211,6 +217,9 @@ func (i *Iter) Prev() (*Span, error) {
return &i.spans[i.index], nil
}

// SetContext is part of the FragmentIterator interface.
func (i *Iter) SetContext(ctx context.Context) {}

// Close implements FragmentIterator.Close.
func (i *Iter) Close() {}

Expand Down
20 changes: 17 additions & 3 deletions internal/keyspan/keyspanimpl/level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package keyspanimpl

import (
"context"
"fmt"

"github.com/cockroachdb/pebble/internal/base"
Expand All @@ -17,7 +18,7 @@ import (
// TableNewSpanIter creates a new iterator for range key spans for the given
// file.
type TableNewSpanIter func(
file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions,
ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions,
) (keyspan.FragmentIterator, error)

// LevelIter provides a merged view of spans from sstables in an L1+ level or an
Expand Down Expand Up @@ -46,6 +47,8 @@ type LevelIter struct {
// newIter creates a range del iterator if keyType is KeyTypePoint or a range
// key iterator if keyType is KeyTypeRange.
newIter TableNewSpanIter
// ctx is passed to TableNewSpanIter.
ctx context.Context

// The options that were passed in.
tableOpts keyspan.SpanIterOptions
Expand Down Expand Up @@ -79,6 +82,7 @@ var _ keyspan.FragmentIterator = (*LevelIter)(nil)
// newIter must create a range del iterator for the given file if keyType is
// KeyTypePoint or a range key iterator if keyType is KeyTypeRange.
func NewLevelIter(
ctx context.Context,
opts keyspan.SpanIterOptions,
cmp base.Compare,
newIter TableNewSpanIter,
Expand All @@ -87,7 +91,7 @@ func NewLevelIter(
keyType manifest.KeyType,
) *LevelIter {
l := &LevelIter{}
l.Init(opts, cmp, newIter, files, level, keyType)
l.Init(ctx, opts, cmp, newIter, files, level, keyType)
return l
}

Expand All @@ -96,6 +100,7 @@ func NewLevelIter(
// newIter must create a range del iterator for the given file if keyType is
// KeyTypePoint or a range key iterator if keyType is KeyTypeRange.
func (l *LevelIter) Init(
ctx context.Context,
opts keyspan.SpanIterOptions,
cmp base.Compare,
newIter TableNewSpanIter,
Expand All @@ -111,6 +116,7 @@ func (l *LevelIter) Init(
keyType: keyType,
level: level,
newIter: newIter,
ctx: ctx,
tableOpts: opts,
files: files.Filter(keyType),
}
Expand Down Expand Up @@ -387,6 +393,14 @@ func (l *LevelIter) moveToPrevFile() (*keyspan.Span, error) {
}
}

// SetContext is part of the FragmentIterator interface.
func (l *LevelIter) SetContext(ctx context.Context) {
l.ctx = ctx
if l.lastIter != nil {
l.lastIter.SetContext(ctx)
}
}

// Close implements keyspan.FragmentIterator.
func (l *LevelIter) Close() {
l.file = nil
Expand Down Expand Up @@ -442,7 +456,7 @@ func (l *LevelIter) setPosAtFile(f *manifest.FileMetadata) error {
l.lastIter = nil
l.lastIterFile = nil
}
iter, err := l.newIter(l.file, l.tableOpts)
iter, err := l.newIter(l.ctx, l.file, l.tableOpts)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions internal/keyspan/keyspanimpl/level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package keyspanimpl

import (
"context"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -301,7 +302,7 @@ func TestLevelIterEquivalence(t *testing.T) {
metas = append(metas, meta)
}

tableNewIters := func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
tableNewIters := func(ctx context.Context, file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
return keyspan.NewIter(base.DefaultComparer.Compare, tc.levels[j][file.FileNum-1]), nil
}
// Add all the fileMetadatas to L6.
Expand All @@ -314,6 +315,7 @@ func TestLevelIterEquivalence(t *testing.T) {
v, err := b.Apply(nil, base.DefaultComparer, 0, 0)
require.NoError(t, err)
levelIter.Init(
context.Background(),
keyspan.SpanIterOptions{}, base.DefaultComparer.Compare, tableNewIters,
v.Levels[6].Iter(), 0, manifest.KeyTypeRange,
)
Expand Down Expand Up @@ -414,7 +416,7 @@ func TestLevelIter(t *testing.T) {
keyType = manifest.KeyTypePoint
}
}
tableNewIters := func(file *manifest.FileMetadata, _ keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
tableNewIters := func(ctx context.Context, file *manifest.FileMetadata, _ keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
f := files[file.FileNum-1]
if keyType == manifest.KeyTypePoint {
return keyspan.NewIter(cmp, f.rangeDels), nil
Expand All @@ -426,7 +428,7 @@ func TestLevelIter(t *testing.T) {
metas[i] = files[i].meta
}
lm := manifest.MakeLevelMetadata(cmp, 6, metas)
iter := NewLevelIter(keyspan.SpanIterOptions{}, cmp, tableNewIters, lm.Iter(), 6, keyType)
iter := NewLevelIter(context.Background(), keyspan.SpanIterOptions{}, cmp, tableNewIters, lm.Iter(), 6, keyType)
extraInfo := func() string {
return iter.String()
}
Expand Down
8 changes: 8 additions & 0 deletions internal/keyspan/keyspanimpl/merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package keyspanimpl
import (
"bytes"
"cmp"
"context"
"fmt"
"slices"

Expand Down Expand Up @@ -676,6 +677,13 @@ func (m *MergingIter) Prev() (*keyspan.Span, error) {
return m.findPrevFragmentSet()
}

// SetContext is part of the FragmentIterator interface.
func (m *MergingIter) SetContext(ctx context.Context) {
for i := range m.levels {
m.levels[i].iter.SetContext(ctx)
}
}

// Close closes the iterator, releasing all acquired resources.
func (m *MergingIter) Close() {
for i := range m.levels {
Expand Down
6 changes: 6 additions & 0 deletions internal/keyspan/logging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package keyspan

import (
"context"
"fmt"

"github.com/cockroachdb/pebble/internal/base"
Expand Down Expand Up @@ -132,6 +133,11 @@ func (i *loggingIter) Prev() (*Span, error) {
return span, err
}

// SetContext is part of the FragmentIterator interface.
func (i *loggingIter) SetContext(ctx context.Context) {
i.iter.SetContext(ctx)
}

// Close implements FragmentIterator.
func (i *loggingIter) Close() {
opEnd := i.opStartf("Close()")
Expand Down
11 changes: 11 additions & 0 deletions internal/keyspan/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package keyspan

import (
"bytes"
"context"
"fmt"
"go/token"
"io"
Expand Down Expand Up @@ -365,6 +366,11 @@ func (p *probeIterator) Prev() (*Span, error) {
return p.handleOp(op)
}

// SetContext is part of the FragmentIterator interface.
func (p *probeIterator) SetContext(ctx context.Context) {
p.iter.SetContext(ctx)
}

func (p *probeIterator) Close() {
op := op{Kind: OpClose}
if p.iter != nil {
Expand Down Expand Up @@ -561,6 +567,11 @@ func (i *invalidatingIter) Last() (*Span, error) { return i.invalida
func (i *invalidatingIter) Next() (*Span, error) { return i.invalidate(i.iter.Next()) }
func (i *invalidatingIter) Prev() (*Span, error) { return i.invalidate(i.iter.Prev()) }

// SetContext is part of the FragmentIterator interface.
func (i *invalidatingIter) SetContext(ctx context.Context) {
i.iter.SetContext(ctx)
}

func (i *invalidatingIter) Close() {
_, _ = i.invalidate(nil, nil)
i.iter.Close()
Expand Down
Loading

0 comments on commit 2752abb

Please sign in to comment.