From 7af3d79141fc0f47729d525d6dfb45bb910cfb27 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Thu, 27 Apr 2017 13:11:47 -0400 Subject: [PATCH] storageccl: limit parallel Import requests to 1 We used to run 5, but this was overloading disks, which caused contention in RocksDB, which slowed down heartbeats, which caused mass lease transfers. This worked much better in our large scale tests and doesn't seem to slow it down much (10-15%). A 2TB restore finished with a handful of missed heartbeats. A followup will more smooth out the WriteBatch work, which helps even more. For #14792. --- pkg/ccl/sqlccl/backup.go | 2 +- pkg/ccl/storageccl/export.go | 16 ++++++++++++++-- pkg/ccl/storageccl/import.go | 16 ++++++++++++++-- pkg/ccl/storageccl/limiter.go | 33 ++++++++++++++------------------- 4 files changed, 43 insertions(+), 24 deletions(-) diff --git a/pkg/ccl/sqlccl/backup.go b/pkg/ccl/sqlccl/backup.go index 4abeb47e99a7..5d84f6ef826a 100644 --- a/pkg/ccl/sqlccl/backup.go +++ b/pkg/ccl/sqlccl/backup.go @@ -389,7 +389,7 @@ func Backup( // TODO(dan): Make this limiting per node. // // TODO(dan): See if there's some better solution than rate-limiting #14798. - maxConcurrentExports := clusterNodeCount(p.ExecCfg().Gossip) * storageccl.ParallelRequestsLimit + maxConcurrentExports := clusterNodeCount(p.ExecCfg().Gossip) * storageccl.ExportRequestLimit exportsSem := make(chan struct{}, maxConcurrentExports) header := roachpb.Header{Timestamp: endTime} diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index dfc8ead6314f..38bb6b4a6bc8 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -28,11 +28,23 @@ import ( "github.com/pkg/errors" ) +// ExportRequestLimit is the number of Export requests that can run at once. +// Each extracts data from RocksDB to a temp file and then uploads it to +// cloud storage. In order to not exhaust the disk or memory, or saturate +// the network, limit the number of these that can be run in parallel. This +// number was chosen by a guess. If SST files are likely to not be over +// 200MB, then 5 parallel workers hopefully won't use more than 1GB of space +// in the temp directory. It could be improved by more measured heuristics. +const ExportRequestLimit = 5 + +var exportRequestLimiter *concurrentRequestLimiter + func init() { storage.SetExportCmd(storage.Command{ DeclareKeys: declareKeysExport, Eval: evalExport, }) + exportRequestLimiter = makeConcurrentRequestLimiter(ExportRequestLimit) } func declareKeysExport( @@ -69,10 +81,10 @@ func evalExport( } } - if err := beginLimitedRequest(ctx); err != nil { + if err := exportRequestLimiter.beginLimitedRequest(ctx); err != nil { return storage.EvalResult{}, err } - defer endLimitedRequest() + defer exportRequestLimiter.endLimitedRequest() log.Infof(ctx, "export [%s,%s)", args.Key, args.EndKey) exportStore, err := MakeExportStorage(ctx, args.Storage) diff --git a/pkg/ccl/storageccl/import.go b/pkg/ccl/storageccl/import.go index f438add88fd8..a98ab6e9d76f 100644 --- a/pkg/ccl/storageccl/import.go +++ b/pkg/ccl/storageccl/import.go @@ -26,8 +26,20 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" ) +var importRequestLimiter *concurrentRequestLimiter + func init() { storage.SetImportCmd(evalImport) + + // importRequestLimit is the number of Import requests that can run at once. + // Each downloads a file from cloud storage to a temp file, iterates it, and + // sends WriteBatch requests to batch insert it. After accounting for write + // amplification, a single ImportRequest and the resulting WriteBatch + // requests is enough to keep an SSD busy. Any more and we risk contending + // RocksDB, which slows down heartbeats, which can cause mass lease + // transfers. + const importRequestLimit = 1 + importRequestLimiter = makeConcurrentRequestLimiter(importRequestLimit) } // evalImport bulk loads key/value entries. @@ -52,10 +64,10 @@ func evalImport(ctx context.Context, cArgs storage.CommandArgs) (*roachpb.Import ctx, span := tracing.ChildSpan(ctx, fmt.Sprintf("Import [%s,%s)", importStart, importEnd)) defer tracing.FinishSpan(span) - if err := beginLimitedRequest(ctx); err != nil { + if err := importRequestLimiter.beginLimitedRequest(ctx); err != nil { return nil, err } - defer endLimitedRequest() + defer importRequestLimiter.endLimitedRequest() log.Infof(ctx, "import [%s,%s)", importStart, importEnd) // Arrived at by tuning and watching the effect on BenchmarkRestore. diff --git a/pkg/ccl/storageccl/limiter.go b/pkg/ccl/storageccl/limiter.go index 22197fb130e4..1e74ab7d2d23 100644 --- a/pkg/ccl/storageccl/limiter.go +++ b/pkg/ccl/storageccl/limiter.go @@ -14,26 +14,21 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" ) -const ( - // ParallelRequestsLimit is the number of Export or Import requests that can - // run at once. Both of these requests generally do some read/write from the - // network and cache results to a tmp file. In order to not exhaust the disk - // or memory, or saturate the network, limit the number of these that can be - // run in parallel. This number was chosen by a guess. If SST files are - // likely to not be over 200MB, then 5 parallel workers hopefully won't use - // more than 1GB of space in the tmp directory. It could be improved by more - // measured heuristics. - ParallelRequestsLimit = 5 -) +// concurrentRequestLimiter allows a configurable number of requests to run at +// once, while respecting context.Context cancellation and adding tracing spans +// when a request has to block for the limiter. +type concurrentRequestLimiter struct { + sem chan struct{} +} -var ( - parallelRequestsLimiter = make(chan struct{}, ParallelRequestsLimit) -) +func makeConcurrentRequestLimiter(limit int) *concurrentRequestLimiter { + return &concurrentRequestLimiter{sem: make(chan struct{}, limit)} +} -func beginLimitedRequest(ctx context.Context) error { +func (l *concurrentRequestLimiter) beginLimitedRequest(ctx context.Context) error { // Check to see there's a slot immediately available. select { - case parallelRequestsLimiter <- struct{}{}: + case l.sem <- struct{}{}: return nil default: } @@ -46,11 +41,11 @@ func beginLimitedRequest(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() - case parallelRequestsLimiter <- struct{}{}: + case l.sem <- struct{}{}: return nil } } -func endLimitedRequest() { - <-parallelRequestsLimiter +func (l *concurrentRequestLimiter) endLimitedRequest() { + <-l.sem }