Skip to content

Commit

Permalink
*: start reading range keys from sstables
Browse files Browse the repository at this point in the history
The previous commit, which enabled flushing and compactions
of range keys, now makes the in-memory range key arena obsolete
for reading range keys. This change sets up iterators to read
range keys from sstable range key blocks, and makes many
(far too many) minor fixes in other range key code to
make that happen correctly.
  • Loading branch information
itsbilal committed Jun 10, 2022
1 parent d72083d commit ae99f4f
Show file tree
Hide file tree
Showing 18 changed files with 280 additions and 202 deletions.
44 changes: 36 additions & 8 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ func (c *compaction) elideRangeKey(start, end []byte) bool {

// newInputIter returns an iterator over all the input tables in a compaction.
func (c *compaction) newInputIter(
newIters tableNewIters, newSpanIter keyspan.TableNewSpanIter, snapshots []uint64,
newIters tableNewIters, newRangeKeyIter keyspan.TableNewSpanIter, snapshots []uint64,
) (_ internalIterator, retErr error) {
var rangeDelIters []keyspan.FragmentIterator
var rangeKeyIters []keyspan.FragmentIterator
Expand Down Expand Up @@ -1198,7 +1198,28 @@ func (c *compaction) newInputIter(
}
if hasRangeKeys {
li := &keyspan.LevelIter{}
li.Init(keyspan.SpanIterOptions{}, c.cmp, newSpanIter, level.files.Iter(), l, c.logger, manifest.KeyTypeRange)
newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions *keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
iter, err := newRangeKeyIter(file, iterOptions)
if iter != nil {
// Ensure that the range key iter is not closed until the compaction is
// finished. This is necessary because range key processing
// requires the range keys to be held in memory for up to the
// lifetime of the compaction.
c.closers = append(c.closers, iter)
iter = noCloseIter{iter}

// We do not need to truncate range keys to sstable boundaries, or
// only read within the file's atomic compaction units, unlike with
// range tombstones. This is because range keys were added after we
// stopped splitting user keys across sstables, so all the range keys
// in this sstable must wholly lie within the file's bounds.
}
if iter == nil {
iter = emptyKeyspanIter
}
return iter, err
}
li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, c.logger, manifest.KeyTypeRange)
rangeKeyIters = append(rangeKeyIters, li)
}
return nil
Expand Down Expand Up @@ -1645,11 +1666,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
if err == nil {
flushed = d.mu.mem.queue[:n]
d.mu.mem.queue = d.mu.mem.queue[n:]
d.updateReadStateLocked(d.opts.DebugCheck, func() {
// TODO(jackson): Remove this, plus this updateReadStateLocked
// parameter when range keys are persisted to sstables.
err = d.applyFlushedRangeKeys(flushed)
})
d.updateReadStateLocked(d.opts.DebugCheck)
d.updateTableStatsLocked(ve.NewFiles)
}
// Signal FlushEnd after installing the new readState. This helps for unit
Expand Down Expand Up @@ -1974,6 +1991,11 @@ func checkDeleteCompactionHints(
if m.Compacting || !h.canDelete(cmp, m, snapshots) || files[m] {
continue
}
if m.HasRangeKeys {
// TODO(bilal): Remove this conditional when deletion hints work well
// with sstables containing range keys.
continue
}

if files == nil {
// Construct files lazily, assuming most calls will not
Expand Down Expand Up @@ -2080,7 +2102,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
// there are no references obsolete tables will be added to the obsolete
// table list.
if err == nil {
d.updateReadStateLocked(d.opts.DebugCheck, nil)
d.updateReadStateLocked(d.opts.DebugCheck)
d.updateTableStatsLocked(ve.NewFiles)
}
d.deleteObsoleteFiles(jobID, true /* waitForOngoing */)
Expand Down Expand Up @@ -2329,6 +2351,12 @@ func (d *DB) runCompaction(
if len(iter.tombstones) > 0 {
startKey = iter.tombstones[0].Start
}
if startKey == nil {
startKey = c.rangeKeyFrag.Start()
if len(iter.rangeKeys) > 0 {
startKey = iter.rangeKeys[0].Start
}
}
if splitKey != nil && d.cmp(startKey, splitKey) == 0 {
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions compaction_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ func (i *compactionIter) nextInStripe() stripeChangeType {
return sameStripeNonSkippable
}
return newStripe
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
// Range keys are interleaved at the max sequence number for a given user
// key, so we should not see any more range keys in this stripe.
panic("unreachable")
case InternalKeyKindInvalid:
if i.curSnapshotIdx == origSnapshotIdx {
return sameStripeNonSkippable
Expand Down
4 changes: 2 additions & 2 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
entry := d.newFlushableEntry(d.mu.mem.mutable, 0, 0)
entry.readerRefs++
d.mu.mem.queue = append(d.mu.mem.queue, entry)
d.updateReadStateLocked(nil, nil)
d.updateReadStateLocked(nil)
}
mem = d.mu.mem.mutable
start, end = nil, nil
Expand Down Expand Up @@ -845,7 +845,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
}); err != nil {
return nil, err
}
d.updateReadStateLocked(nil, nil)
d.updateReadStateLocked(nil)
d.updateTableStatsLocked(ve.NewFiles)
}

Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
var entry *flushableEntry
d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum)
d.mu.mem.queue = append(d.mu.mem.queue, entry)
d.updateReadStateLocked(nil, nil)
d.updateReadStateLocked(nil)
if immMem.writerUnref() {
d.maybeScheduleFlush()
}
Expand Down
2 changes: 1 addition & 1 deletion flush_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func flushExternalTable(untypedDB interface{}, path string, originalMeta *fileMe
}
return err
}
d.updateReadStateLocked(d.opts.DebugCheck, nil)
d.updateReadStateLocked(d.opts.DebugCheck)
d.updateTableStatsLocked(ve.NewFiles)
d.deleteObsoleteFiles(jobID, true /* waitForOngoing */)
d.maybeScheduleCompaction()
Expand Down
2 changes: 1 addition & 1 deletion ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ func (d *DB) ingestApply(
}); err != nil {
return nil, err
}
d.updateReadStateLocked(d.opts.DebugCheck, nil)
d.updateReadStateLocked(d.opts.DebugCheck)
d.updateTableStatsLocked(ve.NewFiles)
d.deleteObsoleteFiles(jobID, false /* waitForOngoing */)
// The ingestion may have pushed a level over the threshold for compaction,
Expand Down
132 changes: 81 additions & 51 deletions internal/keyspan/level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (l *LevelIter) loadFile(file *manifest.FileMetadata, dir int) loadFileRetur
return noFileLoaded
}
if indicator != fileAlreadyLoaded {
l.iter, l.err = l.newIter(l.files.Current(), &l.tableOpts)
l.iter, l.err = l.newIter(file, &l.tableOpts)
indicator = newFileLoaded
}
if l.err != nil {
Expand All @@ -190,33 +190,42 @@ func (l *LevelIter) loadFile(file *manifest.FileMetadata, dir int) loadFileRetur
// SeekGE implements keyspan.FragmentIterator.
func (l *LevelIter) SeekGE(key []byte) *Span {
l.dir = +1
l.straddle = Span{}
l.straddleDir = 0
l.err = nil // clear cached iteration error

f := l.findFileGE(key)
if f != nil && l.keyType == manifest.KeyTypeRange && l.cmp(key, f.SmallestRangeKey.UserKey) < 0 {
// Return a straddling key instead of loading the file.
l.iterFile = f
l.iter = nil
l.straddleDir = +1
// The synthetic span that we are creating starts at the seeked key. This
// is an optimization as it prevents us from loading the adjacent file's
// bounds, at the expense of this iterator appearing "inconsistent" to its
// callers i.e.:
//
// SeekGE(bb) -> {bb-c, empty}
// Next() -> {c-d, RANGEKEYSET}
// Prev() -> {a-c, empty}
//
// Seeing as the inconsistency will only be around empty spans, which are
// expected to be elided by one of the higher-level iterators (either
// top-level Iterator or the defragmenting iter), the entire iterator should
// still appear consistent to the user.
l.straddle = Span{
Start: key,
End: f.SmallestRangeKey.UserKey,
Keys: nil,
prevFile := l.files.Prev()
if prevFile != nil {
// We could unconditionally return an empty span between the seek key and
// f.SmallestRangeKey, however if this span is to the left of all range
// keys on this level, it could lead to inconsistent behaviour in relative
// positioning operations. Consider this example, with a b-c range key:
//
// SeekGE(a) -> a-b:{}
// Next() -> b-c{(#5,RANGEKEYSET,@4,foo)}
// Prev() -> nil
//
// Iterators higher up in the iterator stack rely on this sort of relative
// positioning consistency.
//
// TODO(bilal): Investigate ways to be able to return straddle spans in
// cases similar to the above, while still retaining correctness.
l.files.Next()
// Return a straddling key instead of loading the file.
l.iterFile = f
if err := l.Close(); err != nil {
return nil
}
l.straddleDir = +1
l.straddle = Span{
Start: prevFile.LargestRangeKey.UserKey,
End: f.SmallestRangeKey.UserKey,
Keys: nil,
}
return &l.straddle
}
return &l.straddle
}
loadFileIndicator := l.loadFile(f, +1)
if loadFileIndicator == noFileLoaded {
Expand All @@ -231,33 +240,42 @@ func (l *LevelIter) SeekGE(key []byte) *Span {
// SeekLT implements keyspan.FragmentIterator.
func (l *LevelIter) SeekLT(key []byte) *Span {
l.dir = -1
l.straddle = Span{}
l.straddleDir = 0
l.err = nil // clear cached iteration error

f := l.findFileLT(key)
if f != nil && l.keyType == manifest.KeyTypeRange && l.cmp(f.LargestRangeKey.UserKey, key) < 0 {
// Return a straddling key instead of loading the file.
l.iterFile = f
l.iter = nil
l.straddleDir = -1
// The synthetic span that we are creating ends at the seeked key. This
// is an optimization as it prevents us from loading the adjacent file's
// bounds, at the expense of this iterator appearing "inconsistent" to its
// callers i.e.:
//
// SeekLT(dd) -> {d-dd, empty}
// Prev() -> {c-d, RANGEKEYSET}
// Next() -> {d-e, empty}
//
// Seeing as the inconsistency will only be around empty spans, which are
// expected to be elided by one of the higher-level iterators (either
// top-level Iterator or the defragmenting iter), the entire iterator should
// still appear consistent to the user.
l.straddle = Span{
Start: f.LargestRangeKey.UserKey,
End: key,
Keys: nil,
nextFile := l.files.Next()
if nextFile != nil {
// We could unconditionally return an empty span between f.LargestRangeKey
// and the seek key, however if this span is to the right of all range keys
// on this level, it could lead to inconsistent behaviour in relative
// positioning operations. Consider this example, with a b-c range key:
//
// SeekLT(d) -> c-d:{}
// Prev() -> b-c{(#5,RANGEKEYSET,@4,foo)}
// Next() -> nil
//
// Iterators higher up in the iterator stack rely on this sort of relative
// positioning consistency.
//
// TODO(bilal): Investigate ways to be able to return straddle spans in
// cases similar to the above, while still retaining correctness.
l.files.Prev()
// Return a straddling key instead of loading the file.
l.iterFile = f
if err := l.Close(); err != nil {
return nil
}
l.straddleDir = -1
l.straddle = Span{
Start: f.LargestRangeKey.UserKey,
End: nextFile.SmallestRangeKey.UserKey,
Keys: nil,
}
return &l.straddle
}
return &l.straddle
}
if l.loadFile(l.findFileLT(key), -1) == noFileLoaded {
return nil
Expand All @@ -271,6 +289,8 @@ func (l *LevelIter) SeekLT(key []byte) *Span {
// First implements keyspan.FragmentIterator.
func (l *LevelIter) First() *Span {
l.dir = +1
l.straddle = Span{}
l.straddleDir = 0
l.err = nil // clear cached iteration error

if l.loadFile(l.files.First(), +1) == noFileLoaded {
Expand All @@ -285,6 +305,8 @@ func (l *LevelIter) First() *Span {
// Last implements keyspan.FragmentIterator.
func (l *LevelIter) Last() *Span {
l.dir = -1
l.straddle = Span{}
l.straddleDir = 0
l.err = nil // clear cached iteration error

if l.loadFile(l.files.Last(), -1) == noFileLoaded {
Expand All @@ -298,10 +320,14 @@ func (l *LevelIter) Last() *Span {

// Next implements keyspan.FragmentIterator.
func (l *LevelIter) Next() *Span {
l.dir = +1
if l.err != nil || (l.iter == nil && l.iterFile == nil) {
if l.err != nil || (l.iter == nil && l.iterFile == nil && l.dir > 0) {
return nil
}
if l.iter == nil && l.iterFile == nil {
// l.dir <= 0
return l.First()
}
l.dir = +1

if l.iter != nil {
if span := l.iter.Next(); span != nil {
Expand All @@ -313,10 +339,14 @@ func (l *LevelIter) Next() *Span {

// Prev implements keyspan.FragmentIterator.
func (l *LevelIter) Prev() *Span {
l.dir = -1
if l.err != nil || (l.iter == nil && l.iterFile == nil) {
if l.err != nil || (l.iter == nil && l.iterFile == nil && l.dir < 0) {
return nil
}
if l.iter == nil && l.iterFile == nil {
// l.dir >= 0
return l.Last()
}
l.dir = -1

if l.iter != nil {
if span := l.iter.Prev(); span != nil {
Expand Down Expand Up @@ -355,7 +385,7 @@ func (l *LevelIter) skipEmptyFileForward() *Span {
Start: startKey,
End: endKey,
}
l.straddleDir = l.dir
l.straddleDir = +1
return &l.straddle
}
} else if l.straddleDir < 0 {
Expand Down Expand Up @@ -416,7 +446,7 @@ func (l *LevelIter) skipEmptyFileBackward() *Span {
Start: startKey,
End: endKey,
}
l.straddleDir = l.dir
l.straddleDir = -1
return &l.straddle
}
} else if l.straddleDir > 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/keyspan/merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func (m *MergingIter) synthesizeKeys(dir int8) (bool, *Span) {
}
sort.Sort(&m.keys)

// Apply the configured transform. See VisibleTransform.
// Apply the configured transform. See visibleTransform.
s := Span{
Start: m.start,
End: m.end,
Expand Down
Loading

0 comments on commit ae99f4f

Please sign in to comment.