From 7ce2628fbfe92a4e8aa5bfdc8753acbc78a02b5c Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Fri, 15 Nov 2024 13:04:54 -0500 Subject: [PATCH] valblk: move reader, fetcher Move the facilities around reading value blocks into the valblk package. --- sstable/internal.go | 9 ++ sstable/layout.go | 32 +---- sstable/reader.go | 43 ++++++- sstable/reader_common.go | 5 +- sstable/reader_iter_single_lvl.go | 29 ++--- sstable/reader_iter_two_lvl.go | 19 +-- sstable/reader_test.go | 3 +- sstable/reader_virtual.go | 5 +- sstable/{value_block.go => valblk/reader.go} | 126 ++++++++----------- sstable/valblk/valblk.go | 32 +++++ sstable/writer_test.go | 2 +- table_cache.go | 7 +- 12 files changed, 164 insertions(+), 148 deletions(-) rename sstable/{value_block.go => valblk/reader.go} (76%) diff --git a/sstable/internal.go b/sstable/internal.go index 55787e3e4b..df8a36fc03 100644 --- a/sstable/internal.go +++ b/sstable/internal.go @@ -7,6 +7,7 @@ package sstable import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" + "github.com/cockroachdb/pebble/sstable/valblk" ) // These constants are part of the file format, and should not be changed. @@ -28,3 +29,11 @@ type InternalKey = base.InternalKey // Span exports the keyspan.Span type. type Span = keyspan.Span + +const valueBlocksIndexHandleMaxLen = blockHandleMaxLenWithoutProperties + 3 + +// Assert blockHandleLikelyMaxLen >= valueBlocksIndexHandleMaxLen. +const _ = uint(blockHandleLikelyMaxLen - valueBlocksIndexHandleMaxLen) + +// Assert blockHandleLikelyMaxLen >= valblk.HandleMaxLen. +const _ = uint(blockHandleLikelyMaxLen - valblk.HandleMaxLen) diff --git a/sstable/layout.go b/sstable/layout.go index e330d73956..f9769f6465 100644 --- a/sstable/layout.go +++ b/sstable/layout.go @@ -527,7 +527,7 @@ func decodeLayout(comparer *base.Comparer, data []byte) (Layout, error) { if err != nil { return Layout{}, errors.Wrap(err, "decompressing value index") } - layout.ValueBlock, err = decodeValueBlockIndex(vbiBlock, vbih) + layout.ValueBlock, err = valblk.DecodeIndex(vbiBlock, vbih) if err != nil { return Layout{}, err } @@ -617,36 +617,6 @@ func decodeMetaindex( return meta, vbih, nil } -func decodeValueBlockIndex(data []byte, vbih valblk.IndexHandle) ([]block.Handle, error) { - var valueBlocks []block.Handle - indexEntryLen := int(vbih.BlockNumByteLength + vbih.BlockOffsetByteLength + - vbih.BlockLengthByteLength) - i := 0 - for len(data) != 0 { - if len(data) < indexEntryLen { - return nil, errors.Errorf( - "remaining value index block %d does not contain a full entry of length %d", - len(data), indexEntryLen) - } - n := int(vbih.BlockNumByteLength) - bn := int(littleEndianGet(data, n)) - if bn != i { - return nil, errors.Errorf("unexpected block num %d, expected %d", - bn, i) - } - i++ - data = data[n:] - n = int(vbih.BlockOffsetByteLength) - blockOffset := littleEndianGet(data, n) - data = data[n:] - n = int(vbih.BlockLengthByteLength) - blockLen := littleEndianGet(data, n) - data = data[n:] - valueBlocks = append(valueBlocks, block.Handle{Offset: blockOffset, Length: blockLen}) - } - return valueBlocks, nil -} - // layoutWriter writes the structure of an sstable to durable storage. It // accepts serialized blocks, writes them to storage and returns a block handle // describing the offset and length of the block. diff --git a/sstable/reader.go b/sstable/reader.go index 8955b054eb..e1fbaab584 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -123,7 +123,7 @@ func (r *Reader) NewPointIter( filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, ) (Iterator, error) { return r.newPointIter( ctx, transforms, lower, upper, filterer, filterBlockSizeLimit, @@ -154,7 +154,7 @@ func (r *Reader) newPointIter( filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, vState *virtualState, ) (Iterator, error) { // NB: pebble.tableCache wraps the returned iterator with one which performs @@ -211,7 +211,7 @@ func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterat func (r *Reader) NewCompactionIter( transforms IterTransforms, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, bufferPool *block.BufferPool, ) (Iterator, error) { return r.newCompactionIter(transforms, statsAccum, rp, nil, bufferPool) @@ -220,7 +220,7 @@ func (r *Reader) NewCompactionIter( func (r *Reader) newCompactionIter( transforms IterTransforms, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, vState *virtualState, bufferPool *block.BufferPool, ) (Iterator, error) { @@ -448,6 +448,14 @@ func (r *Reader) initKeyspanBlockMetadata(metadata *block.Metadata, data []byte) return nil } +// ReadValueBlockExternal implements valblk.ExternalBlockReader, allowing a +// base.LazyValue to read a value block. +func (r *Reader) ReadValueBlockExternal( + ctx context.Context, bh block.Handle, +) (block.BufferHandle, error) { + return r.readValueBlock(ctx, noEnv, noReadHandle, bh) +} + func (r *Reader) readValueBlock( ctx context.Context, env readBlockEnv, readHandle objstorage.ReadHandle, bh block.Handle, ) (block.BufferHandle, error) { @@ -749,7 +757,7 @@ func (r *Reader) Layout() (*Layout, error) { return nil, err } defer vbiH.Release() - l.ValueBlock, err = decodeValueBlockIndex(vbiH.BlockData(), r.valueBIH) + l.ValueBlock, err = valblk.DecodeIndex(vbiH.BlockData(), r.valueBIH) if err != nil { return nil, err } @@ -1160,3 +1168,28 @@ func (w deterministicStopwatchForTesting) stop() time.Duration { } return dur } + +// MakeTrivialReaderProvider creates a valblk.ReaderProvider which always +// returns the given reader. It should be used when the Reader will outlive the +// iterator tree. +func MakeTrivialReaderProvider(r *Reader) valblk.ReaderProvider { + return (*trivialReaderProvider)(r) +} + +// trivialReaderProvider implements valblk.ReaderProvider for a Reader that will +// outlive the top-level iterator in the iterator tree. +// +// Defining the type in this manner (as opposed to a struct) avoids allocation. +type trivialReaderProvider Reader + +var _ valblk.ReaderProvider = (*trivialReaderProvider)(nil) + +// GetReader implements ReaderProvider. +func (trp *trivialReaderProvider) GetReader( + ctx context.Context, +) (valblk.ExternalBlockReader, error) { + return (*Reader)(trp), nil +} + +// Close implements ReaderProvider. +func (trp *trivialReaderProvider) Close() {} diff --git a/sstable/reader_common.go b/sstable/reader_common.go index f8db26278c..790d2d5d32 100644 --- a/sstable/reader_common.go +++ b/sstable/reader_common.go @@ -11,6 +11,7 @@ import ( "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/sstable/block" + "github.com/cockroachdb/pebble/sstable/valblk" ) // CommonReader abstracts functionality over a Reader or a VirtualReader. This @@ -33,13 +34,13 @@ type CommonReader interface { filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, ) (Iterator, error) NewCompactionIter( transforms IterTransforms, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, bufferPool *block.BufferPool, ) (Iterator, error) diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index ec1cd76ee5..5c7986f508 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/sstable/block" + "github.com/cockroachdb/pebble/sstable/valblk" ) // singleLevelIterator iterates over an entire table of data. To seek for a given @@ -64,7 +65,7 @@ type singleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIte // loading. It may not actually have loaded the block, due to an error or // because it was considered irrelevant. dataBH block.Handle - vbReader valueBlockReader + vbReader valblk.Reader // vbRH is the read handle for value blocks, which are in a different // part of the sstable than data blocks. vbRH objstorage.ReadHandle @@ -211,7 +212,7 @@ func newColumnBlockSingleLevelIterator( filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, bufferPool *block.BufferPool, ) (*singleLevelIteratorColumnBlocks, error) { if r.err != nil { @@ -228,12 +229,7 @@ func newColumnBlockSingleLevelIterator( ) var getLazyValuer block.GetLazyValueForPrefixAndValueHandler if r.Properties.NumValueBlocks > 0 { - i.vbReader = valueBlockReader{ - bpOpen: i, - rp: rp, - vbih: r.valueBIH, - stats: stats, - } + i.vbReader = valblk.MakeReader(i, rp, r.valueBIH, stats) getLazyValuer = &i.vbReader i.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.vbRHPrealloc) } @@ -266,7 +262,7 @@ func newRowBlockSingleLevelIterator( filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, bufferPool *block.BufferPool, ) (*singleLevelIteratorRowBlocks, error) { if r.err != nil { @@ -283,12 +279,7 @@ func newRowBlockSingleLevelIterator( ) if r.tableFormat >= TableFormatPebblev3 { if r.Properties.NumValueBlocks > 0 { - i.vbReader = valueBlockReader{ - bpOpen: i, - rp: rp, - vbih: r.valueBIH, - stats: stats, - } + i.vbReader = valblk.MakeReader(i, rp, r.valueBIH, stats) (&i.data).SetGetLazyValuer(&i.vbReader) i.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.vbRHPrealloc) } @@ -561,9 +552,9 @@ func (i *singleLevelIterator[I, PI, P, PD]) loadDataBlock(dir int8) loadBlockRes return loadBlockOK } -// readBlockForVBR implements the blockProviderWhenOpen interface for use by -// the valueBlockReader. -func (i *singleLevelIterator[I, PI, D, PD]) readBlockForVBR( +// ReadValueBlock implements the valblk.BlockProviderWhenOpen interface for use +// by the valblk.Reader. +func (i *singleLevelIterator[I, PI, D, PD]) ReadValueBlock( bh block.Handle, stats *base.InternalIteratorStats, ) (block.BufferHandle, error) { env := i.readBlockEnv @@ -1582,7 +1573,7 @@ func (i *singleLevelIterator[I, PI, D, PD]) closeInternal() error { if i.bpfs != nil { releaseBlockPropertiesFilterer(i.bpfs) } - i.vbReader.close() + i.vbReader.Close() if i.vbRH != nil { err = firstError(err, i.vbRH.Close()) i.vbRH = nil diff --git a/sstable/reader_iter_two_lvl.go b/sstable/reader_iter_two_lvl.go index 0d75b20224..46d6669cd7 100644 --- a/sstable/reader_iter_two_lvl.go +++ b/sstable/reader_iter_two_lvl.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/sstable/block" + "github.com/cockroachdb/pebble/sstable/valblk" ) type twoLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIterator[D]] struct { @@ -162,7 +163,7 @@ func newColumnBlockTwoLevelIterator( filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, bufferPool *block.BufferPool, ) (*twoLevelIteratorColumnBlocks, error) { if r.err != nil { @@ -186,12 +187,7 @@ func newColumnBlockTwoLevelIterator( // versions of keys, and therefore never expose a LazyValue that is // separated to their callers, they can put this valueBlockReader into a // sync.Pool. - i.secondLevel.vbReader = valueBlockReader{ - bpOpen: &i.secondLevel, - rp: rp, - vbih: r.valueBIH, - stats: stats, - } + i.secondLevel.vbReader = valblk.MakeReader(&i.secondLevel, rp, r.valueBIH, stats) getLazyValuer = &i.secondLevel.vbReader i.secondLevel.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.secondLevel.vbRHPrealloc) } @@ -225,7 +221,7 @@ func newRowBlockTwoLevelIterator( filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, bufferPool *block.BufferPool, ) (*twoLevelIteratorRowBlocks, error) { if r.err != nil { @@ -249,12 +245,7 @@ func newRowBlockTwoLevelIterator( // versions of keys, and therefore never expose a LazyValue that is // separated to their callers, they can put this valueBlockReader into a // sync.Pool. - i.secondLevel.vbReader = valueBlockReader{ - bpOpen: &i.secondLevel, - rp: rp, - vbih: r.valueBIH, - stats: stats, - } + i.secondLevel.vbReader = valblk.MakeReader(&i.secondLevel, rp, r.valueBIH, stats) i.secondLevel.data.SetGetLazyValuer(&i.secondLevel.vbReader) i.secondLevel.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.secondLevel.vbRHPrealloc) } diff --git a/sstable/reader_test.go b/sstable/reader_test.go index c244a83383..1ec4951072 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/sstable/block" + "github.com/cockroachdb/pebble/sstable/valblk" "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/pebble/vfs/errorfs" "github.com/stretchr/testify/require" @@ -230,7 +231,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i return "virtualize must be called before creating compaction iters" } - var rp ReaderProvider + var rp valblk.ReaderProvider transforms := IterTransforms{ SyntheticPrefixAndSuffix: block.MakeSyntheticPrefixAndSuffix(nil, syntheticSuffix), } diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index c37c8963a1..e714f5fcfd 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -11,6 +11,7 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/rangekey" "github.com/cockroachdb/pebble/sstable/block" + "github.com/cockroachdb/pebble/sstable/valblk" ) // VirtualReader wraps Reader. Its purpose is to restrict functionality of the @@ -95,7 +96,7 @@ func MakeVirtualReader(reader *Reader, p VirtualReaderParams) VirtualReader { func (v *VirtualReader) NewCompactionIter( transforms IterTransforms, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, bufferPool *block.BufferPool, ) (Iterator, error) { return v.reader.newCompactionIter( @@ -119,7 +120,7 @@ func (v *VirtualReader) NewPointIter( filterBlockSizeLimit FilterBlockSizeLimit, stats *base.InternalIteratorStats, statsAccum IterStatsAccumulator, - rp ReaderProvider, + rp valblk.ReaderProvider, ) (Iterator, error) { return v.reader.newPointIter( ctx, transforms, lower, upper, filterer, filterBlockSizeLimit, diff --git a/sstable/value_block.go b/sstable/valblk/reader.go similarity index 76% rename from sstable/value_block.go rename to sstable/valblk/reader.go index 9694c14ae6..ef70c50b50 100644 --- a/sstable/value_block.go +++ b/sstable/valblk/reader.go @@ -2,7 +2,7 @@ // of this source code is governed by a BSD-style license that can be found in // the LICENSE file. -package sstable +package valblk import ( "context" @@ -14,26 +14,29 @@ import ( "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing" "github.com/cockroachdb/pebble/sstable/block" - "github.com/cockroachdb/pebble/sstable/valblk" ) -const valueBlocksIndexHandleMaxLen = blockHandleMaxLenWithoutProperties + 3 - -// Assert blockHandleLikelyMaxLen >= valueBlocksIndexHandleMaxLen. -const _ = uint(blockHandleLikelyMaxLen - valueBlocksIndexHandleMaxLen) +// ReaderProvider supports the implementation of blockProviderWhenClosed. +// GetReader and Close can be called multiple times in pairs. +type ReaderProvider interface { + GetReader(context.Context) (ExternalBlockReader, error) + Close() +} -// Assert blockHandleLikelyMaxLen >= valblk.HandleMaxLen. -const _ = uint(blockHandleLikelyMaxLen - valblk.HandleMaxLen) +// ExternalBlockReader is used to read value blocks from a file outside the +// context of a open sstable iterator reading the file. +type ExternalBlockReader interface { + ReadValueBlockExternal(context.Context, block.Handle) (block.BufferHandle, error) +} -type blockProviderWhenOpen interface { - readBlockForVBR( - h block.Handle, stats *base.InternalIteratorStats, - ) (block.BufferHandle, error) +// IteratorBlockReader is used to read value blocks from within an open file. +type IteratorBlockReader interface { + ReadValueBlock(block.Handle, *base.InternalIteratorStats) (block.BufferHandle, error) } type blockProviderWhenClosed struct { rp ReaderProvider - r *Reader + r ExternalBlockReader } func (bpwc *blockProviderWhenClosed) open(ctx context.Context) error { @@ -47,7 +50,7 @@ func (bpwc *blockProviderWhenClosed) close() { bpwc.r = nil } -func (bpwc blockProviderWhenClosed) readBlockForVBR( +func (bpwc blockProviderWhenClosed) ReadValueBlock( h block.Handle, stats *base.InternalIteratorStats, ) (block.BufferHandle, error) { // This is rare, since most block reads happen when the corresponding @@ -61,48 +64,18 @@ func (bpwc blockProviderWhenClosed) readBlockForVBR( // TODO(jackson,sumeer): Consider whether to use a buffer pool in this case. // The bpwc is not allowed to outlive the iterator tree, so it cannot // outlive the buffer pool. - return bpwc.r.readValueBlock(ctx, noEnv, noReadHandle, h) -} - -// ReaderProvider supports the implementation of blockProviderWhenClosed. -// GetReader and Close can be called multiple times in pairs. -type ReaderProvider interface { - GetReader(ctx context.Context) (r *Reader, err error) - Close() -} - -// MakeTrivialReaderProvider creates a ReaderProvider which always returns the -// given reader. It should be used when the Reader will outlive the iterator -// tree. -func MakeTrivialReaderProvider(r *Reader) ReaderProvider { - return (*trivialReaderProvider)(r) + return bpwc.r.ReadValueBlockExternal(ctx, h) } -// trivialReaderProvider implements ReaderProvider for a Reader that will -// outlive the top-level iterator in the iterator tree. -// -// Defining the type in this manner (as opposed to a struct) avoids allocation. -type trivialReaderProvider Reader - -var _ ReaderProvider = (*trivialReaderProvider)(nil) - -// GetReader implements ReaderProvider. -func (trp *trivialReaderProvider) GetReader(ctx context.Context) (*Reader, error) { - return (*Reader)(trp), nil -} - -// Close implements ReaderProvider. -func (trp *trivialReaderProvider) Close() {} - -// valueBlockReader implements GetLazyValueForPrefixAndValueHandler; it is used -// to create LazyValues (each of which can can be used to retrieve a value in a -// value block). It is used when the sstable was written with +// Reader implements GetLazyValueForPrefixAndValueHandler; it is used to create +// LazyValues (each of which can can be used to retrieve a value in a value +// block). It is used when the sstable was written with // Properties.ValueBlocksAreEnabled. The lifetime of this object is tied to the // lifetime of the sstable iterator. -type valueBlockReader struct { - bpOpen blockProviderWhenOpen +type Reader struct { + bpOpen IteratorBlockReader rp ReaderProvider - vbih valblk.IndexHandle + vbih IndexHandle stats *base.InternalIteratorStats // fetcher is allocated lazily the first time we create a LazyValue, in order @@ -111,9 +84,28 @@ type valueBlockReader struct { fetcher *valueBlockFetcher } -var _ block.GetLazyValueForPrefixAndValueHandler = (*valueBlockReader)(nil) +// MakeReader constructs a Reader. +func MakeReader( + i IteratorBlockReader, rp ReaderProvider, vbih IndexHandle, stats *base.InternalIteratorStats, +) Reader { + return Reader{ + bpOpen: i, + rp: rp, + vbih: vbih, + stats: stats, + } +} -func (r *valueBlockReader) GetLazyValueForPrefixAndValueHandle(handle []byte) base.LazyValue { +var _ block.GetLazyValueForPrefixAndValueHandler = (*Reader)(nil) + +// GetLazyValueForPrefixAndValueHandle returns a LazyValue for the given value +// prefix and value. +// +// The result is only valid until the next call to +// GetLazyValueForPrefixAndValueHandle. Use LazyValue.Clone if the lifetime of +// the LazyValue needs to be extended. For more details, see the "memory +// management" comment where LazyValue is declared. +func (r *Reader) GetLazyValueForPrefixAndValueHandle(handle []byte) base.LazyValue { if r.fetcher == nil { // NB: we cannot avoid this allocation, since the valueBlockFetcher // can outlive the singleLevelIterator due to be being embedded in a @@ -125,7 +117,7 @@ func (r *valueBlockReader) GetLazyValueForPrefixAndValueHandle(handle []byte) ba r.fetcher = newValueBlockFetcher(r.bpOpen, r.rp, r.vbih, r.stats) } lazyFetcher := &r.fetcher.lazyFetcher - valLen, h := valblk.DecodeLenFromHandle(handle[1:]) + valLen, h := DecodeLenFromHandle(handle[1:]) *lazyFetcher = base.LazyFetcher{ Fetcher: r.fetcher, Attribute: base.AttributeAndLen{ @@ -143,7 +135,8 @@ func (r *valueBlockReader) GetLazyValueForPrefixAndValueHandle(handle []byte) ba } } -func (r *valueBlockReader) close() { +// Close closes the Reader. +func (r *Reader) Close() { r.bpOpen = nil if r.fetcher != nil { r.fetcher.close() @@ -155,9 +148,9 @@ func (r *valueBlockReader) close() { // to fetch a value from a value block. The lifetime of this object is not tied // to the lifetime of the iterator - a LazyValue can be accessed later. type valueBlockFetcher struct { - bpOpen blockProviderWhenOpen + bpOpen IteratorBlockReader rp ReaderProvider - vbih valblk.IndexHandle + vbih IndexHandle stats *base.InternalIteratorStats // The value blocks index is lazily retrieved the first time the reader // needs to read a value that resides in a value block. @@ -183,9 +176,9 @@ type valueBlockFetcher struct { var _ base.ValueFetcher = (*valueBlockFetcher)(nil) func newValueBlockFetcher( - bpOpen blockProviderWhenOpen, + bpOpen IteratorBlockReader, rp ReaderProvider, - vbih valblk.IndexHandle, + vbih IndexHandle, stats *base.InternalIteratorStats, ) *valueBlockFetcher { return &valueBlockFetcher{ @@ -259,10 +252,10 @@ func (f *valueBlockFetcher) doValueMangling(v []byte) []byte { } func (f *valueBlockFetcher) getValueInternal(handle []byte, valLen int32) (val []byte, err error) { - vh := valblk.DecodeRemainingHandle(handle) + vh := DecodeRemainingHandle(handle) vh.ValueLen = uint32(valLen) if f.vbiBlock == nil { - ch, err := f.bpOpen.readBlockForVBR(f.vbih.Handle, f.stats) + ch, err := f.bpOpen.ReadValueBlock(f.vbih.Handle, f.stats) if err != nil { return nil, err } @@ -274,7 +267,7 @@ func (f *valueBlockFetcher) getValueInternal(handle []byte, valLen int32) (val [ if err != nil { return nil, err } - vbCacheHandle, err := f.bpOpen.readBlockForVBR(vbh, f.stats) + vbCacheHandle, err := f.bpOpen.ReadValueBlock(vbh, f.stats) if err != nil { return nil, err } @@ -314,12 +307,3 @@ func (f *valueBlockFetcher) getBlockHandle(blockNum uint32) (block.Handle, error blockLen := littleEndianGet(b, n) return block.Handle{Offset: blockOffset, Length: blockLen}, nil } - -func littleEndianGet(b []byte, n int) uint64 { - _ = b[n-1] // bounds check - v := uint64(b[0]) - for i := 1; i < n; i++ { - v |= uint64(b[i]) << (8 * i) - } - return v -} diff --git a/sstable/valblk/valblk.go b/sstable/valblk/valblk.go index 45c2329012..2021e58283 100644 --- a/sstable/valblk/valblk.go +++ b/sstable/valblk/valblk.go @@ -322,6 +322,38 @@ func DecodeIndexHandle(src []byte) (IndexHandle, int, error) { return vbih, n + 3, nil } +// DecodeIndex decodes the entirety of a value block index, returning a slice of +// the contained block handles. +func DecodeIndex(data []byte, vbih IndexHandle) ([]block.Handle, error) { + var valueBlocks []block.Handle + indexEntryLen := int(vbih.BlockNumByteLength + vbih.BlockOffsetByteLength + + vbih.BlockLengthByteLength) + i := 0 + for len(data) != 0 { + if len(data) < indexEntryLen { + return nil, errors.Errorf( + "remaining value index block %d does not contain a full entry of length %d", + len(data), indexEntryLen) + } + n := int(vbih.BlockNumByteLength) + bn := int(littleEndianGet(data, n)) + if bn != i { + return nil, errors.Errorf("unexpected block num %d, expected %d", + bn, i) + } + i++ + data = data[n:] + n = int(vbih.BlockOffsetByteLength) + blockOffset := littleEndianGet(data, n) + data = data[n:] + n = int(vbih.BlockLengthByteLength) + blockLen := littleEndianGet(data, n) + data = data[n:] + valueBlocks = append(valueBlocks, block.Handle{Offset: blockOffset, Length: blockLen}) + } + return valueBlocks, nil +} + // littleEndianPut writes v to b using little endian encoding, under the // assumption that v can be represented using n bytes. func littleEndianPut(v uint64, b []byte, n int) { diff --git a/sstable/writer_test.go b/sstable/writer_test.go index c5c275171f..f85a9e4451 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -419,7 +419,7 @@ func TestWriterWithValueBlocks(t *testing.T) { return err.Error() } forceRowIterIgnoreValueBlocks := func(i *singleLevelIteratorRowBlocks) { - i.vbReader = valueBlockReader{} + i.vbReader = valblk.Reader{} i.data.SetGetLazyValuer(nil) i.data.SetHasValuePrefix(false) } diff --git a/table_cache.go b/table_cache.go index 100a5c68ac..6d6bd1993f 100644 --- a/table_cache.go +++ b/table_cache.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing" "github.com/cockroachdb/pebble/sstable" + "github.com/cockroachdb/pebble/sstable/valblk" ) var emptyIter = &errorIter{err: nil} @@ -666,7 +667,7 @@ type tableCacheShardReaderProvider struct { } } -var _ sstable.ReaderProvider = &tableCacheShardReaderProvider{} +var _ valblk.ReaderProvider = &tableCacheShardReaderProvider{} func (rp *tableCacheShardReaderProvider) init( c *tableCacheShard, dbOpts *tableCacheOpts, backingFileNum base.DiskFileNum, @@ -692,7 +693,9 @@ func (rp *tableCacheShardReaderProvider) init( // // TODO(bananabrick): We could return a wrapper over the Reader to ensure // that the reader isn't used for other purposes. -func (rp *tableCacheShardReaderProvider) GetReader(ctx context.Context) (*sstable.Reader, error) { +func (rp *tableCacheShardReaderProvider) GetReader( + ctx context.Context, +) (valblk.ExternalBlockReader, error) { rp.mu.Lock() defer rp.mu.Unlock()