diff --git a/pkg/server/span_download.go b/pkg/server/span_download.go index 57502b9f3c14..845cecb697a4 100644 --- a/pkg/server/span_download.go +++ b/pkg/server/span_download.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/authserver" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/srverrors" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -84,99 +85,112 @@ func (s *systemStatusServer) DownloadSpan( func (s *systemStatusServer) localDownloadSpan( ctx context.Context, req *serverpb.DownloadSpanRequest, ) error { - return s.stores.VisitStores(func(store *kvserver.Store) error { - spanCh := make(chan roachpb.Span) - - grp := ctxgroup.WithContext(ctx) - grp.GoCtx(func(ctx context.Context) error { - defer close(spanCh) - ctxDone := ctx.Done() - - for _, sp := range req.Spans { - select { - case spanCh <- sp: - case <-ctxDone: - return ctx.Err() - } - } - return nil - }) + downloadSpansCh, stopTuningCh := make(chan roachpb.Span), make(chan struct{}) + return ctxgroup.GoAndWait(ctx, + // Tune download concurrency until stopTuningCh closes. + func(ctx context.Context) error { + t := time.NewTicker(time.Second * 15) + defer t.Stop() + return tuneDownloadCurrency(ctx, t.C, stopTuningCh, store.StateEngine(), + s.sqlServer.cfg.RuntimeStatSampler.GetCPUCombinedPercentNorm) + }, + // Download until downloadSpansCh closes, then close stopTuningCh. + func(ctx context.Context) error { + defer close(stopTuningCh) + return downloadSpans(ctx, store.StateEngine(), downloadSpansCh) + }, + // Send spans to downloadSpansCh. + func(ctx context.Context) error { + defer close(downloadSpansCh) + return sendDownloadSpans(ctx, req.Spans, downloadSpansCh) + }, + ) + }) +} - const downloadWaiters = 16 - downloadersDone := make(chan struct{}, downloadWaiters) +// sendDownloadSpans sends spans that cover all spans in spans to the passed ch. +func sendDownloadSpans(ctx context.Context, spans roachpb.Spans, out chan roachpb.Span) error { + ctxDone := ctx.Done() + for _, sp := range spans { + select { + case out <- sp: + case <-ctxDone: + return ctx.Err() + } + } + return nil +} - downloader := func(ctx context.Context) error { - for sp := range spanCh { - if err := store.TODOEngine().Download(ctx, sp); err != nil { - return err - } +// downloadSpans instructs the passed engine, in parallel, to downloads spans +// received on the passed ch until it closes. +func downloadSpans(ctx context.Context, eng storage.Engine, spans chan roachpb.Span) error { + const downloadWaiters = 16 + return ctxgroup.GroupWorkers(ctx, downloadWaiters, func(ctx context.Context, _ int) error { + for sp := range spans { + if err := eng.Download(ctx, sp); err != nil { + return err } - downloadersDone <- struct{}{} - return nil } + return nil + }) +} - for i := 0; i < downloadWaiters; i++ { - grp.GoCtx(downloader) +// tuneDownloadCurrency periodically adjusts download concurrency up or down on +// the passed engine based on the cpu load value read from the passed func until +// the passed done ch closes or is signaled. +func tuneDownloadCurrency( + ctx context.Context, + tick <-chan time.Time, + done <-chan struct{}, + eng storage.Engine, + readCPU func() float64, +) error { + var added int64 + // Remove any additional concurrency we've added when we exit. + // + // TODO(dt,radu): Ideally we'd adjust a separate limit that applies only + // to download compactions, so that this does not fight with manual calls + // to SetConcurrentCompactions. + defer func() { + if added != 0 { + adjusted := eng.AdjustCompactionConcurrency(-added) + log.Infof(ctx, "downloads complete; reset compaction concurrency to %d", adjusted) } + }() - grp.GoCtx(func(ctx context.Context) error { - var added int64 - // Remove any additional concurrency we've added when we exit. - // - // TODO(dt,radu): Ideally we'd adjust a separate limit that applies only - // to download compactions, so that this does not fight with manual calls - // to SetConcurrentCompactions. - defer func() { - if added != 0 { - adjusted := store.TODOEngine().AdjustCompactionConcurrency(-added) - log.Infof(ctx, "downloads complete; reset compaction concurrency to %d", adjusted) - } - }() - - const maxAddedConcurrency, lowCPU, highCPU, initialIncrease = 16, 0.65, 0.8, 8 - - // Begin by bumping up the concurrency by 8, then start watching the CPU - // usage and adjusting up or down based on CPU until downloading finishes. - store.TODOEngine().AdjustCompactionConcurrency(initialIncrease) - added += initialIncrease - - t := time.NewTicker(time.Second * 15) - defer t.Stop() - ctxDone := ctx.Done() - - var waitersExited int - for { - select { - case <-ctxDone: - return ctx.Err() - case <-downloadersDone: - waitersExited++ - // Return and stop managing added concurrency if the workers are done. - if waitersExited >= downloadWaiters { - return nil - } - case <-t.C: - cpu := s.sqlServer.cfg.RuntimeStatSampler.GetCPUCombinedPercentNorm() - if cpu > highCPU && added > 0 { - // If CPU is high and we have added any additional concurrency, we - // should reduce our added concurrency to make sure CPU is available - // for the execution of foreground traffic. - adjusted := store.TODOEngine().AdjustCompactionConcurrency(-1) - added-- - log.Infof(ctx, "decreasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% > %.0f%%", added, adjusted, cpu*100, highCPU*100) - } else if cpu < lowCPU { - // If CPU is low, we should use it to do additional downloading. - if added < maxAddedConcurrency { - adjusted := store.TODOEngine().AdjustCompactionConcurrency(1) - added++ - log.Infof(ctx, "increasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% < %.0f%%", added, adjusted, cpu*100, lowCPU*100) - } - } + const maxAddedConcurrency, lowCPU, highCPU, initialIncrease = 16, 0.65, 0.8, 8 + + // Begin by bumping up the concurrency by 8, then start watching the CPU + // usage and adjusting up or down based on CPU until downloading finishes. + eng.AdjustCompactionConcurrency(initialIncrease) + added += initialIncrease + + ctxDone := ctx.Done() + + for { + select { + case <-ctxDone: + return ctx.Err() + case <-done: + return nil + case <-tick: + cpu := readCPU() + if cpu > highCPU && added > 0 { + // If CPU is high and we have added any additional concurrency, we + // should reduce our added concurrency to make sure CPU is available + // for the execution of foreground traffic. + adjusted := eng.AdjustCompactionConcurrency(-1) + added-- + log.Infof(ctx, "decreasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% > %.0f%%", added, adjusted, cpu*100, highCPU*100) + } else if cpu < lowCPU { + // If CPU is low, we should use it to do additional downloading. + if added < maxAddedConcurrency { + adjusted := eng.AdjustCompactionConcurrency(1) + added++ + log.Infof(ctx, "increasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% < %.0f%%", added, adjusted, cpu*100, lowCPU*100) } } - }) - - return grp.Wait() - }) + } + } }