From e05894178560698423538c054fe7249495de488f Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Tue, 22 Feb 2022 18:01:24 -0500 Subject: [PATCH] db: add external sstable merging iterator Add a pebble.NewExternalIter function that may be used to construct a *pebble.Iterator that reads from a provided slice of sstables rather than committed database state. Input sstables are required to contain all zero-sequence number keys. Shadowing of keys is resolved by treating the files as ordered in reverse chronological order. This iterator is intended to replace the storage package's multiIterator. --- data_test.go | 36 +++++- external_iterator.go | 171 +++++++++++++++++++++++++ external_iterator_test.go | 66 ++++++++++ internal/rangekey/defragment.go | 5 + internal/rangekey/interleaving_iter.go | 4 +- internal/rangekey/iter.go | 13 +- iterator.go | 5 + sstable/block.go | 160 ++++++++++++++++++----- sstable/block_test.go | 12 +- sstable/reader.go | 37 +++--- testdata/external_iterator | 112 ++++++++++++++++ 11 files changed, 563 insertions(+), 58 deletions(-) create mode 100644 external_iterator.go create mode 100644 external_iterator_test.go create mode 100644 testdata/external_iterator diff --git a/data_test.go b/data_test.go index 99778e65b7..eeb6f2c4d0 100644 --- a/data_test.go +++ b/data_test.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/datadriven" + "github.com/cockroachdb/pebble/internal/rangekey" "github.com/cockroachdb/pebble/internal/testkeys" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" @@ -413,11 +414,13 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { } path := td.CmdArgs[0].String() + writeOpts := d.opts.MakeWriterOptions(0 /* level */, d.opts.FormatMajorVersion.MaxTableFormat()) + f, err := fs.Create(path) if err != nil { return err } - w := sstable.NewWriter(f, sstable.WriterOptions{}) + w := sstable.NewWriter(f, writeOpts) iters := []internalIterator{ b.newInternalIter(nil), b.newRangeDelIter(nil), @@ -437,6 +440,37 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { return err } } + + if rki := b.newRangeKeyIter(nil); rki != nil { + for key, _ := rki.First(); key != nil; key, _ = rki.Next() { + s := rki.Current() + s.Start.SetSeqNum(0) + + var err error + switch s.Start.Kind() { + case base.InternalKeyKindRangeKeySet: + suffixValue, rest, ok := rangekey.DecodeSuffixValue(s.Value) + if !ok || len(rest) > 0 { + panic("expected single unset single suffix") + } + err = w.RangeKeySet(s.Start.UserKey, s.End, suffixValue.Suffix, suffixValue.Value) + case base.InternalKeyKindRangeKeyUnset: + suffix, rest, ok := rangekey.DecodeSuffix(s.Value) + if !ok || len(rest) > 0 { + panic("expected single unset single suffix") + } + err = w.RangeKeyUnset(s.Start.UserKey, s.End, suffix) + case base.InternalKeyKindRangeKeyDelete: + err = w.RangeKeyDelete(s.Start.UserKey, s.End) + default: + panic("not a range key") + } + if err != nil { + return err + } + } + } + return w.Close() } diff --git a/external_iterator.go b/external_iterator.go new file mode 100644 index 0000000000..e302e0c1e1 --- /dev/null +++ b/external_iterator.go @@ -0,0 +1,171 @@ +// Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package pebble + +import ( + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/keyspan" + "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/internal/rangekey" + "github.com/cockroachdb/pebble/sstable" +) + +// NewExternalIter takes an input set of sstable files which may overlap +// arbitrarily and returns an Iterator over the merged contents of the sstables. +// Input sstables may contain point keys, range keys, range deletions, etc. The +// input files slice must be sorted in reverse chronological ordering. A key in +// a file at a lower index will shadow a key with an identical user key +// contained within a file at a higher index. +// +// Input sstables must only contain keys with the zero sequence number. +func NewExternalIter( + o *Options, + iterOpts *IterOptions, + files []sstable.ReadableFile, + extraReaderOpts ...sstable.ReaderOption, +) (it *Iterator, err error) { + var readers []*sstable.Reader + + // Ensure we close all the opened readers if we error out. + closeReaders := func() { + for i := range readers { + _ = readers[i].Close() + } + } + defer func() { + if err != nil { + closeReaders() + } + }() + readers, err = openExternalTables(o, files, o.MakeReaderOptions(), extraReaderOpts...) + if err != nil { + return nil, err + } + + buf := iterAllocPool.Get().(*iterAlloc) + dbi := &buf.dbi + *dbi = Iterator{ + alloc: buf, + cmp: o.Comparer.Compare, + equal: o.equal(), + iter: &buf.merging, + merge: o.Merger.Merge, + split: o.Comparer.Split, + readState: nil, + keyBuf: buf.keyBuf, + prefixOrFullSeekKey: buf.prefixOrFullSeekKey, + batch: nil, + newIters: func(f *manifest.FileMetadata, opts *IterOptions, bytesIterated *uint64) (internalIterator, keyspan.FragmentIterator, error) { + // NB: External iterators are currently constructed without any + // `levelIters`. newIters should never be called. When we support + // organizing multiple non-overlapping files into a single level + // (see TODO below), we'll need to adjust this tableNewIters + // implementation to open iterators by looking up f in a map + // of readers indexed by *fileMetadata. + panic("unreachable") + }, + seqNum: base.InternalKeySeqNumMax, + } + if iterOpts != nil { + dbi.opts = *iterOpts + } + dbi.opts.logger = o.Logger + + // TODO(jackson): In some instances we could generate fewer levels by using + // L0Sublevels code to organize nonoverlapping files into the same level. + // This would allow us to use levelIters and keep a smaller set of data and + // files in-memory. However, it would also require us to identify the bounds + // of all the files upfront. + + // Ensure we close all iters if error out early. + mlevels := buf.mlevels[:0] + var rangeKeyIters []keyspan.FragmentIterator + defer func() { + if err != nil { + for i := range rangeKeyIters { + _ = rangeKeyIters[i].Close() + } + for i := range mlevels { + if mlevels[i].iter != nil { + _ = mlevels[i].iter.Close() + } + if mlevels[i].rangeDelIter != nil { + _ = mlevels[i].rangeDelIter.Close() + } + } + } + }() + if iterOpts.pointKeys() { + if len(files) > cap(mlevels) { + mlevels = make([]mergingIterLevel, 0, len(files)) + } + for _, r := range readers { + pointIter, err := r.NewIter(dbi.opts.LowerBound, dbi.opts.UpperBound) + if err != nil { + return nil, err + } + rangeDelIter, err := r.NewRawRangeDelIter() + if err != nil { + _ = pointIter.Close() + return nil, err + } + mlevels = append(mlevels, mergingIterLevel{ + iter: pointIter, + rangeDelIter: rangeDelIter, + }) + } + } + buf.merging.init(&dbi.opts, dbi.cmp, dbi.split, mlevels...) + buf.merging.snapshot = base.InternalKeySeqNumMax + buf.merging.elideRangeTombstones = true + + if dbi.opts.rangeKeys() { + for _, r := range readers { + rki, err := r.NewRawRangeKeyIter() + if err != nil { + return nil, err + } + if rki != nil { + rangeKeyIters = append(rangeKeyIters, rki) + } + } + + // TODO(jackson): Pool range-key iterator objects. + dbi.rangeKey = &iteratorRangeKeyState{} + fragmentedIter := &rangekey.Iter{} + fragmentedIter.Init(o.Comparer.Compare, o.Comparer.FormatKey, base.InternalKeySeqNumMax, rangeKeyIters...) + iter := &rangekey.DefragmentingIter{} + iter.Init(o.Comparer.Compare, fragmentedIter, rangekey.DefragmentLogical) + dbi.rangeKey.rangeKeyIter = iter + + dbi.rangeKey.iter.Init(dbi.cmp, dbi.split, &buf.merging, iter, dbi.opts.RangeKeyMasking.Suffix) + dbi.iter = &dbi.rangeKey.iter + dbi.iter.SetBounds(dbi.opts.LowerBound, dbi.opts.UpperBound) + } + + // Close all the opened sstable.Readers when the Iterator is closed. + dbi.closeHook = closeReaders + return dbi, nil +} + +func openExternalTables( + o *Options, + files []sstable.ReadableFile, + readerOpts sstable.ReaderOptions, + extraReaderOpts ...sstable.ReaderOption, +) (readers []*sstable.Reader, err error) { + readers = make([]*sstable.Reader, 0, len(files)) + for i := range files { + r, err := sstable.NewReader(files[i], readerOpts, extraReaderOpts...) + if err != nil { + return readers, err + } + // Use the index of the file in files as the sequence number for all of + // its keys. + r.Properties.GlobalSeqNum = uint64(len(files) - i) + readers = append(readers, r) + } + return readers, err +} diff --git a/external_iterator_test.go b/external_iterator_test.go new file mode 100644 index 0000000000..134a4de4d8 --- /dev/null +++ b/external_iterator_test.go @@ -0,0 +1,66 @@ +// Copyright 2022 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package pebble + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/pebble/internal/datadriven" + "github.com/cockroachdb/pebble/internal/testkeys" + "github.com/cockroachdb/pebble/sstable" + "github.com/cockroachdb/pebble/vfs" + "github.com/stretchr/testify/require" +) + +func TestExternalIterator(t *testing.T) { + mem := vfs.NewMem() + o := &Options{ + FS: mem, + Comparer: testkeys.Comparer, + FormatMajorVersion: FormatRangeKeys, + } + o.EnsureDefaults() + d, err := Open("", o) + require.NoError(t, err) + defer func() { require.NoError(t, d.Close()) }() + + datadriven.RunTest(t, "testdata/external_iterator", func(td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + mem = vfs.NewMem() + return "" + case "build": + if err := runBuildCmd(td, d, mem); err != nil { + return err.Error() + } + return "" + case "iter": + opts := IterOptions{KeyTypes: IterKeyTypePointsAndRanges} + var files []sstable.ReadableFile + for _, arg := range td.CmdArgs { + switch arg.Key { + case "mask-suffix": + opts.RangeKeyMasking.Suffix = []byte(arg.Vals[0]) + case "lower": + opts.LowerBound = []byte(arg.Vals[0]) + case "upper": + opts.UpperBound = []byte(arg.Vals[0]) + case "files": + for _, v := range arg.Vals { + f, err := mem.Open(v) + require.NoError(t, err) + files = append(files, f) + } + } + } + it, err := NewExternalIter(o, &opts, files) + require.NoError(t, err) + return runIterCmd(td, it, true /* close iter */) + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} diff --git a/internal/rangekey/defragment.go b/internal/rangekey/defragment.go index 51b96fd9f4..ef7a9ce247 100644 --- a/internal/rangekey/defragment.go +++ b/internal/rangekey/defragment.go @@ -168,6 +168,11 @@ func (i *DefragmentingIter) Error() error { return i.iter.Error() } +// Close closes the underlying iterators. +func (i *DefragmentingIter) Close() error { + return i.iter.Close() +} + // Current returns the span at the iterator's current position. func (i *DefragmentingIter) Current() *CoalescedSpan { return &i.curr diff --git a/internal/rangekey/interleaving_iter.go b/internal/rangekey/interleaving_iter.go index 6bfd4b02b7..8fbf34868c 100644 --- a/internal/rangekey/interleaving_iter.go +++ b/internal/rangekey/interleaving_iter.go @@ -734,7 +734,9 @@ func (i *InterleavingIter) Error() error { // Close implements (base.InternalIterator).Close. func (i *InterleavingIter) Close() error { - return i.pointIter.Close() + perr := i.pointIter.Close() + rerr := i.rangeKeyIter.Close() + return firstError(perr, rerr) } // String implements (base.InternalIterator).String. diff --git a/internal/rangekey/iter.go b/internal/rangekey/iter.go index 5ec6d14f6c..0b4de60194 100644 --- a/internal/rangekey/iter.go +++ b/internal/rangekey/iter.go @@ -22,6 +22,7 @@ type Iterator interface { Prev() *CoalescedSpan Current() *CoalescedSpan Clone() Iterator + Close() error } // TODO(jackson): Consider modifying the interface to support returning 'empty' @@ -69,7 +70,12 @@ type Iter struct { var _ Iterator = (*Iter)(nil) // Init initializes an iterator over a set of fragmented, coalesced spans. -func (i *Iter) Init(cmp base.Compare, formatKey base.FormatKey, visibleSeqNum uint64, iters ...keyspan.FragmentIterator) { +func (i *Iter) Init( + cmp base.Compare, + formatKey base.FormatKey, + visibleSeqNum uint64, + iters ...keyspan.FragmentIterator, +) { *i = Iter{} i.miter.Init(cmp, iters...) i.coalescer.Init(cmp, formatKey, visibleSeqNum, func(span CoalescedSpan) { @@ -96,6 +102,11 @@ func (i *Iter) Error() error { return i.err } +// Close closes all underlying iterators. +func (i *Iter) Close() error { + return i.miter.Close() +} + func (i *Iter) coalesceForward() *CoalescedSpan { i.dir = +1 i.valid = false diff --git a/iterator.go b/iterator.go index 280d9237b9..168ee20efa 100644 --- a/iterator.go +++ b/iterator.go @@ -163,6 +163,7 @@ type Iterator struct { prefixOrFullSeekKey []byte readSampling readSampling stats IteratorStats + closeHook func() // Following fields are only used in Clone. // Non-nil if this Iterator includes a Batch. @@ -1509,6 +1510,10 @@ func (i *Iterator) Close() error { i.readState = nil } + if i.closeHook != nil { + i.closeHook() + } + // Close the closer for the current value if one was open. if i.valueCloser != nil { err = firstError(err, i.valueCloser.Close()) diff --git a/sstable/block.go b/sstable/block.go index 88adb377c0..a3697eae9c 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -6,13 +6,12 @@ package sstable import ( "encoding/binary" - "fmt" "unsafe" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/cache" - "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" + "github.com/cockroachdb/pebble/internal/rangekey" ) func uvarintLen(v uint32) int { @@ -616,7 +615,7 @@ func (i *blockIter) SeekGE(key []byte, trySeekUsingNext bool) (*InternalKey, []b i.decodeInternalKey(i.key) // Iterate from that restart point to somewhere >= the key sought. - for ; i.Valid(); i.Next() { + for ; i.valid(); i.Next() { if base.InternalCompare(i.cmp, i.ikey, ikey) >= 0 { return &i.ikey, i.val } @@ -765,7 +764,7 @@ func (i *blockIter) SeekLT(key []byte) (*InternalKey, []byte) { i.cacheEntry() } - if !i.Valid() { + if !i.valid() { return nil, nil } return &i.ikey, i.val @@ -775,7 +774,7 @@ func (i *blockIter) SeekLT(key []byte) (*InternalKey, []byte) { // package. func (i *blockIter) First() (*InternalKey, []byte) { i.offset = 0 - if !i.Valid() { + if !i.valid() { return nil, nil } i.clearCache() @@ -788,7 +787,7 @@ func (i *blockIter) First() (*InternalKey, []byte) { func (i *blockIter) Last() (*InternalKey, []byte) { // Seek forward from the last restart point. i.offset = int32(binary.LittleEndian.Uint32(i.data[i.restarts+4*(i.numRestarts-1):])) - if !i.Valid() { + if !i.valid() { return nil, nil } @@ -824,7 +823,7 @@ func (i *blockIter) Next() (*InternalKey, []byte) { } i.offset = i.nextOffset - if !i.Valid() { + if !i.valid() { return nil, nil } i.readEntry() @@ -944,47 +943,146 @@ func (i *blockIter) SetBounds(lower, upper []byte) { panic("pebble: SetBounds unimplemented") } -// Implement additional methods required to satisfy the keyspan.FragmentIterator -// interface. This implementation is intended to be used with range deletion -// tombstones only, because range deletions have no associated value and store -// the end key bound directly within the internal value. -// -// This blockIter implementation of keyspan.FragmentIterator should not be used -// with range keys. +func (i *blockIter) valid() bool { + return i.offset >= 0 && i.offset < i.restarts +} -// TODO(jackson): Is there a more robust way to prevent accidental misuse of -// this FragmentIterator implementation while avoiding extra indirection in the -// case of range deletions? +// fragmentBlockIter wraps a blockIter, implementing the +// keyspan.FragmentIterator interface. +type fragmentBlockIter struct { + blockIter blockIter + span keyspan.Span + err error +} -var _ keyspan.FragmentIterator = (*blockIter)(nil) +func (i *fragmentBlockIter) decodeSpan( + k *InternalKey, internalValue []byte, +) (*InternalKey, []byte) { + if k == nil { + i.span = keyspan.Span{} + i.err = nil + return nil, nil + } + + // decode the end key from a fragment's raw internal value. RANGEDELs store + // the end key directly as the value, whereas range keys require decoding to + // separate the end key from the rest of the range key state. + var endKey, decodedValue []byte + switch kind := k.Kind(); kind { + case base.InternalKeyKindRangeDelete: + endKey = internalValue + case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete: + var ok bool + endKey, decodedValue, ok = rangekey.DecodeEndKey(kind, internalValue) + if !ok { + i.span = keyspan.Span{} + i.err = base.CorruptionErrorf("pebble: corrupt keyspan fragment of kind %d", kind) + return nil, nil + } + default: + i.span = keyspan.Span{} + i.err = base.CorruptionErrorf("pebble: corrupt keyspan fragment of kind %d", kind) + return nil, nil + } + i.err = nil + i.span = keyspan.Span{Start: *k, End: endKey, Value: decodedValue} + + // We have to return the internalValue, not the decodedValue, + // because there are still range-deletion usages that depend on it. + // TODO(jackson): Remove remaining dependencies on the internal + // value being propagated via positioning methods and refactor the + // FragmentIterator to be independent of base.InternalIterator. + return k, internalValue +} + +// Error implements (base.InternalIterator).Error, as documented in the +// internal/base package. +func (i *fragmentBlockIter) Error() error { + return i.err +} // Valid implements (keyspan.FragmentIterator).Valid, as documented in the // internal/keyspan package. -func (i *blockIter) Valid() bool { - return i.offset >= 0 && i.offset < i.restarts +func (i *fragmentBlockIter) Valid() bool { + return i.err == nil && i.blockIter.valid() } // End implements (keyspan.FragmentIterator).End, as documented in the // internal/keyspan package. -func (i *blockIter) End() []byte { - return i.val +func (i *fragmentBlockIter) End() []byte { + return i.span.End } // Current implements (keyspan.FragmentIterator).Current, as documented in the // internal/keyspan package. -func (i *blockIter) Current() keyspan.Span { - if !i.Valid() { - return keyspan.Span{} - } - if invariants.Enabled && i.ikey.Kind() != InternalKeyKindRangeDelete { - panic(fmt.Sprintf("pebble: blockIter's fragment iterator implementation used on non-RANGEDELs (kind %d)", i.ikey.Kind())) - } - return keyspan.Span{Start: i.ikey, End: i.val, Value: nil} +func (i *fragmentBlockIter) Current() keyspan.Span { + return i.span } // Clone implements (keyspan.FragmentIterator).Clone, as documented in the // internal/keyspan package. -func (i *blockIter) Clone() keyspan.FragmentIterator { +func (i *fragmentBlockIter) Clone() keyspan.FragmentIterator { // TODO(jackson): Remove keyspan.FragmentIterator.Clone. panic("unimplemented") } + +// Close implements (base.InternalIterator).Close, as documented in the +// internal/base package. +func (i *fragmentBlockIter) Close() error { + return i.blockIter.Close() +} + +// First implements (base.InternalIterator).First, as documented in the +// internal/base package. +func (i *fragmentBlockIter) First() (*InternalKey, []byte) { + return i.decodeSpan(i.blockIter.First()) +} + +// Last implements (base.InternalIterator).Last, as documented in the +// internal/base package. +func (i *fragmentBlockIter) Last() (*InternalKey, []byte) { + return i.decodeSpan(i.blockIter.Last()) +} + +// Next implements (base.InternalIterator).Next, as documented in the +// internal/base package. +func (i *fragmentBlockIter) Next() (*InternalKey, []byte) { + return i.decodeSpan(i.blockIter.Next()) +} + +// Prev implements (base.InternalIterator).Prev, as documented in the +// internal/base package. +func (i *fragmentBlockIter) Prev() (*InternalKey, []byte) { + return i.decodeSpan(i.blockIter.Prev()) +} + +// SeekGE implements (base.InternalIterator).SeekGE, as documented in the +// internal/base package. +func (i *fragmentBlockIter) SeekGE(k []byte, trySeekUsingNext bool) (*InternalKey, []byte) { + return i.decodeSpan(i.blockIter.SeekGE(k, trySeekUsingNext)) +} + +// SeekPrefixGE implements (base.InternalIterator).SeekPrefixGE, as +// documented in the internal/base package. +func (i *fragmentBlockIter) SeekPrefixGE( + prefix, k []byte, trySeekUsingNext bool, +) (*InternalKey, []byte) { + return i.decodeSpan(i.blockIter.SeekPrefixGE(prefix, k, trySeekUsingNext)) +} + +// SeekLT implements (base.InternalIterator).SeekLT, as documented in the +// internal/base package. +func (i *fragmentBlockIter) SeekLT(k []byte) (*InternalKey, []byte) { + return i.decodeSpan(i.blockIter.SeekLT(k)) +} + +// SetBounds implements (base.InternalIterator).SetBounds, as documented +// in the internal/base package. +func (i *fragmentBlockIter) SetBounds(lower, upper []byte) { + i.blockIter.SetBounds(lower, upper) +} + +// String implements fmt.Stringer. +func (i *fragmentBlockIter) String() string { + return "fragment-block-iter" +} diff --git a/sstable/block_test.go b/sstable/block_test.go index 743c051516..aa5333bb8e 100644 --- a/sstable/block_test.go +++ b/sstable/block_test.go @@ -198,7 +198,7 @@ func TestBlockIter2(t *testing.T) { case "prev": iter.Prev() } - if iter.Valid() { + if iter.valid() { fmt.Fprintf(&b, "<%s:%d>", iter.Key().UserKey, iter.Key().SeqNum()) } else if err := iter.Error(); err != nil { fmt.Fprintf(&b, "", err) @@ -340,7 +340,7 @@ func BenchmarkBlockIterSeekGE(b *testing.B) { k := keys[rng.Intn(len(keys))] it.SeekGE(k, false /* trySeekUsingNext */) if testing.Verbose() { - if !it.Valid() { + if !it.valid() { b.Fatal("expected to find key") } if !bytes.Equal(k, it.Key().UserKey) { @@ -383,11 +383,11 @@ func BenchmarkBlockIterSeekLT(b *testing.B) { it.SeekLT(keys[j]) if testing.Verbose() { if j == 0 { - if it.Valid() { + if it.valid() { b.Fatal("unexpected key") } } else { - if !it.Valid() { + if !it.valid() { b.Fatal("expected to find key") } k := keys[j-1] @@ -424,7 +424,7 @@ func BenchmarkBlockIterNext(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - if !it.Valid() { + if !it.valid() { it.First() } it.Next() @@ -456,7 +456,7 @@ func BenchmarkBlockIterPrev(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - if !it.Valid() { + if !it.valid() { it.Last() } it.Prev() diff --git a/sstable/reader.go b/sstable/reader.go index c71b80146b..179fd78284 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -332,7 +332,7 @@ const ( // unpositioned. If unsuccessful, it sets i.err to any error encountered, which // may be nil if we have simply exhausted the entire table. func (i *singleLevelIterator) loadBlock() loadBlockResult { - if !i.index.Valid() { + if !i.index.valid() { // Ensure the data block iterator is invalidated even if loading of the block // fails. i.data.invalidate() @@ -341,7 +341,7 @@ func (i *singleLevelIterator) loadBlock() loadBlockResult { // Load the next block. v := i.index.Value() bhp, err := decodeBlockHandleWithProperties(v) - if i.dataBH == bhp.BlockHandle && i.data.Valid() { + if i.dataBH == bhp.BlockHandle && i.data.valid() { // We're already at the data block we want to load. Reset bounds in case // they changed since the last seek, but don't reload the block from cache // or disk. @@ -456,7 +456,7 @@ func (i *singleLevelIterator) trySeekLTUsingPrevWithinBlock( func (i *singleLevelIterator) recordOffset() uint64 { offset := i.dataBH.Offset - if i.data.Valid() { + if i.data.valid() { // - i.dataBH.Length/len(i.data.data) is the compression ratio. If // uncompressed, this is 1. // - i.data.nextOffset is the uncompressed position of the current record @@ -497,7 +497,7 @@ func (i *singleLevelIterator) seekGEHelper( key []byte, boundsCmp int, trySeekUsingNext bool, ) (*InternalKey, []byte) { var dontSeekWithinBlock bool - if !i.data.isDataInvalidated() && !i.index.isDataInvalidated() && i.data.Valid() && i.index.Valid() && + if !i.data.isDataInvalidated() && !i.index.isDataInvalidated() && i.data.valid() && i.index.valid() && boundsCmp > 0 && i.cmp(key, i.index.Key().UserKey) <= 0 { // Fast-path: The bounds have moved forward and this SeekGE is // respecting the lower bound (guaranteed by Iterator). We know that @@ -657,7 +657,7 @@ func (i *singleLevelIterator) SeekLT(key []byte) (*InternalKey, []byte) { i.positionedUsingLatestBounds = true var dontSeekWithinBlock bool - if !i.data.isDataInvalidated() && !i.index.isDataInvalidated() && i.data.Valid() && i.index.Valid() && + if !i.data.isDataInvalidated() && !i.index.isDataInvalidated() && i.data.valid() && i.index.valid() && boundsCmp < 0 && i.cmp(i.data.firstKey.UserKey, key) < 0 { // Fast-path: The bounds have moved backward, and this SeekLT is // respecting the upper bound (guaranteed by Iterator). We know that @@ -1154,7 +1154,7 @@ func (i *twoLevelIterator) loadIndex() loadBlockResult { // Ensure the data block iterator is invalidated even if loading of the // index fails. i.data.invalidate() - if !i.topLevelIndex.Valid() { + if !i.topLevelIndex.valid() { i.index.offset = 0 i.index.restarts = 0 return loadBlockFailed @@ -1224,7 +1224,7 @@ func (i *twoLevelIterator) SeekGE(key []byte, trySeekUsingNext bool) (*InternalK i.err = nil // clear cached iteration error var dontSeekWithinSingleLevelIter bool - if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.Valid() || (i.boundsCmp <= 0 && !trySeekUsingNext) || + if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.valid() || (i.boundsCmp <= 0 && !trySeekUsingNext) || i.cmp(key, i.topLevelIndex.Key().UserKey) > 0 { // Slow-path: need to position the topLevelIndex. trySeekUsingNext = false @@ -1319,7 +1319,7 @@ func (i *twoLevelIterator) SeekPrefixGE( i.exhaustedBounds = 0 var dontSeekWithinSingleLevelIter bool - if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.Valid() || i.boundsCmp <= 0 || + if i.topLevelIndex.isDataInvalidated() || !i.topLevelIndex.valid() || i.boundsCmp <= 0 || i.cmp(key, i.topLevelIndex.Key().UserKey) > 0 { // Slow-path: need to position the topLevelIndex. // @@ -1669,7 +1669,9 @@ func (i *twoLevelCompactionIterator) Close() error { return i.twoLevelIterator.Close() } -func (i *twoLevelCompactionIterator) SeekGE(key []byte, trySeekUsingNext bool) (*InternalKey, []byte) { +func (i *twoLevelCompactionIterator) SeekGE( + key []byte, trySeekUsingNext bool, +) (*InternalKey, []byte) { panic("pebble: SeekGE unimplemented") } @@ -2164,20 +2166,17 @@ func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { if err != nil { return nil, err } - i := &blockIter{} - if err := i.initHandle(r.Compare, h, r.Properties.GlobalSeqNum); err != nil { + i := &fragmentBlockIter{} + if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum); err != nil { return nil, err } - // NB: *blockIter implements keyspan.FragmentIter, assuming the raw value is - // is the span's end key, and the span has no other value. This is - // sufficient for range deletion tombstones, but not for range keys. return i, nil } // NewRawRangeKeyIter returns an internal iterator for the contents of the // range-key block for the table. Returns nil if the table does not contain any // range keys. -func (r *Reader) NewRawRangeKeyIter() (base.InternalIterator, error) { +func (r *Reader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) { if r.rangeKeyBH.Length == 0 { return nil, nil } @@ -2185,8 +2184,8 @@ func (r *Reader) NewRawRangeKeyIter() (base.InternalIterator, error) { if err != nil { return nil, err } - i := &blockIter{} - if err := i.initHandle(r.Compare, h, r.Properties.GlobalSeqNum); err != nil { + i := &fragmentBlockIter{} + if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum); err != nil { return nil, err } return i, nil @@ -2208,7 +2207,9 @@ func (r *Reader) readRangeKey() (cache.Handle, error) { return r.readBlock(r.rangeKeyBH, nil /* transform */, nil /* readaheadState */) } -func checkChecksum(checksumType ChecksumType, b []byte, bh BlockHandle, fileNum base.FileNum) error { +func checkChecksum( + checksumType ChecksumType, b []byte, bh BlockHandle, fileNum base.FileNum, +) error { expectedChecksum := binary.LittleEndian.Uint32(b[bh.Length+1:]) var computedChecksum uint32 switch checksumType { diff --git a/testdata/external_iterator b/testdata/external_iterator new file mode 100644 index 0000000000..ce257c7ebb --- /dev/null +++ b/testdata/external_iterator @@ -0,0 +1,112 @@ +build 1 +set b b +set c c +---- + +build 2 +del-range c z +---- + +# Test that a delete range in a more recent file shadows keys in an +# earlier file. + +iter files=(2, 1) +first +next +---- +b: (b, .) +. + +build 3 +set a a +set f f +---- + +# Test including an even more recent file with point keys overlapping +# the rangedel. Since the point keys are assigned a higher sequence +# number, they should NOT be shadowed by the rangedel. + +iter files=(3, 2, 1) +first +next +next +next +---- +a: (a, .) +b: (b, .) +f: (f, .) +. + +# Test including range keys, and merging the range key state across +# files. Range keys should be interleaved. + +build 4 +range-key-set a c @2 foo +range-key-set c e @3 bar +---- + +build 5 +range-key-del b d +---- + +iter files=(5, 4, 3, 2, 1) +first +next +next +next +next +---- +a: (a, [a-b) @2=foo) +b: (b, .) +d: (., [d-e) @3=bar) +f: (f, .) +. + +# Test range keys that overlap each other with identical state. These +# should be defragmented and exposed as a single range key. + +reset +---- + +build ag +range-key-set a g @5 foo +---- + +build ek +range-key-set e k @5 foo +---- + +iter files=(ag, ek) +first +next +---- +a: (., [a-k) @5=foo) +. + +# Test range-key masking by creating points, some with suffixes above +# the range key's suffix, some with suffixes below the range key's +# suffix. + +build points +set a@4 v +set c@2 v +set d@9 v +set e@5 v +set k@3 v +set p@4 v +---- + +iter files=(points, ag, ek) mask-suffix=@7 +first +next +next +next +next +next +---- +a: (., [a-k) @5=foo) +d@9: (v, [a-k) @5=foo) +e@5: (v, [a-k) @5=foo) +k@3: (v, .) +p@4: (v, .) +.