Skip to content

Commit

Permalink
colblk: add NewKeyspanIter, object pooling
Browse files Browse the repository at this point in the history
Refactor the exported interface surrounding the KeyspanIter to accommodate the
expected usage: constructing heap-allocated *KeyspanIters that read through a
cache/pool handle. The previous KeyspanIter type was refactored as an
unexported keyspanIter type, and a new KeyspanIter implements cache-awareness
as well as recycling of objects on Close.
  • Loading branch information
jbowens committed Sep 23, 2024
1 parent aba5868 commit 9d69d20
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 26 deletions.
113 changes: 92 additions & 21 deletions sstable/colblk/keyspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"context"
"encoding/binary"
"fmt"
"os"
"sync"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/binfmt"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/treeprinter"
"github.com/cockroachdb/pebble/sstable/block"
)

// the keyspan header encodes a 32-bit count of the number of unique boundary
Expand Down Expand Up @@ -285,12 +288,65 @@ func (r *KeyspanReader) searchBoundaryKeys(cmp base.Compare, key []byte) (index
return i, false
}

// A KeyspanIter is an iterator over a keyspan block. It implements the
// NewKeyspanIter constructs a new iterator over a keyspan columnar block.
func NewKeyspanIter(
cmp base.Compare, h block.BufferHandle, transforms block.FragmentIterTransforms,
) *KeyspanIter {
i := keyspanIterPool.Get().(*KeyspanIter)
i.closeCheck = invariants.CloseChecker{}
i.handle = h
// TODO(jackson): We can teach the block cache to stash a *KeyspanReader.
// Then all iters would use the same reader rather than needing to allocate
// their own KeyspanReader and parse the high-level block structure
// themselves.
i.allocReader.Init(h.Get())
i.init(cmp, &i.allocReader, transforms)
return i
}

var keyspanIterPool = sync.Pool{
New: func() interface{} {
i := &KeyspanIter{}
invariants.SetFinalizer(i, checkKeyspanIter)
return i
},
}

// A KeyspanIter is an iterator over a columnar keyspan block. It implements the
// keyspan.FragmentIterator interface.
type KeyspanIter struct {
r *KeyspanReader
cmp base.Compare
span keyspan.Span
keyspanIter
handle block.BufferHandle
allocReader KeyspanReader

closeCheck invariants.CloseChecker
}

// Close closes the iterator.
func (i *KeyspanIter) Close() {
i.handle.Release()
i.handle = block.BufferHandle{}

if invariants.Sometimes(25) {
// In invariants mode, sometimes don't add the object to the pool so
// that we can check for double closes that take longer than the object
// stays in the pool.
return
}

i.keyspanIter.Close()
i.allocReader = KeyspanReader{}
i.closeCheck.Close()
keyspanIterPool.Put(i)
}

// A keyspanIter is an iterator over a keyspan block. It implements the
// keyspan.FragmentIterator interface.
type keyspanIter struct {
r *KeyspanReader
cmp base.Compare
transforms block.FragmentIterTransforms
span keyspan.Span
// When positioned, the current span's start key is the user key at
// i.r.userKeys.At(i.startBoundIndex)
// and the current span's end key is the user key at
Expand All @@ -300,13 +356,16 @@ type KeyspanIter struct {
}

// Assert that KeyspanIter implements the FragmentIterator interface.
var _ keyspan.FragmentIterator = (*KeyspanIter)(nil)
var _ keyspan.FragmentIterator = (*keyspanIter)(nil)

// Init initializes the iterator with the given comparison function and keyspan
// init initializes the iterator with the given comparison function and keyspan
// reader.
func (i *KeyspanIter) Init(cmp base.Compare, r *KeyspanReader) {
func (i *keyspanIter) init(
cmp base.Compare, r *KeyspanReader, transforms block.FragmentIterTransforms,
) {
i.r = r
i.cmp = cmp
i.transforms = transforms
i.span.Start, i.span.End = nil, nil
i.startBoundIndex = -1
if i.span.Keys == nil {
Expand All @@ -317,7 +376,7 @@ func (i *KeyspanIter) Init(cmp base.Compare, r *KeyspanReader) {
// SeekGE moves the iterator to the first span covering a key greater than
// or equal to the given key. This is equivalent to seeking to the first
// span with an end key greater than the given key.
func (i *KeyspanIter) SeekGE(key []byte) (*keyspan.Span, error) {
func (i *keyspanIter) SeekGE(key []byte) (*keyspan.Span, error) {
// Seek among the boundary keys.
j, eq := i.r.searchBoundaryKeys(i.cmp, key)
// If the found boundary key does not exactly equal the given key, it's
Expand All @@ -332,7 +391,7 @@ func (i *KeyspanIter) SeekGE(key []byte) (*keyspan.Span, error) {
// SeekLT moves the iterator to the last span covering a key less than the
// given key. This is equivalent to seeking to the last span with a start
// key less than the given key.
func (i *KeyspanIter) SeekLT(key []byte) (*keyspan.Span, error) {
func (i *keyspanIter) SeekLT(key []byte) (*keyspan.Span, error) {
// Seek among the boundary keys.
j, eq := i.r.searchBoundaryKeys(i.cmp, key)
// If eq is true, the found boundary key exactly equals the given key. A
Expand All @@ -348,29 +407,29 @@ func (i *KeyspanIter) SeekLT(key []byte) (*keyspan.Span, error) {
}

// First moves the iterator to the first span.
func (i *KeyspanIter) First() (*keyspan.Span, error) {
func (i *keyspanIter) First() (*keyspan.Span, error) {
return i.gatherKeysForward(0), nil
}

// Last moves the iterator to the last span.
func (i *KeyspanIter) Last() (*keyspan.Span, error) {
func (i *keyspanIter) Last() (*keyspan.Span, error) {
return i.gatherKeysBackward(int(i.r.boundaryKeysCount) - 2), nil
}

// Next moves the iterator to the next span.
func (i *KeyspanIter) Next() (*keyspan.Span, error) {
func (i *keyspanIter) Next() (*keyspan.Span, error) {
return i.gatherKeysForward(i.startBoundIndex + 1), nil
}

// Prev moves the iterator to the previous span.
func (i *KeyspanIter) Prev() (*keyspan.Span, error) {
func (i *keyspanIter) Prev() (*keyspan.Span, error) {
return i.gatherKeysBackward(max(i.startBoundIndex-1, -1)), nil
}

// gatherKeysForward returns the first non-empty Span in the forward direction,
// starting with the span formed by using the boundary key at index
// [startBoundIndex] as the span's start boundary.
func (i *KeyspanIter) gatherKeysForward(startBoundIndex int) *keyspan.Span {
func (i *keyspanIter) gatherKeysForward(startBoundIndex int) *keyspan.Span {
if invariants.Enabled && startBoundIndex < 0 {
panic(errors.AssertionFailedf("out of bounds: i.startBoundIndex=%d", startBoundIndex))
}
Expand All @@ -394,7 +453,7 @@ func (i *KeyspanIter) gatherKeysForward(startBoundIndex int) *keyspan.Span {
// gatherKeysBackward returns the first non-empty Span in the backward direction,
// starting with the span formed by using the boundary key at index
// [startBoundIndex] as the span's start boundary.
func (i *KeyspanIter) gatherKeysBackward(startBoundIndex int) *keyspan.Span {
func (i *keyspanIter) gatherKeysBackward(startBoundIndex int) *keyspan.Span {
i.startBoundIndex = startBoundIndex
if i.startBoundIndex < 0 {
return nil
Expand All @@ -418,13 +477,15 @@ func (i *KeyspanIter) gatherKeysBackward(startBoundIndex int) *keyspan.Span {

// isNonemptySpan returns true if the span starting at i.startBoundIndex
// contains keys.
func (i *KeyspanIter) isNonemptySpan(startBoundIndex int) bool {
func (i *keyspanIter) isNonemptySpan(startBoundIndex int) bool {
return i.r.boundaryKeyIndices.At(startBoundIndex) < i.r.boundaryKeyIndices.At(startBoundIndex+1)
}

// materializeSpan constructs the current span from i.startBoundIndex and
// i.{start,end}KeyIndex.
func (i *KeyspanIter) materializeSpan() *keyspan.Span {
func (i *keyspanIter) materializeSpan() *keyspan.Span {
// TODO(jackson): Apply i.transforms.

i.span = keyspan.Span{
Start: i.r.boundaryKeys.At(i.startBoundIndex),
End: i.r.boundaryKeys.At(i.startBoundIndex + 1),
Expand All @@ -446,15 +507,25 @@ func (i *KeyspanIter) materializeSpan() *keyspan.Span {
}

// Close closes the iterator.
func (i *KeyspanIter) Close() {}
func (i *keyspanIter) Close() {
*i = keyspanIter{}
}

// SetContext implements keyspan.FragmentIterator.
func (i *KeyspanIter) SetContext(context.Context) {}
func (i *keyspanIter) SetContext(context.Context) {}

// WrapChildren implements keyspan.FragmentIterator.
func (i *KeyspanIter) WrapChildren(keyspan.WrapFn) {}
func (i *keyspanIter) WrapChildren(keyspan.WrapFn) {}

// DebugTree is part of the FragmentIterator interface.
func (i *KeyspanIter) DebugTree(tp treeprinter.Node) {
func (i *keyspanIter) DebugTree(tp treeprinter.Node) {
tp.Childf("%T(%p)", i, i)
}

func checkKeyspanIter(obj interface{}) {
i := obj.(*KeyspanIter)
if p := i.handle.Get(); p != nil {
fmt.Fprintf(os.Stderr, "KeyspanIter.handle is not nil: %p\n", p)
os.Exit(1)
}
}
62 changes: 57 additions & 5 deletions sstable/colblk/keyspan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import (
"fmt"
"io"
"strings"
"sync"
"testing"
"time"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

Expand Down Expand Up @@ -48,15 +52,64 @@ func TestKeyspanBlock(t *testing.T) {
fmt.Fprint(&buf, kr.DebugString())
return buf.String()
case "iter":
var iter KeyspanIter
iter.Init(base.DefaultComparer.Compare, &kr)
var iter keyspanIter
iter.init(base.DefaultComparer.Compare, &kr, block.NoFragmentTransforms)
return keyspan.RunFragmentIteratorCmd(&iter, td.Input, nil)
default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
})
}

// TestKeyspanBlockPooling exercises the NewKeyspanIter constructor of a
// KeyspanIter and the Close behavior that retains keyspan iters within a pool.
func TestKeyspanBlockPooling(t *testing.T) {
var w KeyspanBlockWriter
w.Init(testkeys.Comparer.Equal)
s1 := keyspan.ParseSpan("b-c:{(#100,RANGEDEL) (#20,RANGEDEL) (#0,RANGEDEL)}")
s2 := keyspan.ParseSpan("c-d:{(#100,RANGEDEL) (#0,RANGEDEL)}")
w.AddSpan(s1)
w.AddSpan(s2)
b := w.Finish()

c := cache.New(10 << 10)
defer c.Unref()
v := block.Alloc(len(b), nil)
copy(v.Get(), b)
v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release()

getBlockAndIterate := func() {
h := c.Get(cache.ID(1), base.DiskFileNum(1), 0)
require.NotNil(t, h.Get())
it := NewKeyspanIter(testkeys.Comparer.Compare, block.CacheBufferHandle(h), block.NoFragmentTransforms)
defer it.Close()
s, err := it.First()
require.NoError(t, err)
require.NotNil(t, s)
require.Equal(t, s1.String(), s.String())
s, err = it.Next()
require.NoError(t, err)
require.NotNil(t, s)
require.Equal(t, s2.String(), s.String())
s, err = it.Next()
require.NoError(t, err)
require.Nil(t, s)
}

const workers = 8
var wg sync.WaitGroup
wg.Add(workers)
for w := 0; w < workers; w++ {
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
getBlockAndIterate()
}
}()
}
wg.Wait()
}

func BenchmarkKeyspanBlock_RangeDeletions(b *testing.B) {
for _, numSpans := range []int{1, 10, 100} {
for _, keysPerSpan := range []int{1, 2, 5} {
Expand Down Expand Up @@ -94,14 +147,13 @@ func benchmarkKeyspanBlockRangeDeletions(b *testing.B, numSpans, keysPerSpan, ke
}
w.AddSpan(s)
}
block := w.Finish()
avgRowSize := float64(w.Size()) / float64(numSpans*keysPerSpan)

var kr KeyspanReader
kr.Init(block)
kr.Init(w.Finish())

var it KeyspanIter
it.Init(base.DefaultComparer.Compare, &kr)
it.init(base.DefaultComparer.Compare, &kr, block.NoFragmentTransforms)
b.Run("SeekGE", func(b *testing.B) {
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))

Expand Down

0 comments on commit 9d69d20

Please sign in to comment.