Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent sstable boundaries from expanding during compaction #67

Merged
merged 1 commit into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"unsafe"

"github.com/petermattis/pebble/db"
"github.com/petermattis/pebble/internal/rangedel"
"github.com/petermattis/pebble/sstable"
"github.com/petermattis/pebble/storage"
)
Expand Down Expand Up @@ -121,7 +122,7 @@ func (c *compaction) expandInputs(inputs []fileMetadata) []fileMetadata {
end := start + len(inputs)
for ; end < len(files); end++ {
cur := &files[end-1]
next := files[end]
next := &files[end]
if c.cmp(cur.largest.UserKey, next.smallest.UserKey) < 0 {
break
}
Expand Down Expand Up @@ -232,6 +233,57 @@ func (c *compaction) elideRangeTombstone(start, end []byte) bool {
return true
}

// atomicUnitBounds returns the bounds of the atomic compaction unit containing
// the specified sstable (identified by a pointer to its fileMetadata).
func (c *compaction) atomicUnitBounds(f *fileMetadata) (lower, upper []byte) {
for i := range c.inputs {
files := c.inputs[i]
for j := range files {
if f == &files[j] {
lowerBound := f.smallest.UserKey
for k := j; k > 0; k-- {
cur := &files[k]
prev := &files[k-1]
if c.cmp(prev.largest.UserKey, cur.smallest.UserKey) < 0 {
break
}
if prev.largest.Trailer == db.InternalKeyRangeDeleteSentinel {
// The range deletion sentinel key is set for the largest key in a
// table when a range deletion tombstone straddles a table. It
// isn't necessary to include the next table in the atomic
// compaction unit as cur.largest.UserKey does not actually exist
// in the table.
break
}
lowerBound = prev.smallest.UserKey
}

upperBound := f.largest.UserKey
for k := j + 1; k < len(files); k++ {
cur := &files[k-1]
next := &files[k]
if c.cmp(cur.largest.UserKey, next.smallest.UserKey) < 0 {
break
}
if cur.largest.Trailer == db.InternalKeyRangeDeleteSentinel {
// The range deletion sentinel key is set for the largest key in a
// table when a range deletion tombstone straddles a table. It
// isn't necessary to include the next table in the atomic
// compaction unit as cur.largest.UserKey does not actually exist
// in the table.
break
}
// cur.largest.UserKey == next.largest.UserKey, so next is part of
// the atomic compaction unit.
upperBound = next.largest.UserKey
}
return lowerBound, upperBound
}
}
}
return nil, nil
}

// newInputIter returns an iterator over all the input tables in a compaction.
func (c *compaction) newInputIter(
newIters tableNewIters,
Expand Down Expand Up @@ -267,11 +319,17 @@ func (c *compaction) newInputIter(
rangeDelIter = nil
}
}
if rangeDelIter != nil {
// Truncate the range tombstones returned by the iterator to the upper
// bound of the atomic compaction unit.
lowerBound, upperBound := c.atomicUnitBounds(f)
if lowerBound != nil || upperBound != nil {
rangeDelIter = rangedel.Truncate(c.cmp, rangeDelIter, lowerBound, upperBound)
}
}
return rangeDelIter, nil, err
}

// TODO(peter,rangedel): test that range tombstones are properly included in
// the output sstable.
if c.level != 0 {
iters = append(iters, newLevelIter(nil, c.cmp, newIters, c.inputs[0]))
iters = append(iters, newLevelIter(nil, c.cmp, newRangeDelIter, c.inputs[0]))
Expand Down
53 changes: 53 additions & 0 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,3 +900,56 @@ func TestCompactionExpandInputs(t *testing.T) {
}
})
}

func TestCompactionAtomicUnitBounds(t *testing.T) {
cmp := db.DefaultComparer.Compare
var files []fileMetadata

parseMeta := func(s string) fileMetadata {
parts := strings.Split(s, "-")
if len(parts) != 2 {
t.Fatalf("malformed table spec: %s", s)
}
return fileMetadata{
smallest: db.ParseInternalKey(parts[0]),
largest: db.ParseInternalKey(parts[1]),
}
}

datadriven.RunTest(t, "testdata/compaction_atomic_unit_bounds",
func(d *datadriven.TestData) string {
switch d.Cmd {
case "define":
files = nil
if len(d.Input) == 0 {
return ""
}
for _, data := range strings.Split(d.Input, "\n") {
meta := parseMeta(data)
meta.fileNum = uint64(len(files))
files = append(files, meta)
}
sort.Sort(bySmallest{files, cmp})
return ""

case "atomic-unit-bounds":
c := &compaction{
cmp: cmp,
}
c.inputs[0] = files
if len(d.CmdArgs) != 1 {
return fmt.Sprintf("%s expects 1 argument", d.Cmd)
}
index, err := strconv.ParseInt(d.CmdArgs[0].String(), 10, 64)
if err != nil {
return err.Error()
}

lower, upper := c.atomicUnitBounds(&files[index])
return fmt.Sprintf("%s-%s\n", lower, upper)

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}
3 changes: 3 additions & 0 deletions internal/rangedel/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ type iterator interface {
// than the given key.
SeekLT(key []byte) bool

// First moves the iterator the the first key/value pair.
First() bool

// Last moves the iterator the the last key/value pair.
Last() bool

Expand Down
166 changes: 166 additions & 0 deletions internal/rangedel/testdata/truncate
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
build
1: b-d
2: d-f
3: f-h
----
1: b-d
2: d-f
3: f-h


truncate a-b
----

truncate a-c
----
1: bc

truncate a-d
----
1: b-d

truncate a-e
----
1: b-d
2: de

truncate a-f
----
1: b-d
2: d-f

truncate a-g
----
1: b-d
2: d-f
3: fg

truncate a-h
----
1: b-d
2: d-f
3: f-h


truncate b-b
----

truncate b-c
----
1: bc

truncate b-d
----
1: b-d

truncate b-e
----
1: b-d
2: de

truncate b-f
----
1: b-d
2: d-f

truncate b-g
----
1: b-d
2: d-f
3: fg

truncate b-h
----
1: b-d
2: d-f
3: f-h


truncate c-c
----

truncate c-d
----
1: cd

truncate c-e
----
1: cd
2: de

truncate c-f
----
1: cd
2: d-f

truncate c-g
----
1: cd
2: d-f
3: fg

truncate c-h
----
1: cd
2: d-f
3: f-h


truncate d-d
----

truncate d-e
----
2: de

truncate d-f
----
2: d-f

truncate d-g
----
2: d-f
3: fg

truncate d-h
----
2: d-f
3: f-h


truncate e-e
----

truncate e-f
----
2: ef

truncate e-g
----
2: ef
3: fg

truncate e-h
----
2: ef
3: f-h


truncate f-f
----

truncate f-g
----
3: fg

truncate f-h
----
3: f-h


truncate g-g
----

truncate g-h
----
3: gh
31 changes: 31 additions & 0 deletions internal/rangedel/truncate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package rangedel

import (
"github.com/petermattis/pebble/db"
)

// Truncate creates a new iterator where every tombstone in the supplied
// iterator is truncated to be contained within the range [lower, upper).
func Truncate(cmp db.Compare, iter iterator, lower, upper []byte) *Iter {
var tombstones []Tombstone
for valid := iter.First(); valid; valid = iter.Next() {
t := Tombstone{
Start: iter.Key(),
End: iter.Value(),
}
if cmp(t.Start.UserKey, lower) < 0 {
t.Start.UserKey = lower
}
if cmp(t.End, upper) > 0 {
t.End = upper
}
if cmp(t.Start.UserKey, t.End) < 0 {
tombstones = append(tombstones, t)
}
}
return NewIter(cmp, tombstones)
}
Loading