Skip to content

Commit

Permalink
server: use group for download workers
Browse files Browse the repository at this point in the history
Release note: none.
Epic: none.
  • Loading branch information
dt committed Jan 9, 2024
1 parent 01da063 commit 82baeca
Showing 1 changed file with 99 additions and 85 deletions.
184 changes: 99 additions & 85 deletions pkg/server/span_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/authserver"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -84,99 +85,112 @@ func (s *systemStatusServer) DownloadSpan(
func (s *systemStatusServer) localDownloadSpan(
ctx context.Context, req *serverpb.DownloadSpanRequest,
) error {

return s.stores.VisitStores(func(store *kvserver.Store) error {
spanCh := make(chan roachpb.Span)

grp := ctxgroup.WithContext(ctx)
grp.GoCtx(func(ctx context.Context) error {
defer close(spanCh)
ctxDone := ctx.Done()

for _, sp := range req.Spans {
select {
case spanCh <- sp:
case <-ctxDone:
return ctx.Err()
}
}
return nil
})
downloadSpansCh, stopTuningCh := make(chan roachpb.Span), make(chan struct{})
return ctxgroup.GoAndWait(ctx,
// Tune download concurrency until stopTuningCh closes.
func(ctx context.Context) error {
t := time.NewTicker(time.Second * 15)
defer t.Stop()
return tuneDownloadCurrency(ctx, t.C, stopTuningCh, store.StateEngine(),
s.sqlServer.cfg.RuntimeStatSampler.GetCPUCombinedPercentNorm)
},
// Download until downloadSpansCh closes, then close stopTuningCh.
func(ctx context.Context) error {
defer close(stopTuningCh)
return downloadSpans(ctx, store.StateEngine(), downloadSpansCh)
},
// Send spans to downloadSpansCh.
func(ctx context.Context) error {
defer close(downloadSpansCh)
return sendDownloadSpans(ctx, req.Spans, downloadSpansCh)
},
)
})
}

const downloadWaiters = 16
downloadersDone := make(chan struct{}, downloadWaiters)
// sendDownloadSpans sends spans that cover all spans in spans to the passed ch.
func sendDownloadSpans(ctx context.Context, spans roachpb.Spans, out chan roachpb.Span) error {
ctxDone := ctx.Done()
for _, sp := range spans {
select {
case out <- sp:
case <-ctxDone:
return ctx.Err()
}
}
return nil
}

downloader := func(ctx context.Context) error {
for sp := range spanCh {
if err := store.TODOEngine().Download(ctx, sp); err != nil {
return err
}
// downloadSpans instructs the passed engine, in parallel, to downloads spans
// received on the passed ch until it closes.
func downloadSpans(ctx context.Context, eng storage.Engine, spans chan roachpb.Span) error {
const downloadWaiters = 16
return ctxgroup.GroupWorkers(ctx, downloadWaiters, func(ctx context.Context, _ int) error {
for sp := range spans {
if err := eng.Download(ctx, sp); err != nil {
return err
}
downloadersDone <- struct{}{}
return nil
}
return nil
})
}

for i := 0; i < downloadWaiters; i++ {
grp.GoCtx(downloader)
// tuneDownloadCurrency periodically adjusts download concurrency up or down on
// the passed engine based on the cpu load value read from the passed func until
// the passed done ch closes or is signaled.
func tuneDownloadCurrency(
ctx context.Context,
tick <-chan time.Time,
done <-chan struct{},
eng storage.Engine,
readCPU func() float64,
) error {
var added int64
// Remove any additional concurrency we've added when we exit.
//
// TODO(dt,radu): Ideally we'd adjust a separate limit that applies only
// to download compactions, so that this does not fight with manual calls
// to SetConcurrentCompactions.
defer func() {
if added != 0 {
adjusted := eng.AdjustCompactionConcurrency(-added)
log.Infof(ctx, "downloads complete; reset compaction concurrency to %d", adjusted)
}
}()

grp.GoCtx(func(ctx context.Context) error {
var added int64
// Remove any additional concurrency we've added when we exit.
//
// TODO(dt,radu): Ideally we'd adjust a separate limit that applies only
// to download compactions, so that this does not fight with manual calls
// to SetConcurrentCompactions.
defer func() {
if added != 0 {
adjusted := store.TODOEngine().AdjustCompactionConcurrency(-added)
log.Infof(ctx, "downloads complete; reset compaction concurrency to %d", adjusted)
}
}()

const maxAddedConcurrency, lowCPU, highCPU, initialIncrease = 16, 0.65, 0.8, 8

// Begin by bumping up the concurrency by 8, then start watching the CPU
// usage and adjusting up or down based on CPU until downloading finishes.
store.TODOEngine().AdjustCompactionConcurrency(initialIncrease)
added += initialIncrease

t := time.NewTicker(time.Second * 15)
defer t.Stop()
ctxDone := ctx.Done()

var waitersExited int
for {
select {
case <-ctxDone:
return ctx.Err()
case <-downloadersDone:
waitersExited++
// Return and stop managing added concurrency if the workers are done.
if waitersExited >= downloadWaiters {
return nil
}
case <-t.C:
cpu := s.sqlServer.cfg.RuntimeStatSampler.GetCPUCombinedPercentNorm()
if cpu > highCPU && added > 0 {
// If CPU is high and we have added any additional concurrency, we
// should reduce our added concurrency to make sure CPU is available
// for the execution of foreground traffic.
adjusted := store.TODOEngine().AdjustCompactionConcurrency(-1)
added--
log.Infof(ctx, "decreasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% > %.0f%%", added, adjusted, cpu*100, highCPU*100)
} else if cpu < lowCPU {
// If CPU is low, we should use it to do additional downloading.
if added < maxAddedConcurrency {
adjusted := store.TODOEngine().AdjustCompactionConcurrency(1)
added++
log.Infof(ctx, "increasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% < %.0f%%", added, adjusted, cpu*100, lowCPU*100)
}
}
const maxAddedConcurrency, lowCPU, highCPU, initialIncrease = 16, 0.65, 0.8, 8

// Begin by bumping up the concurrency by 8, then start watching the CPU
// usage and adjusting up or down based on CPU until downloading finishes.
eng.AdjustCompactionConcurrency(initialIncrease)
added += initialIncrease

ctxDone := ctx.Done()

for {
select {
case <-ctxDone:
return ctx.Err()
case <-done:
return nil
case <-tick:
cpu := readCPU()
if cpu > highCPU && added > 0 {
// If CPU is high and we have added any additional concurrency, we
// should reduce our added concurrency to make sure CPU is available
// for the execution of foreground traffic.
adjusted := eng.AdjustCompactionConcurrency(-1)
added--
log.Infof(ctx, "decreasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% > %.0f%%", added, adjusted, cpu*100, highCPU*100)
} else if cpu < lowCPU {
// If CPU is low, we should use it to do additional downloading.
if added < maxAddedConcurrency {
adjusted := eng.AdjustCompactionConcurrency(1)
added++
log.Infof(ctx, "increasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% < %.0f%%", added, adjusted, cpu*100, lowCPU*100)
}
}
})

return grp.Wait()
})
}
}
}

0 comments on commit 82baeca

Please sign in to comment.