Skip to content

Commit

Permalink
*: Wire up L0SubLevels with iterator and version creation
Browse files Browse the repository at this point in the history
This change effectively lowers read amplification in L0 by taking
advantage of knowledge about sublevels contained in
Version.L0SubLevels and using it to create levelIters for each sublevel
instead of directly adding one iterator per file to the mergingIter.
The get_iter is also updated to take advantage of this information.

The Version struct is also updated, as a result of this change, to
hold a reference to an L0SubLevels instance. That reference is initialized
during Version initialization in version_edit.

Most of the test changes are a result of always printing the sublevel
alongside the level for L0.
  • Loading branch information
itsbilal committed Apr 21, 2020
1 parent a128988 commit 3ca6ed7
Show file tree
Hide file tree
Showing 29 changed files with 552 additions and 300 deletions.
4 changes: 2 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,10 +650,10 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r

// Check that the LSM ordering invariants are ok in order to prevent
// generating corrupted sstables due to a violation of those invariants.
if err := manifest.CheckOrdering(c.cmp, c.format, c.startLevel, c.inputs[0]); err != nil {
if err := manifest.CheckOrdering(c.cmp, c.format, c.startLevel, manifest.InvalidSublevel, c.inputs[0]); err != nil {
c.logger.Fatalf("%s", err)
}
if err := manifest.CheckOrdering(c.cmp, c.format, c.outputLevel, c.inputs[1]); err != nil {
if err := manifest.CheckOrdering(c.cmp, c.format, c.outputLevel, manifest.InvalidSublevel, c.inputs[1]); err != nil {
c.logger.Fatalf("%s", err)
}

Expand Down
56 changes: 24 additions & 32 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/pebble/internal/arenaskl"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/manual"
"github.com/cockroachdb/pebble/internal/record"
"github.com/cockroachdb/pebble/vfs"
Expand Down Expand Up @@ -365,7 +366,7 @@ func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer,
get.key = key
get.batch = b
get.mem = readState.memtables
get.l0 = readState.current.Files[0]
get.l0 = readState.current.L0SubLevels.Files
get.version = readState.current

// Strip off memtables which cannot possibly contain the seqNum being read
Expand Down Expand Up @@ -615,7 +616,7 @@ type iterAlloc struct {
dbi Iterator
merging mergingIter
mlevels [3 + numLevels]mergingIterLevel
levels [numLevels]levelIter
levels [3 + numLevels]levelIter
}

var iterAllocPool = sync.Pool{
Expand Down Expand Up @@ -687,34 +688,16 @@ func (d *DB) newIterInternal(
})
}

// The level 0 files need to be added from newest to oldest.
//
// Note that level 0 files do not contain untruncated tombstones, even in the presence of
// L0=>L0 compactions since such compactions output a single file. Therefore, we do not
// need to wrap level 0 files individually in level iterators.
current := readState.current
for i := len(current.Files[0]) - 1; i >= 0; i-- {
f := current.Files[0][i]
iter, rangeDelIter, err := d.newIters(f, &dbi.opts, nil)
if err != nil {
// Ensure the mergingIter is initialized so Iterator.Close will properly
// close any sstable iterators that have been opened.
buf.merging.init(&dbi.opts, d.cmp, mlevels...)
_ = dbi.Close()
// Return a new alloced Iterator structure, because dbi.Close will
// return dbi to a sync.Pool.
return &Iterator{err: err}
}
mlevels = append(mlevels, mergingIterLevel{
iter: iter,
rangeDelIter: rangeDelIter,
})
}

// Determine the final size for mlevels so that we can avoid any more
// reallocations. This is important because each levelIter will hold a
// reference to elements in mlevels.
start := len(mlevels)
current := readState.current
for sl := 0; sl < len(current.L0SubLevels.Files); sl++ {
if len(current.L0SubLevels.Files[sl]) > 0 {
mlevels = append(mlevels, mergingIterLevel{})
}
}
for level := 1; level < len(current.Files); level++ {
if len(current.Files[level]) == 0 {
continue
Expand All @@ -724,13 +707,11 @@ func (d *DB) newIterInternal(
finalMLevels := mlevels
mlevels = mlevels[start:]

// Add level iterators for the remaining files.
levels := buf.levels[:]
for level := 1; level < len(current.Files); level++ {
if len(current.Files[level]) == 0 {
continue
addLevelIterForFiles := func(files []*manifest.FileMetadata, level int) {
if len(files) == 0 {
return
}

var li *levelIter
if len(levels) > 0 {
li = &levels[0]
Expand All @@ -739,14 +720,25 @@ func (d *DB) newIterInternal(
li = &levelIter{}
}

li.init(dbi.opts, d.cmp, d.newIters, current.Files[level], level, nil)
li.init(dbi.opts, d.cmp, d.newIters, files, level, nil)
li.initRangeDel(&mlevels[0].rangeDelIter)
li.initSmallestLargestUserKey(&mlevels[0].smallestUserKey, &mlevels[0].largestUserKey,
&mlevels[0].isLargestUserKeyRangeDelSentinel)
mlevels[0].iter = li
mlevels = mlevels[1:]
}

// Add level iterators for the L0 sublevels, iterating from newest to
// oldest.
for i := len(current.L0SubLevels.Files) - 1; i >= 0; i-- {
addLevelIterForFiles(current.L0SubLevels.Files[i], 0)
}

// Add level iterators for the non-empty non-L0 levels.
for level := 1; level < len(current.Files); level++ {
addLevelIterForFiles(current.Files[level], level)
}

buf.merging.init(&dbi.opts, d.cmp, finalMLevels...)
buf.merging.snapshot = seqNum
buf.merging.elideRangeTombstones = true
Expand Down
4 changes: 2 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,13 @@ func TestLargeBatch(t *testing.T) {

// Verify this results in one L0 table being created.
require.NoError(t, try(100*time.Microsecond, 20*time.Second,
verifyLSM("0:\n 000005:[a-a]\n")))
verifyLSM("0.0:\n 000005:[a-a]\n")))

require.NoError(t, d.Set([]byte("b"), bytes.Repeat([]byte("b"), 512), nil))

// Verify this results in a second L0 table being created.
require.NoError(t, try(100*time.Microsecond, 20*time.Second,
verifyLSM("0:\n 000005:[a-a]\n 000007:[b-b]\n")))
verifyLSM("0.0:\n 000005:[a-a]\n 000007:[b-b]\n")))

// Allocate a bunch of batches to exhaust the batchPool. None of these
// batches should have a non-zero count.
Expand Down
4 changes: 2 additions & 2 deletions error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestRequireReadError(t *testing.T) {
require.NoError(t, d.Set(key1, value, nil))
require.NoError(t, d.Flush())
expectLSM(`
0:
0.0:
000007:[a1#4,SET-a2#72057594037927935,RANGEDEL]
6:
000005:[a1#1,SET-a2#2,SET]
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestCorruptReadError(t *testing.T) {
require.NoError(t, d.Set(key1, value, nil))
require.NoError(t, d.Flush())
expectLSM(`
0:
0.0:
000007:[a1#4,SET-a2#72057594037927935,RANGEDEL]
6:
000005:[a1#1,SET-a2#2,SET]
Expand Down
15 changes: 6 additions & 9 deletions get_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type getIter struct {
level int
batch *Batch
mem flushableList
l0 []*fileMetadata
l0 [][]*fileMetadata
version *version
iterKey *InternalKey
iterValue []byte
Expand Down Expand Up @@ -139,15 +139,12 @@ func (g *getIter) Next() (*InternalKey, []byte) {
if g.level == 0 {
// Create iterators from L0 from newest to oldest.
if n := len(g.l0); n > 0 {
l := g.l0[n-1]
g.iter, g.rangeDelIter, g.err = g.newIters(
l,
nil, /* iter options */
nil /* bytes iterated */)
if g.err != nil {
return nil, nil
}
files := g.l0[n-1]
g.l0 = g.l0[:n-1]
iterOpts := IterOptions{logger: g.logger}
g.levelIter.init(iterOpts, g.cmp, g.newIters, files, 0, nil)
g.levelIter.initRangeDel(&g.rangeDelIter)
g.iter = &g.levelIter
g.iterKey, g.iterValue = g.iter.SeekGE(g.key)
continue
}
Expand Down
5 changes: 4 additions & 1 deletion get_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,9 @@ func TestGetIter(t *testing.T) {
v.Files[tt.level] = append(v.Files[tt.level], meta)
}

if err := v.InitL0Sublevels(cmp, base.DefaultFormatter); err != nil {
t.Fatalf("desc=%q: internal error: %s", desc, err.Error())
}
err := v.CheckOrdering(cmp, base.DefaultFormatter)
if tc.badOrdering && err == nil {
t.Errorf("desc=%q: want bad ordering, got nil error", desc)
Expand All @@ -539,7 +542,7 @@ func TestGetIter(t *testing.T) {
get.equal = equal
get.newIters = newIter
get.key = ikey.UserKey
get.l0 = v.Files[0]
get.l0 = v.L0SubLevels.Files
get.version = v
get.snapshot = ikey.SeqNum() + 1

Expand Down
4 changes: 2 additions & 2 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ func TestConcurrentIngestCompact(t *testing.T) {
ingest("c")

expectLSM(`
0:
0.0:
000005:[a#2,SET-a#2,SET]
000007:[c#4,SET-c#4,SET]
6:
Expand All @@ -790,7 +790,7 @@ func TestConcurrentIngestCompact(t *testing.T) {
compact("a", "z")

expectLSM(`
0:
0.0:
000009:[b#5,SET-b#5,SET]
6:
000008:[a#0,SET-c#0,SET]
Expand Down
56 changes: 49 additions & 7 deletions internal/manifest/l0_sublevels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,35 @@ func TestL0SubLevels(t *testing.T) {
var level int
var err error
var fileMetas [NumLevels][]*FileMetadata
var explicitSublevels [][]*FileMetadata
var sublevels *L0SubLevels
baseLevel := NumLevels - 1

datadriven.RunTest(t, "testdata/l0_sublevels", func(td *datadriven.TestData) string {
switch td.Cmd {
case "define":
fileMetas = [NumLevels][]*FileMetadata{}
explicitSublevels = [][]*FileMetadata{}
baseLevel = NumLevels - 1
sublevel := -1
sublevels = nil
for _, data := range strings.Split(td.Input, "\n") {
data = strings.TrimSpace(data)
switch data {
switch data[:2] {
case "L0", "L1", "L2", "L3", "L4", "L5", "L6":
level, err = strconv.Atoi(data[1:])
level, err = strconv.Atoi(data[1:2])
if err != nil {
return err.Error()
}
if level == 0 && len(data) > 3 {
// Sublevel was specified.
sublevel, err = strconv.Atoi(data[3:])
if err != nil {
return err.Error()
}
} else {
sublevel = -1
}
default:
meta, err := parseMeta(data)
if err != nil {
Expand All @@ -121,28 +134,50 @@ func TestL0SubLevels(t *testing.T) {
baseLevel = level
}
fileMetas[level] = append(fileMetas[level], meta)
if sublevel != -1 {
for len(explicitSublevels) <= sublevel {
explicitSublevels = append(explicitSublevels, []*FileMetadata{})
}
explicitSublevels[sublevel] = append(explicitSublevels[sublevel], meta)
}
}
}

flushSplitMaxBytes := 64
initialize := true
for _, arg := range td.CmdArgs {
switch arg.Key {
case "flush_split_max_bytes":
flushSplitMaxBytes, err = strconv.Atoi(arg.Vals[0])
if err != nil {
t.Fatal(err)
}
case "no_initialize":
// This case is for use with explicitly-specified sublevels
// only.
initialize = false
}
}
for i := 0; i < NumLevels; i++ {
SortBySeqNum(fileMetas[i])
}

sublevels, err = NewL0SubLevels(
fileMetas[0],
base.DefaultComparer.Compare,
base.DefaultFormatter,
uint64(flushSplitMaxBytes))
if initialize {
sublevels, err = NewL0SubLevels(
fileMetas[0],
base.DefaultComparer.Compare,
base.DefaultFormatter,
uint64(flushSplitMaxBytes))
} else {
// This case is for use with explicitly-specified sublevels
// only.
sublevels = &L0SubLevels{
Files: explicitSublevels,
cmp: base.DefaultComparer.Compare,
format: base.DefaultFormatter,
filesByAge: fileMetas[0],
}
}

if err != nil {
t.Fatal(err)
Expand All @@ -166,6 +201,13 @@ func TestL0SubLevels(t *testing.T) {
return builder.String()
case "max-depth-after-ongoing-compactions":
return strconv.Itoa(sublevels.MaxDepthAfterOngoingCompactions())
case "l0-check-ordering":
for sublevel, files := range sublevels.Files {
if err := CheckOrdering(base.DefaultComparer.Compare, base.DefaultFormatter, 0, sublevel, files); err != nil {
return err.Error()
}
}
return "OK"
}
return fmt.Sprintf("unrecognized command: %s", td.Cmd)
})
Expand Down
42 changes: 42 additions & 0 deletions internal/manifest/testdata/l0_sublevels
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,48 @@ flush split keys(3): [b, e, j]
000003:e#5,1-j#7,1
compacting file count: 0, base compacting intervals:

define no_initialize
L0.2
0009:a.SET.10-b.SET.10
L0.1
0003:e.SET.5-j.SET.7
L0.0
0007:b.SET.6-j.SET.8
----
file count: 3, sublevels: 3, intervals: 0
flush split keys(0): []
0.2: file count: 1, bytes: 256, width (mean, max): 1.0, 1, interval range: [0, 0]
000009:a#10,1-b#10,1
0.1: file count: 1, bytes: 256, width (mean, max): 1.0, 1, interval range: [0, 0]
000003:e#5,1-j#7,1
0.0: file count: 1, bytes: 256, width (mean, max): 1.0, 1, interval range: [0, 0]
000007:b#6,1-j#8,1
compacting file count: 0, base compacting intervals:

l0-check-ordering
----
OK

define no_initialize
L0.1
0009:a.SET.10-b.SET.10
L0.0
0007:b.SET.6-j.SET.8
0003:e.SET.5-j.SET.7
----
file count: 3, sublevels: 2, intervals: 0
flush split keys(0): []
0.1: file count: 1, bytes: 256, width (mean, max): 1.0, 1, interval range: [0, 0]
000009:a#10,1-b#10,1
0.0: file count: 2, bytes: 512, width (mean, max): 1.0, 1, interval range: [0, 0]
000007:b#6,1-j#8,1
000003:e#5,1-j#7,1
compacting file count: 0, base compacting intervals:

l0-check-ordering
----
L0.0 files 000007 and 000003 have overlapping ranges: [b#6,SET-j#8,SET] vs [e#5,SET-j#7,SET]

define
L0
0001:a.SET.2-b.SET.3
Expand Down
Loading

0 comments on commit 3ca6ed7

Please sign in to comment.