Skip to content

Commit

Permalink
compaction: don't split outputs within a user key
Browse files Browse the repository at this point in the history
During a compaction, if the current sstable is hit the file size limit, defer
finishing the sstable if the next sstable would share a user key. This is the
current behavior of flushes, and this change brings parity between the two.

This change is motivated by introduction of range keys (see cockroachdb#1339). This
ensures we can always cleanly truncate range keys that span range-key boundaries.

This commit also removes (keyspan.Fragmenter).FlushTo. Now that we prohibit
splitting sstables in the middle of a user key, the Fragmenter's FlushTo
function is unnecessary. Compactions and flushes always use the
TruncateAndFlushTo variant.

This change required a tweak to the way grandparent limits are applied, in
order to switch the grandparent splitter's comparison into a >= comparsion.
This was necessary due to the shift in interpreting `splitterSuggestion`s as
exclusive boundaries.
  • Loading branch information
jbowens committed Jan 25, 2022
1 parent ca9b452 commit 28db1fc
Show file tree
Hide file tree
Showing 15 changed files with 158 additions and 461 deletions.
184 changes: 62 additions & 122 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ type grandparentLimitSplitter struct {
func (g *grandparentLimitSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
if g.limit != nil && g.c.cmp(key.UserKey, g.limit) > 0 {
// NB: The limit must be applied using >= since g.limit may be used as the
// `splitterSuggestion` ultimately passed to `compactionIter.Tombstones` to
// serve as an *exclusive* end boundary truncation point. If we used > then,
// we may have already added a key with the user key `g.limit` to the
// previous sstable.
if g.limit != nil && g.c.cmp(key.UserKey, g.limit) >= 0 {
return splitNow
}
return noSplit
Expand Down Expand Up @@ -854,14 +859,6 @@ func seekGT(iter *manifest.LevelIterator, cmp base.Compare, key []byte) *manifes
// current heuristic stops output of a table if the addition of another key
// would cause the table to overlap more than 10x the target file size at
// level N. See maxGrandparentOverlapBytes.
//
// TODO(peter): Stopping compaction output in the middle of a user-key creates
// 2 sstables that need to be compacted together as an "atomic compaction
// unit". This is unfortunate as it removes the benefit of stopping output to
// an sstable in order to prevent a large compaction with the next level. Seems
// better to adjust findGrandparentLimit to not stop output in the middle of a
// user-key. Perhaps this isn't a problem if the compaction picking heuristics
// always pick the right (older) sibling for compaction first.
func (c *compaction) findGrandparentLimit(start []byte) []byte {
iter := c.grandparents.Iter()
var overlappedBytes uint64
Expand All @@ -870,8 +867,8 @@ func (c *compaction) findGrandparentLimit(start []byte) []byte {
// To ensure forward progress we always return a larger user
// key than where we started. See comments above clients of
// this function for how this is used.
if overlappedBytes > c.maxOverlapBytes && c.cmp(start, f.Largest.UserKey) < 0 {
return f.Largest.UserKey
if overlappedBytes > c.maxOverlapBytes && c.cmp(start, f.Smallest.UserKey) < 0 {
return f.Smallest.UserKey
}
}
return nil
Expand Down Expand Up @@ -1056,13 +1053,15 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r
c.closers = append(c.closers, rangeDelIter)
rangeDelIter = noCloseIter{rangeDelIter}

// Truncate the range tombstones returned by the iterator to the upper
// bound of the atomic compaction unit. Note that we need do this
// truncation at read time in order to handle RocksDB generated sstables
// which do not truncate range tombstones to atomic compaction unit
// boundaries at write time. Because we're doing the truncation at read
// time, we follow RocksDB's lead and do not truncate tombstones to
// atomic unit boundaries at compaction time.
// Truncate the range tombstones returned by the iterator to the
// upper bound of the atomic compaction unit. Note that we need do
// this truncation at read time in order to handle sstables
// generated by RocksDB and earlier versions of Pebble which do not
// truncate range tombstones to atomic compaction unit boundaries at
// write time.
//
// The current Pebble compaction logic DOES truncate tombstones to
// atomic unit boundaries at compaction time too.
atomicUnit, _ := expandToAtomicUnit(c.cmp, f.Slice(), true /* disableIsCompacting */)
lowerBound, upperBound := manifest.KeyRange(c.cmp, atomicUnit.Iter())
// Range deletion tombstones are often written to sstables
Expand Down Expand Up @@ -2205,7 +2204,7 @@ func (d *DB) runCompaction(
// compactionIter.Tombstones via keyspan.Fragmenter.FlushTo, and by the
// WriterMetadata.LargestRangeDel.UserKey.
splitKey = append([]byte(nil), splitKey...)
for _, v := range iter.Tombstones(splitKey, splitL0Outputs) {
for _, v := range iter.Tombstones(splitKey) {
if tw == nil {
if err := newOutput(); err != nil {
return err
Expand Down Expand Up @@ -2272,10 +2271,9 @@ func (d *DB) runCompaction(
outputMetrics.Size += int64(meta.Size)
outputMetrics.NumFiles++

// The handling of range boundaries is a bit complicated.
if n := len(ve.NewFiles); n > 1 {
// This is not the first output file. Bound the smallest range key by the
// previous tables largest key.
// This is not the first output file. Ensure the sstable boundaries
// are nonoverlapping.
prevMeta := ve.NewFiles[n-2].Meta
if writerMeta.SmallestRangeDel.UserKey != nil {
c := d.cmp(writerMeta.SmallestRangeDel.UserKey, prevMeta.Largest.UserKey)
Expand All @@ -2284,40 +2282,17 @@ func (d *DB) runCompaction(
"pebble: smallest range tombstone start key is less than previous sstable largest key: %s < %s",
writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey),
prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey))
}
if c == 0 && prevMeta.Largest.SeqNum() <= writerMeta.SmallestRangeDel.SeqNum() {
// The user key portion of the range boundary start key is equal to
// the previous table's largest key. We need the tables to be
// key-space partitioned, so force the boundary to a key that we know
// is larger than the previous table's largest key.
if prevMeta.Largest.SeqNum() == 0 {
// If the seqnum of the previous table's largest key is 0, we can't
// decrement it. This should never happen as we take care in the
// main compaction loop to avoid generating an sstable with a
// largest key containing a zero seqnum.
return errors.Errorf(
"pebble: previous sstable largest key unexpectedly has 0 seqnum: %s",
prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey))
}
// TODO(peter): Technically, this produces a small gap with the
// previous sstable. The largest key of the previous table may be
// b#5.SET. The smallest key of the new sstable may then become
// b#4.RANGEDEL even though the tombstone is [b,z)#6. If iteration
// ever precisely truncates sstable boundaries, the key b#5.DEL at
// a lower level could slip through. Note that this can't ever
// actually happen, though, because the only way for two records to
// have the same seqnum is via ingestion. And even if it did happen
// revealing a deletion tombstone is not problematic. Slightly more
// worrisome is the combination of b#5.MERGE, b#5.SET and
// b#4.RANGEDEL, but we can't ever see b#5.MERGE and b#5.SET at
// different levels in the tree due to the ingestion argument and
// atomic compaction units.
//
// TODO(sumeer): Incorporate the the comment in
// https://github.com/cockroachdb/pebble/pull/479#pullrequestreview-340600654
// into docs/range_deletions.md and reference the correctness
// argument here. Note that that comment might be slightly incorrect.
writerMeta.SmallestRangeDel.SetSeqNum(prevMeta.Largest.SeqNum() - 1)
} else if c == 0 && !prevMeta.Largest.IsExclusiveSentinel() {
// The user key portion of the range boundary start key is
// equal to the previous table's largest key user key, and
// the previous table's largest key is not exclusive. This
// violates the invariant that tables are key-space
// partitioned.
return errors.Errorf(
"pebble: invariant violation: previous sstable largest key %s, current sstable smallest rangedel: %s",
prevMeta.Largest.Pretty(d.opts.Comparer.FormatKey),
writerMeta.SmallestRangeDel.Pretty(d.opts.Comparer.FormatKey),
)
}
}
}
Expand Down Expand Up @@ -2365,12 +2340,10 @@ func (d *DB) runCompaction(
c.largest.Pretty(d.opts.Comparer.FormatKey))
}
}
// Verify that when splitting an output to L0, we never split different
// revisions of the same user key across two different sstables.
if splitL0Outputs {
if err := c.errorOnUserKeyOverlap(ve); err != nil {
return err
}
// Verify that we never split different revisions of the same user key
// across two different sstables.
if err := c.errorOnUserKeyOverlap(ve); err != nil {
return err
}
if err := meta.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil {
return err
Expand All @@ -2384,40 +2357,29 @@ func (d *DB) runCompaction(
// the splitterGroup can be composed of multiple splitters. In this case,
// we start off with splitters for file sizes, grandparent limits, and (for
// L0 splits) L0 limits, before wrapping them in an splitterGroup.
//
// There is a complication here: We may not be able to switch SSTables right
// away when we are splitting an L0 output. We do not split the same user
// key across different sstables within one flush or intra-L0 compaction, so
// the userKeyChangeSplitter ensures we are at a user key change boundary
// when doing a split.
outputSplitters := []compactionOutputSplitter{
&fileSizeSplitter{maxFileSize: c.maxOutputFileSize},
&grandparentLimitSplitter{c: c, ve: ve},
}
var splitter compactionOutputSplitter
if splitL0Outputs {
// outputSplitters[0] is the file size splitter, which doesn't guarantee
// that all advised splits will be at user key change boundaries. Wrap
// it in a userKeyChangeSplitter.
outputSplitters[0] = &userKeyChangeSplitter{
cmp: c.cmp,
splitter: outputSplitters[0],
unsafePrevUserKey: func() []byte {
// Return the largest point key written to tw or the start of
// the current range deletion in the fragmenter, whichever is
// greater.
prevPoint := prevPointKey.UnsafeKey()
if c.cmp(prevPoint.UserKey, c.rangeDelFrag.Start()) > 0 {
return prevPoint.UserKey
}
return c.rangeDelFrag.Start()
},
}
outputSplitters = append(outputSplitters, &l0LimitSplitter{c: c, ve: ve})
}
splitter = &splitterGroup{
cmp: c.cmp,
splitters: outputSplitters,
// We do not split the same user key across different sstables within one
// flush or compaction, so the userKeyChangeSplitter ensures we are at a
// user key change boundary when doing a split.
var splitter compactionOutputSplitter = &userKeyChangeSplitter{
cmp: c.cmp,
splitter: &splitterGroup{cmp: c.cmp, splitters: outputSplitters},
unsafePrevUserKey: func() []byte {
// Return the largest point key written to tw or the start of
// the current range deletion in the fragmenter, whichever is
// greater.
prevPoint := prevPointKey.UnsafeKey()
if c.cmp(prevPoint.UserKey, c.rangeDelFrag.Start()) > 0 {
return prevPoint.UserKey
}
return c.rangeDelFrag.Start()
},
}

// NB: we avoid calling maybeThrottle on a nilPacer because the cost of
Expand All @@ -2440,7 +2402,6 @@ func (d *DB) runCompaction(
splitterSuggestion := splitter.onNewOutput(key)

// Each inner loop iteration processes one key from the input iterator.
prevPointSeqNum := InternalKeySeqNumMax
for ; key != nil; key, val = iter.Next() {
if split := splitter.shouldSplitBefore(key, tw); split == splitNow {
break
Expand Down Expand Up @@ -2468,21 +2429,11 @@ func (d *DB) runCompaction(
if err := tw.Add(*key, val); err != nil {
return nil, pendingOutputs, err
}
prevPointSeqNum = key.SeqNum()
}

// A splitter requested a split, and we're ready to finish the output.
// We need to choose the key at which to split any pending range
// tombstones.
//
// There's a complication here. We need to ensure that for a user key
// k we never end up with one output's largest key as k#0 and the
// next output's smallest key as k#RANGEDEL,#x where x > 0. This is a
// problem because k#RANGEDEL,#x sorts before k#0. Normally, we just
// adjust the seqnum of the next output's smallest boundary to be
// less, but that's not possible with the zero seqnum. We can avoid
// this case with careful picking of where to split pending range
// tombstones.
var splitKey []byte
switch {
case key != nil:
Expand Down Expand Up @@ -2516,30 +2467,19 @@ func (d *DB) runCompaction(
//
// TODO(jackson): This case is only problematic if splitKey equals
// the user key of the last point key added. We don't need to
// flush *all* range tombstones to the current sstable. We could
// flush up to the next grandparent limit greater than
// `splitterSuggestion` instead.
// flush *all* range tombstones to the current sstable. There are
// two alternatives:
// 1. We could flush up to the next grandparent limit greater than
// `splitterSuggestion` instead.
// 2. We could change the flush split suggestion to be inclusive
// (like the grandparent limit is), so that we're guaranteed
// that splitterSuggestion is greater than the previous user key
// added to the sstable.
splitKey = nil
case key == nil && prevPointSeqNum != 0:
// The last key added did not have a zero sequence number, so
// we'll always be able to adjust the next table's smallest key.
// NB: Because of the splitter's `onNewOutput` contract,
// `splitterSuggestion` must be >= any key previously added to the
// current output sstable.
case key == nil:
// NB: Because of the userKeyChangeSplitter, `splitterSuggestion`
// must be > any key previously added to the current output sstable.
splitKey = splitterSuggestion
case key == nil && prevPointSeqNum == 0:
// The last key added did have a zero sequence number. The
// splitters' suggested split point might have the same user key,
// which would cause the next output to have an unadjustable
// smallest key. To prevent that, we ignore the splitter's
// suggestion, leaving splitKey nil to flush all pending range
// tombstones.
// TODO(jackson): This case is only problematic if splitKey equals
// the user key of the last point key added. We don't need to
// flush *all* range tombstones to the current sstable. We could
// flush up to the next grandparent limit greater than
// `splitterSuggestion` instead.
splitKey = nil
default:
return nil, nil, errors.New("pebble: not reached")
}
Expand Down
9 changes: 3 additions & 6 deletions compaction_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,17 +769,14 @@ func (i *compactionIter) Close() error {
// exclude specifies if the specified key is exclusive or inclusive.
// When exclude = true, all returned range tombstones are truncated to the
// specified key.
func (i *compactionIter) Tombstones(key []byte, exclude bool) []keyspan.Span {
switch {
case key == nil:
func (i *compactionIter) Tombstones(key []byte) []keyspan.Span {
if key == nil {
i.rangeDelFrag.Finish()
case exclude:
} else {
// The specified end key is exclusive; no versions of the specified
// user key (including range tombstones covering that key) should
// be flushed yet.
i.rangeDelFrag.TruncateAndFlushTo(key)
default:
i.rangeDelFrag.FlushTo(key)
}
tombstones := i.tombstones
i.tombstones = nil
Expand Down
2 changes: 1 addition & 1 deletion compaction_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestCompactionIter(t *testing.T) {
if len(parts) == 2 {
key = []byte(parts[1])
}
for _, v := range iter.Tombstones(key, false) {
for _, v := range iter.Tombstones(key) {
fmt.Fprintf(&b, "%s-%s#%d\n",
v.Start.UserKey, v.End, v.Start.SeqNum())
}
Expand Down
5 changes: 5 additions & 0 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ func (pc *pickedCompaction) grow(sm, la InternalKey, maxExpandedBytes uint64) bo
// disableIsCompacting is true, isCompacting always returns false. This helps
// avoid spurious races from being detected when this method is used outside
// of compaction picking code.
//
// TODO(jackson): Compactions and flushes no longer split a user key between two
// sstables. We could perform a migration, re-compacting any sstables with split
// user keys, which would allow us to remove atomic compaction unit expansion
// code.
func expandToAtomicUnit(
cmp Compare, inputs manifest.LevelSlice, disableIsCompacting bool,
) (slice manifest.LevelSlice, isCompacting bool) {
Expand Down
4 changes: 4 additions & 0 deletions docs/range_deletions.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Range Deletions

TODO: The following explanation of range deletions does not take into account
the recent change to prohibit splitting of a user key between sstables. This
change simplifies the logic, removing 'improperly truncated range tombstones.'

TODO: The following explanation of range deletions ignores the
kind/trailer that appears at the end of keys after the sequence
number. This should be harmless but need to add a justification on why
Expand Down
Loading

0 comments on commit 28db1fc

Please sign in to comment.