Skip to content

Commit

Permalink
Merge pull request cockroachdb#80487 from cockroachdb/blathers/backpo…
Browse files Browse the repository at this point in the history
…rt-release-22.1.0-80386

release-22.1.0: release-22.1: kv/bulk: parallelize sending SSTs due to range bounds
  • Loading branch information
dt authored Apr 25, 2022
2 parents 5a628cb + 180a222 commit b7681be
Show file tree
Hide file tree
Showing 15 changed files with 410 additions and 217 deletions.
5 changes: 2 additions & 3 deletions docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ pkg/base/node_id.go | `*SQLIDContainer`
pkg/base/node_id.go | `*StoreIDContainer`
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/bulk/sst_batcher.go | `sorted`
pkg/kv/bulk/sst_batcher.go | `sz`
pkg/kv/bulk/sst_batcher.go | `timing`
pkg/kv/bulk/stats.go | `sz`
pkg/kv/bulk/stats.go | `timing`
pkg/kv/kvserver/closedts/ctpb/service.go | `LAI`
pkg/kv/kvserver/closedts/ctpb/service.go | `SeqNum`
pkg/kv/kvserver/concurrency/lock/locking.go | `Durability`
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,12 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
disallowShadowingBelow,
writeAtBatchTS,
false, /* splitFilledRanges */
rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
)
if err != nil {
return summary, err
}
defer batcher.Close()
defer batcher.Close(ctx)

var keyScratch, valueScratch []byte

Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -248,8 +249,9 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{},
s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, nil, nil)
},
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
BackupMonitor: mon.NewUnlimitedMonitor(ctx, "test", mon.MemoryResource, nil, nil, 0, s.ClusterSettings()),
},
EvalCtx: &tree.EvalContext{
Codec: keys.SystemSQLCodec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
evalCtx := sip.FlowCtx.EvalCtx
db := sip.FlowCtx.Cfg.DB
var err error
sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db, evalCtx.Settings)
sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db, evalCtx.Settings, sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount())
if err != nil {
sip.MoveToDraining(errors.Wrap(err, "creating stream sst batcher"))
return
Expand Down Expand Up @@ -321,7 +321,7 @@ func (sip *streamIngestionProcessor) close() {
_ = client.Close()
}
if sip.batcher != nil {
sip.batcher.Close()
sip.batcher.Close(sip.Ctx)
}
if sip.maxFlushRateTimer != nil {
sip.maxFlushRateTimer.Stop()
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"kv_buf.go",
"setting.go",
"sst_batcher.go",
"stats.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/bulk",
visibility = ["//visibility:public"],
Expand All @@ -23,13 +24,16 @@ go_library(
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/admission",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)

Expand Down
137 changes: 41 additions & 96 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
Expand All @@ -51,19 +52,6 @@ type BufferingAdder struct {

lastFlush time.Time

// flushCounts accumulates performance and debug info for logging.
flushCounts struct {
total int // number of flushes.
bufferSize int // number of flushes due to buffer size.
totalSort time.Duration
totalFlush time.Duration
totalFilling time.Duration
// span tracks the total span into which this batcher has flushed. It is
// only maintained if log.V(1), so if vmodule is upped mid-ingest it may be
// incomplete.
span roachpb.Span
}

// name of the BufferingAdder for the purpose of logging only.
name string

Expand Down Expand Up @@ -109,6 +97,7 @@ func MakeBulkAdder(
disallowShadowingBelow: opts.DisallowShadowingBelow,
batchTS: opts.BatchTimestamp,
writeAtBatchTS: opts.WriteAtBatchTimestamp,
mem: bulkMon.MakeBoundAccount(),
},
timestamp: timestamp,
maxBufferLimit: opts.MaxBufferSize,
Expand All @@ -118,6 +107,7 @@ func MakeBulkAdder(
lastFlush: timeutil.Now(),
}

b.sink.mem.Mu = &syncutil.Mutex{}
// At minimum a bulk adder needs enough space to store a buffer of
// curBufferSize, and a subsequent SST of SSTSize in-memory. If the memory
// account is unable to reserve this minimum threshold we cannot continue.
Expand All @@ -142,39 +132,18 @@ func (b *BufferingAdder) SetOnFlush(fn func(summary roachpb.BulkOpSummary)) {

// Close closes the underlying SST builder.
func (b *BufferingAdder) Close(ctx context.Context) {
if b.flushCounts.total > 0 {
log.VEventf(ctx, 1,
"%s adder closing; ingested %s (%s): %s filling; %v sorting; %v / %v flushing; %v sending; %v splitting; %d; %v scattering, %d, %v; %v commit-wait",
b.name,
sz(b.sink.totalRows.DataSize),
sorted(b.sorted),
timing(b.flushCounts.totalFilling),
timing(b.flushCounts.totalSort),
timing(b.flushCounts.totalFlush),
timing(b.sink.flushCounts.flushWait),
timing(b.sink.flushCounts.sendWait),
timing(b.sink.flushCounts.splitWait),
b.sink.flushCounts.splits,
timing(b.sink.flushCounts.scatterWait),
b.sink.flushCounts.scatters,
b.sink.flushCounts.scatterMoved,
timing(b.sink.flushCounts.commitWait),
)
log.VEventf(ctx, 2, "%s adder closing; flushed into %s %d times, %d due to buffer size (%s); flushing chunked into %d files (%d for ranges, %d for sst size, +%d after split-retries)",
b.name,
b.flushCounts.span,
b.flushCounts.total,
b.flushCounts.bufferSize,
sz(b.memAcc.Used()),
b.sink.flushCounts.total,
b.sink.flushCounts.dueToRange,
b.sink.flushCounts.dueToSize,
b.sink.flushCounts.files-b.sink.flushCounts.total,
)
} else {
log.VEventf(ctx, 3, "%s adder closing; ingested nothing", b.name)
if log.V(1) {
if b.sink.stats.bufferFlushes > 0 {
b.sink.stats.LogTimings(ctx, b.name, "closing")
if log.V(3) {
b.sink.stats.LogPerStoreTimings(ctx, b.name)
}
b.sink.stats.LogFlushes(ctx, b.name, "closing", sz(b.memAcc.Used()))
} else {
log.Infof(ctx, "%s adder closing; ingested nothing", b.name)
}
}
b.sink.Close()
b.sink.Close(ctx)

if b.bulkMon != nil {
b.memAcc.Close(ctx)
Expand All @@ -196,7 +165,7 @@ func (b *BufferingAdder) Add(ctx context.Context, key roachpb.Key, value []byte)
return b.curBuf.append(key, value)
}

b.flushCounts.bufferSize++
b.sink.stats.flushesDueToSize++
log.VEventf(ctx, 3, "%s adder triggering flush of %s of KVs in %s buffer",
b.name, b.curBuf.KVSize(), b.bufferedMemSize())

Expand Down Expand Up @@ -246,7 +215,7 @@ func (b *BufferingAdder) Flush(ctx context.Context) error {
}

func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
b.flushCounts.totalFilling += timeutil.Since(b.lastFlush)
b.sink.stats.fillWait += timeutil.Since(b.lastFlush)

if b.bufferedKeys() == 0 {
if b.onFlush != nil {
Expand All @@ -258,10 +227,10 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
if err := b.sink.Reset(ctx); err != nil {
return err
}
b.flushCounts.total++
b.sink.stats.bufferFlushes++

before := b.sink.flushCounts
beforeSize := b.sink.totalRows.DataSize
before := b.sink.stats
beforeSize := b.sink.mu.totalRows.DataSize

beforeSort := timeutil.Now()

Expand All @@ -271,7 +240,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
mvccKey := storage.MVCCKey{Timestamp: b.timestamp}

beforeFlush := timeutil.Now()
b.flushCounts.totalSort += beforeFlush.Sub(beforeSort)
b.sink.stats.sortWait += beforeFlush.Sub(beforeSort)

// If this is the first flush and is due to size, if it was unsorted then
// create initial splits if requested before flushing.
Expand All @@ -286,8 +255,8 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
}

if log.V(1) {
if len(b.flushCounts.span.Key) == 0 || b.curBuf.Key(0).Compare(b.flushCounts.span.Key) < 0 {
b.flushCounts.span.Key = b.curBuf.Key(0).Clone()
if len(b.sink.stats.span.Key) == 0 || b.curBuf.Key(0).Compare(b.sink.stats.span.Key) < 0 {
b.sink.stats.span.Key = b.curBuf.Key(0).Clone()
}
}

Expand All @@ -302,18 +271,18 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
}

if log.V(1) {
if b.flushCounts.span.EndKey.Compare(mvccKey.Key) < 0 {
b.flushCounts.span.EndKey = mvccKey.Key.Clone()
if b.sink.stats.span.EndKey.Compare(mvccKey.Key) < 0 {
b.sink.stats.span.EndKey = mvccKey.Key.Clone()
}
}

b.flushCounts.totalFlush += timeutil.Since(beforeFlush)
b.sink.stats.flushWait += timeutil.Since(beforeFlush)

if log.V(3) {
written := b.sink.totalRows.DataSize - beforeSize
files := b.sink.flushCounts.total - before.total
dueToSplits := b.sink.flushCounts.dueToRange - before.dueToRange
dueToSize := b.sink.flushCounts.dueToSize - before.dueToSize
written := b.sink.mu.totalRows.DataSize - beforeSize
files := b.sink.stats.batches - before.batches
dueToSplits := b.sink.stats.batchesDueToRange - before.batchesDueToRange
dueToSize := b.sink.stats.batchesDueToRange - before.batchesDueToRange

log.Infof(ctx,
"%s adder flushing %s (%s buffered/%0.2gx) wrote %d SSTs (avg: %s) with %d for splits, %d for size, took %v",
Expand All @@ -329,39 +298,15 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
)
}

if log.V(4) {
log.Infof(ctx,
"%s adder has ingested %s (%s): %s filling; %v sorting; %v / %v flushing; %v sending; %v splitting; %d; %v scattering, %d, %v; %v commit-wait",
b.name,
sz(b.sink.totalRows.DataSize),
sorted(b.sorted),
timing(b.flushCounts.totalFilling),
timing(b.flushCounts.totalSort),
timing(b.flushCounts.totalFlush),
timing(b.sink.flushCounts.flushWait),
timing(b.sink.flushCounts.sendWait),
timing(b.sink.flushCounts.splitWait),
b.sink.flushCounts.splits,
timing(b.sink.flushCounts.scatterWait),
b.sink.flushCounts.scatters,
b.sink.flushCounts.scatterMoved,
timing(b.sink.flushCounts.commitWait),
)
if log.V(2) {
b.sink.stats.LogTimings(ctx, b.name, "flushed")
if log.V(3) {
b.sink.stats.LogPerStoreTimings(ctx, b.name)
}
}

if log.V(5) {
log.Infof(ctx,
"%s adder has flushed into %s %d times, %d due to buffer size (%s), chunked as %d files (%d for ranges, %d for sst size, +%d for split-retries)",
b.name,
b.flushCounts.span,
b.flushCounts.total,
b.flushCounts.bufferSize,
sz(b.memAcc.Used()),
b.sink.flushCounts.total,
b.sink.flushCounts.dueToRange,
b.sink.flushCounts.dueToSize,
b.sink.flushCounts.files-b.sink.flushCounts.total,
)
if log.V(3) {
b.sink.stats.LogFlushes(ctx, b.name, "flushed", sz(b.memAcc.Used()))
}

if b.onFlush != nil {
Expand Down Expand Up @@ -436,27 +381,27 @@ func (b *BufferingAdder) createInitialSplits(ctx context.Context) error {
splitsWait := beforeScatters.Sub(beforeSplits)
log.Infof(ctx, "%s adder created %d initial splits in %v from %d keys in %s buffer",
b.name, len(toScatter), timing(splitsWait), b.curBuf.Len(), b.curBuf.MemSize())
b.sink.flushCounts.splits += len(toScatter)
b.sink.flushCounts.splitWait += splitsWait
b.sink.stats.splits += len(toScatter)
b.sink.stats.splitWait += splitsWait

for _, splitKey := range toScatter {
resp, err := b.sink.db.AdminScatter(ctx, splitKey, 0 /* maxSize */)
if err != nil {
log.Warningf(ctx, "failed to scatter: %v", err)
continue
}
b.sink.flushCounts.scatters++
b.sink.stats.scatters++
if resp.MVCCStats != nil {
moved := sz(resp.MVCCStats.Total())
b.sink.flushCounts.scatterMoved += moved
b.sink.stats.scatterMoved += moved
if moved > 0 {
log.VEventf(ctx, 1, "pre-split scattered %s in non-empty range %s",
moved, resp.RangeInfos[0].Desc.KeySpan().AsRawSpanWithNoLocals())
}
}
}
scattersWait := timeutil.Since(beforeScatters)
b.sink.flushCounts.scatterWait += scattersWait
b.sink.stats.scatterWait += scattersWait
log.Infof(ctx, "%s adder scattered %d initial split spans in %v",
b.name, len(toScatter), timing(scattersWait))

Expand Down
Loading

0 comments on commit b7681be

Please sign in to comment.