Skip to content

Commit

Permalink
compaction: Invalidate limit when a splitter defers a split suggestion
Browse files Browse the repository at this point in the history
Currently, it's possible for the nonZeroSeqNumSplitter to "withhold"
a compaction split suggestion in such a way that the `limit` variable
in the compaction loop is exceeded, only to advise a compaction split
at a later point.

This is usually not a concern as we just reset `limit`
when that compaction split is actually advised, but if the compaction
were to run out of keys in this narrow window, we would leave the limit
at a non-nil past key, which would violate an invariant in the rangedel
fragmenter as it can't truncate range tombstones to a passed key.

Similar logic already existed in older iteration of this code, which
is in use in the crl-release-20.2 branch. A refactor here introduced
this bug.

This change allows for a 3-way return value from shouldSplitBefore;
in the case where limit has been exceeded, we resort to resetting the
limit like before.

Will address cockroachdb/cockroach#54284 when this lands in cockroach
master.
  • Loading branch information
itsbilal committed Sep 15, 2020
1 parent 315b317 commit 81aa14d
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 86 deletions.
120 changes: 87 additions & 33 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,41 @@ type compactionLevel struct {
files manifest.LevelSlice
}

// Return output from compactionOutputSplitters. See comment on
// compactionOutputSplitter.shouldSplitBefore() on how this value is used.
type compactionSplitSuggestion int

const(
noSplit compactionSplitSuggestion = iota
splitSoon
splitNow
)

// String implements the Stringer interface.
func (c compactionSplitSuggestion) String() string {
switch c {
case noSplit:
return "no-split"
case splitSoon:
return "split-soon"
}
return "split-now"
}

// compactionOutputSplitter is an interface for encapsulating logic around
// switching the output of a compaction to a new output file. Additional
// constraints around switching compaction outputs that are specific to that
// compaction type (eg. flush splits) are implemented in
// compactionOutputSplitters that compose other child compactionOutputSplitters.
type compactionOutputSplitter interface {
// shouldSplitBefore returns whether we should split outputs before the
// specified "current key".
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) bool
// specified "current key". The return value is one of splitNow, splitSoon,
// or noSlit. splitNow means a split is advised before the specified key,
// splitSoon means no split is advised yet but the limit returned in
// onNewOutput can be considered invalidated and a splitNow suggestion will
// be made on an upcoming key shortly, and noSplit means no split is
// advised.
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion
// onNewOutput updates internal splitter state when the compaction switches
// to a new sstable, and returns the next limit for the new output which
// would get used to truncate range tombstones if the compaction iterator
Expand All @@ -93,12 +119,15 @@ type fileSizeSplitter struct {
maxFileSize uint64
}

func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) bool {
func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion {
// The Kind != RangeDelete part exists because EstimatedSize doesn't grow
// rightaway when a range tombstone is added to the fragmenter. It's always
// better to make a sequence of range tombstones visible to the fragmenter.
return key.Kind() != InternalKeyKindRangeDelete && tw != nil &&
tw.EstimatedSize() >= f.maxFileSize
if key.Kind() != InternalKeyKindRangeDelete && tw != nil &&
tw.EstimatedSize() >= f.maxFileSize {
return splitNow
}
return noSplit
}

func (f *fileSizeSplitter) onNewOutput(key *InternalKey) []byte {
Expand All @@ -111,8 +140,11 @@ type grandparentLimitSplitter struct {
limit []byte
}

func (g *grandparentLimitSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) bool {
return g.limit != nil && g.c.cmp(key.UserKey, g.limit) > 0
func (g *grandparentLimitSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion {
if g.limit != nil && g.c.cmp(key.UserKey, g.limit) > 0 {
return splitNow
}
return noSplit
}

func (g *grandparentLimitSplitter) onNewOutput(key *InternalKey) []byte {
Expand Down Expand Up @@ -203,8 +235,11 @@ type l0LimitSplitter struct {
limit []byte
}

func (l *l0LimitSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) bool {
return l.limit != nil && l.c.cmp(key.UserKey, l.limit) > 0
func (l *l0LimitSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion {
if l.limit != nil && l.c.cmp(key.UserKey, l.limit) > 0 {
return splitNow
}
return noSplit
}

func (l *l0LimitSplitter) onNewOutput(key *InternalKey) []byte {
Expand Down Expand Up @@ -244,13 +279,17 @@ type splitterGroup struct {
splitters []compactionOutputSplitter
}

func (a *splitterGroup) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) bool {
func (a *splitterGroup) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) (suggestion compactionSplitSuggestion) {
suggestion = noSplit
for _, splitter := range a.splitters {
if splitter.shouldSplitBefore(key, tw) {
return true
switch splitter.shouldSplitBefore(key, tw) {
case splitNow:
return splitNow
case splitSoon:
suggestion = splitSoon
}
}
return false
return suggestion
}

func (a *splitterGroup) onNewOutput(key *InternalKey) []byte {
Expand Down Expand Up @@ -281,17 +320,18 @@ type userKeyChangeSplitter struct {
splitter compactionOutputSplitter
}

func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) bool {
func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion {
if u.splitOnNextUserKey && u.cmp(u.savedKey, key.UserKey) != 0 {
u.splitOnNextUserKey = false
u.savedKey = u.savedKey[:0]
return true
return splitNow
}
if u.splitter.shouldSplitBefore(key, tw) {
if split := u.splitter.shouldSplitBefore(key, tw); split == splitNow {
u.splitOnNextUserKey = true
u.savedKey = append(u.savedKey[:0], key.UserKey...)
return noSplit
}
return false
return noSplit
}

func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte {
Expand All @@ -309,7 +349,7 @@ type nonZeroSeqNumSplitter struct {
splitOnNonZeroSeqNum bool
}

func (n *nonZeroSeqNumSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) bool {
func (n *nonZeroSeqNumSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion {
curSeqNum := key.SeqNum()
keyKind := key.Kind()
prevPointSeqNum := n.prevPointSeqNum
Expand All @@ -320,16 +360,17 @@ func (n *nonZeroSeqNumSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.
if n.splitOnNonZeroSeqNum {
if prevPointSeqNum > 0 || n.c.rangeDelFrag.Empty() {
n.splitOnNonZeroSeqNum = false
return true
return splitNow
}
} else if n.splitter.shouldSplitBefore(key, tw) {
} else if split := n.splitter.shouldSplitBefore(key, tw); split == splitNow {
userKeyChange := curSeqNum > prevPointSeqNum
if prevPointSeqNum > 0 || n.c.rangeDelFrag.Empty() || userKeyChange {
return true
return splitNow
}
n.splitOnNonZeroSeqNum = true
return splitSoon
}
return false
return noSplit
}

func (n *nonZeroSeqNumSplitter) onNewOutput(key *InternalKey) []byte {
Expand Down Expand Up @@ -2146,18 +2187,31 @@ func (d *DB) runCompaction(
// Each inner loop iteration processes one key from the input iterator.
prevPointSeqNum := InternalKeySeqNumMax
for ; key != nil; key, val = iter.Next() {
if splitter.shouldSplitBefore(key, tw) {
limit = key.UserKey
if splittingFlush {
// Flush all tombstones up until key.UserKey, and
// truncate them at that key.
//
// The fragmenter could save the passed-in key. As this
// key could live beyond the write into the current
// sstable output file, make a copy.
c.rangeDelFrag.TruncateAndFlushTo(key.Clone().UserKey)
if split := splitter.shouldSplitBefore(key, tw); split != noSplit {
if split == splitNow {
limit = key.UserKey
if splittingFlush {
// Flush all tombstones up until key.UserKey, and
// truncate them at that key.
//
// The fragmenter could save the passed-in key. As this
// key could live beyond the write into the current
// sstable output file, make a copy.
c.rangeDelFrag.TruncateAndFlushTo(key.Clone().UserKey)
}
break
}
break
// split == splitSoon
//
// Invalidate the limit here. It has probably been exceeded
// by the current key, but we can't split just yet, such as to
// maintain the nonzero sequence number invariant mentioned
// above. Setting limit to nil is okay as it's just a transient
// setting, as when split eventually equals splitNow, we will
// set the limit to the key after that. If the compaction were
// to run out of keys before we get to that point, limit would
// be nil as it should be for all end-of-compaction cases.
limit = nil
}

atomic.StoreUint64(c.atomicBytesIterated, c.bytesIterated)
Expand Down
23 changes: 14 additions & 9 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1959,10 +1959,10 @@ func TestCompactionCheckOrdering(t *testing.T) {
}

type mockSplitter struct {
guaranteesUserKeyChangeVal, shouldSplitVal bool
shouldSplitVal compactionSplitSuggestion
}

func (m *mockSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) bool {
func (m *mockSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion {
return m.shouldSplitVal
}

Expand Down Expand Up @@ -2037,23 +2037,28 @@ func TestCompactionOutputSplitters(t *testing.T) {
return "expected at least 2 args"
}
splitterToSet := (*pickSplitter(d.CmdArgs[0].Key)).(*mockSplitter)
val, err := strconv.ParseBool(d.CmdArgs[1].Key)
if err != nil {
t.Fatal(err)
var val compactionSplitSuggestion
switch d.CmdArgs[1].Key {
case "split-now":
val = splitNow
case "split-soon":
val = splitSoon
case "no-split":
val = noSplit
default:
t.Fatalf("unexpected value for should-split: %s", d.CmdArgs[1].Key)
}
splitterToSet.shouldSplitVal = val
case "set-user-key-change":
return "TODO: remove"
case "should-split-before":
if len(d.CmdArgs) < 1 {
return "expected at least 1 arg"
}
key := base.ParseInternalKey(d.CmdArgs[0].Key)
shouldSplit := main.shouldSplitBefore(&key, nil)
if shouldSplit {
if shouldSplit == splitNow {
main.onNewOutput(&key)
}
return fmt.Sprintf("%t", shouldSplit)
return fmt.Sprintf("%s", shouldSplit)
default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
Loading

0 comments on commit 81aa14d

Please sign in to comment.