Skip to content

Commit

Permalink
valblk: move reader, fetcher
Browse files Browse the repository at this point in the history
Move the facilities around reading value blocks into the valblk package.
  • Loading branch information
jbowens committed Nov 18, 2024
1 parent bd62887 commit 7ce2628
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 148 deletions.
9 changes: 9 additions & 0 deletions sstable/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package sstable
import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/sstable/valblk"
)

// These constants are part of the file format, and should not be changed.
Expand All @@ -28,3 +29,11 @@ type InternalKey = base.InternalKey

// Span exports the keyspan.Span type.
type Span = keyspan.Span

const valueBlocksIndexHandleMaxLen = blockHandleMaxLenWithoutProperties + 3

// Assert blockHandleLikelyMaxLen >= valueBlocksIndexHandleMaxLen.
const _ = uint(blockHandleLikelyMaxLen - valueBlocksIndexHandleMaxLen)

// Assert blockHandleLikelyMaxLen >= valblk.HandleMaxLen.
const _ = uint(blockHandleLikelyMaxLen - valblk.HandleMaxLen)
32 changes: 1 addition & 31 deletions sstable/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ func decodeLayout(comparer *base.Comparer, data []byte) (Layout, error) {
if err != nil {
return Layout{}, errors.Wrap(err, "decompressing value index")
}
layout.ValueBlock, err = decodeValueBlockIndex(vbiBlock, vbih)
layout.ValueBlock, err = valblk.DecodeIndex(vbiBlock, vbih)
if err != nil {
return Layout{}, err
}
Expand Down Expand Up @@ -617,36 +617,6 @@ func decodeMetaindex(
return meta, vbih, nil
}

func decodeValueBlockIndex(data []byte, vbih valblk.IndexHandle) ([]block.Handle, error) {
var valueBlocks []block.Handle
indexEntryLen := int(vbih.BlockNumByteLength + vbih.BlockOffsetByteLength +
vbih.BlockLengthByteLength)
i := 0
for len(data) != 0 {
if len(data) < indexEntryLen {
return nil, errors.Errorf(
"remaining value index block %d does not contain a full entry of length %d",
len(data), indexEntryLen)
}
n := int(vbih.BlockNumByteLength)
bn := int(littleEndianGet(data, n))
if bn != i {
return nil, errors.Errorf("unexpected block num %d, expected %d",
bn, i)
}
i++
data = data[n:]
n = int(vbih.BlockOffsetByteLength)
blockOffset := littleEndianGet(data, n)
data = data[n:]
n = int(vbih.BlockLengthByteLength)
blockLen := littleEndianGet(data, n)
data = data[n:]
valueBlocks = append(valueBlocks, block.Handle{Offset: blockOffset, Length: blockLen})
}
return valueBlocks, nil
}

// layoutWriter writes the structure of an sstable to durable storage. It
// accepts serialized blocks, writes them to storage and returns a block handle
// describing the offset and length of the block.
Expand Down
43 changes: 38 additions & 5 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *Reader) NewPointIter(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
) (Iterator, error) {
return r.newPointIter(
ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
Expand Down Expand Up @@ -154,7 +154,7 @@ func (r *Reader) newPointIter(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
vState *virtualState,
) (Iterator, error) {
// NB: pebble.tableCache wraps the returned iterator with one which performs
Expand Down Expand Up @@ -211,7 +211,7 @@ func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterat
func (r *Reader) NewCompactionIter(
transforms IterTransforms,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (Iterator, error) {
return r.newCompactionIter(transforms, statsAccum, rp, nil, bufferPool)
Expand All @@ -220,7 +220,7 @@ func (r *Reader) NewCompactionIter(
func (r *Reader) newCompactionIter(
transforms IterTransforms,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
vState *virtualState,
bufferPool *block.BufferPool,
) (Iterator, error) {
Expand Down Expand Up @@ -448,6 +448,14 @@ func (r *Reader) initKeyspanBlockMetadata(metadata *block.Metadata, data []byte)
return nil
}

// ReadValueBlockExternal implements valblk.ExternalBlockReader, allowing a
// base.LazyValue to read a value block.
func (r *Reader) ReadValueBlockExternal(
ctx context.Context, bh block.Handle,
) (block.BufferHandle, error) {
return r.readValueBlock(ctx, noEnv, noReadHandle, bh)
}

func (r *Reader) readValueBlock(
ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle,
) (block.BufferHandle, error) {
Expand Down Expand Up @@ -749,7 +757,7 @@ func (r *Reader) Layout() (*Layout, error) {
return nil, err
}
defer vbiH.Release()
l.ValueBlock, err = decodeValueBlockIndex(vbiH.BlockData(), r.valueBIH)
l.ValueBlock, err = valblk.DecodeIndex(vbiH.BlockData(), r.valueBIH)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1160,3 +1168,28 @@ func (w deterministicStopwatchForTesting) stop() time.Duration {
}
return dur
}

// MakeTrivialReaderProvider creates a valblk.ReaderProvider which always
// returns the given reader. It should be used when the Reader will outlive the
// iterator tree.
func MakeTrivialReaderProvider(r *Reader) valblk.ReaderProvider {
return (*trivialReaderProvider)(r)
}

// trivialReaderProvider implements valblk.ReaderProvider for a Reader that will
// outlive the top-level iterator in the iterator tree.
//
// Defining the type in this manner (as opposed to a struct) avoids allocation.
type trivialReaderProvider Reader

var _ valblk.ReaderProvider = (*trivialReaderProvider)(nil)

// GetReader implements ReaderProvider.
func (trp *trivialReaderProvider) GetReader(
ctx context.Context,
) (valblk.ExternalBlockReader, error) {
return (*Reader)(trp), nil
}

// Close implements ReaderProvider.
func (trp *trivialReaderProvider) Close() {}
5 changes: 3 additions & 2 deletions sstable/reader_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/valblk"
)

// CommonReader abstracts functionality over a Reader or a VirtualReader. This
Expand All @@ -33,13 +34,13 @@ type CommonReader interface {
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
) (Iterator, error)

NewCompactionIter(
transforms IterTransforms,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (Iterator, error)

Expand Down
29 changes: 10 additions & 19 deletions sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/valblk"
)

// singleLevelIterator iterates over an entire table of data. To seek for a given
Expand Down Expand Up @@ -64,7 +65,7 @@ type singleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIte
// loading. It may not actually have loaded the block, due to an error or
// because it was considered irrelevant.
dataBH block.Handle
vbReader valueBlockReader
vbReader valblk.Reader
// vbRH is the read handle for value blocks, which are in a different
// part of the sstable than data blocks.
vbRH objstorage.ReadHandle
Expand Down Expand Up @@ -211,7 +212,7 @@ func newColumnBlockSingleLevelIterator(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (*singleLevelIteratorColumnBlocks, error) {
if r.err != nil {
Expand All @@ -228,12 +229,7 @@ func newColumnBlockSingleLevelIterator(
)
var getLazyValuer block.GetLazyValueForPrefixAndValueHandler
if r.Properties.NumValueBlocks > 0 {
i.vbReader = valueBlockReader{
bpOpen: i,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
i.vbReader = valblk.MakeReader(i, rp, r.valueBIH, stats)
getLazyValuer = &i.vbReader
i.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.vbRHPrealloc)
}
Expand Down Expand Up @@ -266,7 +262,7 @@ func newRowBlockSingleLevelIterator(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (*singleLevelIteratorRowBlocks, error) {
if r.err != nil {
Expand All @@ -283,12 +279,7 @@ func newRowBlockSingleLevelIterator(
)
if r.tableFormat >= TableFormatPebblev3 {
if r.Properties.NumValueBlocks > 0 {
i.vbReader = valueBlockReader{
bpOpen: i,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
i.vbReader = valblk.MakeReader(i, rp, r.valueBIH, stats)
(&i.data).SetGetLazyValuer(&i.vbReader)
i.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.vbRHPrealloc)
}
Expand Down Expand Up @@ -561,9 +552,9 @@ func (i *singleLevelIterator[I, PI, P, PD]) loadDataBlock(dir int8) loadBlockRes
return loadBlockOK
}

// readBlockForVBR implements the blockProviderWhenOpen interface for use by
// the valueBlockReader.
func (i *singleLevelIterator[I, PI, D, PD]) readBlockForVBR(
// ReadValueBlock implements the valblk.BlockProviderWhenOpen interface for use
// by the valblk.Reader.
func (i *singleLevelIterator[I, PI, D, PD]) ReadValueBlock(
bh block.Handle, stats *base.InternalIteratorStats,
) (block.BufferHandle, error) {
env := i.readBlockEnv
Expand Down Expand Up @@ -1582,7 +1573,7 @@ func (i *singleLevelIterator[I, PI, D, PD]) closeInternal() error {
if i.bpfs != nil {
releaseBlockPropertiesFilterer(i.bpfs)
}
i.vbReader.close()
i.vbReader.Close()
if i.vbRH != nil {
err = firstError(err, i.vbRH.Close())
i.vbRH = nil
Expand Down
19 changes: 5 additions & 14 deletions sstable/reader_iter_two_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/valblk"
)

type twoLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIterator[D]] struct {
Expand Down Expand Up @@ -162,7 +163,7 @@ func newColumnBlockTwoLevelIterator(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (*twoLevelIteratorColumnBlocks, error) {
if r.err != nil {
Expand All @@ -186,12 +187,7 @@ func newColumnBlockTwoLevelIterator(
// versions of keys, and therefore never expose a LazyValue that is
// separated to their callers, they can put this valueBlockReader into a
// sync.Pool.
i.secondLevel.vbReader = valueBlockReader{
bpOpen: &i.secondLevel,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
i.secondLevel.vbReader = valblk.MakeReader(&i.secondLevel, rp, r.valueBIH, stats)
getLazyValuer = &i.secondLevel.vbReader
i.secondLevel.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.secondLevel.vbRHPrealloc)
}
Expand Down Expand Up @@ -225,7 +221,7 @@ func newRowBlockTwoLevelIterator(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (*twoLevelIteratorRowBlocks, error) {
if r.err != nil {
Expand All @@ -249,12 +245,7 @@ func newRowBlockTwoLevelIterator(
// versions of keys, and therefore never expose a LazyValue that is
// separated to their callers, they can put this valueBlockReader into a
// sync.Pool.
i.secondLevel.vbReader = valueBlockReader{
bpOpen: &i.secondLevel,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
i.secondLevel.vbReader = valblk.MakeReader(&i.secondLevel, rp, r.valueBIH, stats)
i.secondLevel.data.SetGetLazyValuer(&i.secondLevel.vbReader)
i.secondLevel.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.secondLevel.vbRHPrealloc)
}
Expand Down
3 changes: 2 additions & 1 deletion sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/valblk"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/vfs/errorfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -230,7 +231,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i
return "virtualize must be called before creating compaction iters"
}

var rp ReaderProvider
var rp valblk.ReaderProvider
transforms := IterTransforms{
SyntheticPrefixAndSuffix: block.MakeSyntheticPrefixAndSuffix(nil, syntheticSuffix),
}
Expand Down
5 changes: 3 additions & 2 deletions sstable/reader_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/valblk"
)

// VirtualReader wraps Reader. Its purpose is to restrict functionality of the
Expand Down Expand Up @@ -95,7 +96,7 @@ func MakeVirtualReader(reader *Reader, p VirtualReaderParams) VirtualReader {
func (v *VirtualReader) NewCompactionIter(
transforms IterTransforms,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
bufferPool *block.BufferPool,
) (Iterator, error) {
return v.reader.newCompactionIter(
Expand All @@ -119,7 +120,7 @@ func (v *VirtualReader) NewPointIter(
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
statsAccum IterStatsAccumulator,
rp ReaderProvider,
rp valblk.ReaderProvider,
) (Iterator, error) {
return v.reader.newPointIter(
ctx, transforms, lower, upper, filterer, filterBlockSizeLimit,
Expand Down
Loading

0 comments on commit 7ce2628

Please sign in to comment.