Skip to content

Commit

Permalink
compaction: use sstable writer for detecting user key changes
Browse files Browse the repository at this point in the history
The `userKeyChangeSplitter` is responsible for avoiding splits within a
user key during flushes. Previously, it worked by recording the current
user key when a split is requested. This introduced a delay in
splitting. When the split is requested, the current key might already be
different than the last key written to the sstable. This change alters
the `userKeyChangeSplitter` to read the last written point key from the
current sstable writer and the last written range key from the range
deletion fragmenter.

This has a couple advantages:
a) It avoids an extra copy of a user key.
b) It may split sooner, closer to the target file size. This is a
   practical concern in writing unit tests that involves flushes and
   small target file sizes. If we prevent splitting user keys across
   outputs in compactions too, it will become a practical concern there
   too.
c) It exposes the previous point key to the broader compaction loop,
   which is necessary for narrowing the conditions during which the
   compaction loop must flush all/additional range tombstones.
   Currently, we ignore the splitters' suggested split point during
   flushes because we may have already output a key with the user key to
   the current sstable. Future commits will be able to read this
   previous point key and use the splitters' suggestion if the previous
   point key's user key is not equal to the splitters' suggestion.
  • Loading branch information
jbowens committed Dec 2, 2021
1 parent afaa43d commit 4b595de
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 20 deletions.
48 changes: 33 additions & 15 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ type compactionOutputSplitter interface {
// shouldSplitBefore returns whether we should split outputs before the
// specified "current key". The return value is splitNow or noSplit.
// splitNow means a split is advised before the specified key, and noSplit
// means no split is advised.
// means no split is advised. If shouldSplitBefore(a) advises a split then
// shouldSplitBefore(b) should also advise a split given b >= a, until
// onNewOutput is called.
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion
// onNewOutput updates internal splitter state when the compaction switches
// to a new sstable, and returns the next limit for the new output which
Expand Down Expand Up @@ -323,26 +325,22 @@ func (a *splitterGroup) onNewOutput(key *InternalKey) []byte {
// the boundary between atomic compaction units). Use this splitter to wrap
// any splitters that don't guarantee user key splits (i.e. splitters that make
// their determination in ways other than comparing the current key against a
// limit key.
// limit key.) If a wrapped splitter advises a split, it must continue
// to advise a split until a new output.
type userKeyChangeSplitter struct {
cmp Compare
splitOnNextUserKey bool
savedKey []byte
splitter compactionOutputSplitter
cmp Compare
splitter compactionOutputSplitter
unsafePrevUserKey func() []byte
}

func (u *userKeyChangeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
if u.splitOnNextUserKey && u.cmp(u.savedKey, key.UserKey) != 0 {
u.splitOnNextUserKey = false
u.savedKey = u.savedKey[:0]
return splitNow
if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow {
return split
}
if split := u.splitter.shouldSplitBefore(key, tw); split == splitNow {
u.splitOnNextUserKey = true
u.savedKey = append(u.savedKey[:0], key.UserKey...)
return noSplit
if u.cmp(key.UserKey, u.unsafePrevUserKey()) > 0 {
return splitNow
}
return noSplit
}
Expand Down Expand Up @@ -2110,6 +2108,16 @@ func (d *DB) runCompaction(
writerOpts.BlockPropertyCollectors = nil
}

// prevPointKey is a sstable.WriterOption that provides access to
// the last point key written to a writer's sstable. When a new
// output begins in newOutput, prevPointKey is updated to point to
// the new output's sstable.Writer. This allows the compaction loop
// to access the last written point key without requiring the
// compaction loop to make a copy of each key ahead of time. Users
// must be careful, because the byte slice returned by UnsafeKey
// points directly into the Writer's block buffer.
var prevPointKey sstable.PreviousPointKeyOpt

newOutput := func() error {
fileMeta := &fileMetadata{}
d.mu.Lock()
Expand Down Expand Up @@ -2144,7 +2152,7 @@ func (d *DB) runCompaction(
filenames = append(filenames, filename)
cacheOpts := private.SSTableCacheOpts(d.cacheID, fileNum).(sstable.WriterOption)
internalTableOpt := private.SSTableInternalTableOpt.(sstable.WriterOption)
tw = sstable.NewWriter(file, writerOpts, cacheOpts, internalTableOpt)
tw = sstable.NewWriter(file, writerOpts, cacheOpts, internalTableOpt, &prevPointKey)

fileMeta.CreationTime = time.Now().Unix()
ve.NewFiles = append(ve.NewFiles, newFileEntry{
Expand Down Expand Up @@ -2382,6 +2390,16 @@ func (d *DB) runCompaction(
outputSplitters[0] = &userKeyChangeSplitter{
cmp: c.cmp,
splitter: outputSplitters[0],
unsafePrevUserKey: func() []byte {
// Return the largest point key written to tw or the start of
// the current range deletion in the fragmenter, whichever is
// greater.
prevPoint := prevPointKey.UnsafeKey()
if c.cmp(prevPoint.UserKey, c.rangeDelFrag.Start()) > 0 {
return prevPoint.UserKey
}
return c.rangeDelFrag.Start()
},
}
outputSplitters = append(outputSplitters, &l0LimitSplitter{c: c, ve: ve})
}
Expand Down
9 changes: 8 additions & 1 deletion compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2706,6 +2706,7 @@ func (m *mockSplitter) onNewOutput(key *InternalKey) []byte {

func TestCompactionOutputSplitters(t *testing.T) {
var main, child0, child1 compactionOutputSplitter
var prevUserKey []byte
pickSplitter := func(input string) *compactionOutputSplitter {
switch input {
case "main":
Expand Down Expand Up @@ -2742,7 +2743,10 @@ func TestCompactionOutputSplitters(t *testing.T) {
*splitterToInit = &mockSplitter{}
case "userkey":
*splitterToInit = &userKeyChangeSplitter{
cmp: base.DefaultComparer.Compare,
cmp: base.DefaultComparer.Compare,
unsafePrevUserKey: func() []byte {
return prevUserKey
},
splitter: child0,
}
}
Expand Down Expand Up @@ -2770,6 +2774,9 @@ func TestCompactionOutputSplitters(t *testing.T) {
shouldSplit := main.shouldSplitBefore(&key, nil)
if shouldSplit == splitNow {
main.onNewOutput(&key)
prevUserKey = nil
} else {
prevUserKey = key.UserKey
}
return shouldSplit.String()
default:
Expand Down
9 changes: 9 additions & 0 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pebble
import (
"bytes"
"fmt"
"math"
"math/rand"
"strconv"
"strings"
Expand Down Expand Up @@ -483,6 +484,14 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
c := newFlush(d.opts, d.mu.versions.currentVersion(),
d.mu.versions.picker.getBaseLevel(), toFlush, &d.atomic.bytesFlushed)
c.disableRangeTombstoneElision = true
// NB: define allows the test to exactly specify which keys go
// into which sstables. If the test has a small target file
// size to test grandparent limits, etc, the maxOutputFileSize
// can cause splitting /within/ the bounds specified to the
// test. Ignore the target size here, and split only according
// to the user-defined boundaries.
c.maxOutputFileSize = math.MaxUint64

newVE, _, err := d.runCompaction(0, c, nilPacer)
if err != nil {
return err
Expand Down
21 changes: 21 additions & 0 deletions sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,27 @@ type WriterOption interface {
writerApply(*Writer)
}

// PreviousPointKeyOpt is a WriterOption that provides access to the last
// point key written to the writer while building a sstable.
type PreviousPointKeyOpt struct {
w *Writer
}

// UnsafeKey returns the last point key written to the writer to which this
// option was passed during creation. The returned key points directly into
// a buffer belonging the Writer. The value's lifetime ends the next time a
// point key is added to the Writer.
func (o PreviousPointKeyOpt) UnsafeKey() base.InternalKey {
if o.w == nil {
return base.InvalidInternalKey
}
return o.w.meta.LargestPoint
}

func (o *PreviousPointKeyOpt) writerApply(w *Writer) {
o.w = w
}

// internalTableOpt is a WriterOption that sets properties for sstables being
// created by the db itself (i.e. through flushes and compactions), as opposed
// to those meant for ingestion.
Expand Down
4 changes: 0 additions & 4 deletions testdata/compaction_output_splitters
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ should-split-before food.SET.4
----
no-split

set-should-split child0 no-split
----
ok

should-split-before food2.SET.4
----
split-now

0 comments on commit 4b595de

Please sign in to comment.