Skip to content

Commit

Permalink
*: Make sublevel compactions default, remove legacy code
Browse files Browse the repository at this point in the history
Now that sublevel compactions (and flush splits) have shipped
in one stable release, it's worthwhile to move the `FlushSplitBytes`
parameter out of the Experimental sub-struct, have it default to
2*L0 Target file size (same as Cockroach), and make sublevel
compactions the only supported option.

A lot of test changes to account for the sublevel compactions /
flush split default world. L0 Compaction threshold was bumped up
in some tests to account for the fact that sublevel compactions
multiply sublevel count by 2, instead of just looking at file count.
  • Loading branch information
itsbilal committed Nov 24, 2020
1 parent 2da6278 commit 32a5180
Show file tree
Hide file tree
Showing 21 changed files with 91 additions and 180 deletions.
3 changes: 2 additions & 1 deletion checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestCheckpoint(t *testing.T) {
var buf syncedBuffer
mem := vfs.NewMem()
opts := &Options{
FS: loggingFS{mem, &buf},
FS: loggingFS{mem, &buf},
L0CompactionThreshold: 10,
}

datadriven.RunTest(t, "testdata/checkpoint", func(td *datadriven.TestData) string {
Expand Down
3 changes: 1 addition & 2 deletions cmd/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func newPebbleDB(dir string) DB {
Name: "cockroach_merge_operator",
},
}
opts.Experimental.L0SublevelCompactions = true

for i := 0; i < len(opts.Levels); i++ {
l := &opts.Levels[i]
Expand All @@ -82,7 +81,7 @@ func newPebbleDB(dir string) DB {
l.EnsureDefaults()
}
opts.Levels[6].FilterPolicy = nil
opts.Experimental.FlushSplitBytes = opts.Levels[0].TargetFileSize
opts.FlushSplitBytes = opts.Levels[0].TargetFileSize

opts.EnsureDefaults()

Expand Down
4 changes: 2 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func newFlush(
}
}

if opts.Experimental.FlushSplitBytes > 0 {
if opts.FlushSplitBytes > 0 {
c.maxOutputFileSize = uint64(opts.Level(0).TargetFileSize)
c.maxOverlapBytes = maxGrandparentOverlapBytes(opts, 0)
c.grandparents = c.version.Overlaps(baseLevel, c.cmp,
Expand Down Expand Up @@ -1968,7 +1968,7 @@ func (d *DB) runCompaction(
return nil
}

splitL0Outputs := c.outputLevel.level == 0 && d.opts.Experimental.FlushSplitBytes > 0
splitL0Outputs := c.outputLevel.level == 0 && d.opts.FlushSplitBytes > 0

// finishOutput is called for an sstable with the first key of the next sstable, and for the
// last sstable with an empty key.
Expand Down
88 changes: 7 additions & 81 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,80 +762,11 @@ func (p *compactionPickerByScore) calculateL0Score(
var info candidateLevelInfo
info.outputLevel = p.baseLevel

if p.opts.Experimental.L0SublevelCompactions {
// If L0Sublevels are present, we use the sublevel count as opposed to
// the L0 file count to score this level. The base vs intra-L0
// compaction determination happens in pickAuto, not here.
info.score = float64(2*p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()) /
float64(p.opts.L0CompactionThreshold)
return info
}

// TODO(peter): The current scoring logic precludes concurrent L0->Lbase
// compactions in most cases because if there is an in-progress L0->Lbase
// compaction we'll instead preferentially score an intra-L0 compaction. One
// possible way out is to score both by increasing the size of the "scores"
// array by one and adding entries for both L0->Lbase and intra-L0
// compactions.

// We treat level-0 specially by bounding the number of files instead of
// number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too many
// level-0 compactions.
//
// (2) The files in level-0 are merged on every read and therefore we
// wish to avoid too many files when the individual file size is small
// (perhaps because of a small write-buffer setting, or very high
// compression ratios, or lots of overwrites/deletions).

// Score an L0->Lbase compaction by counting the number of idle
// (non-compacting) files in L0.
var idleL0Count, totalL0Count, intraL0Count int
iter := p.vers.Levels[0].Iter()
for f := iter.First(); f != nil; f = iter.Next() {
if f.Compacting {
intraL0Count = 0
} else {
idleL0Count++
intraL0Count++
}
totalL0Count++
}
info.score = float64(idleL0Count) / float64(p.opts.L0CompactionThreshold)

// Only start an intra-L0 compaction if there is an existing L0->Lbase
// compaction.
var l0Compaction bool
for i := range inProgressCompactions {
if inProgressCompactions[i].inputs[0].level == 0 &&
inProgressCompactions[i].outputLevel != 0 {
l0Compaction = true
break
}
}
if !l0Compaction {
return info
}

if totalL0Count < p.opts.L0CompactionThreshold+2 {
// If L0 isn't accumulating many files beyond the regular L0 trigger,
// don't resort to an intra-L0 compaction yet. This matches the RocksDB
// heuristic.
return info
}
if intraL0Count < minIntraL0Count {
// Not enough idle L0 files to perform an intra-L0 compaction. This
// matches the RocksDB heuristic. Note that if another file is flushed
// or ingested to L0, a new compaction picker will be created and we'll
// reexamine the intra-L0 score.
return info
}

// Score the intra-L0 compaction using the number of files that are
// possibly in the compaction.
info.score = float64(intraL0Count) / float64(p.opts.L0CompactionThreshold)
info.outputLevel = 0
// If L0Sublevels are present, we use the sublevel count as opposed to
// the L0 file count to score this level. The base vs intra-L0
// compaction determination happens in pickAuto, not here.
info.score = float64(2*p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()) /
float64(p.opts.L0CompactionThreshold)
return info
}

Expand Down Expand Up @@ -942,12 +873,7 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
// significantly right after a base compaction finishes, and before those
// bytes have been compacted further down the LSM.
if n := len(env.inProgressCompactions); n > 0 {
var l0ReadAmp int
if p.opts.Experimental.L0SublevelCompactions {
l0ReadAmp = p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()
} else {
l0ReadAmp = p.vers.Levels[0].Len()
}
l0ReadAmp := p.vers.L0Sublevels.MaxDepthAfterOngoingCompactions()
compactionDebt := int(p.estimatedCompactionDebt(0))
ccSignal1 := n * p.opts.Experimental.L0CompactionConcurrency
ccSignal2 := n * p.opts.Experimental.CompactionDebtConcurrency
Expand Down Expand Up @@ -1020,7 +946,7 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
continue
}

if info.level == 0 && p.opts.Experimental.L0SublevelCompactions {
if info.level == 0 {
pc = pickL0(env, p.opts, p.vers, p.baseLevel)
// Fail-safe to protect against compacting the same sstable
// concurrently.
Expand Down
10 changes: 7 additions & 3 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ func loadVersion(d *datadriven.TestData) (*version, *Options, [numLevels]int64,
return nil, nil, sizes, err.Error()
}
for i := uint64(1); sizes[level] < int64(size); i++ {
key := base.MakeInternalKey([]byte(fmt.Sprintf("%04d", i)), i, InternalKeyKindSet)
var key InternalKey
if level == 0 {
// For L0, make `size` overlapping files.
key = base.MakeInternalKey([]byte(fmt.Sprintf("%04d", 1)), i, InternalKeyKindSet)
} else {
key = base.MakeInternalKey([]byte(fmt.Sprintf("%04d", i)), i, InternalKeyKindSet)
}
m := &fileMetadata{
Smallest: key,
Largest: key,
Expand Down Expand Up @@ -458,7 +464,6 @@ func TestCompactionPickerL0(t *testing.T) {
}

opts := (*Options)(nil).EnsureDefaults()
opts.Experimental.L0SublevelCompactions = true
opts.Experimental.L0CompactionConcurrency = 1
var picker *compactionPickerByScore
var inProgressCompactions []compactionInfo
Expand Down Expand Up @@ -690,7 +695,6 @@ func TestCompactionPickerConcurrency(t *testing.T) {
}

opts := (*Options)(nil).EnsureDefaults()
opts.Experimental.L0SublevelCompactions = true
opts.Experimental.L0CompactionConcurrency = 1
var picker *compactionPickerByScore
var inProgressCompactions []compactionInfo
Expand Down
22 changes: 12 additions & 10 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newVersion(opts *Options, files [numLevels][]*fileMetadata) *version {
return manifest.NewVersion(
opts.Comparer.Compare,
opts.Comparer.FormatKey,
opts.Experimental.FlushSplitBytes,
opts.FlushSplitBytes,
files)
}

Expand Down Expand Up @@ -798,9 +798,10 @@ func TestCompaction(t *testing.T) {

mem := vfs.NewMem()
opts := &Options{
FS: mem,
MemTableSize: memTableSize,
DebugCheck: DebugCheckLevels,
FS: mem,
MemTableSize: memTableSize,
DebugCheck: DebugCheckLevels,
L0CompactionThreshold: 8,
}
opts.private.enablePacing = true
d, err := Open("", opts)
Expand Down Expand Up @@ -867,12 +868,13 @@ func TestCompaction(t *testing.T) {
{"+D", "D", "Aa.BC.Bb."},
{"-a", "Da", "Aa.BC.Bb."},
{"+d", "Dad", "Aa.BC.Bb."},
// The next addition creates the fourth level-0 table, and l0CompactionTrigger == 4,
// so this triggers a non-trivial compaction into one level-1 table. Note
// that the keys in this one larger table are interleaved from the four smaller ones.
{"+E", "E", "ABCDbd."},
{"+e", "Ee", "ABCDbd."},
{"+F", "F", "ABCDbd.Ee."},
{"+E", "E", "Aa.BC.Bb.Dad."},
{"+e", "Ee", "Aa.BC.Bb.Dad."},
// The next addition creates the fourth level-0 table, and l0CompactionTrigger == 8,
// but since the sublevel count is doubled when comparing with l0CompactionTrigger,
// the addition of the 4th sublevel triggers a non-trivial compaction into one level-1 table.
// Note that the keys in this one larger table are interleaved from the four smaller ones.
{"+F", "F", "ABCDEbde."},
}
for _, tc := range testCases {
if key := tc.key[1:]; tc.key[0] == '+' {
Expand Down
7 changes: 1 addition & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,12 +1331,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
continue
}
}
var l0ReadAmp int
if d.opts.Experimental.L0SublevelCompactions {
l0ReadAmp = d.mu.versions.currentVersion().L0Sublevels.ReadAmplification()
} else {
l0ReadAmp = d.mu.versions.currentVersion().Levels[0].Len()
}
l0ReadAmp := d.mu.versions.currentVersion().L0Sublevels.ReadAmplification()
if l0ReadAmp >= d.opts.L0StopWritesThreshold {
// There are too many level-0 files, so we wait.
if !stalled {
Expand Down
3 changes: 2 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,8 @@ func TestSingleDeleteFlush(t *testing.T) {

func TestUnremovableSingleDelete(t *testing.T) {
d, err := Open("", &Options{
FS: vfs.NewMem(),
FS: vfs.NewMem(),
L0CompactionThreshold: 8,
})
require.NoError(t, err)
defer func() {
Expand Down
9 changes: 5 additions & 4 deletions event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ func TestEventListener(t *testing.T) {
case "open":
buf.Reset()
opts := &Options{
FS: loggingFS{mem, &buf},
EventListener: MakeLoggingEventListener(&buf),
MaxManifestFileSize: 1,
WALDir: "wal",
FS: loggingFS{mem, &buf},
EventListener: MakeLoggingEventListener(&buf),
MaxManifestFileSize: 1,
L0CompactionThreshold: 10,
WALDir: "wal",
}
// The table stats collector runs asynchronously and its
// timing is less predictable. It increments nextJobID, which
Expand Down
16 changes: 10 additions & 6 deletions flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ import (
)

func TestManualFlush(t *testing.T) {
d, err := Open("", &Options{
FS: vfs.NewMem(),
})
getOptions := func() *Options {
opts := &Options{
FS: vfs.NewMem(),
L0CompactionThreshold: 10,
}
opts.private.disableAutomaticCompactions = true
return opts
}
d, err := Open("", getOptions())
require.NoError(t, err)
defer func() {
require.NoError(t, d.Close())
Expand Down Expand Up @@ -75,9 +81,7 @@ func TestManualFlush(t *testing.T) {
if err := d.Close(); err != nil {
return err.Error()
}
d, err = Open("", &Options{
FS: vfs.NewMem(),
})
d, err = Open("", getOptions())
if err != nil {
return err.Error()
}
Expand Down
5 changes: 3 additions & 2 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,9 @@ func TestIngestError(t *testing.T) {

inj := errorfs.OnIndex(-1)
d, err := Open("", &Options{
FS: errorfs.Wrap(mem, inj),
Logger: panicLogger{},
FS: errorfs.Wrap(mem, inj),
Logger: panicLogger{},
L0CompactionThreshold: 8,
})
require.NoError(t, err)
// Force the creation of an L0 sstable that overlaps with the tables
Expand Down
3 changes: 1 addition & 2 deletions internal/metamorphic/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,8 @@ func randomOptions(rng *rand.Rand) *testOptions {
opts.BytesPerSync = 1 << uint(rng.Intn(28)) // 1B - 256MB
opts.Cache = cache.New(1 << uint(rng.Intn(30))) // 1B - 1GB
opts.DisableWAL = rng.Intn(2) == 0
opts.Experimental.FlushSplitBytes = 1 << rng.Intn(20) // 1B - 1MB
opts.FlushSplitBytes = 1 << rng.Intn(20) // 1B - 1MB
opts.Experimental.L0CompactionConcurrency = 1 + rng.Intn(4) // 1-4
opts.Experimental.L0SublevelCompactions = rng.Intn(2) == 0
opts.Experimental.MinDeletionRate = 1 << uint(20 + rng.Intn(10)) // 1MB - 1GB
opts.L0CompactionThreshold = 1 + rng.Intn(100) // 1 - 100
opts.L0StopWritesThreshold = 1 + rng.Intn(100) // 1 - 100
Expand Down
3 changes: 2 additions & 1 deletion metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ zmemtbl 14 13 B

func TestMetrics(t *testing.T) {
d, err := Open("", &Options{
FS: vfs.NewMem(),
FS: vfs.NewMem(),
L0CompactionThreshold: 8,
})
require.NoError(t, err)
defer func() {
Expand Down
Loading

0 comments on commit 32a5180

Please sign in to comment.