Skip to content

Commit

Permalink
sstable: integrate columnar blocks into Reader
Browse files Browse the repository at this point in the history
Add support for the new TableFormatPebblev5 (which introduces columnar blocks)
into the sstable.Reader. This commit stops short of bumping TableFormatMax to
TableFormatPebblev5 because columnar blocks don't yet support the iterator
transforms.
  • Loading branch information
jbowens committed Sep 23, 2024
1 parent 65c3ae1 commit 3831e2b
Show file tree
Hide file tree
Showing 11 changed files with 480 additions and 212 deletions.
3 changes: 2 additions & 1 deletion sstable/colblk_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type RawColumnWriter struct {
layout layoutWriter

separatorBuf []byte
tmp []byte
tmp [blockHandleLikelyMaxLen]byte
disableKeyOrderChecks bool
}

Expand Down Expand Up @@ -761,6 +761,7 @@ func (w *RawColumnWriter) Close() (err error) {
panic(errors.AssertionFailedf("pebble: %d of queued data blocks but layout offset is %d",
w.queuedDataSize, w.layout.offset))
}
w.props.DataSize = w.layout.offset
if _, err = w.flushBufferedIndexBlocks(); err != nil {
return err
}
Expand Down
23 changes: 23 additions & 0 deletions sstable/colblk_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package sstable

import (
"context"
"fmt"
"testing"

Expand All @@ -21,12 +22,20 @@ import (
func TestColumnarWriter(t *testing.T) {
var meta *WriterMetadata
var obj *objstorage.MemObj
var r *Reader
defer func() {
if r != nil {
require.NoError(t, r.Close())
r = nil
}
}()
keySchema := colblk.DefaultKeySchema(testkeys.Comparer, 16)
datadriven.Walk(t, "testdata/columnar_writer", func(t *testing.T, path string) {
datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "build":
var writerOpts WriterOptions
writerOpts.Comparer = testkeys.Comparer
writerOpts.Compression = block.NoCompression
writerOpts.TableFormat = TableFormatPebblev5
writerOpts.KeySchema = keySchema
Expand All @@ -39,6 +48,20 @@ func TestColumnarWriter(t *testing.T) {
return fmt.Sprintf("error: %s", err)
}
return formatWriterMetadata(td, meta)
case "open":
if r != nil {
require.NoError(t, r.Close())
r = nil
}
var err error
r, err = NewReader(context.Background(), obj, ReaderOptions{
Comparer: testkeys.Comparer,
KeySchema: keySchema,
})
require.NoError(t, err)
return "ok"
case "props":
return r.Properties.String()
case "describe-binary":
f := binfmt.New(obj.Data())
if err := describeSSTableBinary(f, keySchema); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions sstable/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ type ReaderOptions struct {
Comparers Comparers
Mergers Mergers

// KeySchema describes the schema to use when interpreting columnar data
// blocks. Only used for sstables encoded in format TableFormatPebblev5 or
// higher.
KeySchema colblk.KeySchema

// Filters is a map from filter policy name to filter policy. Filters with
// policies that are not in this map will be ignored.
Filters map[string]FilterPolicy
Expand Down Expand Up @@ -315,5 +320,8 @@ func (o WriterOptions) ensureDefaults() WriterOptions {
if o.DeletionSizeRatioThreshold == 0 {
o.DeletionSizeRatioThreshold = DefaultDeletionSizeRatioThreshold
}
if len(o.KeySchema.ColumnTypes) == 0 && o.TableFormat.BlockColumnar() {
o.KeySchema = colblk.DefaultKeySchema(o.Comparer, 16 /* bundle size */)
}
return o
}
3 changes: 3 additions & 0 deletions sstable/random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/pebble/internal/bytealloc"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable/colblk"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/vfs/errorfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -285,13 +286,15 @@ func (cfg *randomTableConfig) readerOpts() ReaderOptions {
func (cfg *randomTableConfig) randomize() {
if cfg.wopts == nil {
cfg.wopts = &WriterOptions{
Comparer: testkeys.Comparer,
// Test all table formats in [TableFormatLevelDB, TableFormatMax].
TableFormat: TableFormat(cfg.rng.Intn(int(TableFormatMax)) + 1),
BlockRestartInterval: (1 << cfg.rng.Intn(6)), // {1, 2, 4, ..., 32}
BlockSizeThreshold: min(int(100*cfg.rng.Float64()), 1), // 1-100%
BlockSize: (1 << cfg.rng.Intn(18)), // {1, 2, 4, ..., 128 KiB}
IndexBlockSize: (1 << cfg.rng.Intn(20)), // {1, 2, 4, ..., 512 KiB}
BlockPropertyCollectors: nil,
KeySchema: colblk.DefaultKeySchema(testkeys.Comparer, 16 /* bundle size */),
WritingToLowestLevel: cfg.rng.Intn(2) == 1,
Parallelism: cfg.rng.Intn(2) == 1,
}
Expand Down
62 changes: 46 additions & 16 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/colblk"
"github.com/cockroachdb/pebble/sstable/rowblk"
"github.com/cockroachdb/pebble/vfs"
)
Expand All @@ -51,6 +52,7 @@ type Reader struct {

// The following fields are copied from the ReadOptions.
cacheOpts sstableinternal.CacheOptions
keySchema colblk.KeySchema
loadBlockSema *fifo.Semaphore
deniedUserProperties map[string]struct{}
filterMetricsTracker *FilterMetricsTracker
Expand Down Expand Up @@ -163,13 +165,25 @@ func (r *Reader) newPointIter(
var res Iterator
var err error
if r.Properties.IndexType == twoLevelIndex {
res, err = newRowBlockTwoLevelIterator(
ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
if r.tableFormat.BlockColumnar() {
res, err = newColumnBlockTwoLevelIterator(
ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
} else {
res, err = newRowBlockTwoLevelIterator(
ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
}
} else {
res, err = newRowBlockSingleLevelIterator(
ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
if r.tableFormat.BlockColumnar() {
res, err = newColumnBlockSingleLevelIterator(
ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
} else {
res, err = newRowBlockSingleLevelIterator(
ctx, r, vState, transforms, lower, upper, filterer, filterBlockSizeLimit,
stats, categoryAndQoS, statsCollector, rp, nil /* bufferPool */)
}
}
if err != nil {
// Note: we don't want to return res here - it will be a nil
Expand Down Expand Up @@ -245,7 +259,7 @@ func (r *Reader) newCompactionIter(
// any range deletions.
func (r *Reader) NewRawRangeDelIter(
ctx context.Context, transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error) {
) (iter keyspan.FragmentIterator, err error) {
if r.rangeDelBH.Length == 0 {
return nil, nil
}
Expand All @@ -254,31 +268,39 @@ func (r *Reader) NewRawRangeDelIter(
return nil, err
}
transforms.ElideSameSeqNum = true
i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
if err != nil {
return nil, err
if r.tableFormat.BlockColumnar() {
iter = colblk.NewKeyspanIter(r.Compare, h, transforms)
} else {
iter, err = rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
if err != nil {
return nil, err
}
}
return keyspan.MaybeAssert(i, r.Compare), nil
return keyspan.MaybeAssert(iter, r.Compare), nil
}

// NewRawRangeKeyIter returns an internal iterator for the contents of the
// range-key block for the table. Returns nil if the table does not contain any
// range keys.
func (r *Reader) NewRawRangeKeyIter(
ctx context.Context, transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error) {
) (iter keyspan.FragmentIterator, err error) {
if r.rangeKeyBH.Length == 0 {
return nil, nil
}
h, err := r.readRangeKey(ctx, nil /* stats */, nil /* iterStats */)
if err != nil {
return nil, err
}
i, err := rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
if err != nil {
return nil, err
if r.tableFormat.BlockColumnar() {
iter = colblk.NewKeyspanIter(r.Compare, h, transforms)
} else {
iter, err = rowblk.NewFragmentIter(r.cacheOpts.FileNum, r.Compare, r.Comparer.CompareSuffixes, r.Split, h, transforms)
if err != nil {
return nil, err
}
}
return keyspan.MaybeAssert(i, r.Compare), nil
return keyspan.MaybeAssert(iter, r.Compare), nil
}

func (r *Reader) readIndex(
Expand Down Expand Up @@ -836,6 +858,7 @@ func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Re
r := &Reader{
readable: f,
cacheOpts: o.internal.CacheOpts,
keySchema: o.KeySchema,
loadBlockSema: o.LoadBlockSema,
deniedUserProperties: o.DeniedUserProperties,
filterMetricsTracker: o.FilterMetricsTracker,
Expand Down Expand Up @@ -871,6 +894,13 @@ func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Re
r.metaIndexBH = footer.metaindexBH
r.footerBH = footer.footerBH

// If the table format indicates that blocks are encoded within the columnar
// format, we require a key schema to interpret it correctly.
if r.tableFormat.BlockColumnar() && len(r.keySchema.ColumnTypes) == 0 {
r.err = errors.Newf("pebble/table: key schema required for reading tables of format %s", r.tableFormat)
return nil, r.Close()
}

if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
r.Comparer = o.Comparer
r.Compare = o.Comparer.Compare
Expand Down
35 changes: 31 additions & 4 deletions sstable/reader_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/colblk"
"github.com/cockroachdb/pebble/sstable/rowblk"
)

Expand Down Expand Up @@ -144,12 +145,18 @@ type Iterator interface {
//
// TODO(sumeer): remove the aforementioned defensive code.

type singleLevelIteratorRowBlocks = singleLevelIterator[rowblk.IndexIter, *rowblk.IndexIter, rowblk.Iter, *rowblk.Iter]
type twoLevelIteratorRowBlocks = twoLevelIterator[rowblk.IndexIter, *rowblk.IndexIter, rowblk.Iter, *rowblk.Iter]
type (
singleLevelIteratorRowBlocks = singleLevelIterator[rowblk.IndexIter, *rowblk.IndexIter, rowblk.Iter, *rowblk.Iter]
twoLevelIteratorRowBlocks = twoLevelIterator[rowblk.IndexIter, *rowblk.IndexIter, rowblk.Iter, *rowblk.Iter]
singleLevelIteratorColumnBlocks = singleLevelIterator[colblk.IndexIter, *colblk.IndexIter, colblk.MultiDataBlockIter, *colblk.MultiDataBlockIter]
twoLevelIteratorColumnBlocks = twoLevelIterator[colblk.IndexIter, *colblk.IndexIter, colblk.MultiDataBlockIter, *colblk.MultiDataBlockIter]
)

var (
singleLevelIterRowBlockPool sync.Pool // *singleLevelIteratorRowBlocks
twoLevelIterRowBlockPool sync.Pool // *twoLevelIteratorRowBlocks
singleLevelIterRowBlockPool sync.Pool // *singleLevelIteratorRowBlocks
twoLevelIterRowBlockPool sync.Pool // *twoLevelIteratorRowBlocks
singleLevelIterColumnBlockPool sync.Pool // *singleLevelIteratorColumnBlocks
twoLevelIterColumnBlockPool sync.Pool // *singleLevelIteratorColumnBlocks
)

func init() {
Expand All @@ -169,6 +176,26 @@ func init() {
return i
},
}
singleLevelIterColumnBlockPool = sync.Pool{
New: func() interface{} {
i := &singleLevelIteratorColumnBlocks{
pool: &singleLevelIterColumnBlockPool,
}
// Note: this is a no-op if invariants are disabled or race is enabled.
invariants.SetFinalizer(i, checkSingleLevelIterator[colblk.IndexIter, *colblk.IndexIter, colblk.MultiDataBlockIter, *colblk.MultiDataBlockIter])
return i
},
}
twoLevelIterColumnBlockPool = sync.Pool{
New: func() interface{} {
i := &twoLevelIteratorColumnBlocks{
pool: &twoLevelIterColumnBlockPool,
}
// Note: this is a no-op if invariants are disabled or race is enabled.
invariants.SetFinalizer(i, checkTwoLevelIterator[colblk.IndexIter, *colblk.IndexIter, colblk.MultiDataBlockIter, *colblk.MultiDataBlockIter])
return i
},
}
}

func checkSingleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIterator[D]](
Expand Down
64 changes: 64 additions & 0 deletions sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,70 @@ type singleLevelIterator[I any, PI indexBlockIterator[I], D any, PD dataBlockIte
// singleLevelIterator implements the base.InternalIterator interface.
var _ base.InternalIterator = (*singleLevelIteratorRowBlocks)(nil)

// newColumnBlockSingleLevelIterator reads the index block and creates and
// initializes a singleLevelIterator over an sstable with column-oriented data
// blocks.
//
// Note that lower, upper are iterator bounds and are separate from virtual
// sstable bounds. If the virtualState passed in is not nil, then virtual
// sstable bounds will be enforced.
func newColumnBlockSingleLevelIterator(
ctx context.Context,
r *Reader,
v *virtualState,
transforms IterTransforms,
lower, upper []byte,
filterer *BlockPropertiesFilterer,
filterBlockSizeLimit FilterBlockSizeLimit,
stats *base.InternalIteratorStats,
categoryAndQoS CategoryAndQoS,
statsCollector *CategoryStatsCollector,
rp ReaderProvider,
bufferPool *block.BufferPool,
) (*singleLevelIteratorColumnBlocks, error) {
if r.err != nil {
return nil, r.err
}
if !r.tableFormat.BlockColumnar() {
panic(errors.AssertionFailedf("table format %d should not use columnar block format", r.tableFormat))
}
i := singleLevelIterColumnBlockPool.Get().(*singleLevelIteratorColumnBlocks)
useFilterBlock := shouldUseFilterBlock(r, filterBlockSizeLimit)
i.init(
ctx, r, v, transforms, lower, upper, filterer, useFilterBlock,
stats, categoryAndQoS, statsCollector, bufferPool,
)
i.data.KeySchema = r.keySchema
if r.Properties.NumValueBlocks > 0 {
// NB: we cannot avoid this ~248 byte allocation, since valueBlockReader
// can outlive the singleLevelIterator due to be being embedded in a
// LazyValue. This consumes ~2% in microbenchmark CPU profiles, but we
// should only optimize this if it shows up as significant in end-to-end
// CockroachDB benchmarks, since it is tricky to do so. One possibility
// is that if many sstable iterators only get positioned at latest
// versions of keys, and therefore never expose a LazyValue that is
// separated to their callers, they can put this valueBlockReader into a
// sync.Pool.
i.vbReader = &valueBlockReader{
bpOpen: i,
rp: rp,
vbih: r.valueBIH,
stats: stats,
}
i.data.GetLazyValuer = i.vbReader
i.vbRH = objstorageprovider.UsePreallocatedReadHandle(r.readable, objstorage.NoReadBefore, &i.vbRHPrealloc)
}
indexH, err := r.readIndex(ctx, i.indexFilterRH, stats, &i.iterStats)
if err == nil {
err = i.index.InitHandle(i.cmp, r.Split, indexH, transforms)
}
if err != nil {
_ = i.Close()
return nil, err
}
return i, nil
}

// newRowBlockSingleLevelIterator reads the index block and creates and
// initializes a singleLevelIterator over an sstable with row-oriented data
// blocks.
Expand Down
Loading

0 comments on commit 3831e2b

Please sign in to comment.