Skip to content

Commit

Permalink
kv/bulk: fix initial splits
Browse files Browse the repository at this point in the history
Previously initial splits were missing at the first key due to how the loop
initialized. Now the first key is used as a split key. This requires using a
different predicate key than usual; typically we check if where we are
splitting at is still in the same range as the prior split to detect if
another node has already split the span, but we have no prior split on
the first one. Instead, we use the next split for the first one's predicate
as this can also serve to show that the span is still 'wide' enough to split.

Release note: none.

Release justification: low-risk fix of new functionality.
  • Loading branch information
dt committed Mar 7, 2022
1 parent fc7b49f commit ea35476
Showing 1 changed file with 44 additions and 10 deletions.
54 changes: 44 additions & 10 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,38 +353,72 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
}

func (b *BufferingAdder) createInitialSplits(ctx context.Context) error {
targetSize := b.curBuf.Len() / b.initialSplits
log.Infof(ctx, "%s adder creating up to %d initial splits from %d keys in %s buffer", b.name, b.initialSplits, b.curBuf.Len(), b.curBuf.MemSize)
log.Infof(ctx, "%s adder creating up to %d initial splits from %d KVs in %s buffer",
b.name, b.initialSplits, b.curBuf.Len(), b.curBuf.MemSize)

hour := hlc.Timestamp{WallTime: timeutil.Now().Add(time.Hour).UnixNano()}

before := timeutil.Now()

created := 0
for i := targetSize; i < b.curBuf.Len(); i += targetSize {
k := b.curBuf.Key(i)
prev := b.curBuf.Key(i - targetSize)
log.VEventf(ctx, 1, "%s adder pre-splitting at key %d of %d at %s", b.name, i, b.curBuf.Len(), k)
resp, err := b.sink.db.SplitAndScatter(ctx, k, hour, prev)
width := len(b.curBuf.entries) / b.initialSplits
for i := 0; i < b.initialSplits; i++ {
expire := hour
if i == 0 {
// If we over-split because our input is loosely ordered and we're just
// seeing a sample of the first span here vs a sample of all of it, then
// we may not fill enough for these splits to remain on their own. In that
// case we'd really prefer the other splits be merged away first rather
// than the first split, as it serves the important purpose of segregating
// this processor's span from the one below it when is being constantly
// re-scattered by that processor, so give the first split an extra hour.
expire = hour.Add(time.Hour.Nanoseconds(), 0)
}

splitAt := i * width
if splitAt >= len(b.curBuf.entries) {
break
}
// Typically we split at splitAt if, and only if, its range still includes
// the prior split, indicating no other processor is also splitting this
// span. However, for the first split, there is no prior split, so we can
// use the next split instead, as it too proves the range that need to be
// split still has enough "width" (i.e. between two splits) to indicate that
// another processor hasn't already split it.
predicateAt := splitAt - width
if predicateAt < 0 {
next := splitAt + width
if next > len(b.curBuf.entries)-1 {
next = len(b.curBuf.entries) - 1
}
predicateAt = next
}
splitKey := b.curBuf.Key(splitAt)
predicateKey := b.curBuf.Key(predicateAt)
log.VEventf(ctx, 1, "pre-splitting span %d of %d at %s", i, b.initialSplits, splitKey)
resp, err := b.sink.db.SplitAndScatter(ctx, splitKey, expire, predicateKey)
if err != nil {
// TODO(dt): a typed error would be nice here.
if strings.Contains(err.Error(), "predicate") {
log.VEventf(ctx, 1, "%s adder split at %s rejected, had previously split and no longer included %s", b.name, k, prev)
log.VEventf(ctx, 1, "%s adder split at %s rejected, had previously split and no longer included %s",
b.name, splitKey, predicateKey)
continue
}
return err
}

b.sink.flushCounts.splitWait += resp.Timing.Split
b.sink.flushCounts.scatterWait += resp.Timing.Scatter
if resp.ScatteredStats != nil {
moved := sz(resp.ScatteredStats.Total())
b.sink.flushCounts.scatterMoved += moved
if resp.ScatteredStats.Total() > 0 {
log.VEventf(ctx, 1, "pre-split scattered %s in non-empty range %s", moved, resp.ScatteredSpan)
log.VEventf(ctx, 1, "pre-split scattered %s in non-empty range %s",
moved, resp.ScatteredSpan)
}
}
created++
}

log.Infof(ctx, "%s adder created %d initial splits in %v from %d keys in %s buffer",
b.name, created, timing(timeutil.Since(before)), b.curBuf.Len(), b.curBuf.MemSize)

Expand Down

0 comments on commit ea35476

Please sign in to comment.