diff --git a/compaction.go b/compaction.go index 438ea93835..d56a19ab09 100644 --- a/compaction.go +++ b/compaction.go @@ -106,7 +106,9 @@ type compactionOutputSplitter interface { // shouldSplitBefore returns whether we should split outputs before the // specified "current key". The return value is splitNow or noSplit. // splitNow means a split is advised before the specified key, and noSplit - // means no split is advised. + // means no split is advised. If shouldSplitBefore(a) advises a split then + // shouldSplitBefore(b) should also advise a split given b >= a, until + // onNewOutput is called. shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion // onNewOutput updates internal splitter state when the compaction switches // to a new sstable, and returns the next limit for the new output which @@ -323,26 +325,22 @@ func (a *splitterGroup) onNewOutput(key *InternalKey) []byte { // the boundary between atomic compaction units). Use this splitter to wrap // any splitters that don't guarantee user key splits (i.e. splitters that make // their determination in ways other than comparing the current key against a -// limit key. +// limit key.) If a wrapped splitter advises a split, it must continue +// to advise a split until a new output. type userKeyChangeSplitter struct { - cmp Compare - splitOnNextUserKey bool - savedKey []byte - splitter compactionOutputSplitter + cmp Compare + splitter compactionOutputSplitter + unsafePrevUserKey func() []byte } func (u *userKeyChangeSplitter) shouldSplitBefore( key *InternalKey, tw *sstable.Writer, ) compactionSplitSuggestion { - if u.splitOnNextUserKey && u.cmp(u.savedKey, key.UserKey) != 0 { - u.splitOnNextUserKey = false - u.savedKey = u.savedKey[:0] - return splitNow + if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow { + return split } - if split := u.splitter.shouldSplitBefore(key, tw); split == splitNow { - u.splitOnNextUserKey = true - u.savedKey = append(u.savedKey[:0], key.UserKey...) - return noSplit + if u.cmp(key.UserKey, u.unsafePrevUserKey()) > 0 { + return splitNow } return noSplit } @@ -2110,6 +2108,16 @@ func (d *DB) runCompaction( writerOpts.BlockPropertyCollectors = nil } + // prevPointKey is a sstable.WriterOption that provides access to + // the last point key written to a writer's sstable. When a new + // output begins in newOutput, prevPointKey is updated to point to + // the new output's sstable.Writer. This allows the compaction loop + // to access the last written point key without requiring the + // compaction loop to make a copy of each key ahead of time. Users + // must be careful, because the byte slice returned by UnsafeKey + // points directly into the Writer's block buffer. + var prevPointKey sstable.PreviousPointKeyOpt + newOutput := func() error { fileMeta := &fileMetadata{} d.mu.Lock() @@ -2144,7 +2152,7 @@ func (d *DB) runCompaction( filenames = append(filenames, filename) cacheOpts := private.SSTableCacheOpts(d.cacheID, fileNum).(sstable.WriterOption) internalTableOpt := private.SSTableInternalTableOpt.(sstable.WriterOption) - tw = sstable.NewWriter(file, writerOpts, cacheOpts, internalTableOpt) + tw = sstable.NewWriter(file, writerOpts, cacheOpts, internalTableOpt, &prevPointKey) fileMeta.CreationTime = time.Now().Unix() ve.NewFiles = append(ve.NewFiles, newFileEntry{ @@ -2382,6 +2390,16 @@ func (d *DB) runCompaction( outputSplitters[0] = &userKeyChangeSplitter{ cmp: c.cmp, splitter: outputSplitters[0], + unsafePrevUserKey: func() []byte { + // Return the largest point key written to tw or the start of + // the current range deletion in the fragmenter, whichever is + // greater. + prevPoint := prevPointKey.UnsafeKey() + if c.cmp(prevPoint.UserKey, c.rangeDelFrag.Start()) > 0 { + return prevPoint.UserKey + } + return c.rangeDelFrag.Start() + }, } outputSplitters = append(outputSplitters, &l0LimitSplitter{c: c, ve: ve}) } diff --git a/compaction_test.go b/compaction_test.go index 9490be712e..2adbe206e3 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -2706,6 +2706,7 @@ func (m *mockSplitter) onNewOutput(key *InternalKey) []byte { func TestCompactionOutputSplitters(t *testing.T) { var main, child0, child1 compactionOutputSplitter + var prevUserKey []byte pickSplitter := func(input string) *compactionOutputSplitter { switch input { case "main": @@ -2742,7 +2743,10 @@ func TestCompactionOutputSplitters(t *testing.T) { *splitterToInit = &mockSplitter{} case "userkey": *splitterToInit = &userKeyChangeSplitter{ - cmp: base.DefaultComparer.Compare, + cmp: base.DefaultComparer.Compare, + unsafePrevUserKey: func() []byte { + return prevUserKey + }, splitter: child0, } } @@ -2770,6 +2774,9 @@ func TestCompactionOutputSplitters(t *testing.T) { shouldSplit := main.shouldSplitBefore(&key, nil) if shouldSplit == splitNow { main.onNewOutput(&key) + prevUserKey = nil + } else { + prevUserKey = key.UserKey } return shouldSplit.String() default: diff --git a/data_test.go b/data_test.go index eea06b4f25..551d65b843 100644 --- a/data_test.go +++ b/data_test.go @@ -7,6 +7,7 @@ package pebble import ( "bytes" "fmt" + "math" "math/rand" "strconv" "strings" @@ -483,6 +484,14 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { c := newFlush(d.opts, d.mu.versions.currentVersion(), d.mu.versions.picker.getBaseLevel(), toFlush, &d.atomic.bytesFlushed) c.disableRangeTombstoneElision = true + // NB: define allows the test to exactly specify which keys go + // into which sstables. If the test has a small target file + // size to test grandparent limits, etc, the maxOutputFileSize + // can cause splitting /within/ the bounds specified to the + // test. Ignore the target size here, and split only according + // to the user-defined boundaries. + c.maxOutputFileSize = math.MaxUint64 + newVE, _, err := d.runCompaction(0, c, nilPacer) if err != nil { return err diff --git a/sstable/writer.go b/sstable/writer.go index bee276e464..bdbb7a7e5c 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -836,6 +836,27 @@ type WriterOption interface { writerApply(*Writer) } +// PreviousPointKeyOpt is a WriterOption that provides access to the last +// point key written to the writer while building a sstable. +type PreviousPointKeyOpt struct { + w *Writer +} + +// UnsafeKey returns the last point key written to the writer to which this +// option was passed during creation. The returned key points directly into +// a buffer belonging the Writer. The value's lifetime ends the next time a +// point key is added to the Writer. +func (o PreviousPointKeyOpt) UnsafeKey() base.InternalKey { + if o.w == nil { + return base.InvalidInternalKey + } + return o.w.meta.LargestPoint +} + +func (o *PreviousPointKeyOpt) writerApply(w *Writer) { + o.w = w +} + // internalTableOpt is a WriterOption that sets properties for sstables being // created by the db itself (i.e. through flushes and compactions), as opposed // to those meant for ingestion. diff --git a/testdata/compaction_output_splitters b/testdata/compaction_output_splitters index aaec60acae..237b1ce63f 100644 --- a/testdata/compaction_output_splitters +++ b/testdata/compaction_output_splitters @@ -107,10 +107,6 @@ should-split-before food.SET.4 ---- no-split -set-should-split child0 no-split ----- -ok - should-split-before food2.SET.4 ---- split-now