diff --git a/pkg/kv/bulk/buffering_adder.go b/pkg/kv/bulk/buffering_adder.go index 3946d8aeac59..c84695a96c89 100644 --- a/pkg/kv/bulk/buffering_adder.go +++ b/pkg/kv/bulk/buffering_adder.go @@ -353,38 +353,72 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error { } func (b *BufferingAdder) createInitialSplits(ctx context.Context) error { - targetSize := b.curBuf.Len() / b.initialSplits - log.Infof(ctx, "%s adder creating up to %d initial splits from %d keys in %s buffer", b.name, b.initialSplits, b.curBuf.Len(), b.curBuf.MemSize) + log.Infof(ctx, "%s adder creating up to %d initial splits from %d KVs in %s buffer", + b.name, b.initialSplits, b.curBuf.Len(), b.curBuf.MemSize) hour := hlc.Timestamp{WallTime: timeutil.Now().Add(time.Hour).UnixNano()} - before := timeutil.Now() created := 0 - for i := targetSize; i < b.curBuf.Len(); i += targetSize { - k := b.curBuf.Key(i) - prev := b.curBuf.Key(i - targetSize) - log.VEventf(ctx, 1, "%s adder pre-splitting at key %d of %d at %s", b.name, i, b.curBuf.Len(), k) - resp, err := b.sink.db.SplitAndScatter(ctx, k, hour, prev) + width := len(b.curBuf.entries) / b.initialSplits + for i := 0; i < b.initialSplits; i++ { + expire := hour + if i == 0 { + // If we over-split because our input is loosely ordered and we're just + // seeing a sample of the first span here vs a sample of all of it, then + // we may not fill enough for these splits to remain on their own. In that + // case we'd really prefer the other splits be merged away first rather + // than the first split, as it serves the important purpose of segregating + // this processor's span from the one below it when is being constantly + // re-scattered by that processor, so give the first split an extra hour. + expire = hour.Add(time.Hour.Nanoseconds(), 0) + } + + splitAt := i * width + if splitAt >= len(b.curBuf.entries) { + break + } + // Typically we split at splitAt if, and only if, its range still includes + // the prior split, indicating no other processor is also splitting this + // span. However, for the first split, there is no prior split, so we can + // use the next split instead, as it too proves the range that need to be + // split still has enough "width" (i.e. between two splits) to indicate that + // another processor hasn't already split it. + predicateAt := splitAt - width + if predicateAt < 0 { + next := splitAt + width + if next > len(b.curBuf.entries)-1 { + next = len(b.curBuf.entries) - 1 + } + predicateAt = next + } + splitKey := b.curBuf.Key(splitAt) + predicateKey := b.curBuf.Key(predicateAt) + log.VEventf(ctx, 1, "pre-splitting span %d of %d at %s", i, b.initialSplits, splitKey) + resp, err := b.sink.db.SplitAndScatter(ctx, splitKey, expire, predicateKey) if err != nil { // TODO(dt): a typed error would be nice here. if strings.Contains(err.Error(), "predicate") { - log.VEventf(ctx, 1, "%s adder split at %s rejected, had previously split and no longer included %s", b.name, k, prev) + log.VEventf(ctx, 1, "%s adder split at %s rejected, had previously split and no longer included %s", + b.name, splitKey, predicateKey) continue } return err } + b.sink.flushCounts.splitWait += resp.Timing.Split b.sink.flushCounts.scatterWait += resp.Timing.Scatter if resp.ScatteredStats != nil { moved := sz(resp.ScatteredStats.Total()) b.sink.flushCounts.scatterMoved += moved if resp.ScatteredStats.Total() > 0 { - log.VEventf(ctx, 1, "pre-split scattered %s in non-empty range %s", moved, resp.ScatteredSpan) + log.VEventf(ctx, 1, "pre-split scattered %s in non-empty range %s", + moved, resp.ScatteredSpan) } } created++ } + log.Infof(ctx, "%s adder created %d initial splits in %v from %d keys in %s buffer", b.name, created, timing(timeutil.Since(before)), b.curBuf.Len(), b.curBuf.MemSize)