diff --git a/compaction.go b/compaction.go index 53d237b2d2..3a6a131026 100644 --- a/compaction.go +++ b/compaction.go @@ -1009,7 +1009,7 @@ func (c *compaction) elideRangeKey(start, end []byte) bool { // newInputIter returns an iterator over all the input tables in a compaction. func (c *compaction) newInputIter( - newIters tableNewIters, newSpanIter keyspan.TableNewSpanIter, snapshots []uint64, + newIters tableNewIters, newRangeKeyIter keyspan.TableNewSpanIter, snapshots []uint64, ) (_ internalIterator, retErr error) { var rangeDelIters []keyspan.FragmentIterator var rangeKeyIters []keyspan.FragmentIterator @@ -1198,7 +1198,28 @@ func (c *compaction) newInputIter( } if hasRangeKeys { li := &keyspan.LevelIter{} - li.Init(keyspan.SpanIterOptions{}, c.cmp, newSpanIter, level.files.Iter(), l, c.logger, manifest.KeyTypeRange) + newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions *keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) { + iter, err := newRangeKeyIter(file, iterOptions) + if iter != nil { + // Ensure that the range key iter is not closed until the compaction is + // finished. This is necessary because range key processing + // requires the range keys to be held in memory for up to the + // lifetime of the compaction. + c.closers = append(c.closers, iter) + iter = noCloseIter{iter} + + // We do not need to truncate range keys to sstable boundaries, or + // only read within the file's atomic compaction units, unlike with + // range tombstones. This is because range keys were added after we + // stopped splitting user keys across sstables, so all the range keys + // in this sstable must wholly lie within the file's bounds. + } + if iter == nil { + iter = emptyKeyspanIter + } + return iter, err + } + li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, c.logger, manifest.KeyTypeRange) rangeKeyIters = append(rangeKeyIters, li) } return nil @@ -1645,11 +1666,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { if err == nil { flushed = d.mu.mem.queue[:n] d.mu.mem.queue = d.mu.mem.queue[n:] - d.updateReadStateLocked(d.opts.DebugCheck, func() { - // TODO(jackson): Remove this, plus this updateReadStateLocked - // parameter when range keys are persisted to sstables. - err = d.applyFlushedRangeKeys(flushed) - }) + d.updateReadStateLocked(d.opts.DebugCheck) d.updateTableStatsLocked(ve.NewFiles) } // Signal FlushEnd after installing the new readState. This helps for unit @@ -1974,6 +1991,11 @@ func checkDeleteCompactionHints( if m.Compacting || !h.canDelete(cmp, m, snapshots) || files[m] { continue } + if m.HasRangeKeys { + // TODO(bilal): Remove this conditional when deletion hints work well + // with sstables containing range keys. + continue + } if files == nil { // Construct files lazily, assuming most calls will not @@ -2080,7 +2102,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { // there are no references obsolete tables will be added to the obsolete // table list. if err == nil { - d.updateReadStateLocked(d.opts.DebugCheck, nil) + d.updateReadStateLocked(d.opts.DebugCheck) d.updateTableStatsLocked(ve.NewFiles) } d.deleteObsoleteFiles(jobID, true /* waitForOngoing */) @@ -2329,6 +2351,12 @@ func (d *DB) runCompaction( if len(iter.tombstones) > 0 { startKey = iter.tombstones[0].Start } + if startKey == nil { + startKey = c.rangeKeyFrag.Start() + if len(iter.rangeKeys) > 0 { + startKey = iter.rangeKeys[0].Start + } + } if splitKey != nil && d.cmp(startKey, splitKey) == 0 { return nil } diff --git a/compaction_iter.go b/compaction_iter.go index 3a92b4bb04..7a3a0c59e3 100644 --- a/compaction_iter.go +++ b/compaction_iter.go @@ -506,6 +506,10 @@ func (i *compactionIter) nextInStripe() stripeChangeType { return sameStripeNonSkippable } return newStripe + case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete: + // Range keys are interleaved at the max sequence number for a given user + // key, so we should not see any more range keys in this stripe. + panic("unreachable") case InternalKeyKindInvalid: if i.curSnapshotIdx == origSnapshotIdx { return sameStripeNonSkippable diff --git a/data_test.go b/data_test.go index cab4f9808d..fde2fb6ffb 100644 --- a/data_test.go +++ b/data_test.go @@ -756,7 +756,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { entry := d.newFlushableEntry(d.mu.mem.mutable, 0, 0) entry.readerRefs++ d.mu.mem.queue = append(d.mu.mem.queue, entry) - d.updateReadStateLocked(nil, nil) + d.updateReadStateLocked(nil) } mem = d.mu.mem.mutable start, end = nil, nil @@ -845,7 +845,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { }); err != nil { return nil, err } - d.updateReadStateLocked(nil, nil) + d.updateReadStateLocked(nil) d.updateTableStatsLocked(ve.NewFiles) } diff --git a/db.go b/db.go index 3f088a4c82..320d828b6a 100644 --- a/db.go +++ b/db.go @@ -1983,7 +1983,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error { var entry *flushableEntry d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum) d.mu.mem.queue = append(d.mu.mem.queue, entry) - d.updateReadStateLocked(nil, nil) + d.updateReadStateLocked(nil) if immMem.writerUnref() { d.maybeScheduleFlush() } diff --git a/flush_external.go b/flush_external.go index 8bda7b97d5..179beb2127 100644 --- a/flush_external.go +++ b/flush_external.go @@ -85,7 +85,7 @@ func flushExternalTable(untypedDB interface{}, path string, originalMeta *fileMe } return err } - d.updateReadStateLocked(d.opts.DebugCheck, nil) + d.updateReadStateLocked(d.opts.DebugCheck) d.updateTableStatsLocked(ve.NewFiles) d.deleteObsoleteFiles(jobID, true /* waitForOngoing */) d.maybeScheduleCompaction() diff --git a/ingest.go b/ingest.go index 737430a210..f3fc8e8406 100644 --- a/ingest.go +++ b/ingest.go @@ -849,7 +849,7 @@ func (d *DB) ingestApply( }); err != nil { return nil, err } - d.updateReadStateLocked(d.opts.DebugCheck, nil) + d.updateReadStateLocked(d.opts.DebugCheck) d.updateTableStatsLocked(ve.NewFiles) d.deleteObsoleteFiles(jobID, false /* waitForOngoing */) // The ingestion may have pushed a level over the threshold for compaction, diff --git a/internal/keyspan/level_iter.go b/internal/keyspan/level_iter.go index a55c1efa8d..c65c022ec9 100644 --- a/internal/keyspan/level_iter.go +++ b/internal/keyspan/level_iter.go @@ -178,7 +178,7 @@ func (l *LevelIter) loadFile(file *manifest.FileMetadata, dir int) loadFileRetur return noFileLoaded } if indicator != fileAlreadyLoaded { - l.iter, l.err = l.newIter(l.files.Current(), &l.tableOpts) + l.iter, l.err = l.newIter(file, &l.tableOpts) indicator = newFileLoaded } if l.err != nil { @@ -190,33 +190,42 @@ func (l *LevelIter) loadFile(file *manifest.FileMetadata, dir int) loadFileRetur // SeekGE implements keyspan.FragmentIterator. func (l *LevelIter) SeekGE(key []byte) *Span { l.dir = +1 + l.straddle = Span{} + l.straddleDir = 0 l.err = nil // clear cached iteration error f := l.findFileGE(key) if f != nil && l.keyType == manifest.KeyTypeRange && l.cmp(key, f.SmallestRangeKey.UserKey) < 0 { - // Return a straddling key instead of loading the file. - l.iterFile = f - l.iter = nil - l.straddleDir = +1 - // The synthetic span that we are creating starts at the seeked key. This - // is an optimization as it prevents us from loading the adjacent file's - // bounds, at the expense of this iterator appearing "inconsistent" to its - // callers i.e.: - // - // SeekGE(bb) -> {bb-c, empty} - // Next() -> {c-d, RANGEKEYSET} - // Prev() -> {a-c, empty} - // - // Seeing as the inconsistency will only be around empty spans, which are - // expected to be elided by one of the higher-level iterators (either - // top-level Iterator or the defragmenting iter), the entire iterator should - // still appear consistent to the user. - l.straddle = Span{ - Start: key, - End: f.SmallestRangeKey.UserKey, - Keys: nil, + prevFile := l.files.Prev() + if prevFile != nil { + // We could unconditionally return an empty span between the seek key and + // f.SmallestRangeKey, however if this span is to the left of all range + // keys on this level, it could lead to inconsistent behaviour in relative + // positioning operations. Consider this example, with a b-c range key: + // + // SeekGE(a) -> a-b:{} + // Next() -> b-c{(#5,RANGEKEYSET,@4,foo)} + // Prev() -> nil + // + // Iterators higher up in the iterator stack rely on this sort of relative + // positioning consistency. + // + // TODO(bilal): Investigate ways to be able to return straddle spans in + // cases similar to the above, while still retaining correctness. + l.files.Next() + // Return a straddling key instead of loading the file. + l.iterFile = f + if err := l.Close(); err != nil { + return nil + } + l.straddleDir = +1 + l.straddle = Span{ + Start: prevFile.LargestRangeKey.UserKey, + End: f.SmallestRangeKey.UserKey, + Keys: nil, + } + return &l.straddle } - return &l.straddle } loadFileIndicator := l.loadFile(f, +1) if loadFileIndicator == noFileLoaded { @@ -231,33 +240,42 @@ func (l *LevelIter) SeekGE(key []byte) *Span { // SeekLT implements keyspan.FragmentIterator. func (l *LevelIter) SeekLT(key []byte) *Span { l.dir = -1 + l.straddle = Span{} + l.straddleDir = 0 l.err = nil // clear cached iteration error f := l.findFileLT(key) if f != nil && l.keyType == manifest.KeyTypeRange && l.cmp(f.LargestRangeKey.UserKey, key) < 0 { - // Return a straddling key instead of loading the file. - l.iterFile = f - l.iter = nil - l.straddleDir = -1 - // The synthetic span that we are creating ends at the seeked key. This - // is an optimization as it prevents us from loading the adjacent file's - // bounds, at the expense of this iterator appearing "inconsistent" to its - // callers i.e.: - // - // SeekLT(dd) -> {d-dd, empty} - // Prev() -> {c-d, RANGEKEYSET} - // Next() -> {d-e, empty} - // - // Seeing as the inconsistency will only be around empty spans, which are - // expected to be elided by one of the higher-level iterators (either - // top-level Iterator or the defragmenting iter), the entire iterator should - // still appear consistent to the user. - l.straddle = Span{ - Start: f.LargestRangeKey.UserKey, - End: key, - Keys: nil, + nextFile := l.files.Next() + if nextFile != nil { + // We could unconditionally return an empty span between f.LargestRangeKey + // and the seek key, however if this span is to the right of all range keys + // on this level, it could lead to inconsistent behaviour in relative + // positioning operations. Consider this example, with a b-c range key: + // + // SeekLT(d) -> c-d:{} + // Prev() -> b-c{(#5,RANGEKEYSET,@4,foo)} + // Next() -> nil + // + // Iterators higher up in the iterator stack rely on this sort of relative + // positioning consistency. + // + // TODO(bilal): Investigate ways to be able to return straddle spans in + // cases similar to the above, while still retaining correctness. + l.files.Prev() + // Return a straddling key instead of loading the file. + l.iterFile = f + if err := l.Close(); err != nil { + return nil + } + l.straddleDir = -1 + l.straddle = Span{ + Start: f.LargestRangeKey.UserKey, + End: nextFile.SmallestRangeKey.UserKey, + Keys: nil, + } + return &l.straddle } - return &l.straddle } if l.loadFile(l.findFileLT(key), -1) == noFileLoaded { return nil @@ -271,6 +289,8 @@ func (l *LevelIter) SeekLT(key []byte) *Span { // First implements keyspan.FragmentIterator. func (l *LevelIter) First() *Span { l.dir = +1 + l.straddle = Span{} + l.straddleDir = 0 l.err = nil // clear cached iteration error if l.loadFile(l.files.First(), +1) == noFileLoaded { @@ -285,6 +305,8 @@ func (l *LevelIter) First() *Span { // Last implements keyspan.FragmentIterator. func (l *LevelIter) Last() *Span { l.dir = -1 + l.straddle = Span{} + l.straddleDir = 0 l.err = nil // clear cached iteration error if l.loadFile(l.files.Last(), -1) == noFileLoaded { @@ -298,10 +320,14 @@ func (l *LevelIter) Last() *Span { // Next implements keyspan.FragmentIterator. func (l *LevelIter) Next() *Span { - l.dir = +1 - if l.err != nil || (l.iter == nil && l.iterFile == nil) { + if l.err != nil || (l.iter == nil && l.iterFile == nil && l.dir > 0) { return nil } + if l.iter == nil && l.iterFile == nil { + // l.dir <= 0 + return l.First() + } + l.dir = +1 if l.iter != nil { if span := l.iter.Next(); span != nil { @@ -313,10 +339,14 @@ func (l *LevelIter) Next() *Span { // Prev implements keyspan.FragmentIterator. func (l *LevelIter) Prev() *Span { - l.dir = -1 - if l.err != nil || (l.iter == nil && l.iterFile == nil) { + if l.err != nil || (l.iter == nil && l.iterFile == nil && l.dir < 0) { return nil } + if l.iter == nil && l.iterFile == nil { + // l.dir >= 0 + return l.Last() + } + l.dir = -1 if l.iter != nil { if span := l.iter.Prev(); span != nil { @@ -355,7 +385,7 @@ func (l *LevelIter) skipEmptyFileForward() *Span { Start: startKey, End: endKey, } - l.straddleDir = l.dir + l.straddleDir = +1 return &l.straddle } } else if l.straddleDir < 0 { @@ -416,7 +446,7 @@ func (l *LevelIter) skipEmptyFileBackward() *Span { Start: startKey, End: endKey, } - l.straddleDir = l.dir + l.straddleDir = -1 return &l.straddle } } else if l.straddleDir > 0 { diff --git a/internal/keyspan/merging_iter.go b/internal/keyspan/merging_iter.go index d521b3043d..6a68f27c0a 100644 --- a/internal/keyspan/merging_iter.go +++ b/internal/keyspan/merging_iter.go @@ -811,7 +811,7 @@ func (m *MergingIter) synthesizeKeys(dir int8) (bool, *Span) { } sort.Sort(&m.keys) - // Apply the configured transform. See VisibleTransform. + // Apply the configured transform. See visibleTransform. s := Span{ Start: m.start, End: m.end, diff --git a/internal/keyspan/testdata/level_iter b/internal/keyspan/testdata/level_iter index a4aaf9c4b3..eaa3cb5c19 100644 --- a/internal/keyspan/testdata/level_iter +++ b/internal/keyspan/testdata/level_iter @@ -184,7 +184,7 @@ next prev prev ---- -bb-c:{} (file = 000001.sst) +b-c:{} (file = 000001.sst) c-d:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000002.sst) b-c:{} (file = 000002.sst) c-d:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000002.sst) @@ -201,7 +201,7 @@ prev next next ---- -d-dd:{} (file = 000001.sst) +d-e:{} (file = 000001.sst) c-d:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000002.sst) d-e:{} (file = 000002.sst) c-d:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000002.sst) @@ -216,7 +216,7 @@ prev next next ---- -d-dd:{} (file = 000003.sst) +d-e:{} (file = 000003.sst) c-d:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000002.sst) d-e:{} (file = 000002.sst) c-d:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000002.sst) @@ -228,12 +228,12 @@ e-f:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000003.sst) iter seek-lt bb ---- -b-bb:{} (file = 000003.sst) +b-c:{} (file = 000003.sst) iter seek-ge dd ---- -dd-e:{} (file = 000003.sst) +d-e:{} (file = 000003.sst) iter seek-lt d @@ -241,16 +241,18 @@ prev next prev prev +prev next next ---- c-d:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000002.sst) +b-c:{} (file = 000002.sst) c-d:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000002.sst) b-c:{} (file = 000002.sst) a-b:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000001.sst) . -. -. +a-b:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000001.sst) +b-c:{} (file = 000001.sst) # A bunch of files with point keys only should not fragment straddles. @@ -393,3 +395,49 @@ e-g:{} (file = 000002.sst) g-h:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000003.sst) i-j:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000003.sst) . + +# Test seeking outside of bounds with straddles. + +define +file + c-d:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} +file + e-f:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} +file + g-h:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} +---- + +iter +seek-lt j +next +prev +prev +---- +g-h:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000003.sst) +. +g-h:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000003.sst) +f-g:{} (file = 000003.sst) + +iter +seek-lt j +prev +prev +next +next +---- +g-h:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000003.sst) +f-g:{} (file = 000003.sst) +e-f:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000002.sst) +f-g:{} (file = 000002.sst) +g-h:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000003.sst) + +iter +seek-ge a +prev +next +next +---- +c-d:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000001.sst) +. +c-d:{(#2,RANGEKEYSET,@3,foo) (#1,RANGEKEYSET,@1,bar)} (file = 000001.sst) +d-e:{} (file = 000001.sst) diff --git a/internal/manifest/level_metadata.go b/internal/manifest/level_metadata.go index 193dc76903..b3a5038e7b 100644 --- a/internal/manifest/level_metadata.go +++ b/internal/manifest/level_metadata.go @@ -481,6 +481,21 @@ func (i *LevelIterator) SeekGE(cmp Compare, userKey []byte) *FileMetadata { meta := i.seek(func(m *FileMetadata) bool { return cmp(m.Largest.UserKey, userKey) >= 0 }) + for meta != nil { + switch i.filter { + case KeyTypePointAndRange: + return meta + case KeyTypePoint: + if meta.HasPointKeys && cmp(meta.LargestPointKey.UserKey, userKey) >= 0 { + return meta + } + case KeyTypeRange: + if meta.HasRangeKeys && cmp(meta.LargestRangeKey.UserKey, userKey) >= 0 { + return meta + } + } + meta = i.Next() + } return i.filteredNextFile(meta) } @@ -497,6 +512,21 @@ func (i *LevelIterator) SeekLT(cmp Compare, userKey []byte) *FileMetadata { return cmp(m.Smallest.UserKey, userKey) >= 0 }) meta := i.Prev() + for meta != nil { + switch i.filter { + case KeyTypePointAndRange: + return meta + case KeyTypePoint: + if meta.HasPointKeys && cmp(meta.SmallestPointKey.UserKey, userKey) < 0 { + return meta + } + case KeyTypeRange: + if meta.HasRangeKeys && cmp(meta.SmallestRangeKey.UserKey, userKey) < 0 { + return meta + } + } + meta = i.Prev() + } return i.filteredPrevFile(meta) } diff --git a/internal/metamorphic/ops.go b/internal/metamorphic/ops.go index 5e2a0cd663..d3780ee368 100644 --- a/internal/metamorphic/ops.go +++ b/internal/metamorphic/ops.go @@ -341,26 +341,16 @@ func (o *ingestOp) run(t *test, h *history) { id := o.batchIDs[0] b := t.getBatch(id) iter, rangeDelIter, rangeKeyIter := private.BatchSort(b) - // Ingests currently discard range keys. Using apply as an alternative - // to ingestion would create a divergence, since batch applications do - // commit range keys. Only allow the ingest to be applied as a batch if - // it doesn't contain any range keys. - // TODO(jackson): When range keys are properly persisted, allow - // tables containing range keys to be applied as batches. - if rangeKeyIter != nil { - closeIters(iter, rangeDelIter, rangeKeyIter) - } else { - c, err := o.collapseBatch(t, iter, rangeDelIter, rangeKeyIter) - if err == nil { - w := t.getWriter(makeObjID(dbTag, 0)) - err = w.Apply(c, t.writeOpts) - } - _ = b.Close() - _ = c.Close() - t.clearObj(id) - h.Recordf("%s // %v", o, err) - return + c, err := o.collapseBatch(t, iter, rangeDelIter, rangeKeyIter) + if err == nil { + w := t.getWriter(makeObjID(dbTag, 0)) + err = w.Apply(c, t.writeOpts) } + _ = b.Close() + _ = c.Close() + t.clearObj(id) + h.Recordf("%s // %v", o, err) + return } var paths []string diff --git a/iterator.go b/iterator.go index a98ed58742..1d71bebeb1 100644 --- a/iterator.go +++ b/iterator.go @@ -613,10 +613,10 @@ func (i *Iterator) sampleRead() { var containsKey bool if i.pos == iterPosNext || i.pos == iterPosCurForward || i.pos == iterPosCurForwardPaused { - containsKey = i.cmp(file.Smallest.UserKey, i.key) <= 0 + containsKey = i.cmp(file.SmallestPointKey.UserKey, i.key) <= 0 } else if i.pos == iterPosPrev || i.pos == iterPosCurReverse || i.pos == iterPosCurReversePaused { - containsKey = i.cmp(file.Largest.UserKey, i.key) >= 0 + containsKey = i.cmp(file.LargestPointKey.UserKey, i.key) >= 0 } // Do nothing if the current key is not contained in file's // bounds. We could seek the LevelIterator at this level @@ -653,8 +653,8 @@ func (i *Iterator) sampleRead() { atomic.AddInt64(&topFile.Atomic.AllowedSeeks, topFile.InitAllowedSeeks) read := readCompaction{ - start: topFile.Smallest.UserKey, - end: topFile.Largest.UserKey, + start: topFile.SmallestPointKey.UserKey, + end: topFile.LargestPointKey.UserKey, level: topLevel, fileNum: topFile.FileNum, } diff --git a/level_iter.go b/level_iter.go index 65b41fbbca..d3dedeaae0 100644 --- a/level_iter.go +++ b/level_iter.go @@ -248,11 +248,11 @@ func (l *levelIter) findFileLT(key []byte) *fileMetadata { func (l *levelIter) initTableBounds(f *fileMetadata) int { l.tableOpts.LowerBound = l.lower if l.tableOpts.LowerBound != nil { - if l.cmp(f.Largest.UserKey, l.tableOpts.LowerBound) < 0 { + if l.cmp(f.LargestPointKey.UserKey, l.tableOpts.LowerBound) < 0 { // The largest key in the sstable is smaller than the lower bound. return -1 } - if l.cmp(l.tableOpts.LowerBound, f.Smallest.UserKey) <= 0 { + if l.cmp(l.tableOpts.LowerBound, f.SmallestPointKey.UserKey) <= 0 { // The lower bound is smaller or equal to the smallest key in the // table. Iteration within the table does not need to check the lower // bound. @@ -261,15 +261,16 @@ func (l *levelIter) initTableBounds(f *fileMetadata) int { } l.tableOpts.UpperBound = l.upper if l.tableOpts.UpperBound != nil { - if l.cmp(f.Smallest.UserKey, l.tableOpts.UpperBound) >= 0 { + if l.cmp(f.SmallestPointKey.UserKey, l.tableOpts.UpperBound) >= 0 { // The smallest key in the sstable is greater than or equal to the upper // bound. return 1 } - if l.cmp(l.tableOpts.UpperBound, f.Largest.UserKey) > 0 { + if l.cmp(l.tableOpts.UpperBound, f.LargestPointKey.UserKey) > 0 { // The upper bound is greater than the largest key in the // table. Iteration within the table does not need to check the upper - // bound. NB: tableOpts.UpperBound is exclusive and f.Largest is inclusive. + // bound. NB: tableOpts.UpperBound is exclusive and f.LargestPointKey is + // inclusive. l.tableOpts.UpperBound = nil } } @@ -354,13 +355,13 @@ func (l *levelIter) loadFile(file *fileMetadata, dir int) loadFileReturnIndicato rangeDelIter.Close() } if l.smallestUserKey != nil { - *l.smallestUserKey = file.Smallest.UserKey + *l.smallestUserKey = file.SmallestPointKey.UserKey } if l.largestUserKey != nil { - *l.largestUserKey = file.Largest.UserKey + *l.largestUserKey = file.LargestPointKey.UserKey } if l.isLargestUserKeyRangeDelSentinel != nil { - *l.isLargestUserKeyRangeDelSentinel = file.Largest.IsExclusiveSentinel() + *l.isLargestUserKeyRangeDelSentinel = file.LargestPointKey.IsExclusiveSentinel() } return newFileLoaded } @@ -447,7 +448,7 @@ func (l *levelIter) SeekPrefixGE( } return l.verify(l.largestBoundary, nil) } - l.syntheticBoundary = l.iterFile.Largest + l.syntheticBoundary = l.iterFile.LargestPointKey l.syntheticBoundary.SetKind(InternalKeyKindRangeDelete) l.largestBoundary = &l.syntheticBoundary if l.isSyntheticIterBoundsKey != nil { @@ -463,7 +464,7 @@ func (l *levelIter) SeekPrefixGE( // next file will defeat the optimization for the next SeekPrefixGE that // is called with trySeekUsingNext=true, since for sparse key spaces it is // likely that the next key will also be contained in the current file. - if n := l.split(l.iterFile.Largest.UserKey); l.cmp(prefix, l.iterFile.Largest.UserKey[:n]) < 0 { + if n := l.split(l.iterFile.LargestPointKey.UserKey); l.cmp(prefix, l.iterFile.LargestPointKey.UserKey[:n]) < 0 { return nil, nil } return l.verify(l.skipEmptyFileForward()) @@ -648,8 +649,8 @@ func (l *levelIter) skipEmptyFileForward() (*InternalKey, []byte) { return nil, nil } // If the boundary is a range deletion tombstone, return that key. - if l.iterFile.Largest.Kind() == InternalKeyKindRangeDelete { - l.largestBoundary = &l.iterFile.Largest + if l.iterFile.LargestPointKey.Kind() == InternalKeyKindRangeDelete { + l.largestBoundary = &l.iterFile.LargestPointKey return l.largestBoundary, nil } } @@ -709,8 +710,8 @@ func (l *levelIter) skipEmptyFileBackward() (*InternalKey, []byte) { return nil, nil } // If the boundary is a range deletion tombstone, return that key. - if l.iterFile.Smallest.Kind() == InternalKeyKindRangeDelete { - l.smallestBoundary = &l.iterFile.Smallest + if l.iterFile.SmallestPointKey.Kind() == InternalKeyKindRangeDelete { + l.smallestBoundary = &l.iterFile.SmallestPointKey return l.smallestBoundary, nil } } diff --git a/open.go b/open.go index bf6ce373d1..31d427e428 100644 --- a/open.go +++ b/open.go @@ -430,7 +430,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { d.mu.log.LogWriter.SetMinSyncInterval(d.opts.WALMinSyncInterval) d.mu.versions.metrics.WAL.Files++ } - d.updateReadStateLocked(d.opts.DebugCheck, nil) + d.updateReadStateLocked(d.opts.DebugCheck) if !d.opts.ReadOnly { // Write the current options to disk. @@ -712,12 +712,6 @@ func (d *DB) replayWAL( if err != nil { return 0, err } - // TODO(jackson): Remove the below call to applyFlushedRangeKeys once - // flushes actually persist range keys to sstables. - err = d.applyFlushedRangeKeys(toFlush) - if err != nil { - return 0, err - } ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...) for i := range toFlush { toFlush[i].readerUnref() diff --git a/range_keys.go b/range_keys.go index 2c6d889930..dfe5b57e5d 100644 --- a/range_keys.go +++ b/range_keys.go @@ -7,10 +7,9 @@ package pebble import ( "sync" - "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/arenaskl" - "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" + "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/rangekey" ) @@ -19,6 +18,9 @@ const rangeKeyArenaSize = 1 << 20 // RangeKeysArena is an in-memory arena in which range keys are stored. // // This is a temporary type that will eventually be removed. +// +// TODO(bilal): This type should mostly be unused now. Clean up the last few +// uses and remove it. type RangeKeysArena struct { once sync.Once skl arenaskl.Skiplist @@ -26,61 +28,6 @@ type RangeKeysArena struct { fragCache keySpanCache } -// applyFlushedRangeKeys is a temporary hack to support in-memory only range -// keys. We use much of the same code that we will use for the memtable, but we -// use a separate arena that exists beyond the lifetime of any individual -// memtable. For as long as we're relying on this hack, a single *pebble.DB may -// only store as many range keys as fit in this arena. -// -// TODO(jackson): Remove applyFlushedRangeKeys when range keys are persisted. -func (d *DB) applyFlushedRangeKeys(flushable []*flushableEntry) error { - var added uint32 - for i := 0; i < len(flushable); i++ { - iter := flushable[i].newRangeKeyIter(nil) - if iter == nil { - continue - } - d.maybeInitializeRangeKeys() - - for s := iter.First(); s != nil; s = iter.Next() { - // flushable.newRangeKeyIter provides a FragmentIterator, which - // iterates over parsed keyspan.Spans. - // - // While we're faking a flush, we just want to write the original - // key into the global arena, so we need to re-encode the span's - // keys into internal key-value pairs. - // - // This awkward recombination will be removed when flushes implement - // range-key logic, coalescing range keys and constructing internal - // keys. - err := rangekey.Encode(s, func(k base.InternalKey, v []byte) error { - err := d.rangeKeys.skl.Add(k, v) - switch { - case err == nil: - added++ - return nil - case errors.Is(err, arenaskl.ErrRecordExists): - // It's possible that we'll try to add a key to the arena twice - // during metamorphic tests that reset the synced state. Ignore. - // When range keys are actually flushed to stable storage, this - // will go away. - return nil - default: - // err != nil - return err - } - }) - if err != nil { - return err - } - } - } - if added > 0 { - d.rangeKeys.fragCache.invalidate(added) - } - return nil -} - func (d *DB) maybeInitializeRangeKeys() { // Lazily construct the global range key arena, so that tests that // don't use range keys don't need to allocate this long-lived @@ -133,12 +80,36 @@ func (d *DB) newRangeKeyIter( } } - // For now while range keys are not fully integrated into Pebble, all range - // keys ever written to the DB are persisted in the d.rangeKeys arena. - frags := d.rangeKeys.fragCache.get() - if len(frags) > 0 { - it.rangeKey.iterConfig.AddLevel(keyspan.NewIter(d.cmp, frags)) + current := readState.current + // TODO(bilal): Roll the LevelIter allocation into it.rangeKey.iterConfig. + levelIters := make([]keyspan.LevelIter, 0) + // Next are the file levels: L0 sub-levels followed by lower levels. + addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Level) { + rangeIter := files.Filter(manifest.KeyTypeRange) + if rangeIter.First() == nil { + // No files with range keys. + return + } + levelIters = append(levelIters, keyspan.LevelIter{}) + li := &levelIters[len(levelIters)-1] + spanIterOpts := keyspan.SpanIterOptions{RangeKeyFilters: it.opts.RangeKeyFilters} + + li.Init(spanIterOpts, it.cmp, d.tableNewRangeKeyIter, files, level, d.opts.Logger, manifest.KeyTypeRange) + it.rangeKey.iterConfig.AddLevel(li) } + // Add level iterators for the L0 sublevels, iterating from newest to + // oldest. + for i := len(current.L0SublevelFiles) - 1; i >= 0; i-- { + addLevelIterForFiles(current.L0SublevelFiles[i].Iter(), manifest.L0Sublevel(i)) + } + + // Add level iterators for the non-empty non-L0 levels. + for level := 1; level < len(current.Levels); level++ { + if current.Levels[level].Empty() { + continue + } + addLevelIterForFiles(current.Levels[level].Iter(), manifest.Level(level)) + } return it.rangeKey.rangeKeyIter } diff --git a/read_state.go b/read_state.go index 6364bc6b93..67d5a7ea5c 100644 --- a/read_state.go +++ b/read_state.go @@ -80,7 +80,7 @@ func (d *DB) loadReadState() *readState { // list of memtables. Requires DB.mu is held. If checker is not nil, it is // called after installing the new readState. If atomicFunc is not nil, it is // executed atomically with the transition to the new read state. -func (d *DB) updateReadStateLocked(checker func(*DB) error, atomicFunc func()) { +func (d *DB) updateReadStateLocked(checker func(*DB) error) { s := &readState{ db: d, refcnt: 1, @@ -95,11 +95,6 @@ func (d *DB) updateReadStateLocked(checker func(*DB) error, atomicFunc func()) { d.readState.Lock() old := d.readState.val d.readState.val = s - // TODO(jackson): Remove atomicFunc. It's temporary to allow us to copy - // range keys from flushed memtables into the global arena atomically. - if atomicFunc != nil { - atomicFunc() - } d.readState.Unlock() if checker != nil { if err := checker(d); err != nil { diff --git a/read_state_test.go b/read_state_test.go index 899677de1c..4f9d5f57ab 100644 --- a/read_state_test.go +++ b/read_state_test.go @@ -29,7 +29,7 @@ func BenchmarkReadState(b *testing.B) { for pb.Next() { if rng.Float32() < updateFrac { d.mu.Lock() - d.updateReadStateLocked(nil, nil) + d.updateReadStateLocked(nil) d.mu.Unlock() } else { s := d.loadReadState() diff --git a/table_cache.go b/table_cache.go index 0f4928ad13..c5f53e41b8 100644 --- a/table_cache.go +++ b/table_cache.go @@ -436,27 +436,14 @@ func (c *tableCacheShard) newRangeKeyIter( } var iter sstable.FragmentIterator - // TODO(bilal): We are currently passing through the raw blockIter for range - // keys. This iter does not support bounds (eg. SetBounds will panic). - // Any future users of the iter returned by this function need to make any - // bounds-specific optimizations themselves. iter, err = v.reader.NewRawRangeKeyIter() + // iter is a block iter that holds the entire value of the block in memory. + // No need to hold onto a ref of the cache value. + c.unrefValue(v) + if err != nil || iter == nil { - c.unrefValue(v) return nil, err } - // NB: v.closeHook takes responsibility for calling unrefValue(v) here. - iter.SetCloseHook(func(i keyspan.FragmentIterator) error { - return v.closeHook(i) - }) - - atomic.AddInt32(&c.atomic.iterCount, 1) - atomic.AddInt32(dbOpts.atomic.iterCount, 1) - if invariants.RaceEnabled { - c.mu.Lock() - c.mu.iters[iter] = debug.Stack() - c.mu.Unlock() - } return iter, nil }