Skip to content

Commit

Permalink
sstable: hide obsolete range keys for shared ingested sstables
Browse files Browse the repository at this point in the history
This change updates the virtual sstable range key iterator to
hide obsolete range keys for shared ingested files. It also
updates the sstable writer to set the obsolete key bit for
range key spans that are underneath existing range key spans
with a matching prefix and/or the one above is a del.

This change also cleans up some dangling references to the older
`isForeign` way of determining whether to hide obsolete keys,
in lieu of the newer isSharedIngested approach to address
boomerang shared files.

Fixes cockroachdb#3174.
  • Loading branch information
itsbilal committed Dec 28, 2023
1 parent 1cce3d0 commit 82dc4ef
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 50 deletions.
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ func runSSTablePropertiesCmd(t *testing.T, td *datadriven.TestData, d *DB) strin
var v sstable.VirtualReader
props := r.Properties.String()
if m != nil && m.Virtual {
v = sstable.MakeVirtualReader(r, m.VirtualMeta(), false /* isForeign */)
v = sstable.MakeVirtualReader(r, m.VirtualMeta(), false /* isShared */)
props = v.Properties.String()
}
if len(td.Input) == 0 {
Expand Down
6 changes: 3 additions & 3 deletions internal/rangekey/rangekey.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func Decode(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span,
case base.InternalKeyKindRangeKeyUnset:
for len(v) > 0 {
var suffix []byte
suffix, v, ok = decodeSuffix(v)
suffix, v, ok = DecodeSuffix(v)
if !ok {
return keyspan.Span{}, base.CorruptionErrorf("pebble: unable to decode range key unset suffix")
}
Expand Down Expand Up @@ -370,10 +370,10 @@ func EncodeUnsetValue(dst []byte, endKey []byte, suffixes [][]byte) int {
return n
}

// decodeSuffix decodes a single suffix from the beginning of data. If decoding
// DecodeSuffix decodes a single suffix from the beginning of data. If decoding
// suffixes from a RangeKeyUnset's value, the end key must have already been
// stripped from the RangeKeyUnset's value (see DecodeEndKey).
func decodeSuffix(data []byte) (suffix, rest []byte, ok bool) {
func DecodeSuffix(data []byte) (suffix, rest []byte, ok bool) {
return decodeVarstring(data)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/rangekey/rangekey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestUnsetSuffixes_RoundTrip(t *testing.T) {
// Decode.
var ss suffixes
for len(b) > 0 {
s, rest, ok := decodeSuffix(b)
s, rest, ok := DecodeSuffix(b)
require.True(t, ok)
ss = append(ss, s)
b = rest
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestUnsetValue_Roundtrip(t *testing.T) {
for len(rest) > 0 {
var ok bool
var suffix []byte
suffix, rest, ok = decodeSuffix(rest)
suffix, rest, ok = DecodeSuffix(rest)
require.True(t, ok)
suffixes = append(suffixes, suffix)
}
Expand Down
28 changes: 18 additions & 10 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (r *Reader) newCompactionIter(
err := i.init(
context.Background(),
r, v, nil /* lower */, nil /* upper */, nil,
false /* useFilter */, v != nil && v.isForeign, /* hideObsoletePoints */
false /* useFilter */, v != nil && v.isSharedIngested, /* hideObsoletePoints */
nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
)
if err != nil {
Expand All @@ -395,7 +395,7 @@ func (r *Reader) newCompactionIter(
i := singleLevelIterPool.Get().(*singleLevelIterator)
err := i.init(
context.Background(), r, v, nil /* lower */, nil, /* upper */
nil, false /* useFilter */, v != nil && v.isForeign, /* hideObsoletePoints */
nil, false /* useFilter */, v != nil && v.isSharedIngested, /* hideObsoletePoints */
nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool,
)
if err != nil {
Expand Down Expand Up @@ -423,19 +423,17 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) {
return nil, err
}
i := &fragmentBlockIter{elideSameSeqnum: true}
// It's okay for hideObsoletePoints to be false here, even for shared ingested
// sstables. This is because rangedels do not apply to points in the same
// sstable at the same sequence number anyway, so exposing obsolete rangedels
// is harmless.
if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
return nil, err
}
return i, nil
}

// NewRawRangeKeyIter returns an internal iterator for the contents of the
// range-key block for the table. Returns nil if the table does not contain any
// range keys.
//
// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
// iterator. Add WithContext methods since the existing ones are public.
func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
func (r *Reader) newRawRangeKeyIter(vState *virtualState) (keyspan.FragmentIterator, error) {
if r.rangeKeyBH.Length == 0 {
return nil, nil
}
Expand All @@ -444,12 +442,22 @@ func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
return nil, err
}
i := rangeKeyFragmentBlockIterPool.Get().(*rangeKeyFragmentBlockIter)
if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil {
if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, vState != nil && vState.isSharedIngested); err != nil {
return nil, err
}
return i, nil
}

// NewRawRangeKeyIter returns an internal iterator for the contents of the
// range-key block for the table. Returns nil if the table does not contain any
// range keys.
//
// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing
// iterator. Add WithContext methods since the existing ones are public.
func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
return r.newRawRangeKeyIter(nil /* vState */)
}

type rangeKeyFragmentBlockIter struct {
fragmentBlockIter
}
Expand Down
2 changes: 1 addition & 1 deletion sstable/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func TestVirtualReader(t *testing.T) {
vMeta.ValidateVirtual(meta.FileMetadata)

vMeta1 = vMeta.VirtualMeta()
v = MakeVirtualReader(r, vMeta1, false /* isForeign */)
v = MakeVirtualReader(r, vMeta1, false /* isSharedIngested */)
return formatVirtualReader(&v)

case "citer":
Expand Down
30 changes: 14 additions & 16 deletions sstable/reader_virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ type VirtualReader struct {

// Lightweight virtual sstable state which can be passed to sstable iterators.
type virtualState struct {
lower InternalKey
upper InternalKey
fileNum base.FileNum
Compare Compare
isForeign bool
prefixChange *manifest.PrefixReplacement
lower InternalKey
upper InternalKey
fileNum base.FileNum
Compare Compare
isSharedIngested bool
prefixChange *manifest.PrefixReplacement
}

func ceilDiv(a, b uint64) uint64 {
Expand All @@ -42,20 +42,18 @@ func ceilDiv(a, b uint64) uint64 {

// MakeVirtualReader is used to contruct a reader which can read from virtual
// sstables.
func MakeVirtualReader(
reader *Reader, meta manifest.VirtualFileMeta, isForeign bool,
) VirtualReader {
func MakeVirtualReader(reader *Reader, meta manifest.VirtualFileMeta, isShared bool) VirtualReader {
if reader.fileNum != meta.FileBacking.DiskFileNum {
panic("pebble: invalid call to MakeVirtualReader")
}

vState := virtualState{
lower: meta.Smallest,
upper: meta.Largest,
fileNum: meta.FileNum,
Compare: reader.Compare,
isForeign: isForeign,
prefixChange: meta.PrefixReplacement,
lower: meta.Smallest,
upper: meta.Largest,
fileNum: meta.FileNum,
Compare: reader.Compare,
isSharedIngested: isShared && reader.Properties.GlobalSeqNum != 0,
prefixChange: meta.PrefixReplacement,
}
v := VirtualReader{
vState: vState,
Expand Down Expand Up @@ -167,7 +165,7 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) {

// NewRawRangeKeyIter wraps Reader.NewRawRangeKeyIter.
func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) {
iter, err := v.reader.NewRawRangeKeyIter()
iter, err := v.reader.newRawRangeKeyIter(&v.vState)
if err != nil {
return nil, err
}
Expand Down
49 changes: 39 additions & 10 deletions sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ type Writer struct {
cache *cache.Cache
restartInterval int
checksumType ChecksumType
// disableKeyOrderChecks disables the checks that keys are added to an
// testingDisableKeyOrderChecks disables the checks that keys are added to an
// sstable in order. It is intended for internal use only in the construction
// of invalid sstables for testing. See tool/make_test_sstables.go.
disableKeyOrderChecks bool
testingDisableKeyOrderChecks bool
// With two level indexes, the index/filter of a SST file is partitioned into
// smaller blocks with an additional top-level index on them. When reading an
// index/filter, only the top-level index is loaded into memory. The two level
Expand Down Expand Up @@ -204,6 +204,7 @@ type Writer struct {
// Information (other than the byte slice) about the last point key, to
// avoid extracting it again.
lastPointKeyInfo pointKeyInfo
lastSpanDeleted bool

// For value blocks.
shortAttributeExtractor base.ShortAttributeExtractor
Expand Down Expand Up @@ -783,7 +784,7 @@ func (w *Writer) makeAddPointDecisionV2(key InternalKey) error {
if w.dataBlockBuf.dataBlock.nEntries == 0 {
return nil
}
if !w.disableKeyOrderChecks {
if !w.testingDisableKeyOrderChecks {
prevPointUserKey := w.dataBlockBuf.dataBlock.getCurUserKey()
cmpUser := w.compare(prevPointUserKey, key.UserKey)
if cmpUser > 0 || (cmpUser == 0 && prevTrailer <= key.Trailer) {
Expand Down Expand Up @@ -901,7 +902,7 @@ func (w *Writer) makeAddPointDecisionV3(
// version (those should be ok). We have to ensure setHasSamePrefix is
// correctly initialized here etc.

if !w.disableKeyOrderChecks &&
if !w.testingDisableKeyOrderChecks &&
(cmpUser > 0 || (cmpUser == 0 && prevPointKeyInfo.trailer <= key.Trailer)) {
return false, false, false, errors.Errorf(
"pebble: keys must be added in strictly increasing order: %s, %s",
Expand Down Expand Up @@ -1064,7 +1065,7 @@ func (w *Writer) prettyTombstone(k InternalKey, value []byte) fmt.Formatter {
}

func (w *Writer) addTombstone(key InternalKey, value []byte) error {
if !w.disableKeyOrderChecks && !w.rangeDelV1Format && w.rangeDelBlock.nEntries > 0 {
if !w.testingDisableKeyOrderChecks && !w.rangeDelV1Format && w.rangeDelBlock.nEntries > 0 {
// Check that tombstones are being added in fragmented order. If the two
// tombstones overlap, their start and end keys must be identical.
prevKey := w.rangeDelBlock.getCurKey()
Expand Down Expand Up @@ -1261,10 +1262,13 @@ func (w *Writer) encodeRangeKeySpan(span keyspan.Span) {
w.err = firstError(w.err, w.rangeKeyEncoder.Encode(&w.rangeKeySpan))
}

// NB: Just like AddRangeKey(), this can only be called with fragmented range
// keys.
func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
if !w.disableKeyOrderChecks && w.rangeKeyBlock.nEntries > 0 {
var isObsolete bool
if !w.testingDisableKeyOrderChecks && w.rangeKeyBlock.nEntries > 0 {
prevStartKey := w.rangeKeyBlock.getCurKey()
prevEndKey, _, ok := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.curValue)
prevEndKey, prevValue, ok := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.curValue)
if !ok {
// We panic here as we should have previously decoded and validated this
// key and value when it was first added to the range key block.
Expand All @@ -1273,7 +1277,7 @@ func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
}

curStartKey := key
curEndKey, _, ok := rangekey.DecodeEndKey(curStartKey.Kind(), value)
curEndKey, curValue, ok := rangekey.DecodeEndKey(curStartKey.Kind(), value)
if !ok {
w.err = errors.Errorf("pebble: invalid end key for span: %s",
curStartKey.Pretty(w.formatKey))
Expand All @@ -1297,6 +1301,26 @@ func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
curStartKey.Pretty(w.formatKey))
return w.err
}
// There are two cases in which the current internal key is obsolete.
// Either we've already written a RangeKeyDelete for this span (w.lastSpanDeleted)
// or the current key's prefix matches that of the previous key.
isObsoleteC2 := false
if prevStartKey.Kind() != base.InternalKeyKindRangeKeyDelete && key.Kind() != base.InternalKeyKindRangeKeyDelete {
prevSuffix, _, ok := rangekey.DecodeSuffix(prevValue)
if !ok {
w.err = errors.Errorf("pebble: unexpected range key value: %q",
prevValue)
return w.err
}
curSuffix, _, ok := rangekey.DecodeSuffix(curValue)
if !ok {
w.err = errors.Errorf("pebble: unexpected range key value: %q",
curValue)
return w.err
}
isObsoleteC2 = bytes.Equal(prevSuffix, curSuffix)
}
isObsolete = w.lastSpanDeleted || isObsoleteC2
} else if w.compare(prevEndKey, curStartKey.UserKey) > 0 {
// If the start user keys are NOT equal, the spans must be disjoint (i.e.
// no overlap).
Expand All @@ -1307,6 +1331,9 @@ func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
prevStartKey.Pretty(w.formatKey),
curStartKey.Pretty(w.formatKey))
return w.err
} else {
// The start key has changed. Reset lastSpanDeleted.
w.lastSpanDeleted = false
}
}

Expand Down Expand Up @@ -1347,7 +1374,9 @@ func (w *Writer) addRangeKey(key InternalKey, value []byte) error {
}

// Add the key to the block.
w.rangeKeyBlock.add(key, value)
w.rangeKeyBlock.addWithOptionalValuePrefix(
key, isObsolete, value, len(key.UserKey), false, 0, false)
w.lastSpanDeleted = w.lastSpanDeleted || key.Kind() == base.InternalKeyKindRangeKeyDelete
return nil
}

Expand Down Expand Up @@ -2362,7 +2391,7 @@ func internalGetProperties(w *Writer) *Properties {
func init() {
private.SSTableWriterDisableKeyOrderChecks = func(i interface{}) {
w := i.(*Writer)
w.disableKeyOrderChecks = true
w.testingDisableKeyOrderChecks = true
}
private.SSTableInternalProperties = internalGetProperties
}
Expand Down
14 changes: 7 additions & 7 deletions table_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,17 +201,17 @@ func (c *tableCacheContainer) estimateSize(
return size, nil
}

// createCommonReader creates a Reader for this file. isForeign, if true for
// createCommonReader creates a Reader for this file. isShared, if true for
// virtual sstables, is passed into the vSSTable reader so its iterators can
// collapse obsolete points accordingly.
func createCommonReader(
v *tableCacheValue, file *fileMetadata, isForeign bool,
v *tableCacheValue, file *fileMetadata, isShared bool,
) sstable.CommonReader {
// TODO(bananabrick): We suffer an allocation if file is a virtual sstable.
var cr sstable.CommonReader = v.reader
if file.Virtual {
virtualReader := sstable.MakeVirtualReader(
v.reader, file.VirtualMeta(), isForeign,
v.reader, file.VirtualMeta(), isShared,
)
cr = &virtualReader
}
Expand All @@ -232,7 +232,7 @@ func (c *tableCacheContainer) withCommonReader(
if err != nil {
return err
}
return fn(createCommonReader(v, meta, provider.IsSharedForeign(objMeta)))
return fn(createCommonReader(v, meta, objMeta.IsShared()))
}

func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error {
Expand Down Expand Up @@ -260,7 +260,7 @@ func (c *tableCacheContainer) withVirtualReader(
if err != nil {
return err
}
return fn(sstable.MakeVirtualReader(v.reader, meta, provider.IsSharedForeign(objMeta)))
return fn(sstable.MakeVirtualReader(v.reader, meta, objMeta.IsShared()))
}

func (c *tableCacheContainer) iterCount() int64 {
Expand Down Expand Up @@ -491,7 +491,7 @@ func (c *tableCacheShard) newIters(
}

// Note: This suffers an allocation for virtual sstables.
cr := createCommonReader(v, file, provider.IsSharedForeign(objMeta))
cr := createCommonReader(v, file, objMeta.IsShared())

// NB: range-del iterator does not maintain a reference to the table, nor
// does it need to read from it after creation.
Expand Down Expand Up @@ -627,7 +627,7 @@ func (c *tableCacheShard) newRangeKeyIter(
objMeta, err = provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum)
if err == nil {
virtualReader := sstable.MakeVirtualReader(
v.reader, file.VirtualMeta(), provider.IsSharedForeign(objMeta),
v.reader, file.VirtualMeta(), objMeta.IsShared(),
)
iter, err = virtualReader.NewRawRangeKeyIter()
}
Expand Down
Loading

0 comments on commit 82dc4ef

Please sign in to comment.