Skip to content

Commit

Permalink
db: remove code related to atomic units
Browse files Browse the repository at this point in the history
The concept of "atomic units" is related to expanding the boundary of
a compaction so that it does not cross a split user key; this can now
be removed.
  • Loading branch information
RaduBerinde committed Dec 16, 2023
1 parent 49c386a commit 769e2ab
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 332 deletions.
49 changes: 0 additions & 49 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,55 +1568,6 @@ func (c *compaction) newRangeDelIter(
closer := rangeDelIter
rangeDelIter = noCloseIter{rangeDelIter}

// Truncate the range tombstones returned by the iterator to the
// upper bound of the atomic compaction unit of the file. We want to
// truncate the range tombstone to the bounds of the file, but files
// with split user keys pose an obstacle: The file's largest bound
// is inclusive whereas the range tombstone's end is exclusive.
//
// Consider the example:
//
// 000001:[b-f#200] range del [c,k)
// 000002:[f#190-g#inf] range del [c,k)
// 000003:[g#500-i#3]
//
// Files 000001 and 000002 contain the untruncated range tombstones
// [c,k). While the keyspace covered by 000003 was at one point
// deleted by the tombstone [c,k), the tombstone may have already
// been compacted away and the file does not contain an untruncated
// range tombstone. We want to bound 000001's tombstone to the file
// bounds, but it's not possible to encode a range tombstone with an
// end boundary within a user key (eg, between sequence numbers
// f#200 and f#190). Instead, we expand 000001 to its atomic
// compaction unit (000001 and 000002) and truncate the tombstone to
// g#inf.
//
// NB: We must not use the atomic compaction unit of the entire
// compaction, because the [c,k) tombstone contained in the file
// 000001 ≥ g. If 000001, 000002 and 000003 are all included in the
// same compaction, the compaction's atomic compaction unit includes
// 000003. However 000003's keys must not be covered by 000001's
// untruncated range tombstone.
//
// Note that we need do this truncation at read time in order to
// handle sstables generated by RocksDB and earlier versions of
// Pebble which do not truncate range tombstones to atomic
// compaction unit boundaries at write time.
//
// The current Pebble compaction logic DOES truncate tombstones to
// atomic unit boundaries at compaction time too.
atomicUnit, _ := expandToAtomicUnit(c.cmp, f.Slice(), true /* disableIsCompacting */)
lowerBound, upperBound := manifest.KeyRange(c.cmp, atomicUnit.Iter())
// Range deletion tombstones are often written to sstables
// untruncated on the end key side. However, they are still only
// valid within a given file's bounds. The logic for writing range
// tombstones to an output file sometimes has an incomplete view
// of range tombstones outside the file's internal key bounds. Skip
// any range tombstones completely outside file bounds.
rangeDelIter = keyspan.Truncate(
c.cmp, rangeDelIter, lowerBound.UserKey, upperBound.UserKey,
&f.Smallest, &f.Largest, false, /* panicOnUpperTruncate */
)
return rangeDelIter, closer, nil
}

Expand Down
158 changes: 20 additions & 138 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,10 @@ func (pc *pickedCompaction) setupInputs(
opts, adjustedOutputLevel(pc.outputLevel.level, pc.baseLevel), diskAvailBytes,
)

// Expand the initial inputs to a clean cut.
var isCompacting bool
startLevel.files, isCompacting = expandToAtomicUnit(pc.cmp, startLevel.files, false /* disableIsCompacting */)
if isCompacting {
if anyTablesCompacting(startLevel.files) {
return false
}

pc.maybeExpandBounds(manifest.KeyRange(pc.cmp, startLevel.files.Iter()))

// Determine the sstables in the output level which overlap with the input
Expand All @@ -424,11 +422,10 @@ func (pc *pickedCompaction) setupInputs(
if startLevel.level != pc.outputLevel.level {
pc.outputLevel.files = pc.version.Overlaps(pc.outputLevel.level, pc.cmp, pc.smallest.UserKey,
pc.largest.UserKey, pc.largest.IsExclusiveSentinel())
pc.outputLevel.files, isCompacting = expandToAtomicUnit(pc.cmp, pc.outputLevel.files,
false /* disableIsCompacting */)
if isCompacting {
if anyTablesCompacting(pc.outputLevel.files) {
return false
}

pc.maybeExpandBounds(manifest.KeyRange(pc.cmp,
startLevel.files.Iter(), pc.outputLevel.files.Iter()))
}
Expand Down Expand Up @@ -519,8 +516,7 @@ func (pc *pickedCompaction) grow(
}
grow0 := pc.version.Overlaps(startLevel.level, pc.cmp, sm.UserKey,
la.UserKey, la.IsExclusiveSentinel())
grow0, isCompacting := expandToAtomicUnit(pc.cmp, grow0, false /* disableIsCompacting */)
if isCompacting {
if anyTablesCompacting(grow0) {
return false
}
if grow0.Len() <= startLevel.files.Len() {
Expand All @@ -534,8 +530,7 @@ func (pc *pickedCompaction) grow(
sm1, la1 := manifest.KeyRange(pc.cmp, grow0.Iter(), pc.outputLevel.files.Iter())
grow1 := pc.version.Overlaps(pc.outputLevel.level, pc.cmp, sm1.UserKey,
la1.UserKey, la1.IsExclusiveSentinel())
grow1, isCompacting = expandToAtomicUnit(pc.cmp, grow1, false /* disableIsCompacting */)
if isCompacting {
if anyTablesCompacting(grow1) {
return false
}
if grow1.Len() != pc.outputLevel.files.Len() {
Expand Down Expand Up @@ -568,110 +563,16 @@ func (pc *pickedCompaction) setupMultiLevelCandidate(opts *Options, diskAvailByt
return pc.setupInputs(opts, diskAvailBytes, pc.extraLevels[len(pc.extraLevels)-1])
}

// expandToAtomicUnit expands the provided level slice within its level both
// forwards and backwards to its "atomic compaction unit" boundaries, if
// necessary.
//
// While picking compaction inputs, this is required to maintain the invariant
// that the versions of keys at level+1 are older than the versions of keys at
// level. Tables are added to the right of the current slice tables such that
// the rightmost table has a "clean cut". A clean cut is either a change in
// user keys, or when the largest key in the left sstable is a range tombstone
// sentinel key (InternalKeyRangeDeleteSentinel).
//
// In addition to maintaining the seqnum invariant, expandToAtomicUnit is used
// to provide clean boundaries for range tombstone truncation during
// compaction. In order to achieve these clean boundaries, expandToAtomicUnit
// needs to find a "clean cut" on the left edge of the compaction as well.
// This is necessary in order for "atomic compaction units" to always be
// compacted as a unit. Failure to do this leads to a subtle bug with
// truncation of range tombstones to atomic compaction unit boundaries.
// Consider the scenario:
//
// L3:
// 12:[a#2,15-b#1,1]
// 13:[b#0,15-d#72057594037927935,15]
//
// These sstables contain a range tombstone [a-d)#2 which spans the two
// sstables. The two sstables need to always be kept together. Compacting
// sstable 13 independently of sstable 12 would result in:
//
// L3:
// 12:[a#2,15-b#1,1]
// L4:
// 14:[b#0,15-d#72057594037927935,15]
//
// This state is still ok, but when sstable 12 is next compacted, its range
// tombstones will be truncated at "b" (the largest key in its atomic
// compaction unit). In the scenario here, that could result in b#1 becoming
// visible when it should be deleted.
//
// isCompacting is returned true for any atomic units that contain files that
// have in-progress compactions, i.e. FileMetadata.Compacting == true. If
// disableIsCompacting is true, isCompacting always returns false. This helps
// avoid spurious races from being detected when this method is used outside
// of compaction picking code.
//
// TODO(jackson): Compactions and flushes no longer split a user key between two
// sstables. We could perform a migration, re-compacting any sstables with split
// user keys, which would allow us to remove atomic compaction unit expansion
// code.
func expandToAtomicUnit(
cmp Compare, inputs manifest.LevelSlice, disableIsCompacting bool,
) (slice manifest.LevelSlice, isCompacting bool) {
// NB: Inputs for L0 can't be expanded and *version.Overlaps guarantees
// that we get a 'clean cut.' For L0, Overlaps will return a slice without
// access to the rest of the L0 files, so it's OK to try to reslice.
if inputs.Empty() {
// Nothing to expand.
return inputs, false
}

// TODO(jackson): Update to avoid use of LevelIterator.Current(). The
// Reslice interface will require some tweaking, because we currently rely
// on Reslice having already positioned the LevelIterator appropriately.

inputs = inputs.Reslice(func(start, end *manifest.LevelIterator) {
iter := start.Clone()
iter.Prev()
for cur, prev := start.Current(), iter.Current(); prev != nil; cur, prev = start.Prev(), iter.Prev() {
if cur.IsCompacting() {
isCompacting = true
}
if cmp(prev.Largest.UserKey, cur.Smallest.UserKey) < 0 {
break
}
if prev.Largest.IsExclusiveSentinel() {
// The table prev has a largest key indicating that the user key
// prev.largest.UserKey doesn't actually exist in the table.
break
}
// prev.Largest.UserKey == cur.Smallest.UserKey, so we need to
// include prev in the compaction.
}

iter = end.Clone()
iter.Next()
for cur, next := end.Current(), iter.Current(); next != nil; cur, next = end.Next(), iter.Next() {
if cur.IsCompacting() {
isCompacting = true
}
if cmp(cur.Largest.UserKey, next.Smallest.UserKey) < 0 {
break
}
if cur.Largest.IsExclusiveSentinel() {
// The table cur has a largest key indicating that the user key
// cur.largest.UserKey doesn't actually exist in the table.
break
}
// cur.Largest.UserKey == next.Smallest.UserKey, so we need to
// include next in the compaction.
// anyTablesCompacting returns true if any tables in the level slice are
// compacting.
func anyTablesCompacting(inputs manifest.LevelSlice) bool {
it := inputs.Iter()
for f := it.First(); f != nil; f = it.Next() {
if f.IsCompacting() {
return true
}
})
inputIter := inputs.Iter()
isCompacting = !disableIsCompacting &&
(isCompacting || inputIter.First().IsCompacting() || inputIter.Last().IsCompacting())
return inputs, isCompacting
}
return false
}

func newCompactionPicker(
Expand Down Expand Up @@ -1578,9 +1479,8 @@ func (p *compactionPickerByScore) pickElisionOnlyCompaction(
// compaction unit.
pc = newPickedCompaction(p.opts, p.vers, numLevels-1, numLevels-1, p.baseLevel)
pc.kind = compactionKindElisionOnly
var isCompacting bool
pc.startLevel.files, isCompacting = expandToAtomicUnit(p.opts.Comparer.Compare, lf.Slice(), false /* disableIsCompacting */)
if isCompacting {
pc.startLevel.files = lf.Slice()
if anyTablesCompacting(lf.Slice()) {
return nil
}
pc.smallest, pc.largest = manifest.KeyRange(pc.cmp, pc.startLevel.files.Iter())
Expand Down Expand Up @@ -1614,27 +1514,9 @@ func (p *compactionPickerByScore) pickRewriteCompaction(env compactionEnv) (pc *
}

inputs := lf.Slice()
// L0 files generated by a flush have never been split such that
// adjacent files can contain the same user key. So we do not need to
// rewrite an atomic compaction unit for L0. Note that there is nothing
// preventing two different flushes from producing files that are
// non-overlapping from an InternalKey perspective, but span the same
// user key. However, such files cannot be in the same L0 sublevel,
// since each sublevel requires non-overlapping user keys (unlike other
// levels).
if l > 0 {
// Find this file's atomic compaction unit. This is only relevant
// for levels L1+.
var isCompacting bool
inputs, isCompacting = expandToAtomicUnit(
p.opts.Comparer.Compare,
inputs,
false, /* disableIsCompacting */
)
if isCompacting {
// Try the next level.
continue
}
if anyTablesCompacting(inputs) {
// Try the next level.
continue
}

pc = newPickedCompaction(p.opts, p.vers, l, l, p.baseLevel)
Expand Down
4 changes: 1 addition & 3 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,10 +1175,8 @@ func TestPickedCompactionExpandInputs(t *testing.T) {
_ = iter.Next()
}

inputs, _ := expandToAtomicUnit(cmp, iter.Take().Slice(), true /* disableIsCompacting */)

var buf bytes.Buffer
inputs.Each(func(f *fileMetadata) {
iter.Take().Slice().Each(func(f *fileMetadata) {
fmt.Fprintf(&buf, "%d: %s-%s\n", f.FileNum, f.Smallest, f.Largest)
})
return buf.String()
Expand Down
66 changes: 0 additions & 66 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1752,72 +1752,6 @@ func TestCompactionOutputLevel(t *testing.T) {
})
}

func TestCompactionAtomicUnitBounds(t *testing.T) {
cmp := DefaultComparer.Compare
var files manifest.LevelSlice

parseMeta := func(s string) *fileMetadata {
parts := strings.Split(s, "-")
if len(parts) != 2 {
t.Fatalf("malformed table spec: %s", s)
}
m := (&fileMetadata{}).ExtendPointKeyBounds(
cmp,
base.ParseInternalKey(parts[0]),
base.ParseInternalKey(parts[1]),
)
m.InitPhysicalBacking()
return m
}

datadriven.RunTest(t, "testdata/compaction_atomic_unit_bounds",
func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "define":
files = manifest.LevelSlice{}
if len(d.Input) == 0 {
return ""
}
var ff []*fileMetadata
for _, data := range strings.Split(d.Input, "\n") {
meta := parseMeta(data)
meta.FileNum = FileNum(len(ff))
ff = append(ff, meta)
}
files = manifest.NewLevelSliceKeySorted(cmp, ff)
return ""

case "atomic-unit-bounds":
c := &compaction{
cmp: cmp,
equal: DefaultComparer.Equal,
comparer: DefaultComparer,
inputs: []compactionLevel{{files: files}, {}},
}
c.startLevel, c.outputLevel = &c.inputs[0], &c.inputs[1]
if len(d.CmdArgs) != 1 {
return fmt.Sprintf("%s expects 1 argument", d.Cmd)
}
index, err := strconv.ParseInt(d.CmdArgs[0].String(), 10, 64)
if err != nil {
return err.Error()
}
iter := files.Iter()
// Advance iter to `index`.
_ = iter.First()
for i := int64(0); i < index; i++ {
_ = iter.Next()
}
atomicUnit, _ := expandToAtomicUnit(c.cmp, iter.Take().Slice(), true /* disableIsCompacting */)
lower, upper := manifest.KeyRange(c.cmp, atomicUnit.Iter())
return fmt.Sprintf("%s-%s\n", lower.UserKey, upper.UserKey)

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
})
}

func TestCompactionDeleteOnlyHints(t *testing.T) {
parseUint64 := func(s string) uint64 {
v, err := strconv.ParseUint(s, 10, 64)
Expand Down
Loading

0 comments on commit 769e2ab

Please sign in to comment.