From 4b595de954b1c9034a1a518c37f1db35431ab676 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Mon, 18 Oct 2021 18:47:38 -0400 Subject: [PATCH] compaction: use sstable writer for detecting user key changes The `userKeyChangeSplitter` is responsible for avoiding splits within a user key during flushes. Previously, it worked by recording the current user key when a split is requested. This introduced a delay in splitting. When the split is requested, the current key might already be different than the last key written to the sstable. This change alters the `userKeyChangeSplitter` to read the last written point key from the current sstable writer and the last written range key from the range deletion fragmenter. This has a couple advantages: a) It avoids an extra copy of a user key. b) It may split sooner, closer to the target file size. This is a practical concern in writing unit tests that involves flushes and small target file sizes. If we prevent splitting user keys across outputs in compactions too, it will become a practical concern there too. c) It exposes the previous point key to the broader compaction loop, which is necessary for narrowing the conditions during which the compaction loop must flush all/additional range tombstones. Currently, we ignore the splitters' suggested split point during flushes because we may have already output a key with the user key to the current sstable. Future commits will be able to read this previous point key and use the splitters' suggestion if the previous point key's user key is not equal to the splitters' suggestion. --- compaction.go | 48 +++++++++++++++++++--------- compaction_test.go | 9 +++++- data_test.go | 9 ++++++ sstable/writer.go | 21 ++++++++++++ testdata/compaction_output_splitters | 4 --- 5 files changed, 71 insertions(+), 20 deletions(-) diff --git a/compaction.go b/compaction.go index 438ea93835..d56a19ab09 100644 --- a/compaction.go +++ b/compaction.go @@ -106,7 +106,9 @@ type compactionOutputSplitter interface { // shouldSplitBefore returns whether we should split outputs before the // specified "current key". The return value is splitNow or noSplit. // splitNow means a split is advised before the specified key, and noSplit - // means no split is advised. + // means no split is advised. If shouldSplitBefore(a) advises a split then + // shouldSplitBefore(b) should also advise a split given b >= a, until + // onNewOutput is called. shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion // onNewOutput updates internal splitter state when the compaction switches // to a new sstable, and returns the next limit for the new output which @@ -323,26 +325,22 @@ func (a *splitterGroup) onNewOutput(key *InternalKey) []byte { // the boundary between atomic compaction units). Use this splitter to wrap // any splitters that don't guarantee user key splits (i.e. splitters that make // their determination in ways other than comparing the current key against a -// limit key. +// limit key.) If a wrapped splitter advises a split, it must continue +// to advise a split until a new output. type userKeyChangeSplitter struct { - cmp Compare - splitOnNextUserKey bool - savedKey []byte - splitter compactionOutputSplitter + cmp Compare + splitter compactionOutputSplitter + unsafePrevUserKey func() []byte } func (u *userKeyChangeSplitter) shouldSplitBefore( key *InternalKey, tw *sstable.Writer, ) compactionSplitSuggestion { - if u.splitOnNextUserKey && u.cmp(u.savedKey, key.UserKey) != 0 { - u.splitOnNextUserKey = false - u.savedKey = u.savedKey[:0] - return splitNow + if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow { + return split } - if split := u.splitter.shouldSplitBefore(key, tw); split == splitNow { - u.splitOnNextUserKey = true - u.savedKey = append(u.savedKey[:0], key.UserKey...) - return noSplit + if u.cmp(key.UserKey, u.unsafePrevUserKey()) > 0 { + return splitNow } return noSplit } @@ -2110,6 +2108,16 @@ func (d *DB) runCompaction( writerOpts.BlockPropertyCollectors = nil } + // prevPointKey is a sstable.WriterOption that provides access to + // the last point key written to a writer's sstable. When a new + // output begins in newOutput, prevPointKey is updated to point to + // the new output's sstable.Writer. This allows the compaction loop + // to access the last written point key without requiring the + // compaction loop to make a copy of each key ahead of time. Users + // must be careful, because the byte slice returned by UnsafeKey + // points directly into the Writer's block buffer. + var prevPointKey sstable.PreviousPointKeyOpt + newOutput := func() error { fileMeta := &fileMetadata{} d.mu.Lock() @@ -2144,7 +2152,7 @@ func (d *DB) runCompaction( filenames = append(filenames, filename) cacheOpts := private.SSTableCacheOpts(d.cacheID, fileNum).(sstable.WriterOption) internalTableOpt := private.SSTableInternalTableOpt.(sstable.WriterOption) - tw = sstable.NewWriter(file, writerOpts, cacheOpts, internalTableOpt) + tw = sstable.NewWriter(file, writerOpts, cacheOpts, internalTableOpt, &prevPointKey) fileMeta.CreationTime = time.Now().Unix() ve.NewFiles = append(ve.NewFiles, newFileEntry{ @@ -2382,6 +2390,16 @@ func (d *DB) runCompaction( outputSplitters[0] = &userKeyChangeSplitter{ cmp: c.cmp, splitter: outputSplitters[0], + unsafePrevUserKey: func() []byte { + // Return the largest point key written to tw or the start of + // the current range deletion in the fragmenter, whichever is + // greater. + prevPoint := prevPointKey.UnsafeKey() + if c.cmp(prevPoint.UserKey, c.rangeDelFrag.Start()) > 0 { + return prevPoint.UserKey + } + return c.rangeDelFrag.Start() + }, } outputSplitters = append(outputSplitters, &l0LimitSplitter{c: c, ve: ve}) } diff --git a/compaction_test.go b/compaction_test.go index 9490be712e..2adbe206e3 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -2706,6 +2706,7 @@ func (m *mockSplitter) onNewOutput(key *InternalKey) []byte { func TestCompactionOutputSplitters(t *testing.T) { var main, child0, child1 compactionOutputSplitter + var prevUserKey []byte pickSplitter := func(input string) *compactionOutputSplitter { switch input { case "main": @@ -2742,7 +2743,10 @@ func TestCompactionOutputSplitters(t *testing.T) { *splitterToInit = &mockSplitter{} case "userkey": *splitterToInit = &userKeyChangeSplitter{ - cmp: base.DefaultComparer.Compare, + cmp: base.DefaultComparer.Compare, + unsafePrevUserKey: func() []byte { + return prevUserKey + }, splitter: child0, } } @@ -2770,6 +2774,9 @@ func TestCompactionOutputSplitters(t *testing.T) { shouldSplit := main.shouldSplitBefore(&key, nil) if shouldSplit == splitNow { main.onNewOutput(&key) + prevUserKey = nil + } else { + prevUserKey = key.UserKey } return shouldSplit.String() default: diff --git a/data_test.go b/data_test.go index eea06b4f25..551d65b843 100644 --- a/data_test.go +++ b/data_test.go @@ -7,6 +7,7 @@ package pebble import ( "bytes" "fmt" + "math" "math/rand" "strconv" "strings" @@ -483,6 +484,14 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { c := newFlush(d.opts, d.mu.versions.currentVersion(), d.mu.versions.picker.getBaseLevel(), toFlush, &d.atomic.bytesFlushed) c.disableRangeTombstoneElision = true + // NB: define allows the test to exactly specify which keys go + // into which sstables. If the test has a small target file + // size to test grandparent limits, etc, the maxOutputFileSize + // can cause splitting /within/ the bounds specified to the + // test. Ignore the target size here, and split only according + // to the user-defined boundaries. + c.maxOutputFileSize = math.MaxUint64 + newVE, _, err := d.runCompaction(0, c, nilPacer) if err != nil { return err diff --git a/sstable/writer.go b/sstable/writer.go index bee276e464..bdbb7a7e5c 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -836,6 +836,27 @@ type WriterOption interface { writerApply(*Writer) } +// PreviousPointKeyOpt is a WriterOption that provides access to the last +// point key written to the writer while building a sstable. +type PreviousPointKeyOpt struct { + w *Writer +} + +// UnsafeKey returns the last point key written to the writer to which this +// option was passed during creation. The returned key points directly into +// a buffer belonging the Writer. The value's lifetime ends the next time a +// point key is added to the Writer. +func (o PreviousPointKeyOpt) UnsafeKey() base.InternalKey { + if o.w == nil { + return base.InvalidInternalKey + } + return o.w.meta.LargestPoint +} + +func (o *PreviousPointKeyOpt) writerApply(w *Writer) { + o.w = w +} + // internalTableOpt is a WriterOption that sets properties for sstables being // created by the db itself (i.e. through flushes and compactions), as opposed // to those meant for ingestion. diff --git a/testdata/compaction_output_splitters b/testdata/compaction_output_splitters index aaec60acae..237b1ce63f 100644 --- a/testdata/compaction_output_splitters +++ b/testdata/compaction_output_splitters @@ -107,10 +107,6 @@ should-split-before food.SET.4 ---- no-split -set-should-split child0 no-split ----- -ok - should-split-before food2.SET.4 ---- split-now