Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: cleanup span download function #117017

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 99 additions & 85 deletions pkg/server/span_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/authserver"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -84,99 +85,112 @@ func (s *systemStatusServer) DownloadSpan(
func (s *systemStatusServer) localDownloadSpan(
ctx context.Context, req *serverpb.DownloadSpanRequest,
) error {

return s.stores.VisitStores(func(store *kvserver.Store) error {
spanCh := make(chan roachpb.Span)

grp := ctxgroup.WithContext(ctx)
grp.GoCtx(func(ctx context.Context) error {
defer close(spanCh)
ctxDone := ctx.Done()

for _, sp := range req.Spans {
select {
case spanCh <- sp:
case <-ctxDone:
return ctx.Err()
}
}
return nil
})
downloadSpansCh, stopTuningCh := make(chan roachpb.Span), make(chan struct{})
return ctxgroup.GoAndWait(ctx,
// Tune download concurrency until stopTuningCh closes.
func(ctx context.Context) error {
t := time.NewTicker(time.Second * 15)
defer t.Stop()
return tuneDownloadCurrency(ctx, t.C, stopTuningCh, store.StateEngine(),
s.sqlServer.cfg.RuntimeStatSampler.GetCPUCombinedPercentNorm)
},
// Download until downloadSpansCh closes, then close stopTuningCh.
func(ctx context.Context) error {
defer close(stopTuningCh)
return downloadSpans(ctx, store.StateEngine(), downloadSpansCh)
},
// Send spans to downloadSpansCh.
func(ctx context.Context) error {
defer close(downloadSpansCh)
return sendDownloadSpans(ctx, req.Spans, downloadSpansCh)
},
)
})
}

const downloadWaiters = 16
downloadersDone := make(chan struct{}, downloadWaiters)
// sendDownloadSpans sends spans that cover all spans in spans to the passed ch.
func sendDownloadSpans(ctx context.Context, spans roachpb.Spans, out chan roachpb.Span) error {
ctxDone := ctx.Done()
for _, sp := range spans {
select {
case out <- sp:
case <-ctxDone:
return ctx.Err()
}
}
return nil
}

downloader := func(ctx context.Context) error {
for sp := range spanCh {
if err := store.TODOEngine().Download(ctx, sp); err != nil {
return err
}
// downloadSpans instructs the passed engine, in parallel, to downloads spans
// received on the passed ch until it closes.
func downloadSpans(ctx context.Context, eng storage.Engine, spans chan roachpb.Span) error {
const downloadWaiters = 16
return ctxgroup.GroupWorkers(ctx, downloadWaiters, func(ctx context.Context, _ int) error {
for sp := range spans {
if err := eng.Download(ctx, sp); err != nil {
return err
}
downloadersDone <- struct{}{}
return nil
}
return nil
})
}

for i := 0; i < downloadWaiters; i++ {
grp.GoCtx(downloader)
// tuneDownloadCurrency periodically adjusts download concurrency up or down on
// the passed engine based on the cpu load value read from the passed func until
// the passed done ch closes or is signaled.
func tuneDownloadCurrency(
ctx context.Context,
tick <-chan time.Time,
done <-chan struct{},
eng storage.Engine,
readCPU func() float64,
) error {
var added int64
// Remove any additional concurrency we've added when we exit.
//
// TODO(dt,radu): Ideally we'd adjust a separate limit that applies only
// to download compactions, so that this does not fight with manual calls
// to SetConcurrentCompactions.
defer func() {
if added != 0 {
adjusted := eng.AdjustCompactionConcurrency(-added)
log.Infof(ctx, "downloads complete; reset compaction concurrency to %d", adjusted)
}
}()

grp.GoCtx(func(ctx context.Context) error {
var added int64
// Remove any additional concurrency we've added when we exit.
//
// TODO(dt,radu): Ideally we'd adjust a separate limit that applies only
// to download compactions, so that this does not fight with manual calls
// to SetConcurrentCompactions.
defer func() {
if added != 0 {
adjusted := store.TODOEngine().AdjustCompactionConcurrency(-added)
log.Infof(ctx, "downloads complete; reset compaction concurrency to %d", adjusted)
}
}()

const maxAddedConcurrency, lowCPU, highCPU, initialIncrease = 16, 0.65, 0.8, 8

// Begin by bumping up the concurrency by 8, then start watching the CPU
// usage and adjusting up or down based on CPU until downloading finishes.
store.TODOEngine().AdjustCompactionConcurrency(initialIncrease)
added += initialIncrease

t := time.NewTicker(time.Second * 15)
defer t.Stop()
ctxDone := ctx.Done()

var waitersExited int
for {
select {
case <-ctxDone:
return ctx.Err()
case <-downloadersDone:
waitersExited++
// Return and stop managing added concurrency if the workers are done.
if waitersExited >= downloadWaiters {
return nil
}
case <-t.C:
cpu := s.sqlServer.cfg.RuntimeStatSampler.GetCPUCombinedPercentNorm()
if cpu > highCPU && added > 0 {
// If CPU is high and we have added any additional concurrency, we
// should reduce our added concurrency to make sure CPU is available
// for the execution of foreground traffic.
adjusted := store.TODOEngine().AdjustCompactionConcurrency(-1)
added--
log.Infof(ctx, "decreasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% > %.0f%%", added, adjusted, cpu*100, highCPU*100)
} else if cpu < lowCPU {
// If CPU is low, we should use it to do additional downloading.
if added < maxAddedConcurrency {
adjusted := store.TODOEngine().AdjustCompactionConcurrency(1)
added++
log.Infof(ctx, "increasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% < %.0f%%", added, adjusted, cpu*100, lowCPU*100)
}
}
const maxAddedConcurrency, lowCPU, highCPU, initialIncrease = 16, 0.65, 0.8, 8

// Begin by bumping up the concurrency by 8, then start watching the CPU
// usage and adjusting up or down based on CPU until downloading finishes.
eng.AdjustCompactionConcurrency(initialIncrease)
added += initialIncrease

ctxDone := ctx.Done()

for {
select {
case <-ctxDone:
return ctx.Err()
case <-done:
return nil
case <-tick:
cpu := readCPU()
if cpu > highCPU && added > 0 {
// If CPU is high and we have added any additional concurrency, we
// should reduce our added concurrency to make sure CPU is available
// for the execution of foreground traffic.
adjusted := eng.AdjustCompactionConcurrency(-1)
added--
log.Infof(ctx, "decreasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% > %.0f%%", added, adjusted, cpu*100, highCPU*100)
} else if cpu < lowCPU {
// If CPU is low, we should use it to do additional downloading.
if added < maxAddedConcurrency {
adjusted := eng.AdjustCompactionConcurrency(1)
added++
log.Infof(ctx, "increasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% < %.0f%%", added, adjusted, cpu*100, lowCPU*100)
}
}
})

return grp.Wait()
})
}
}
}