From 82dc4ef0420f814d7293b5c8a9819de90ece897a Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Wed, 27 Dec 2023 14:15:02 -0500 Subject: [PATCH] sstable: hide obsolete range keys for shared ingested sstables This change updates the virtual sstable range key iterator to hide obsolete range keys for shared ingested files. It also updates the sstable writer to set the obsolete key bit for range key spans that are underneath existing range key spans with a matching prefix and/or the one above is a del. This change also cleans up some dangling references to the older `isForeign` way of determining whether to hide obsolete keys, in lieu of the newer isSharedIngested approach to address boomerang shared files. Fixes #3174. --- data_test.go | 2 +- internal/rangekey/rangekey.go | 6 +-- internal/rangekey/rangekey_test.go | 4 +- sstable/reader.go | 28 +++++++----- sstable/reader_test.go | 2 +- sstable/reader_virtual.go | 30 ++++++------- sstable/writer.go | 49 ++++++++++++++++----- table_cache.go | 14 +++--- testdata/ingest_shared | 69 ++++++++++++++++++++++++++++++ 9 files changed, 154 insertions(+), 50 deletions(-) diff --git a/data_test.go b/data_test.go index 20973a5a31..a757c14ee9 100644 --- a/data_test.go +++ b/data_test.go @@ -1172,7 +1172,7 @@ func runSSTablePropertiesCmd(t *testing.T, td *datadriven.TestData, d *DB) strin var v sstable.VirtualReader props := r.Properties.String() if m != nil && m.Virtual { - v = sstable.MakeVirtualReader(r, m.VirtualMeta(), false /* isForeign */) + v = sstable.MakeVirtualReader(r, m.VirtualMeta(), false /* isShared */) props = v.Properties.String() } if len(td.Input) == 0 { diff --git a/internal/rangekey/rangekey.go b/internal/rangekey/rangekey.go index 2a99834a85..f76357f793 100644 --- a/internal/rangekey/rangekey.go +++ b/internal/rangekey/rangekey.go @@ -193,7 +193,7 @@ func Decode(ik base.InternalKey, v []byte, keysDst []keyspan.Key) (keyspan.Span, case base.InternalKeyKindRangeKeyUnset: for len(v) > 0 { var suffix []byte - suffix, v, ok = decodeSuffix(v) + suffix, v, ok = DecodeSuffix(v) if !ok { return keyspan.Span{}, base.CorruptionErrorf("pebble: unable to decode range key unset suffix") } @@ -370,10 +370,10 @@ func EncodeUnsetValue(dst []byte, endKey []byte, suffixes [][]byte) int { return n } -// decodeSuffix decodes a single suffix from the beginning of data. If decoding +// DecodeSuffix decodes a single suffix from the beginning of data. If decoding // suffixes from a RangeKeyUnset's value, the end key must have already been // stripped from the RangeKeyUnset's value (see DecodeEndKey). -func decodeSuffix(data []byte) (suffix, rest []byte, ok bool) { +func DecodeSuffix(data []byte) (suffix, rest []byte, ok bool) { return decodeVarstring(data) } diff --git a/internal/rangekey/rangekey_test.go b/internal/rangekey/rangekey_test.go index deec970db7..e67f4b8d16 100644 --- a/internal/rangekey/rangekey_test.go +++ b/internal/rangekey/rangekey_test.go @@ -137,7 +137,7 @@ func TestUnsetSuffixes_RoundTrip(t *testing.T) { // Decode. var ss suffixes for len(b) > 0 { - s, rest, ok := decodeSuffix(b) + s, rest, ok := DecodeSuffix(b) require.True(t, ok) ss = append(ss, s) b = rest @@ -192,7 +192,7 @@ func TestUnsetValue_Roundtrip(t *testing.T) { for len(rest) > 0 { var ok bool var suffix []byte - suffix, rest, ok = decodeSuffix(rest) + suffix, rest, ok = DecodeSuffix(rest) require.True(t, ok) suffixes = append(suffixes, suffix) } diff --git a/sstable/reader.go b/sstable/reader.go index 6af32d2eb9..01725a1b82 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -380,7 +380,7 @@ func (r *Reader) newCompactionIter( err := i.init( context.Background(), r, v, nil /* lower */, nil /* upper */, nil, - false /* useFilter */, v != nil && v.isForeign, /* hideObsoletePoints */ + false /* useFilter */, v != nil && v.isSharedIngested, /* hideObsoletePoints */ nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool, ) if err != nil { @@ -395,7 +395,7 @@ func (r *Reader) newCompactionIter( i := singleLevelIterPool.Get().(*singleLevelIterator) err := i.init( context.Background(), r, v, nil /* lower */, nil, /* upper */ - nil, false /* useFilter */, v != nil && v.isForeign, /* hideObsoletePoints */ + nil, false /* useFilter */, v != nil && v.isSharedIngested, /* hideObsoletePoints */ nil /* stats */, categoryAndQoS, statsCollector, rp, bufferPool, ) if err != nil { @@ -423,19 +423,17 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { return nil, err } i := &fragmentBlockIter{elideSameSeqnum: true} + // It's okay for hideObsoletePoints to be false here, even for shared ingested + // sstables. This is because rangedels do not apply to points in the same + // sstable at the same sequence number anyway, so exposing obsolete rangedels + // is harmless. if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil { return nil, err } return i, nil } -// NewRawRangeKeyIter returns an internal iterator for the contents of the -// range-key block for the table. Returns nil if the table does not contain any -// range keys. -// -// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing -// iterator. Add WithContext methods since the existing ones are public. -func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) { +func (r *Reader) newRawRangeKeyIter(vState *virtualState) (keyspan.FragmentIterator, error) { if r.rangeKeyBH.Length == 0 { return nil, nil } @@ -444,12 +442,22 @@ func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) { return nil, err } i := rangeKeyFragmentBlockIterPool.Get().(*rangeKeyFragmentBlockIter) - if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil { + if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, vState != nil && vState.isSharedIngested); err != nil { return nil, err } return i, nil } +// NewRawRangeKeyIter returns an internal iterator for the contents of the +// range-key block for the table. Returns nil if the table does not contain any +// range keys. +// +// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing +// iterator. Add WithContext methods since the existing ones are public. +func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) { + return r.newRawRangeKeyIter(nil /* vState */) +} + type rangeKeyFragmentBlockIter struct { fragmentBlockIter } diff --git a/sstable/reader_test.go b/sstable/reader_test.go index d2fb693c1a..b714b16779 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -345,7 +345,7 @@ func TestVirtualReader(t *testing.T) { vMeta.ValidateVirtual(meta.FileMetadata) vMeta1 = vMeta.VirtualMeta() - v = MakeVirtualReader(r, vMeta1, false /* isForeign */) + v = MakeVirtualReader(r, vMeta1, false /* isSharedIngested */) return formatVirtualReader(&v) case "citer": diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index fb74cdf23e..79be5f39ad 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -28,12 +28,12 @@ type VirtualReader struct { // Lightweight virtual sstable state which can be passed to sstable iterators. type virtualState struct { - lower InternalKey - upper InternalKey - fileNum base.FileNum - Compare Compare - isForeign bool - prefixChange *manifest.PrefixReplacement + lower InternalKey + upper InternalKey + fileNum base.FileNum + Compare Compare + isSharedIngested bool + prefixChange *manifest.PrefixReplacement } func ceilDiv(a, b uint64) uint64 { @@ -42,20 +42,18 @@ func ceilDiv(a, b uint64) uint64 { // MakeVirtualReader is used to contruct a reader which can read from virtual // sstables. -func MakeVirtualReader( - reader *Reader, meta manifest.VirtualFileMeta, isForeign bool, -) VirtualReader { +func MakeVirtualReader(reader *Reader, meta manifest.VirtualFileMeta, isShared bool) VirtualReader { if reader.fileNum != meta.FileBacking.DiskFileNum { panic("pebble: invalid call to MakeVirtualReader") } vState := virtualState{ - lower: meta.Smallest, - upper: meta.Largest, - fileNum: meta.FileNum, - Compare: reader.Compare, - isForeign: isForeign, - prefixChange: meta.PrefixReplacement, + lower: meta.Smallest, + upper: meta.Largest, + fileNum: meta.FileNum, + Compare: reader.Compare, + isSharedIngested: isShared && reader.Properties.GlobalSeqNum != 0, + prefixChange: meta.PrefixReplacement, } v := VirtualReader{ vState: vState, @@ -167,7 +165,7 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { // NewRawRangeKeyIter wraps Reader.NewRawRangeKeyIter. func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) { - iter, err := v.reader.NewRawRangeKeyIter() + iter, err := v.reader.newRawRangeKeyIter(&v.vState) if err != nil { return nil, err } diff --git a/sstable/writer.go b/sstable/writer.go index 883ee7cecd..3becb39009 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -137,10 +137,10 @@ type Writer struct { cache *cache.Cache restartInterval int checksumType ChecksumType - // disableKeyOrderChecks disables the checks that keys are added to an + // testingDisableKeyOrderChecks disables the checks that keys are added to an // sstable in order. It is intended for internal use only in the construction // of invalid sstables for testing. See tool/make_test_sstables.go. - disableKeyOrderChecks bool + testingDisableKeyOrderChecks bool // With two level indexes, the index/filter of a SST file is partitioned into // smaller blocks with an additional top-level index on them. When reading an // index/filter, only the top-level index is loaded into memory. The two level @@ -204,6 +204,7 @@ type Writer struct { // Information (other than the byte slice) about the last point key, to // avoid extracting it again. lastPointKeyInfo pointKeyInfo + lastSpanDeleted bool // For value blocks. shortAttributeExtractor base.ShortAttributeExtractor @@ -783,7 +784,7 @@ func (w *Writer) makeAddPointDecisionV2(key InternalKey) error { if w.dataBlockBuf.dataBlock.nEntries == 0 { return nil } - if !w.disableKeyOrderChecks { + if !w.testingDisableKeyOrderChecks { prevPointUserKey := w.dataBlockBuf.dataBlock.getCurUserKey() cmpUser := w.compare(prevPointUserKey, key.UserKey) if cmpUser > 0 || (cmpUser == 0 && prevTrailer <= key.Trailer) { @@ -901,7 +902,7 @@ func (w *Writer) makeAddPointDecisionV3( // version (those should be ok). We have to ensure setHasSamePrefix is // correctly initialized here etc. - if !w.disableKeyOrderChecks && + if !w.testingDisableKeyOrderChecks && (cmpUser > 0 || (cmpUser == 0 && prevPointKeyInfo.trailer <= key.Trailer)) { return false, false, false, errors.Errorf( "pebble: keys must be added in strictly increasing order: %s, %s", @@ -1064,7 +1065,7 @@ func (w *Writer) prettyTombstone(k InternalKey, value []byte) fmt.Formatter { } func (w *Writer) addTombstone(key InternalKey, value []byte) error { - if !w.disableKeyOrderChecks && !w.rangeDelV1Format && w.rangeDelBlock.nEntries > 0 { + if !w.testingDisableKeyOrderChecks && !w.rangeDelV1Format && w.rangeDelBlock.nEntries > 0 { // Check that tombstones are being added in fragmented order. If the two // tombstones overlap, their start and end keys must be identical. prevKey := w.rangeDelBlock.getCurKey() @@ -1261,10 +1262,13 @@ func (w *Writer) encodeRangeKeySpan(span keyspan.Span) { w.err = firstError(w.err, w.rangeKeyEncoder.Encode(&w.rangeKeySpan)) } +// NB: Just like AddRangeKey(), this can only be called with fragmented range +// keys. func (w *Writer) addRangeKey(key InternalKey, value []byte) error { - if !w.disableKeyOrderChecks && w.rangeKeyBlock.nEntries > 0 { + var isObsolete bool + if !w.testingDisableKeyOrderChecks && w.rangeKeyBlock.nEntries > 0 { prevStartKey := w.rangeKeyBlock.getCurKey() - prevEndKey, _, ok := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.curValue) + prevEndKey, prevValue, ok := rangekey.DecodeEndKey(prevStartKey.Kind(), w.rangeKeyBlock.curValue) if !ok { // We panic here as we should have previously decoded and validated this // key and value when it was first added to the range key block. @@ -1273,7 +1277,7 @@ func (w *Writer) addRangeKey(key InternalKey, value []byte) error { } curStartKey := key - curEndKey, _, ok := rangekey.DecodeEndKey(curStartKey.Kind(), value) + curEndKey, curValue, ok := rangekey.DecodeEndKey(curStartKey.Kind(), value) if !ok { w.err = errors.Errorf("pebble: invalid end key for span: %s", curStartKey.Pretty(w.formatKey)) @@ -1297,6 +1301,26 @@ func (w *Writer) addRangeKey(key InternalKey, value []byte) error { curStartKey.Pretty(w.formatKey)) return w.err } + // There are two cases in which the current internal key is obsolete. + // Either we've already written a RangeKeyDelete for this span (w.lastSpanDeleted) + // or the current key's prefix matches that of the previous key. + isObsoleteC2 := false + if prevStartKey.Kind() != base.InternalKeyKindRangeKeyDelete && key.Kind() != base.InternalKeyKindRangeKeyDelete { + prevSuffix, _, ok := rangekey.DecodeSuffix(prevValue) + if !ok { + w.err = errors.Errorf("pebble: unexpected range key value: %q", + prevValue) + return w.err + } + curSuffix, _, ok := rangekey.DecodeSuffix(curValue) + if !ok { + w.err = errors.Errorf("pebble: unexpected range key value: %q", + curValue) + return w.err + } + isObsoleteC2 = bytes.Equal(prevSuffix, curSuffix) + } + isObsolete = w.lastSpanDeleted || isObsoleteC2 } else if w.compare(prevEndKey, curStartKey.UserKey) > 0 { // If the start user keys are NOT equal, the spans must be disjoint (i.e. // no overlap). @@ -1307,6 +1331,9 @@ func (w *Writer) addRangeKey(key InternalKey, value []byte) error { prevStartKey.Pretty(w.formatKey), curStartKey.Pretty(w.formatKey)) return w.err + } else { + // The start key has changed. Reset lastSpanDeleted. + w.lastSpanDeleted = false } } @@ -1347,7 +1374,9 @@ func (w *Writer) addRangeKey(key InternalKey, value []byte) error { } // Add the key to the block. - w.rangeKeyBlock.add(key, value) + w.rangeKeyBlock.addWithOptionalValuePrefix( + key, isObsolete, value, len(key.UserKey), false, 0, false) + w.lastSpanDeleted = w.lastSpanDeleted || key.Kind() == base.InternalKeyKindRangeKeyDelete return nil } @@ -2362,7 +2391,7 @@ func internalGetProperties(w *Writer) *Properties { func init() { private.SSTableWriterDisableKeyOrderChecks = func(i interface{}) { w := i.(*Writer) - w.disableKeyOrderChecks = true + w.testingDisableKeyOrderChecks = true } private.SSTableInternalProperties = internalGetProperties } diff --git a/table_cache.go b/table_cache.go index 75cf9b2919..b190942485 100644 --- a/table_cache.go +++ b/table_cache.go @@ -201,17 +201,17 @@ func (c *tableCacheContainer) estimateSize( return size, nil } -// createCommonReader creates a Reader for this file. isForeign, if true for +// createCommonReader creates a Reader for this file. isShared, if true for // virtual sstables, is passed into the vSSTable reader so its iterators can // collapse obsolete points accordingly. func createCommonReader( - v *tableCacheValue, file *fileMetadata, isForeign bool, + v *tableCacheValue, file *fileMetadata, isShared bool, ) sstable.CommonReader { // TODO(bananabrick): We suffer an allocation if file is a virtual sstable. var cr sstable.CommonReader = v.reader if file.Virtual { virtualReader := sstable.MakeVirtualReader( - v.reader, file.VirtualMeta(), isForeign, + v.reader, file.VirtualMeta(), isShared, ) cr = &virtualReader } @@ -232,7 +232,7 @@ func (c *tableCacheContainer) withCommonReader( if err != nil { return err } - return fn(createCommonReader(v, meta, provider.IsSharedForeign(objMeta))) + return fn(createCommonReader(v, meta, objMeta.IsShared())) } func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error { @@ -260,7 +260,7 @@ func (c *tableCacheContainer) withVirtualReader( if err != nil { return err } - return fn(sstable.MakeVirtualReader(v.reader, meta, provider.IsSharedForeign(objMeta))) + return fn(sstable.MakeVirtualReader(v.reader, meta, objMeta.IsShared())) } func (c *tableCacheContainer) iterCount() int64 { @@ -491,7 +491,7 @@ func (c *tableCacheShard) newIters( } // Note: This suffers an allocation for virtual sstables. - cr := createCommonReader(v, file, provider.IsSharedForeign(objMeta)) + cr := createCommonReader(v, file, objMeta.IsShared()) // NB: range-del iterator does not maintain a reference to the table, nor // does it need to read from it after creation. @@ -627,7 +627,7 @@ func (c *tableCacheShard) newRangeKeyIter( objMeta, err = provider.Lookup(fileTypeTable, file.FileBacking.DiskFileNum) if err == nil { virtualReader := sstable.MakeVirtualReader( - v.reader, file.VirtualMeta(), provider.IsSharedForeign(objMeta), + v.reader, file.VirtualMeta(), objMeta.IsShared(), ) iter, err = virtualReader.NewRawRangeKeyIter() } diff --git a/testdata/ingest_shared b/testdata/ingest_shared index ca7c7dd9e7..e2ad640fdf 100644 --- a/testdata/ingest_shared +++ b/testdata/ingest_shared @@ -1123,3 +1123,72 @@ next b: (bar, .) c: (baz, .) . + +# Regression test for #3174. Obsolete range keys in shared ingested SSTs should +# not be surfaced. + +reset +---- + +switch 1 +---- +ok + +batch +range-key-set c h @5 foo +---- + +file-only-snapshot s9 + a z +---- +ok + +batch +range-key-del d f +---- + +iter snapshot=s9 +first +next +next +---- +c: (., [c-h) @5=foo UPDATED) +. +. + +compact a-z +---- +ok + +iter +first +next +next +---- +c: (., [c-d) @5=foo UPDATED) +f: (., [f-h) @5=foo UPDATED) +. + +lsm +---- +6: + 000005:[c#10,RANGEKEYSET-h#inf,RANGEKEYSET] + +replicate 1 2 a f +---- +replicated 1 shared SSTs + +switch 2 +---- +ok + +iter +first +next +next +next +---- +c: (., [c-d) @5=foo UPDATED) +. +. +.