Skip to content

Commit

Permalink
sstable: hide obsolete range keys for shared ingested sstables
Browse files Browse the repository at this point in the history
This change updates the virtual sstable range key iterator to
hide obsolete range keys for shared ingested files. It does
this by reusing the ForeignSSTTransformer that was removed
in #2782.

This change also cleans up some dangling references to the older
`isForeign` way of determining whether to hide obsolete keys,
in lieu of the newer isSharedIngested approach to address
boomerang shared files.

Fixes #3174.
  • Loading branch information
itsbilal committed Jan 3, 2024
1 parent dd884ef commit 110330e
Show file tree
Hide file tree
Showing 13 changed files with 223 additions and 101 deletions.
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ func runSSTablePropertiesCmd(t *testing.T, td *datadriven.TestData, d *DB) strin
var v sstable.VirtualReader
props := r.Properties.String()
if m != nil && m.Virtual {
v = sstable.MakeVirtualReader(r, m.VirtualMeta(), false /* isForeign */)
v = sstable.MakeVirtualReader(r, m.VirtualMeta(), false /* isShared */)
props = v.Properties.String()
}
if len(td.Input) == 0 {
Expand Down
4 changes: 4 additions & 0 deletions event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"fmt"
"reflect"
"runtime"
"strings"
"sync"
"testing"
Expand All @@ -25,6 +26,9 @@ import (

// Verify event listener actions, as well as expected filesystem operations.
func TestEventListener(t *testing.T) {
if runtime.GOARCH == "386" {
t.Skip("skipped on 32-bit due to slightly varied output")
}
var d *DB
var memLog base.InMemLogger
mem := vfs.NewMem()
Expand Down
4 changes: 4 additions & 0 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"math"
"os"
"path/filepath"
"runtime"
"slices"
"sort"
"strconv"
Expand Down Expand Up @@ -2116,6 +2117,9 @@ func TestIngest(t *testing.T) {
var mem vfs.FS
var d *DB
var flushed bool
if runtime.GOARCH == "386" {
t.Skip("skipped on 32-bit due to slightly varied output")
}
defer func() {
require.NoError(t, d.Close())
}()
Expand Down
52 changes: 17 additions & 35 deletions internal/rangekey/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package rangekey

import (
"bytes"
"fmt"
"math"
"sort"

Expand Down Expand Up @@ -381,16 +380,17 @@ func coalesce(
}

// ForeignSSTTransformer implements a keyspan.Transformer for range keys in
// foreign sstables (i.e. shared sstables not created by us). It is largely
// similar to the Transform function implemented in UserIteratorConfig in that
// it calls coalesce to remove range keys shadowed by other range keys, but also
// retains the range key that does the shadowing. In addition, it elides
// RangeKey unsets/dels in L6 as they are inapplicable when reading from a
// different Pebble instance.
// shared ingested sstables. It is largely similar to the Transform function
// implemented in UserIteratorConfig in that it calls coalesce to remove range
// keys shadowed by other range keys, but also retains the range key that does
// the shadowing. In addition, it elides RangeKey unsets/dels in L6 as they are
// inapplicable when reading from a different Pebble instance. Finally, it
// returns keys sorted in trailer order, not suffix order, as that's what the
// rest of the iterator stack expects.
type ForeignSSTTransformer struct {
Comparer *base.Comparer
Level int
sortBuf keyspan.KeysBySuffix
Equal base.Equal
SeqNum uint64
sortBuf keyspan.KeysBySuffix
}

// Transform implements the Transformer interface.
Expand All @@ -404,53 +404,35 @@ func (f *ForeignSSTTransformer) Transform(
Cmp: cmp,
Keys: f.sortBuf.Keys[:0],
}
if err := coalesce(f.Comparer.Equal, &f.sortBuf, math.MaxUint64, s.Keys); err != nil {
if err := coalesce(f.Equal, &f.sortBuf, math.MaxUint64, s.Keys); err != nil {
return err
}
keys := f.sortBuf.Keys
dst.Keys = dst.Keys[:0]
for i := range keys {
seqNum := keys[i].SeqNum()
switch keys[i].Kind() {
case base.InternalKeyKindRangeKeySet:
if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 {
panic("pebble: keys unexpectedly not in ascending suffix order")
}
switch f.Level {
case 5:
fallthrough
case 6:
if seqNum != base.SeqNumForLevel(f.Level) {
panic(fmt.Sprintf("pebble: expected range key iter to return seqnum %d, got %d", base.SeqNumForLevel(f.Level), seqNum))
}
}
case base.InternalKeyKindRangeKeyUnset:
if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 {
panic("pebble: keys unexpectedly not in ascending suffix order")
}
fallthrough
case base.InternalKeyKindRangeKeyDelete:
switch f.Level {
case 5:
// Emit this key.
if seqNum != base.SeqNumForLevel(f.Level) {
panic(fmt.Sprintf("pebble: expected range key iter to return seqnum %d, got %d", base.SeqNumForLevel(f.Level), seqNum))
}
case 6:
// Skip this key, as foreign sstable in L6 do not need to emit range key
// unsets/dels as they do not apply to any other sstables.
continue
}
// Nothing to do.
default:
return base.CorruptionErrorf("pebble: unrecognized range key kind %s", keys[i].Kind())
}
dst.Keys = append(dst.Keys, keyspan.Key{
Trailer: base.MakeTrailer(seqNum, keys[i].Kind()),
Trailer: base.MakeTrailer(f.SeqNum, keys[i].Kind()),
Suffix: keys[i].Suffix,
Value: keys[i].Value,
})
}
// coalesce results in dst.Keys being sorted by Suffix.
dst.KeysOrder = keyspan.BySuffixAsc
// coalesce results in dst.Keys being sorted by Suffix. Change it back to
// ByTrailerDesc, as that's what the iterator stack will expect.
keyspan.SortKeysByTrailer(&dst.Keys)
dst.KeysOrder = keyspan.ByTrailerDesc
return nil
}
4 changes: 4 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pebble
import (
"bytes"
"fmt"
"runtime"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -93,6 +94,9 @@ func exampleMetrics() Metrics {
}

func TestMetrics(t *testing.T) {
if runtime.GOARCH == "386" {
t.Skip("skipped on 32-bit due to slightly varied output")
}
c := cache.New(cacheDefaultSize)
defer c.Unref()
opts := &Options{
Expand Down
38 changes: 28 additions & 10 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (c Comparers) readerApply(r *Reader) {
}
if comparer, ok := c[r.Properties.ComparerName]; ok {
r.Compare = comparer.Compare
r.Equal = comparer.Equal
r.FormatKey = comparer.FormatKey
r.Split = comparer.Split
}
Expand Down Expand Up @@ -218,6 +219,7 @@ type Reader struct {
footerBH BlockHandle
opts ReaderOptions
Compare Compare
Equal Equal
FormatKey base.FormatKey
Split Split
tableFilter *tableFilterReader
Expand Down Expand Up @@ -380,7 +382,7 @@ func (r *Reader) newCompactionIter(
err := i.init(
context.Background(),
r, v, nil /* lower */, nil /* upper */, nil,
false /* useFilter */, v != nil && v.isForeign, /* hideObsoletePoints */
false /* useFilter */, v != nil && v.isSharedIngested, /* hideObsoletePoints */
nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
)
if err != nil {
Expand All @@ -395,7 +397,7 @@ func (r *Reader) newCompactionIter(
i := singleLevelIterPool.Get().(*singleLevelIterator)
err := i.init(
context.Background(), r, v, nil /* lower */, nil, /* upper */
nil, false /* useFilter */, v != nil && v.isForeign, /* hideObsoletePoints */
nil, false /* useFilter */, v != nil && v.isSharedIngested, /* hideObsoletePoints */
nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
)
if err != nil {
Expand Down Expand Up @@ -423,19 +425,17 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) {
return nil, err
}
i := &fragmentBlockIter{elideSameSeqnum: true}
// It's okay for hideObsoletePoints to be false here, even for shared ingested
// sstables. This is because rangedels do not apply to points in the same
// sstable at the same sequence number anyway, so exposing obsolete rangedels
// is harmless.
if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
return nil, err
}
return i, nil
}

// NewRawRangeKeyIter returns an internal iterator for the contents of the
// range-key block for the table. Returns nil if the table does not contain any
// range keys.
//
// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
// iterator. Add WithContext methods since the existing ones are public.
func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
func (r *Reader) newRawRangeKeyIter(vState *virtualState) (keyspan.FragmentIterator, error) {
if r.rangeKeyBH.Length == 0 {
return nil, nil
}
Expand All @@ -444,12 +444,29 @@ func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
return nil, err
}
i := rangeKeyFragmentBlockIterPool.Get().(*rangeKeyFragmentBlockIter)
if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
var globalSeqNum uint64
// Don't pass a global sequence number for shared ingested sstables. The
// virtual reader needs to know the materialized sequence numbers, and will
// do the appropriate sequence number substitution.
if vState == nil || !vState.isSharedIngested {
globalSeqNum = r.Properties.GlobalSeqNum
}
if err := i.blockIter.initHandle(r.Compare, h, globalSeqNum, false /* hideObsoletePoints */); err != nil {
return nil, err
}
return i, nil
}

// NewRawRangeKeyIter returns an internal iterator for the contents of the
// range-key block for the table. Returns nil if the table does not contain any
// range keys.
//
// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
// iterator. Add WithContext methods since the existing ones are public.
func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
return r.newRawRangeKeyIter(nil /* vState */)
}

type rangeKeyFragmentBlockIter struct {
fragmentBlockIter
}
Expand Down Expand Up @@ -1178,6 +1195,7 @@ func NewReader(f objstorage.Readable, o ReaderOptions, extraOpts ...ReaderOption

if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
r.Compare = o.Comparer.Compare
r.Equal = o.Comparer.Equal
r.FormatKey = o.Comparer.FormatKey
r.Split = o.Comparer.Split
}
Expand Down
2 changes: 1 addition & 1 deletion sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func TestVirtualReader(t *testing.T) {
vMeta.ValidateVirtual(meta.FileMetadata)

vMeta1 = vMeta.VirtualMeta()
v = MakeVirtualReader(r, vMeta1, false /* isForeign */)
v = MakeVirtualReader(r, vMeta1, false /* isSharedIngested */)
return formatVirtualReader(&v)

case "citer":
Expand Down
49 changes: 33 additions & 16 deletions sstable/reader_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangekey"
)

// VirtualReader wraps Reader. Its purpose is to restrict functionality of the
Expand All @@ -28,12 +29,12 @@ type VirtualReader struct {

// Lightweight virtual sstable state which can be passed to sstable iterators.
type virtualState struct {
lower InternalKey
upper InternalKey
fileNum base.FileNum
Compare Compare
isForeign bool
prefixChange *manifest.PrefixReplacement
lower InternalKey
upper InternalKey
fileNum base.FileNum
Compare Compare
isSharedIngested bool
prefixChange *manifest.PrefixReplacement
}

func ceilDiv(a, b uint64) uint64 {
Expand All @@ -42,20 +43,18 @@ func ceilDiv(a, b uint64) uint64 {

// MakeVirtualReader is used to contruct a reader which can read from virtual
// sstables.
func MakeVirtualReader(
reader *Reader, meta manifest.VirtualFileMeta, isForeign bool,
) VirtualReader {
func MakeVirtualReader(reader *Reader, meta manifest.VirtualFileMeta, isShared bool) VirtualReader {
if reader.fileNum != meta.FileBacking.DiskFileNum {
panic("pebble: invalid call to MakeVirtualReader")
}

vState := virtualState{
lower: meta.Smallest,
upper: meta.Largest,
fileNum: meta.FileNum,
Compare: reader.Compare,
isForeign: isForeign,
prefixChange: meta.PrefixReplacement,
lower: meta.Smallest,
upper: meta.Largest,
fileNum: meta.FileNum,
Compare: reader.Compare,
isSharedIngested: isShared && reader.Properties.GlobalSeqNum != 0,
prefixChange: meta.PrefixReplacement,
}
v := VirtualReader{
vState: vState,
Expand Down Expand Up @@ -167,7 +166,7 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) {

// NewRawRangeKeyIter wraps Reader.NewRawRangeKeyIter.
func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
iter, err := v.reader.NewRawRangeKeyIter()
iter, err := v.reader.newRawRangeKeyIter(&v.vState)
if err != nil {
return nil, err
}
Expand All @@ -177,6 +176,24 @@ func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
lower := &v.vState.lower
upper := &v.vState.upper

if v.vState.isSharedIngested {
// We need to coalesce range keys within each sstable, and then apply the
// global sequence number. For this, we use ForeignSSTTransformer.
//
// TODO(bilal): Avoid these allocations by hoisting the transformer and
// transform iter into VirtualReader.
transform := &rangekey.ForeignSSTTransformer{
Equal: v.reader.Equal,
SeqNum: v.reader.Properties.GlobalSeqNum,
}
transformIter := &keyspan.TransformerIter{
FragmentIterator: iter,
Transformer: transform,
Compare: v.reader.Compare,
}
iter = transformIter
}

if v.vState.prefixChange != nil {
lower = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(lower.UserKey), Trailer: lower.Trailer}
upper = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(upper.UserKey), Trailer: upper.Trailer}
Expand Down
Loading

0 comments on commit 110330e

Please sign in to comment.