Skip to content

Commit

Permalink
sstable: plumb context to range iterator constructors
Browse files Browse the repository at this point in the history
This context covers the read of the range del or range key block.

Informs #3728
  • Loading branch information
RaduBerinde committed Jul 20, 2024
1 parent de0e731 commit 4f01ec4
Show file tree
Hide file tree
Showing 19 changed files with 63 additions and 54 deletions.
4 changes: 2 additions & 2 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (topLevelIterato
if err != nil {
return nil, err
}
rangeDelIter, err = r.NewRawRangeDelIter(sstable.FragmentIterTransforms{
rangeDelIter, err = r.NewRawRangeDelIter(ctx, sstable.FragmentIterTransforms{
SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum),
})
if err != nil {
Expand Down Expand Up @@ -217,7 +217,7 @@ func finishInitializingExternal(ctx context.Context, it *Iterator) error {
for _, r := range readers {
transforms := sstable.FragmentIterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
seqNum--
if rki, err := r.NewRawRangeKeyIter(transforms); err != nil {
if rki, err := r.NewRawRangeKeyIter(ctx, transforms); err != nil {
return err
} else if rki != nil {
rangeKeyIters = append(rangeKeyIters, rki)
Expand Down
4 changes: 2 additions & 2 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func ingestLoad1(
}
}

iter, err := r.NewRawRangeDelIter(sstable.NoFragmentTransforms)
iter, err := r.NewRawRangeDelIter(ctx, sstable.NoFragmentTransforms)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -347,7 +347,7 @@ func ingestLoad1(

// Update the range-key bounds for the table.
{
iter, err := r.NewRawRangeKeyIter(sstable.NoFragmentTransforms)
iter, err := r.NewRawRangeKeyIter(ctx, sstable.NoFragmentTransforms)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion level_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestCheckLevelsCornerCases(t *testing.T) {
newIters :=
func(_ context.Context, file *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts, _ iterKinds) (iterSet, error) {
r := readers[file.FileNum]
rangeDelIter, err := r.NewRawRangeDelIter(sstable.NoFragmentTransforms)
rangeDelIter, err := r.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms)
if err != nil {
return iterSet{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (lt *levelIterTest) newIters(
set.point = iter
}
if kinds.RangeDeletion() {
rangeDelIter, err := lt.readers[file.FileNum].NewRawRangeDelIter(file.FragmentIterTransforms())
rangeDelIter, err := lt.readers[file.FileNum].NewRawRangeDelIter(context.Background(), file.FragmentIterTransforms())
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
}
Expand Down
4 changes: 2 additions & 2 deletions merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestMergingIterCornerCases(t *testing.T) {
var err error
r := readers[file.FileNum]
if kinds.RangeDeletion() {
set.rangeDeletion, err = r.NewRawRangeDelIter(sstable.NoFragmentTransforms)
set.rangeDeletion, err = r.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms)
if err != nil {
return iterSet{}, errors.CombineErrors(err, set.CloseAll())
}
Expand Down Expand Up @@ -678,7 +678,7 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS
if err != nil {
return iterSet{}, err
}
rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter(sstable.NoFragmentTransforms)
rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms)
if err != nil {
iter.Close()
return iterSet{}, err
Expand Down
4 changes: 2 additions & 2 deletions metamorphic/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func openExternalObj(
pointIter, err = reader.NewIter(sstable.NoTransforms, start, end)
panicIfErr(err)

rangeDelIter, err = reader.NewRawRangeDelIter(sstable.NoFragmentTransforms)
rangeDelIter, err = reader.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms)
panicIfErr(err)
if rangeDelIter != nil {
rangeDelIter = keyspan.Truncate(
Expand All @@ -268,7 +268,7 @@ func openExternalObj(
)
}

rangeKeyIter, err = reader.NewRawRangeKeyIter(sstable.NoFragmentTransforms)
rangeKeyIter, err = reader.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms)
panicIfErr(err)
if rangeKeyIter != nil {
rangeKeyIter = keyspan.Truncate(
Expand Down
4 changes: 2 additions & 2 deletions replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ func loadFlushedSSTableKeys(
}

// Load all the range tombstones.
if iter, err := r.NewRawRangeDelIter(sstable.NoFragmentTransforms); err != nil {
if iter, err := r.NewRawRangeDelIter(context.Background(), sstable.NoFragmentTransforms); err != nil {
return err
} else if iter != nil {
defer iter.Close()
Expand All @@ -1043,7 +1043,7 @@ func loadFlushedSSTableKeys(
}

// Load all the range keys.
if iter, err := r.NewRawRangeKeyIter(sstable.NoFragmentTransforms); err != nil {
if iter, err := r.NewRawRangeKeyIter(context.Background(), sstable.NoFragmentTransforms); err != nil {
return err
} else if iter != nil {
defer iter.Close()
Expand Down
33 changes: 15 additions & 18 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,16 +301,13 @@ func (r *Reader) newCompactionIter(
// NewRawRangeDelIter returns an internal iterator for the contents of the
// range-del block for the table. Returns nil if the table does not contain
// any range deletions.
//
// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
// iterator. Add WithContext methods since the existing ones are public.
func (r *Reader) NewRawRangeDelIter(
transforms FragmentIterTransforms,
ctx context.Context, transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error) {
if r.rangeDelBH.Length == 0 {
return nil, nil
}
h, err := r.readRangeDel(nil /* stats */, nil /* iterStats */)
h, err := r.readRangeDel(ctx, nil /* stats */, nil /* iterStats */)
if err != nil {
return nil, err
}
Expand All @@ -325,16 +322,13 @@ func (r *Reader) NewRawRangeDelIter(
// NewRawRangeKeyIter returns an internal iterator for the contents of the
// range-key block for the table. Returns nil if the table does not contain any
// range keys.
//
// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
// iterator. Add WithContext methods since the existing ones are public.
func (r *Reader) NewRawRangeKeyIter(
transforms FragmentIterTransforms,
ctx context.Context, transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error) {
if r.rangeKeyBH.Length == 0 {
return nil, nil
}
h, err := r.readRangeKey(nil /* stats */, nil /* iterStats */)
h, err := r.readRangeKey(ctx, nil /* stats */, nil /* iterStats */)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -366,16 +360,16 @@ func (r *Reader) readFilter(
}

func (r *Reader) readRangeDel(
stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
) (block.BufferHandle, error) {
ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
return r.readBlock(ctx, r.rangeDelBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
}

func (r *Reader) readRangeKey(
stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
ctx context.Context, stats *base.InternalIteratorStats, iterStats *iterStatsAccumulator,
) (block.BufferHandle, error) {
ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock)
ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock)
return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, iterStats, nil /* buffer pool */)
}

Expand Down Expand Up @@ -528,7 +522,10 @@ func (r *Reader) readBlock(
}

func (r *Reader) readMetaindex(
metaindexBH block.Handle, readHandle objstorage.ReadHandle, filters map[string]FilterPolicy,
ctx context.Context,
metaindexBH block.Handle,
readHandle objstorage.ReadHandle,
filters map[string]FilterPolicy,
) error {
// We use a BufferPool when reading metaindex blocks in order to avoid
// populating the block cache with these blocks. In heavy-write workloads,
Expand All @@ -544,7 +541,7 @@ func (r *Reader) readMetaindex(
defer r.metaBufferPool.Release()

b, err := r.readBlock(
context.Background(), metaindexBH, nil /* transform */, readHandle, nil, /* stats */
ctx, metaindexBH, nil /* transform */, readHandle, nil, /* stats */
nil /* iterStats */, &r.metaBufferPool)
if err != nil {
return err
Expand Down Expand Up @@ -588,7 +585,7 @@ func (r *Reader) readMetaindex(

if bh, ok := meta[metaPropertiesName]; ok {
b, err = r.readBlock(
context.Background(), bh, nil /* transform */, readHandle, nil, /* stats */
ctx, bh, nil /* transform */, readHandle, nil, /* stats */
nil /* iterStats */, nil /* buffer pool */)
if err != nil {
return err
Expand Down Expand Up @@ -988,7 +985,7 @@ func NewReader(ctx context.Context, f objstorage.Readable, o ReaderOptions) (*Re
r.checksumType = footer.checksum
r.tableFormat = footer.format
// Read the metaindex and properties blocks.
if err := r.readMetaindex(footer.metaindexBH, rh, o.Filters); err != nil {
if err := r.readMetaindex(ctx, footer.metaindexBH, rh, o.Filters); err != nil {
r.err = err
return nil, r.Close()
}
Expand Down
8 changes: 6 additions & 2 deletions sstable/reader_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ import (
// can be used by code which doesn't care to distinguish between a reader and a
// virtual reader.
type CommonReader interface {
NewRawRangeKeyIter(transforms FragmentIterTransforms) (keyspan.FragmentIterator, error)
NewRawRangeKeyIter(
ctx context.Context, transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error)

NewRawRangeDelIter(transforms FragmentIterTransforms) (keyspan.FragmentIterator, error)
NewRawRangeDelIter(
ctx context.Context, transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error)

NewIterWithBlockPropertyFiltersAndContextEtc(
ctx context.Context,
Expand Down
6 changes: 3 additions & 3 deletions sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i
return "virtualize must be called before scan-range-del"
}
transforms := FragmentIterTransforms{} // TODO(radu): SyntheticSuffix: syntheticSuffix
iter, err := v.NewRawRangeDelIter(transforms)
iter, err := v.NewRawRangeDelIter(context.Background(), transforms)
if err != nil {
return err.Error()
}
Expand All @@ -390,10 +390,10 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i

case "scan-range-key":
if v == nil {
return "virtualize mupst be called before scan-range-key"
return "virtualize must be called before scan-range-key"
}
transforms := FragmentIterTransforms{} // TODO(radu): SyntheticSuffix: syntheticSuffix
iter, err := v.NewRawRangeKeyIter(transforms)
iter, err := v.NewRawRangeKeyIter(context.Background(), transforms)
if err != nil {
return err.Error()
}
Expand Down
8 changes: 4 additions & 4 deletions sstable/reader_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func (v *VirtualReader) ValidateBlockChecksumsOnBacking() error {

// NewRawRangeDelIter wraps Reader.NewRawRangeDelIter.
func (v *VirtualReader) NewRawRangeDelIter(
transforms FragmentIterTransforms,
ctx context.Context, transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error) {
iter, err := v.reader.NewRawRangeDelIter(transforms)
iter, err := v.reader.NewRawRangeDelIter(ctx, transforms)
if err != nil {
return nil, err
}
Expand All @@ -155,7 +155,7 @@ func (v *VirtualReader) NewRawRangeDelIter(

// NewRawRangeKeyIter wraps Reader.NewRawRangeKeyIter.
func (v *VirtualReader) NewRawRangeKeyIter(
transforms FragmentIterTransforms,
ctx context.Context, transforms FragmentIterTransforms,
) (keyspan.FragmentIterator, error) {
syntheticSeqNum := transforms.SyntheticSeqNum
if v.vState.isSharedIngested {
Expand All @@ -164,7 +164,7 @@ func (v *VirtualReader) NewRawRangeKeyIter(
// appropriate sequence number substitution below.
transforms.SyntheticSeqNum = 0
}
iter, err := v.reader.NewRawRangeKeyIter(transforms)
iter, err := v.reader.NewRawRangeKeyIter(ctx, transforms)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion sstable/suffix_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func rewriteDataBlocksToWriter(
}

func rewriteRangeKeyBlockToWriter(r *Reader, w *Writer, from, to []byte) error {
iter, err := r.NewRawRangeKeyIter(NoFragmentTransforms)
iter, err := r.NewRawRangeKeyIter(context.TODO(), NoFragmentTransforms)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions sstable/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ func ReadAll(
})
}

if rangeDelIter := testutils.CheckErr(reader.NewRawRangeDelIter(NoFragmentTransforms)); rangeDelIter != nil {
ctx := context.Background()
if rangeDelIter := testutils.CheckErr(reader.NewRawRangeDelIter(ctx, NoFragmentTransforms)); rangeDelIter != nil {
defer rangeDelIter.Close()
for s := testutils.CheckErr(rangeDelIter.First()); s != nil; s = testutils.CheckErr(rangeDelIter.Next()) {
rangeDels = append(rangeDels, s.Clone())
}
}

if rangeKeyIter := testutils.CheckErr(reader.NewRawRangeKeyIter(NoFragmentTransforms)); rangeKeyIter != nil {
if rangeKeyIter := testutils.CheckErr(reader.NewRawRangeKeyIter(ctx, NoFragmentTransforms)); rangeKeyIter != nil {
defer rangeKeyIter.Close()
for s := testutils.CheckErr(rangeKeyIter.First()); s != nil; s = testutils.CheckErr(rangeKeyIter.Next()) {
rangeKeys = append(rangeKeys, s.Clone())
Expand Down
3 changes: 2 additions & 1 deletion sstable/writer_rangekey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sstable

import (
"bytes"
"context"
"crypto/rand"
"fmt"
"strings"
Expand Down Expand Up @@ -105,7 +106,7 @@ func TestWriter_RangeKeys(t *testing.T) {
return err.Error()
}

iter, err := r.NewRawRangeKeyIter(NoFragmentTransforms)
iter, err := r.NewRawRangeKeyIter(context.Background(), NoFragmentTransforms)
if err != nil {
return err.Error()
}
Expand Down
5 changes: 3 additions & 2 deletions sstable/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package sstable

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/rand"
Expand Down Expand Up @@ -170,7 +171,7 @@ func runDataDriven(t *testing.T, file string, tableFormat TableFormat, paralleli
return buf.String()

case "scan-range-del":
iter, err := r.NewRawRangeDelIter(NoFragmentTransforms)
iter, err := r.NewRawRangeDelIter(context.Background(), NoFragmentTransforms)
if err != nil {
return err.Error()
}
Expand All @@ -190,7 +191,7 @@ func runDataDriven(t *testing.T, file string, tableFormat TableFormat, paralleli
return buf.String()

case "scan-range-key":
iter, err := r.NewRawRangeKeyIter(NoFragmentTransforms)
iter, err := r.NewRawRangeKeyIter(context.Background(), NoFragmentTransforms)
if err != nil {
return err.Error()
}
Expand Down
12 changes: 8 additions & 4 deletions table_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func (c *tableCacheShard) newIters(
var iters iterSet
var err error
if kinds.RangeKey() && file.HasRangeKeys {
iters.rangeKey, err = c.newRangeKeyIter(v, file, cr, opts.SpanIterOptions())
iters.rangeKey, err = c.newRangeKeyIter(ctx, v, file, cr, opts.SpanIterOptions())
}
if kinds.RangeDeletion() && file.HasPointKeys && err == nil {
iters.rangeDeletion, err = c.newRangeDelIter(ctx, file, cr, dbOpts)
Expand Down Expand Up @@ -620,7 +620,7 @@ func (c *tableCacheShard) newRangeDelIter(
) (keyspan.FragmentIterator, error) {
// NB: range-del iterator does not maintain a reference to the table, nor
// does it need to read from it after creation.
rangeDelIter, err := cr.NewRawRangeDelIter(file.FragmentIterTransforms())
rangeDelIter, err := cr.NewRawRangeDelIter(ctx, file.FragmentIterTransforms())
if err != nil {
return nil, err
}
Expand All @@ -641,7 +641,11 @@ func (c *tableCacheShard) newRangeDelIter(
// sstable's range keys. This function is for table-cache internal use only, and
// callers should use newIters instead.
func (c *tableCacheShard) newRangeKeyIter(
v *tableCacheValue, file *fileMetadata, cr sstable.CommonReader, opts keyspan.SpanIterOptions,
ctx context.Context,
v *tableCacheValue,
file *fileMetadata,
cr sstable.CommonReader,
opts keyspan.SpanIterOptions,
) (keyspan.FragmentIterator, error) {
transforms := file.FragmentIterTransforms()
// Don't filter a table's range keys if the file contains RANGEKEYDELs.
Expand All @@ -658,7 +662,7 @@ func (c *tableCacheShard) newRangeKeyIter(
}
}
// TODO(radu): wrap in an AssertBounds.
return cr.NewRawRangeKeyIter(transforms)
return cr.NewRawRangeKeyIter(ctx, transforms)
}

type tableCacheShardReaderProvider struct {
Expand Down
Loading

0 comments on commit 4f01ec4

Please sign in to comment.