From 382574c057b9c21f7026b9df74d496df4390b9c6 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Thu, 29 Nov 2018 09:42:36 -0500 Subject: [PATCH] Implement clean cuts for compactions --- compaction.go | 46 ++++++++++++++++++ compaction_picker.go | 10 +--- compaction_test.go | 62 +++++++++++++++++++++++++ testdata/compaction_expand_inputs | 77 +++++++++++++++++++++++++++++++ 4 files changed, 186 insertions(+), 9 deletions(-) create mode 100644 testdata/compaction_expand_inputs diff --git a/compaction.go b/compaction.go index 4e2d44404e..a357b23dd8 100644 --- a/compaction.go +++ b/compaction.go @@ -9,6 +9,7 @@ import ( "fmt" "path/filepath" "sort" + "unsafe" "github.com/petermattis/pebble/db" "github.com/petermattis/pebble/sstable" @@ -74,6 +75,7 @@ func newCompaction(opts *db.Options, cur *version, level int) *compaction { // setupOtherInputs fills in the rest of the compaction inputs, regardless of // whether the compaction was automatically scheduled or user initiated. func (c *compaction) setupOtherInputs() { + c.inputs[0] = c.expandInputs(c.inputs[0]) smallest0, largest0 := ikeyRange(c.cmp, c.inputs[0], nil) c.inputs[1] = c.version.overlaps(c.level+1, c.cmp, smallest0.UserKey, largest0.UserKey) smallest01, largest01 := ikeyRange(c.cmp, c.inputs[0], c.inputs[1]) @@ -89,6 +91,49 @@ func (c *compaction) setupOtherInputs() { } } +// expandInputs expands the files in inputs[0] in order to maintain the +// invariant that the versions of keys at level+1 are older than the versions +// of keys at level. This is achieved by adding tables to the right of the +// current input tables such that the rightmost table has a "clean cut". A +// clean cut is either a change in user keys, or +func (c *compaction) expandInputs(inputs []fileMetadata) []fileMetadata { + if c.level == 0 { + // We already call version.overlaps for L0 and that call guarantees that we + // get a "clean cut". + return inputs + } + files := c.version.files[c.level] + // Pointer arithmetic to figure out the index if inputs[0] with + // files[0]. This requires that the inputs slice is a sub-slice of + // files. This is true for non-L0 files returned from version.overlaps. + if uintptr(unsafe.Pointer(&inputs[0])) < uintptr(unsafe.Pointer(&files[0])) { + panic("pebble/db: invalid input slice") + } + start := int((uintptr(unsafe.Pointer(&inputs[0])) - + uintptr(unsafe.Pointer(&files[0]))) / unsafe.Sizeof(inputs[0])) + if start >= len(files) { + panic("pebble/db: invalid input slice") + } + end := start + len(inputs) + for ; end < len(files); end++ { + cur := &files[end-1] + next := files[end] + if c.cmp(cur.largest.UserKey, next.smallest.UserKey) < 0 { + break + } + if cur.largest.Trailer == db.InternalKeyRangeDeleteSentinel { + // The range deletion sentinel key is set for the largest key in a table + // when a range deletion tombstone straddles a table. It isn't necessary + // to include the next table in the compaction as cur.largest.UserKey + // does not actually exist in the table. + break + } + // cur.largest.UserKey == next.largest.UserKey, so we need to include next + // in the compaction. + } + return files[start:end] +} + // grow grows the number of inputs at c.level without changing the number of // c.level+1 files in the compaction, and returns whether the inputs grew. sm // and la are the smallest and largest InternalKeys in all of the inputs. @@ -97,6 +142,7 @@ func (c *compaction) grow(sm, la db.InternalKey) bool { return false } grow0 := c.version.overlaps(c.level, c.cmp, sm.UserKey, la.UserKey) + grow0 = c.expandInputs(grow0) if len(grow0) <= len(c.inputs[0]) { return false } diff --git a/compaction_picker.go b/compaction_picker.go index 83c11e8248..bc7fadfb19 100644 --- a/compaction_picker.go +++ b/compaction_picker.go @@ -233,15 +233,7 @@ func (p *compactionPicker) pickAuto(opts *db.Options) (c *compaction) { vers := p.vers c = newCompaction(opts, vers, p.level) - // TODO(peter): Expand to clean cut. We need to guarantee that no newer - // version of a key is compacted to level+1 while leaving an older key at - // level. Doing so violates an invariant: for any key in the tree, newer - // levels will contain newer versions of the key. This invariant is required - // for correct operation by Get and Iterators. A clean cut is only required - // for the right side of a compaction because the left side will contain - // newer versions of a key that straddles tables and therefore is fine to - // leave at level. - c.inputs[0] = []fileMetadata{vers.files[c.level][p.file]} + c.inputs[0] = vers.files[c.level][p.file : p.file+1] // Files in level 0 may overlap each other, so pick up all overlapping ones. if c.level == 0 { diff --git a/compaction_test.go b/compaction_test.go index 382aa19700..9b0ca7d2f5 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -873,3 +873,65 @@ func TestCompactionShouldStopBefore(t *testing.T) { } }) } + +func TestCompactionExpandInputs(t *testing.T) { + cmp := db.DefaultComparer.Compare + var files []fileMetadata + + parseMeta := func(s string) fileMetadata { + parts := strings.Split(s, "-") + if len(parts) != 2 { + t.Fatalf("malformed table spec: %s", s) + } + return fileMetadata{ + smallest: db.ParseInternalKey(parts[0]), + largest: db.ParseInternalKey(parts[1]), + } + } + + datadriven.RunTest(t, "testdata/compaction_expand_inputs", + func(d *datadriven.TestData) string { + switch d.Cmd { + case "define": + files = nil + if len(d.Input) == 0 { + return "" + } + for _, data := range strings.Split(d.Input, "\n") { + meta := parseMeta(data) + meta.fileNum = uint64(len(files)) + files = append(files, meta) + } + sort.Sort(bySmallest{files, cmp}) + return "" + + case "expand-inputs": + c := &compaction{ + cmp: cmp, + version: &version{}, + level: 1, + } + c.version.files[c.level] = files + if len(d.CmdArgs) != 1 { + return fmt.Sprintf("%s expects 1 argument", d.Cmd) + } + index, err := strconv.ParseInt(d.CmdArgs[0].String(), 10, 64) + if err != nil { + return err.Error() + } + + inputs := c.expandInputs(files[index : index+1]) + + var buf bytes.Buffer + for i := range inputs { + f := &inputs[i] + fmt.Fprintf(&buf, "%d: %s-%s\n", f.fileNum, f.smallest, f.largest) + } + return buf.String() + + default: + t.Fatalf("unknown command: %s", d.Cmd) + return "" + } + }) +} diff --git a/testdata/compaction_expand_inputs b/testdata/compaction_expand_inputs new file mode 100644 index 0000000000..71df7deb74 --- /dev/null +++ b/testdata/compaction_expand_inputs @@ -0,0 +1,77 @@ +define +a.SET.1-b.SET.2 +---- + +expand-inputs 0 +---- +0: a#1,1-b#2,1 + +define +a.SET.1-b.SET.2 +c.SET.3-d.SET.4 +e.SET.5-f.SET.6 +---- + +expand-inputs 0 +---- +0: a#1,1-b#2,1 + +expand-inputs 1 +---- +1: c#3,1-d#4,1 + +expand-inputs 2 +---- +2: e#5,1-f#6,1 + +define +a.SET.1-b.SET.2 +b.SET.1-d.SET.4 +e.SET.5-f.SET.6 +---- + +expand-inputs 0 +---- +0: a#1,1-b#2,1 +1: b#1,1-d#4,1 + +expand-inputs 1 +---- +1: b#1,1-d#4,1 + +expand-inputs 2 +---- +2: e#5,1-f#6,1 + +define +a.SET.1-b.SET.2 +b.SET.1-d.SET.4 +d.SET.2-f.SET.6 +---- + +expand-inputs 0 +---- +0: a#1,1-b#2,1 +1: b#1,1-d#4,1 +2: d#2,1-f#6,1 + +expand-inputs 1 +---- +1: b#1,1-d#4,1 +2: d#2,1-f#6,1 + +define +a.SET.1-b.RANGEDEL.72057594037927935 +b.SET.1-d.SET.4 +d.SET.2-f.SET.6 +---- + +expand-inputs 0 +---- +0: a#1,1-b#72057594037927935,15 + +expand-inputs 1 +---- +1: b#1,1-d#4,1 +2: d#2,1-f#6,1 +