Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compaction: use sstable writer for detecting user key changes #1340

Merged
merged 1 commit into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ type compactionOutputSplitter interface {
// shouldSplitBefore returns whether we should split outputs before the
// specified "current key". The return value is splitNow or noSplit.
// splitNow means a split is advised before the specified key, and noSplit
// means no split is advised.
// means no split is advised. If shouldSplitBefore(a) advises a split then
// shouldSplitBefore(b) should also advise a split given b >= a, until
// onNewOutput is called.
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion
// onNewOutput updates internal splitter state when the compaction switches
// to a new sstable, and returns the next limit for the new output which
Expand Down Expand Up @@ -323,26 +325,22 @@ func (a *splitterGroup) onNewOutput(key *InternalKey) []byte {
// the boundary between atomic compaction units). Use this splitter to wrap
// any splitters that don't guarantee user key splits (i.e. splitters that make
// their determination in ways other than comparing the current key against a
// limit key.
// limit key.) If a wrapped splitter advises a split, it must continue
// to advise a split until a new output.
type userKeyChangeSplitter struct {
cmp Compare
splitOnNextUserKey bool
savedKey []byte
splitter compactionOutputSplitter
cmp Compare
splitter compactionOutputSplitter
unsafePrevUserKey func() []byte
}

func (u *userKeyChangeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
if u.splitOnNextUserKey && u.cmp(u.savedKey, key.UserKey) != 0 {
u.splitOnNextUserKey = false
u.savedKey = u.savedKey[:0]
return splitNow
if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow {
return split
}
if split := u.splitter.shouldSplitBefore(key, tw); split == splitNow {
u.splitOnNextUserKey = true
u.savedKey = append(u.savedKey[:0], key.UserKey...)
return noSplit
if u.cmp(key.UserKey, u.unsafePrevUserKey()) > 0 {
return splitNow
}
return noSplit
}
Expand Down Expand Up @@ -2110,6 +2108,16 @@ func (d *DB) runCompaction(
writerOpts.BlockPropertyCollectors = nil
}

// prevPointKey is a sstable.WriterOption that provides access to
// the last point key written to a writer's sstable. When a new
// output begins in newOutput, prevPointKey is updated to point to
// the new output's sstable.Writer. This allows the compaction loop
// to access the last written point key without requiring the
// compaction loop to make a copy of each key ahead of time. Users
// must be careful, because the byte slice returned by UnsafeKey
// points directly into the Writer's block buffer.
var prevPointKey sstable.PreviousPointKeyOpt

newOutput := func() error {
fileMeta := &fileMetadata{}
d.mu.Lock()
Expand Down Expand Up @@ -2144,7 +2152,7 @@ func (d *DB) runCompaction(
filenames = append(filenames, filename)
cacheOpts := private.SSTableCacheOpts(d.cacheID, fileNum).(sstable.WriterOption)
internalTableOpt := private.SSTableInternalTableOpt.(sstable.WriterOption)
tw = sstable.NewWriter(file, writerOpts, cacheOpts, internalTableOpt)
tw = sstable.NewWriter(file, writerOpts, cacheOpts, internalTableOpt, &prevPointKey)

fileMeta.CreationTime = time.Now().Unix()
ve.NewFiles = append(ve.NewFiles, newFileEntry{
Expand Down Expand Up @@ -2382,6 +2390,16 @@ func (d *DB) runCompaction(
outputSplitters[0] = &userKeyChangeSplitter{
cmp: c.cmp,
splitter: outputSplitters[0],
unsafePrevUserKey: func() []byte {
// Return the largest point key written to tw or the start of
// the current range deletion in the fragmenter, whichever is
// greater.
prevPoint := prevPointKey.UnsafeKey()
if c.cmp(prevPoint.UserKey, c.rangeDelFrag.Start()) > 0 {
return prevPoint.UserKey
}
return c.rangeDelFrag.Start()
},
}
outputSplitters = append(outputSplitters, &l0LimitSplitter{c: c, ve: ve})
}
Expand Down
9 changes: 8 additions & 1 deletion compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2706,6 +2706,7 @@ func (m *mockSplitter) onNewOutput(key *InternalKey) []byte {

func TestCompactionOutputSplitters(t *testing.T) {
var main, child0, child1 compactionOutputSplitter
var prevUserKey []byte
pickSplitter := func(input string) *compactionOutputSplitter {
switch input {
case "main":
Expand Down Expand Up @@ -2742,7 +2743,10 @@ func TestCompactionOutputSplitters(t *testing.T) {
*splitterToInit = &mockSplitter{}
case "userkey":
*splitterToInit = &userKeyChangeSplitter{
cmp: base.DefaultComparer.Compare,
cmp: base.DefaultComparer.Compare,
unsafePrevUserKey: func() []byte {
return prevUserKey
},
splitter: child0,
}
}
Expand Down Expand Up @@ -2770,6 +2774,9 @@ func TestCompactionOutputSplitters(t *testing.T) {
shouldSplit := main.shouldSplitBefore(&key, nil)
if shouldSplit == splitNow {
main.onNewOutput(&key)
prevUserKey = nil
} else {
prevUserKey = key.UserKey
}
return shouldSplit.String()
default:
Expand Down
9 changes: 9 additions & 0 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package pebble
import (
"bytes"
"fmt"
"math"
"math/rand"
"strconv"
"strings"
Expand Down Expand Up @@ -483,6 +484,14 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
c := newFlush(d.opts, d.mu.versions.currentVersion(),
d.mu.versions.picker.getBaseLevel(), toFlush, &d.atomic.bytesFlushed)
c.disableRangeTombstoneElision = true
// NB: define allows the test to exactly specify which keys go
// into which sstables. If the test has a small target file
// size to test grandparent limits, etc, the maxOutputFileSize
// can cause splitting /within/ the bounds specified to the
// test. Ignore the target size here, and split only according
// to the user-defined boundaries.
c.maxOutputFileSize = math.MaxUint64

newVE, _, err := d.runCompaction(0, c, nilPacer)
if err != nil {
return err
Expand Down
21 changes: 21 additions & 0 deletions sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,27 @@ type WriterOption interface {
writerApply(*Writer)
}

// PreviousPointKeyOpt is a WriterOption that provides access to the last
// point key written to the writer while building a sstable.
type PreviousPointKeyOpt struct {
w *Writer
}

// UnsafeKey returns the last point key written to the writer to which this
// option was passed during creation. The returned key points directly into
// a buffer belonging the Writer. The value's lifetime ends the next time a
// point key is added to the Writer.
func (o PreviousPointKeyOpt) UnsafeKey() base.InternalKey {
if o.w == nil {
return base.InvalidInternalKey
}
return o.w.meta.LargestPoint
}

func (o *PreviousPointKeyOpt) writerApply(w *Writer) {
o.w = w
}

// internalTableOpt is a WriterOption that sets properties for sstables being
// created by the db itself (i.e. through flushes and compactions), as opposed
// to those meant for ingestion.
Expand Down
4 changes: 0 additions & 4 deletions testdata/compaction_output_splitters
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ should-split-before food.SET.4
----
no-split

set-should-split child0 no-split
----
ok

should-split-before food2.SET.4
----
split-now