From 9d69d204389d8ddf06209e444632e1ec5e568998 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Tue, 17 Sep 2024 17:35:45 -0400 Subject: [PATCH] colblk: add NewKeyspanIter, object pooling Refactor the exported interface surrounding the KeyspanIter to accommodate the expected usage: constructing heap-allocated *KeyspanIters that read through a cache/pool handle. The previous KeyspanIter type was refactored as an unexported keyspanIter type, and a new KeyspanIter implements cache-awareness as well as recycling of objects on Close. --- sstable/colblk/keyspan.go | 113 +++++++++++++++++++++++++++------ sstable/colblk/keyspan_test.go | 62 ++++++++++++++++-- 2 files changed, 149 insertions(+), 26 deletions(-) diff --git a/sstable/colblk/keyspan.go b/sstable/colblk/keyspan.go index e9d845858a..dac1cad183 100644 --- a/sstable/colblk/keyspan.go +++ b/sstable/colblk/keyspan.go @@ -9,6 +9,8 @@ import ( "context" "encoding/binary" "fmt" + "os" + "sync" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" @@ -16,6 +18,7 @@ import ( "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/treeprinter" + "github.com/cockroachdb/pebble/sstable/block" ) // the keyspan header encodes a 32-bit count of the number of unique boundary @@ -285,12 +288,65 @@ func (r *KeyspanReader) searchBoundaryKeys(cmp base.Compare, key []byte) (index return i, false } -// A KeyspanIter is an iterator over a keyspan block. It implements the +// NewKeyspanIter constructs a new iterator over a keyspan columnar block. +func NewKeyspanIter( + cmp base.Compare, h block.BufferHandle, transforms block.FragmentIterTransforms, +) *KeyspanIter { + i := keyspanIterPool.Get().(*KeyspanIter) + i.closeCheck = invariants.CloseChecker{} + i.handle = h + // TODO(jackson): We can teach the block cache to stash a *KeyspanReader. + // Then all iters would use the same reader rather than needing to allocate + // their own KeyspanReader and parse the high-level block structure + // themselves. + i.allocReader.Init(h.Get()) + i.init(cmp, &i.allocReader, transforms) + return i +} + +var keyspanIterPool = sync.Pool{ + New: func() interface{} { + i := &KeyspanIter{} + invariants.SetFinalizer(i, checkKeyspanIter) + return i + }, +} + +// A KeyspanIter is an iterator over a columnar keyspan block. It implements the // keyspan.FragmentIterator interface. type KeyspanIter struct { - r *KeyspanReader - cmp base.Compare - span keyspan.Span + keyspanIter + handle block.BufferHandle + allocReader KeyspanReader + + closeCheck invariants.CloseChecker +} + +// Close closes the iterator. +func (i *KeyspanIter) Close() { + i.handle.Release() + i.handle = block.BufferHandle{} + + if invariants.Sometimes(25) { + // In invariants mode, sometimes don't add the object to the pool so + // that we can check for double closes that take longer than the object + // stays in the pool. + return + } + + i.keyspanIter.Close() + i.allocReader = KeyspanReader{} + i.closeCheck.Close() + keyspanIterPool.Put(i) +} + +// A keyspanIter is an iterator over a keyspan block. It implements the +// keyspan.FragmentIterator interface. +type keyspanIter struct { + r *KeyspanReader + cmp base.Compare + transforms block.FragmentIterTransforms + span keyspan.Span // When positioned, the current span's start key is the user key at // i.r.userKeys.At(i.startBoundIndex) // and the current span's end key is the user key at @@ -300,13 +356,16 @@ type KeyspanIter struct { } // Assert that KeyspanIter implements the FragmentIterator interface. -var _ keyspan.FragmentIterator = (*KeyspanIter)(nil) +var _ keyspan.FragmentIterator = (*keyspanIter)(nil) -// Init initializes the iterator with the given comparison function and keyspan +// init initializes the iterator with the given comparison function and keyspan // reader. -func (i *KeyspanIter) Init(cmp base.Compare, r *KeyspanReader) { +func (i *keyspanIter) init( + cmp base.Compare, r *KeyspanReader, transforms block.FragmentIterTransforms, +) { i.r = r i.cmp = cmp + i.transforms = transforms i.span.Start, i.span.End = nil, nil i.startBoundIndex = -1 if i.span.Keys == nil { @@ -317,7 +376,7 @@ func (i *KeyspanIter) Init(cmp base.Compare, r *KeyspanReader) { // SeekGE moves the iterator to the first span covering a key greater than // or equal to the given key. This is equivalent to seeking to the first // span with an end key greater than the given key. -func (i *KeyspanIter) SeekGE(key []byte) (*keyspan.Span, error) { +func (i *keyspanIter) SeekGE(key []byte) (*keyspan.Span, error) { // Seek among the boundary keys. j, eq := i.r.searchBoundaryKeys(i.cmp, key) // If the found boundary key does not exactly equal the given key, it's @@ -332,7 +391,7 @@ func (i *KeyspanIter) SeekGE(key []byte) (*keyspan.Span, error) { // SeekLT moves the iterator to the last span covering a key less than the // given key. This is equivalent to seeking to the last span with a start // key less than the given key. -func (i *KeyspanIter) SeekLT(key []byte) (*keyspan.Span, error) { +func (i *keyspanIter) SeekLT(key []byte) (*keyspan.Span, error) { // Seek among the boundary keys. j, eq := i.r.searchBoundaryKeys(i.cmp, key) // If eq is true, the found boundary key exactly equals the given key. A @@ -348,29 +407,29 @@ func (i *KeyspanIter) SeekLT(key []byte) (*keyspan.Span, error) { } // First moves the iterator to the first span. -func (i *KeyspanIter) First() (*keyspan.Span, error) { +func (i *keyspanIter) First() (*keyspan.Span, error) { return i.gatherKeysForward(0), nil } // Last moves the iterator to the last span. -func (i *KeyspanIter) Last() (*keyspan.Span, error) { +func (i *keyspanIter) Last() (*keyspan.Span, error) { return i.gatherKeysBackward(int(i.r.boundaryKeysCount) - 2), nil } // Next moves the iterator to the next span. -func (i *KeyspanIter) Next() (*keyspan.Span, error) { +func (i *keyspanIter) Next() (*keyspan.Span, error) { return i.gatherKeysForward(i.startBoundIndex + 1), nil } // Prev moves the iterator to the previous span. -func (i *KeyspanIter) Prev() (*keyspan.Span, error) { +func (i *keyspanIter) Prev() (*keyspan.Span, error) { return i.gatherKeysBackward(max(i.startBoundIndex-1, -1)), nil } // gatherKeysForward returns the first non-empty Span in the forward direction, // starting with the span formed by using the boundary key at index // [startBoundIndex] as the span's start boundary. -func (i *KeyspanIter) gatherKeysForward(startBoundIndex int) *keyspan.Span { +func (i *keyspanIter) gatherKeysForward(startBoundIndex int) *keyspan.Span { if invariants.Enabled && startBoundIndex < 0 { panic(errors.AssertionFailedf("out of bounds: i.startBoundIndex=%d", startBoundIndex)) } @@ -394,7 +453,7 @@ func (i *KeyspanIter) gatherKeysForward(startBoundIndex int) *keyspan.Span { // gatherKeysBackward returns the first non-empty Span in the backward direction, // starting with the span formed by using the boundary key at index // [startBoundIndex] as the span's start boundary. -func (i *KeyspanIter) gatherKeysBackward(startBoundIndex int) *keyspan.Span { +func (i *keyspanIter) gatherKeysBackward(startBoundIndex int) *keyspan.Span { i.startBoundIndex = startBoundIndex if i.startBoundIndex < 0 { return nil @@ -418,13 +477,15 @@ func (i *KeyspanIter) gatherKeysBackward(startBoundIndex int) *keyspan.Span { // isNonemptySpan returns true if the span starting at i.startBoundIndex // contains keys. -func (i *KeyspanIter) isNonemptySpan(startBoundIndex int) bool { +func (i *keyspanIter) isNonemptySpan(startBoundIndex int) bool { return i.r.boundaryKeyIndices.At(startBoundIndex) < i.r.boundaryKeyIndices.At(startBoundIndex+1) } // materializeSpan constructs the current span from i.startBoundIndex and // i.{start,end}KeyIndex. -func (i *KeyspanIter) materializeSpan() *keyspan.Span { +func (i *keyspanIter) materializeSpan() *keyspan.Span { + // TODO(jackson): Apply i.transforms. + i.span = keyspan.Span{ Start: i.r.boundaryKeys.At(i.startBoundIndex), End: i.r.boundaryKeys.At(i.startBoundIndex + 1), @@ -446,15 +507,25 @@ func (i *KeyspanIter) materializeSpan() *keyspan.Span { } // Close closes the iterator. -func (i *KeyspanIter) Close() {} +func (i *keyspanIter) Close() { + *i = keyspanIter{} +} // SetContext implements keyspan.FragmentIterator. -func (i *KeyspanIter) SetContext(context.Context) {} +func (i *keyspanIter) SetContext(context.Context) {} // WrapChildren implements keyspan.FragmentIterator. -func (i *KeyspanIter) WrapChildren(keyspan.WrapFn) {} +func (i *keyspanIter) WrapChildren(keyspan.WrapFn) {} // DebugTree is part of the FragmentIterator interface. -func (i *KeyspanIter) DebugTree(tp treeprinter.Node) { +func (i *keyspanIter) DebugTree(tp treeprinter.Node) { tp.Childf("%T(%p)", i, i) } + +func checkKeyspanIter(obj interface{}) { + i := obj.(*KeyspanIter) + if p := i.handle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "KeyspanIter.handle is not nil: %p\n", p) + os.Exit(1) + } +} diff --git a/sstable/colblk/keyspan_test.go b/sstable/colblk/keyspan_test.go index b62512df64..7712f288db 100644 --- a/sstable/colblk/keyspan_test.go +++ b/sstable/colblk/keyspan_test.go @@ -9,13 +9,17 @@ import ( "fmt" "io" "strings" + "sync" "testing" "time" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/testkeys" + "github.com/cockroachdb/pebble/sstable/block" + "github.com/stretchr/testify/require" "golang.org/x/exp/rand" ) @@ -48,8 +52,8 @@ func TestKeyspanBlock(t *testing.T) { fmt.Fprint(&buf, kr.DebugString()) return buf.String() case "iter": - var iter KeyspanIter - iter.Init(base.DefaultComparer.Compare, &kr) + var iter keyspanIter + iter.init(base.DefaultComparer.Compare, &kr, block.NoFragmentTransforms) return keyspan.RunFragmentIteratorCmd(&iter, td.Input, nil) default: return fmt.Sprintf("unknown command: %s", td.Cmd) @@ -57,6 +61,55 @@ func TestKeyspanBlock(t *testing.T) { }) } +// TestKeyspanBlockPooling exercises the NewKeyspanIter constructor of a +// KeyspanIter and the Close behavior that retains keyspan iters within a pool. +func TestKeyspanBlockPooling(t *testing.T) { + var w KeyspanBlockWriter + w.Init(testkeys.Comparer.Equal) + s1 := keyspan.ParseSpan("b-c:{(#100,RANGEDEL) (#20,RANGEDEL) (#0,RANGEDEL)}") + s2 := keyspan.ParseSpan("c-d:{(#100,RANGEDEL) (#0,RANGEDEL)}") + w.AddSpan(s1) + w.AddSpan(s2) + b := w.Finish() + + c := cache.New(10 << 10) + defer c.Unref() + v := block.Alloc(len(b), nil) + copy(v.Get(), b) + v.MakeHandle(c, cache.ID(1), base.DiskFileNum(1), 0).Release() + + getBlockAndIterate := func() { + h := c.Get(cache.ID(1), base.DiskFileNum(1), 0) + require.NotNil(t, h.Get()) + it := NewKeyspanIter(testkeys.Comparer.Compare, block.CacheBufferHandle(h), block.NoFragmentTransforms) + defer it.Close() + s, err := it.First() + require.NoError(t, err) + require.NotNil(t, s) + require.Equal(t, s1.String(), s.String()) + s, err = it.Next() + require.NoError(t, err) + require.NotNil(t, s) + require.Equal(t, s2.String(), s.String()) + s, err = it.Next() + require.NoError(t, err) + require.Nil(t, s) + } + + const workers = 8 + var wg sync.WaitGroup + wg.Add(workers) + for w := 0; w < workers; w++ { + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + getBlockAndIterate() + } + }() + } + wg.Wait() +} + func BenchmarkKeyspanBlock_RangeDeletions(b *testing.B) { for _, numSpans := range []int{1, 10, 100} { for _, keysPerSpan := range []int{1, 2, 5} { @@ -94,14 +147,13 @@ func benchmarkKeyspanBlockRangeDeletions(b *testing.B, numSpans, keysPerSpan, ke } w.AddSpan(s) } - block := w.Finish() avgRowSize := float64(w.Size()) / float64(numSpans*keysPerSpan) var kr KeyspanReader - kr.Init(block) + kr.Init(w.Finish()) var it KeyspanIter - it.Init(base.DefaultComparer.Compare, &kr) + it.init(base.DefaultComparer.Compare, &kr, block.NoFragmentTransforms) b.Run("SeekGE", func(b *testing.B) { rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))