Skip to content

Commit

Permalink
sstable: hide obsolete range keys for shared ingested sstables
Browse files Browse the repository at this point in the history
This change updates the virtual sstable range key iterator to
hide obsolete range keys for shared ingested files. It does
this by reusing the ForeignSSTTransformer that was removed
in cockroachdb#2782.

This change also cleans up some dangling references to the older
`isForeign` way of determining whether to hide obsolete keys,
in lieu of the newer isSharedIngested approach to address
boomerang shared files.

Fixes cockroachdb#3174.
  • Loading branch information
itsbilal committed Jan 3, 2024
1 parent d25fed7 commit ab534cb
Show file tree
Hide file tree
Showing 15 changed files with 225 additions and 106 deletions.
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ func runSSTablePropertiesCmd(t *testing.T, td *datadriven.TestData, d *DB) strin
var v sstable.VirtualReader
props := r.Properties.String()
if m != nil && m.Virtual {
v = sstable.MakeVirtualReader(r, m.VirtualMeta(), false /* isForeign */)
v = sstable.MakeVirtualReader(r, m.VirtualMeta(), false /* isShared */)
props = v.Properties.String()
}
if len(td.Input) == 0 {
Expand Down
4 changes: 4 additions & 0 deletions event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"fmt"
"reflect"
"runtime"
"strings"
"sync"
"testing"
Expand All @@ -25,6 +26,9 @@ import (

// Verify event listener actions, as well as expected filesystem operations.
func TestEventListener(t *testing.T) {
if runtime.GOARCH == "386" {
t.Skip("skipped on 32-bit due to slightly varied output")
}
var d *DB
var memLog base.InMemLogger
mem := vfs.NewMem()
Expand Down
4 changes: 4 additions & 0 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"math"
"os"
"path/filepath"
"runtime"
"slices"
"sort"
"strconv"
Expand Down Expand Up @@ -2116,6 +2117,9 @@ func TestIngest(t *testing.T) {
var mem vfs.FS
var d *DB
var flushed bool
if runtime.GOARCH == "386" {
t.Skip("skipped on 32-bit due to slightly varied output")
}
defer func() {
require.NoError(t, d.Close())
}()
Expand Down
52 changes: 17 additions & 35 deletions internal/rangekey/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package rangekey

import (
"bytes"
"fmt"
"math"
"sort"

Expand Down Expand Up @@ -381,16 +380,17 @@ func coalesce(
}

// ForeignSSTTransformer implements a keyspan.Transformer for range keys in
// foreign sstables (i.e. shared sstables not created by us). It is largely
// similar to the Transform function implemented in UserIteratorConfig in that
// it calls coalesce to remove range keys shadowed by other range keys, but also
// retains the range key that does the shadowing. In addition, it elides
// RangeKey unsets/dels in L6 as they are inapplicable when reading from a
// different Pebble instance.
// shared ingested sstables. It is largely similar to the Transform function
// implemented in UserIteratorConfig in that it calls coalesce to remove range
// keys shadowed by other range keys, but also retains the range key that does
// the shadowing. In addition, it elides RangeKey unsets/dels in L6 as they are
// inapplicable when reading from a different Pebble instance. Finally, it
// returns keys sorted in trailer order, not suffix order, as that's what the
// rest of the iterator stack expects.
type ForeignSSTTransformer struct {
Comparer *base.Comparer
Level int
sortBuf keyspan.KeysBySuffix
Equal base.Equal
SeqNum uint64
sortBuf keyspan.KeysBySuffix
}

// Transform implements the Transformer interface.
Expand All @@ -404,53 +404,35 @@ func (f *ForeignSSTTransformer) Transform(
Cmp: cmp,
Keys: f.sortBuf.Keys[:0],
}
if err := coalesce(f.Comparer.Equal, &f.sortBuf, math.MaxUint64, s.Keys); err != nil {
if err := coalesce(f.Equal, &f.sortBuf, math.MaxUint64, s.Keys); err != nil {
return err
}
keys := f.sortBuf.Keys
dst.Keys = dst.Keys[:0]
for i := range keys {
seqNum := keys[i].SeqNum()
switch keys[i].Kind() {
case base.InternalKeyKindRangeKeySet:
if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 {
panic("pebble: keys unexpectedly not in ascending suffix order")
}
switch f.Level {
case 5:
fallthrough
case 6:
if seqNum != base.SeqNumForLevel(f.Level) {
panic(fmt.Sprintf("pebble: expected range key iter to return seqnum %d, got %d", base.SeqNumForLevel(f.Level), seqNum))
}
}
case base.InternalKeyKindRangeKeyUnset:
if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 {
panic("pebble: keys unexpectedly not in ascending suffix order")
}
fallthrough
case base.InternalKeyKindRangeKeyDelete:
switch f.Level {
case 5:
// Emit this key.
if seqNum != base.SeqNumForLevel(f.Level) {
panic(fmt.Sprintf("pebble: expected range key iter to return seqnum %d, got %d", base.SeqNumForLevel(f.Level), seqNum))
}
case 6:
// Skip this key, as foreign sstable in L6 do not need to emit range key
// unsets/dels as they do not apply to any other sstables.
continue
}
// Nothing to do.
default:
return base.CorruptionErrorf("pebble: unrecognized range key kind %s", keys[i].Kind())
}
dst.Keys = append(dst.Keys, keyspan.Key{
Trailer: base.MakeTrailer(seqNum, keys[i].Kind()),
Trailer: base.MakeTrailer(f.SeqNum, keys[i].Kind()),
Suffix: keys[i].Suffix,
Value: keys[i].Value,
})
}
// coalesce results in dst.Keys being sorted by Suffix.
dst.KeysOrder = keyspan.BySuffixAsc
// coalesce results in dst.Keys being sorted by Suffix. Change it back to
// ByTrailerDesc, as that's what the iterator stack will expect.
keyspan.SortKeysByTrailer(&dst.Keys)
dst.KeysOrder = keyspan.ByTrailerDesc
return nil
}
6 changes: 3 additions & 3 deletions internal/rangekey/rangekey.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func Decode(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span,
case base.InternalKeyKindRangeKeyUnset:
for len(v) > 0 {
var suffix []byte
suffix, v, ok = decodeSuffix(v)
suffix, v, ok = DecodeSuffix(v)
if !ok {
return keyspan.Span{}, base.CorruptionErrorf("pebble: unable to decode range key unset suffix")
}
Expand Down Expand Up @@ -370,10 +370,10 @@ func EncodeUnsetValue(dst []byte, endKey []byte, suffixes [][]byte) int {
return n
}

// decodeSuffix decodes a single suffix from the beginning of data. If decoding
// DecodeSuffix decodes a single suffix from the beginning of data. If decoding
// suffixes from a RangeKeyUnset's value, the end key must have already been
// stripped from the RangeKeyUnset's value (see DecodeEndKey).
func decodeSuffix(data []byte) (suffix, rest []byte, ok bool) {
func DecodeSuffix(data []byte) (suffix, rest []byte, ok bool) {
return decodeVarstring(data)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/rangekey/rangekey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestUnsetSuffixes_RoundTrip(t *testing.T) {
// Decode.
var ss suffixes
for len(b) > 0 {
s, rest, ok := decodeSuffix(b)
s, rest, ok := DecodeSuffix(b)
require.True(t, ok)
ss = append(ss, s)
b = rest
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestUnsetValue_Roundtrip(t *testing.T) {
for len(rest) > 0 {
var ok bool
var suffix []byte
suffix, rest, ok = decodeSuffix(rest)
suffix, rest, ok = DecodeSuffix(rest)
require.True(t, ok)
suffixes = append(suffixes, suffix)
}
Expand Down
4 changes: 4 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pebble
import (
"bytes"
"fmt"
"runtime"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -93,6 +94,9 @@ func exampleMetrics() Metrics {
}

func TestMetrics(t *testing.T) {
if runtime.GOARCH == "386" {
t.Skip("skipped on 32-bit due to slightly varied output")
}
c := cache.New(cacheDefaultSize)
defer c.Unref()
opts := &Options{
Expand Down
38 changes: 28 additions & 10 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (c Comparers) readerApply(r *Reader) {
}
if comparer, ok := c[r.Properties.ComparerName]; ok {
r.Compare = comparer.Compare
r.Equal = comparer.Equal
r.FormatKey = comparer.FormatKey
r.Split = comparer.Split
}
Expand Down Expand Up @@ -218,6 +219,7 @@ type Reader struct {
footerBH BlockHandle
opts ReaderOptions
Compare Compare
Equal Equal
FormatKey base.FormatKey
Split Split
tableFilter *tableFilterReader
Expand Down Expand Up @@ -380,7 +382,7 @@ func (r *Reader) newCompactionIter(
err := i.init(
context.Background(),
r, v, nil /* lower */, nil /* upper */, nil,
false /* useFilter */, v != nil && v.isForeign, /* hideObsoletePoints */
false /* useFilter */, v != nil && v.isSharedIngested, /* hideObsoletePoints */
nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
)
if err != nil {
Expand All @@ -395,7 +397,7 @@ func (r *Reader) newCompactionIter(
i := singleLevelIterPool.Get().(*singleLevelIterator)
err := i.init(
context.Background(), r, v, nil /* lower */, nil, /* upper */
nil, false /* useFilter */, v != nil && v.isForeign, /* hideObsoletePoints */
nil, false /* useFilter */, v != nil && v.isSharedIngested, /* hideObsoletePoints */
nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
)
if err != nil {
Expand Down Expand Up @@ -423,19 +425,17 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) {
return nil, err
}
i := &fragmentBlockIter{elideSameSeqnum: true}
// It's okay for hideObsoletePoints to be false here, even for shared ingested
// sstables. This is because rangedels do not apply to points in the same
// sstable at the same sequence number anyway, so exposing obsolete rangedels
// is harmless.
if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
return nil, err
}
return i, nil
}

// NewRawRangeKeyIter returns an internal iterator for the contents of the
// range-key block for the table. Returns nil if the table does not contain any
// range keys.
//
// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
// iterator. Add WithContext methods since the existing ones are public.
func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
func (r *Reader) newRawRangeKeyIter(vState *virtualState) (keyspan.FragmentIterator, error) {
if r.rangeKeyBH.Length == 0 {
return nil, nil
}
Expand All @@ -444,12 +444,29 @@ func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
return nil, err
}
i := rangeKeyFragmentBlockIterPool.Get().(*rangeKeyFragmentBlockIter)
if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
var globalSeqNum uint64
// Don't pass a global sequence number for shared ingested sstables. The
// virtual reader needs to know the materialized sequence numbers, and will
// do the appropriate sequence number substitution.
if vState == nil || !vState.isSharedIngested {
globalSeqNum = r.Properties.GlobalSeqNum
}
if err := i.blockIter.initHandle(r.Compare, h, globalSeqNum, false /* hideObsoletePoints */); err != nil {
return nil, err
}
return i, nil
}

// NewRawRangeKeyIter returns an internal iterator for the contents of the
// range-key block for the table. Returns nil if the table does not contain any
// range keys.
//
// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
// iterator. Add WithContext methods since the existing ones are public.
func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
return r.newRawRangeKeyIter(nil /* vState */)
}

type rangeKeyFragmentBlockIter struct {
fragmentBlockIter
}
Expand Down Expand Up @@ -1178,6 +1195,7 @@ func NewReader(f objstorage.Readable, o ReaderOptions, extraOpts ...ReaderOption

if r.Properties.ComparerName == "" || o.Comparer.Name == r.Properties.ComparerName {
r.Compare = o.Comparer.Compare
r.Equal = o.Comparer.Equal
r.FormatKey = o.Comparer.FormatKey
r.Split = o.Comparer.Split
}
Expand Down
2 changes: 1 addition & 1 deletion sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func TestVirtualReader(t *testing.T) {
vMeta.ValidateVirtual(meta.FileMetadata)

vMeta1 = vMeta.VirtualMeta()
v = MakeVirtualReader(r, vMeta1, false /* isForeign */)
v = MakeVirtualReader(r, vMeta1, false /* isSharedIngested */)
return formatVirtualReader(&v)

case "citer":
Expand Down
46 changes: 30 additions & 16 deletions sstable/reader_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangekey"
)

// VirtualReader wraps Reader. Its purpose is to restrict functionality of the
Expand All @@ -28,12 +29,12 @@ type VirtualReader struct {

// Lightweight virtual sstable state which can be passed to sstable iterators.
type virtualState struct {
lower InternalKey
upper InternalKey
fileNum base.FileNum
Compare Compare
isForeign bool
prefixChange *manifest.PrefixReplacement
lower InternalKey
upper InternalKey
fileNum base.FileNum
Compare Compare
isSharedIngested bool
prefixChange *manifest.PrefixReplacement
}

func ceilDiv(a, b uint64) uint64 {
Expand All @@ -42,20 +43,18 @@ func ceilDiv(a, b uint64) uint64 {

// MakeVirtualReader is used to contruct a reader which can read from virtual
// sstables.
func MakeVirtualReader(
reader *Reader, meta manifest.VirtualFileMeta, isForeign bool,
) VirtualReader {
func MakeVirtualReader(reader *Reader, meta manifest.VirtualFileMeta, isShared bool) VirtualReader {
if reader.fileNum != meta.FileBacking.DiskFileNum {
panic("pebble: invalid call to MakeVirtualReader")
}

vState := virtualState{
lower: meta.Smallest,
upper: meta.Largest,
fileNum: meta.FileNum,
Compare: reader.Compare,
isForeign: isForeign,
prefixChange: meta.PrefixReplacement,
lower: meta.Smallest,
upper: meta.Largest,
fileNum: meta.FileNum,
Compare: reader.Compare,
isSharedIngested: isShared && reader.Properties.GlobalSeqNum != 0,
prefixChange: meta.PrefixReplacement,
}
v := VirtualReader{
vState: vState,
Expand Down Expand Up @@ -167,7 +166,7 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) {

// NewRawRangeKeyIter wraps Reader.NewRawRangeKeyIter.
func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
iter, err := v.reader.NewRawRangeKeyIter()
iter, err := v.reader.newRawRangeKeyIter(&v.vState)
if err != nil {
return nil, err
}
Expand All @@ -177,6 +176,21 @@ func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
lower := &v.vState.lower
upper := &v.vState.upper

if v.vState.isSharedIngested {
// We need to coalesce range keys within each sstable, and then apply the
// global sequence number. For this, we use ForeignSSTTransformer.
transform := &rangekey.ForeignSSTTransformer{
Equal: v.reader.Equal,
SeqNum: v.reader.Properties.GlobalSeqNum,
}
transformIter := &keyspan.TransformerIter{
FragmentIterator: iter,
Transformer: transform,
Compare: v.reader.Compare,
}
iter = transformIter
}

if v.vState.prefixChange != nil {
lower = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(lower.UserKey), Trailer: lower.Trailer}
upper = &InternalKey{UserKey: v.vState.prefixChange.ReplaceArg(upper.UserKey), Trailer: upper.Trailer}
Expand Down
Loading

0 comments on commit ab534cb

Please sign in to comment.