Skip to content

Commit

Permalink
sstable: remove global seq num property
Browse files Browse the repository at this point in the history
This change removes the "global seq num" sstable property and instead
passes it to iterators, as a `SyntheticSeqNum`. This cleans up a lot
of the code that manipulated the global seq num in the reader and
allows the same external file backing to be shared.

As part of this change, we create a new struct `IterTransforms` and
move the synthetic suffix as well as `hideObsoletePoints` to this
struct. This cleans up the call sites, and allows easily adding more
transforms in the future (e.g prefix replacement).

We add a method on `FileMetadata` that generates the proper
`IterTransforms` for that table.
  • Loading branch information
RaduBerinde committed Feb 16, 2024
1 parent 9077044 commit a5ccde4
Show file tree
Hide file tree
Showing 55 changed files with 657 additions and 601 deletions.
2 changes: 1 addition & 1 deletion compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ func TestCompaction(t *testing.T) {
return "", "", errors.WithStack(err)
}
defer r.Close()
iter, err := r.NewIter(nil /* lower */, nil /* upper */)
iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
if err != nil {
return "", "", errors.WithStack(err)
}
Expand Down
25 changes: 18 additions & 7 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (internalIterato
if len(it.externalReaders) > cap(mlevels) {
mlevels = make([]mergingIterLevel, 0, len(it.externalReaders))
}
// We set a synthetic sequence number, with lower levels having higer numbers.
seqNum := 0
for _, readers := range it.externalReaders {
seqNum += len(readers)
}
for _, readers := range it.externalReaders {
var combinedIters []internalIterator
for _, r := range readers {
Expand All @@ -196,15 +201,17 @@ func createExternalPointIter(ctx context.Context, it *Iterator) (internalIterato
// not have obsolete points (so the performance optimization is
// unnecessary), and we don't want to bother constructing a
// BlockPropertiesFilterer that includes obsoleteKeyBlockPropertyFilter.
transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
seqNum--
pointIter, err = r.NewIterWithBlockPropertyFiltersAndContextEtc(
ctx, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
false /* hideObsoletePoints */, false, /* useFilterBlock */
ctx, transforms, it.opts.LowerBound, it.opts.UpperBound, nil, /* BlockPropertiesFilterer */
false, /* useFilterBlock */
&it.stats.InternalStats, it.opts.CategoryAndQoS, nil,
sstable.TrivialReaderProvider{Reader: r})
if err != nil {
return nil, err
}
rangeDelIter, err = r.NewRawRangeDelIter()
rangeDelIter, err = r.NewRawRangeDelIter(transforms)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -268,9 +275,16 @@ func finishInitializingExternal(ctx context.Context, it *Iterator) error {
// TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not
// operate on FileMetadatas (similar to simpleLevelIter), and implements
// this optimization.
// We set a synthetic sequence number, with lower levels having higer numbers.
seqNum := 0
for _, readers := range it.externalReaders {
seqNum += len(readers)
}
for _, readers := range it.externalReaders {
for _, r := range readers {
if rki, err := r.NewRawRangeKeyIter(); err != nil {
transforms := sstable.IterTransforms{SyntheticSeqNum: sstable.SyntheticSeqNum(seqNum)}
seqNum--
if rki, err := r.NewRawRangeKeyIter(transforms); err != nil {
return err
} else if rki != nil {
rangeKeyIters = append(rangeKeyIters, rki)
Expand Down Expand Up @@ -322,9 +336,6 @@ func openExternalTables(
if err != nil {
return readers, err
}
// Use the index of the file in files as the sequence number for all of
// its keys.
r.Properties.GlobalSeqNum = uint64(len(files) - i + seqNumOffset)
readers = append(readers, r)
}
return readers, err
Expand Down
4 changes: 2 additions & 2 deletions external_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestSimpleLevelIter(t *testing.T) {
}()
var internalIters []internalIterator
for i := range readers {
iter, err := readers[i].NewIter(nil, nil)
iter, err := readers[i].NewIter(sstable.NoTransforms, nil, nil)
require.NoError(t, err)
internalIters = append(internalIters, iter)
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestIterRandomizedMaybeFilteredKeys(t *testing.T) {

var iter sstable.Iterator
iter, err = r.NewIterWithBlockPropertyFilters(
nil, nil, filterer, false /* useFilterBlock */, nil, /* stats */
sstable.NoTransforms, nil, nil, filterer, false /* useFilterBlock */, nil, /* stats */
sstable.CategoryAndQoS{}, nil, sstable.TrivialReaderProvider{Reader: r})
require.NoError(t, err)
defer iter.Close()
Expand Down
6 changes: 3 additions & 3 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func ingestLoad1(
maybeSetStatsFromProperties(meta.PhysicalMeta(), &r.Properties)

{
iter, err := r.NewIter(nil /* lower */, nil /* upper */)
iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
if err != nil {
return nil, err
}
Expand All @@ -311,7 +311,7 @@ func ingestLoad1(
}
}

iter, err := r.NewRawRangeDelIter()
iter, err := r.NewRawRangeDelIter(sstable.NoTransforms)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -341,7 +341,7 @@ func ingestLoad1(

// Update the range-key bounds for the table.
{
iter, err := r.NewRawRangeKeyIter()
iter, err := r.NewRawRangeKeyIter(sstable.NoTransforms)
if err != nil {
return nil, err
}
Expand Down
19 changes: 18 additions & 1 deletion internal/manifest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,23 @@ func (m *FileMetadata) InternalKeyBounds() (InternalKey, InternalKey) {
return m.Smallest, m.Largest
}

// SyntheticSeqNum returns a SyntheticSeqNum which is set when SmallestSeqNum
// equals LargestSeqNum.
func (m *FileMetadata) SyntheticSeqNum() sstable.SyntheticSeqNum {
if m.SmallestSeqNum == m.LargestSeqNum {
return sstable.SyntheticSeqNum(m.SmallestSeqNum)
}
return sstable.NoSyntheticSeqNum
}

// IterTransforms returns an sstable.IterTransforms that has SyntheticSeqNum set as needed.
func (m *FileMetadata) IterTransforms() sstable.IterTransforms {
return sstable.IterTransforms{
SyntheticSeqNum: m.SyntheticSeqNum(),
SyntheticSuffix: m.SyntheticSuffix,
}
}

// PhysicalFileMeta is used by functions which want a guarantee that their input
// belongs to a physical sst and not a virtual sst.
//
Expand Down Expand Up @@ -335,7 +352,7 @@ func (m VirtualFileMeta) VirtualReaderParams(isShared bool) sstable.VirtualReade
Lower: m.Smallest,
Upper: m.Largest,
FileNum: m.FileNum,
IsShared: isShared,
IsSharedIngested: isShared && m.SyntheticSeqNum() != 0,
Size: m.Size,
BackingSize: m.FileBacking.Size,
PrefixReplacement: m.PrefixReplacement,
Expand Down
4 changes: 2 additions & 2 deletions level_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func TestCheckLevelsCornerCases(t *testing.T) {
newIters :=
func(_ context.Context, file *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts, _ iterKinds) (iterSet, error) {
r := readers[file.FileNum]
rangeDelIter, err := r.NewRawRangeDelIter()
rangeDelIter, err := r.NewRawRangeDelIter(sstable.NoTransforms)
if err != nil {
return iterSet{}, err
}
iter, err := r.NewIter(nil /* lower */, nil /* upper */)
iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
if err != nil {
return iterSet{}, err
}
Expand Down
16 changes: 10 additions & 6 deletions level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,15 @@ func (lt *levelIterTest) newIters(
kinds iterKinds,
) (iterSet, error) {
lt.itersCreated++
transforms := file.IterTransforms()
iter, err := lt.readers[file.FileNum].NewIterWithBlockPropertyFiltersAndContextEtc(
ctx, opts.LowerBound, opts.UpperBound, nil, false, true, iio.stats, sstable.CategoryAndQoS{},
ctx, transforms,
opts.LowerBound, opts.UpperBound, nil, true /* useFilterBlock */, iio.stats, sstable.CategoryAndQoS{},
nil, sstable.TrivialReaderProvider{Reader: lt.readers[file.FileNum]})
if err != nil {
return iterSet{}, err
}
rangeDelIter, err := lt.readers[file.FileNum].NewRawRangeDelIter()
rangeDelIter, err := lt.readers[file.FileNum].NewRawRangeDelIter(transforms)
if err != nil {
return iterSet{}, err
}
Expand Down Expand Up @@ -514,7 +516,7 @@ func buildLevelIterTables(

meta := make([]*fileMetadata, len(readers))
for i := range readers {
iter, err := readers[i].NewIter(nil /* lower */, nil /* upper */)
iter, err := readers[i].NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
require.NoError(b, err)
smallest, _ := iter.First()
meta[i] = &fileMetadata{}
Expand All @@ -541,7 +543,7 @@ func BenchmarkLevelIterSeekGE(b *testing.B) {
newIters := func(
_ context.Context, file *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts, _ iterKinds,
) (iterSet, error) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
iter, err := readers[file.FileNum].NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
return iterSet{point: iter}, err
}
l := newLevelIter(context.Background(), IterOptions{}, DefaultComparer, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})
Expand Down Expand Up @@ -583,6 +585,7 @@ func BenchmarkLevelIterSeqSeekGEWithBounds(b *testing.B) {
_ context.Context, file *manifest.FileMetadata, opts *IterOptions, _ internalIterOpts, _ iterKinds,
) (iterSet, error) {
iter, err := readers[file.FileNum].NewIter(
sstable.NoTransforms,
opts.LowerBound, opts.UpperBound)
return iterSet{point: iter}, err
}
Expand Down Expand Up @@ -625,6 +628,7 @@ func BenchmarkLevelIterSeqSeekPrefixGE(b *testing.B) {
_ context.Context, file *manifest.FileMetadata, opts *IterOptions, _ internalIterOpts, _ iterKinds,
) (iterSet, error) {
iter, err := readers[file.FileNum].NewIter(
sstable.NoTransforms,
opts.LowerBound, opts.UpperBound)
return iterSet{point: iter}, err
}
Expand Down Expand Up @@ -675,7 +679,7 @@ func BenchmarkLevelIterNext(b *testing.B) {
newIters := func(
_ context.Context, file *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts, _ iterKinds,
) (iterSet, error) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
iter, err := readers[file.FileNum].NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
return iterSet{point: iter}, err
}
l := newLevelIter(context.Background(), IterOptions{}, testkeys.Comparer, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})
Expand Down Expand Up @@ -709,7 +713,7 @@ func BenchmarkLevelIterPrev(b *testing.B) {
newIters := func(
_ context.Context, file *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts, _ iterKinds,
) (iterSet, error) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
iter, err := readers[file.FileNum].NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
return iterSet{point: iter}, err
}
l := newLevelIter(context.Background(), IterOptions{}, DefaultComparer, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})
Expand Down
15 changes: 8 additions & 7 deletions merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,12 @@ func TestMergingIterCornerCases(t *testing.T) {
func(_ context.Context, file *manifest.FileMetadata, opts *IterOptions, iio internalIterOpts, kinds iterKinds,
) (iterSet, error) {
r := readers[file.FileNum]
rangeDelIter, err := r.NewRawRangeDelIter()
rangeDelIter, err := r.NewRawRangeDelIter(sstable.NoTransforms)
if err != nil {
return iterSet{}, err
}
iter, err := r.NewIterWithBlockPropertyFilters(
sstable.NoTransforms,
opts.GetLowerBound(), opts.GetUpperBound(), nil, true /* useFilterBlock */, iio.stats,
sstable.CategoryAndQoS{}, nil, sstable.TrivialReaderProvider{Reader: r})
if err != nil {
Expand Down Expand Up @@ -408,7 +409,7 @@ func BenchmarkMergingIterSeekGE(b *testing.B) {
iters := make([]internalIterator, len(readers))
for i := range readers {
var err error
iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */)
iters[i], err = readers[i].NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
require.NoError(b, err)
}
var stats base.InternalIteratorStats
Expand Down Expand Up @@ -441,7 +442,7 @@ func BenchmarkMergingIterNext(b *testing.B) {
iters := make([]internalIterator, len(readers))
for i := range readers {
var err error
iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */)
iters[i], err = readers[i].NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
require.NoError(b, err)
}
var stats base.InternalIteratorStats
Expand Down Expand Up @@ -477,7 +478,7 @@ func BenchmarkMergingIterPrev(b *testing.B) {
iters := make([]internalIterator, len(readers))
for i := range readers {
var err error
iters[i], err = readers[i].NewIter(nil /* lower */, nil /* upper */)
iters[i], err = readers[i].NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
require.NoError(b, err)
}
var stats base.InternalIteratorStats
Expand Down Expand Up @@ -633,7 +634,7 @@ func buildLevelsForMergingIterSeqSeek(
for i := range readers {
meta := make([]*fileMetadata, len(readers[i]))
for j := range readers[i] {
iter, err := readers[i][j].NewIter(nil /* lower */, nil /* upper */)
iter, err := readers[i][j].NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
require.NoError(b, err)
smallest, _ := iter.First()
meta[j] = &fileMetadata{}
Expand All @@ -659,11 +660,11 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS
_ context.Context, file *manifest.FileMetadata, opts *IterOptions, _ internalIterOpts, _ iterKinds,
) (iterSet, error) {
iter, err := readers[levelIndex][file.FileNum].NewIter(
opts.LowerBound, opts.UpperBound)
sstable.NoTransforms, opts.LowerBound, opts.UpperBound)
if err != nil {
return iterSet{}, err
}
rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter()
rdIter, err := readers[levelIndex][file.FileNum].NewRawRangeDelIter(sstable.NoTransforms)
if err != nil {
iter.Close()
return iterSet{}, err
Expand Down
6 changes: 3 additions & 3 deletions metamorphic/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ func openExternalObj(
reader, err = sstable.NewReader(objstorageprovider.NewRemoteReadable(objReader, objSize), opts)
panicIfErr(err)

pointIter, err = reader.NewIter(bounds.Start, bounds.End)
pointIter, err = reader.NewIter(sstable.NoTransforms, bounds.Start, bounds.End)
panicIfErr(err)

rangeDelIter, err = reader.NewRawRangeDelIter()
rangeDelIter, err = reader.NewRawRangeDelIter(sstable.NoTransforms)
panicIfErr(err)
if rangeDelIter != nil {
rangeDelIter = keyspan.Truncate(
Expand All @@ -212,7 +212,7 @@ func openExternalObj(
)
}

rangeKeyIter, err = reader.NewRawRangeKeyIter()
rangeKeyIter, err = reader.NewRawRangeKeyIter(sstable.NoTransforms)
panicIfErr(err)
if rangeKeyIter != nil {
rangeKeyIter = keyspan.Truncate(
Expand Down
6 changes: 3 additions & 3 deletions replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ func loadFlushedSSTableKeys(
defer r.Close()

// Load all the point keys.
iter, err := r.NewIter(nil, nil)
iter, err := r.NewIter(sstable.NoTransforms, nil, nil)
if err != nil {
return err
}
Expand All @@ -1019,7 +1019,7 @@ func loadFlushedSSTableKeys(
}

// Load all the range tombstones.
if iter, err := r.NewRawRangeDelIter(); err != nil {
if iter, err := r.NewRawRangeDelIter(sstable.NoTransforms); err != nil {
return err
} else if iter != nil {
defer iter.Close()
Expand All @@ -1042,7 +1042,7 @@ func loadFlushedSSTableKeys(
}

// Load all the range keys.
if iter, err := r.NewRawRangeKeyIter(); err != nil {
if iter, err := r.NewRawRangeKeyIter(sstable.NoTransforms); err != nil {
return err
} else if iter != nil {
defer iter.Close()
Expand Down
Loading

0 comments on commit a5ccde4

Please sign in to comment.