Skip to content

Commit

Permalink
storageccl: limit parallel Import requests to 1
Browse files Browse the repository at this point in the history
We used to run 5, but this was overloading disks, which caused
contention in RocksDB, which slowed down heartbeats, which caused mass
lease transfers. This worked much better in our large scale tests and
doesn't seem to slow it down much (10-15%). A 2TB restore finished with
a handful of missed heartbeats. A followup will more smooth out the
WriteBatch work, which helps even more.

For cockroachdb#14792.
  • Loading branch information
danhhz committed Apr 27, 2017
1 parent 9052d7c commit d95fdc5
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/sqlccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func Backup(
// TODO(dan): Make this limiting per node.
//
// TODO(dan): See if there's some better solution than rate-limiting #14798.
maxConcurrentExports := clusterNodeCount(p.ExecCfg().Gossip) * storageccl.ParallelRequestsLimit
maxConcurrentExports := clusterNodeCount(p.ExecCfg().Gossip) * storageccl.ExportRequestLimit
exportsSem := make(chan struct{}, maxConcurrentExports)

header := roachpb.Header{Timestamp: endTime}
Expand Down
15 changes: 13 additions & 2 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ import (
"github.com/pkg/errors"
)

// ExportRequestLimit is the number of Export requests that can run at once.
// Each extracts data from RocksDB to a temp file and then uploads it to cloud
// storage. In order to not exhaust the disk or memory, or saturate the network,
// limit the number of these that can be run in parallel. This number was chosen
// by a guess. If SST files are likely to not be over 200MB, then 5 parallel
// workers hopefully won't use more than 1GB of space in the temp directory. It
// could be improved by more measured heuristics.
const ExportRequestLimit = 5

var exportRequestLimiter = makeConcurrentRequestLimiter(ExportRequestLimit)

func init() {
storage.SetExportCmd(storage.Command{
DeclareKeys: declareKeysExport,
Expand Down Expand Up @@ -69,10 +80,10 @@ func evalExport(
}
}

if err := beginLimitedRequest(ctx); err != nil {
if err := exportRequestLimiter.beginLimitedRequest(ctx); err != nil {
return storage.EvalResult{}, err
}
defer endLimitedRequest()
defer exportRequestLimiter.endLimitedRequest()
log.Infof(ctx, "export [%s,%s)", args.Key, args.EndKey)

exportStore, err := MakeExportStorage(ctx, args.Storage)
Expand Down
15 changes: 13 additions & 2 deletions pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// importRequestLimit is the number of Import requests that can run at once.
// Each downloads a file from cloud storage to a temp file, iterates it, and
// sends WriteBatch requests to batch insert it. After accounting for write
// amplification, a single ImportRequest and the resulting WriteBatch
// requests is enough to keep an SSD busy. Any more and we risk contending
// RocksDB, which slows down heartbeats, which can cause mass lease
// transfers.
const importRequestLimit = 1

var importRequestLimiter = makeConcurrentRequestLimiter(importRequestLimit)

func init() {
storage.SetImportCmd(evalImport)
}
Expand All @@ -52,10 +63,10 @@ func evalImport(ctx context.Context, cArgs storage.CommandArgs) (*roachpb.Import
ctx, span := tracing.ChildSpan(ctx, fmt.Sprintf("Import [%s,%s)", importStart, importEnd))
defer tracing.FinishSpan(span)

if err := beginLimitedRequest(ctx); err != nil {
if err := importRequestLimiter.beginLimitedRequest(ctx); err != nil {
return nil, err
}
defer endLimitedRequest()
defer importRequestLimiter.endLimitedRequest()
log.Infof(ctx, "import [%s,%s)", importStart, importEnd)

// Arrived at by tuning and watching the effect on BenchmarkRestore.
Expand Down
33 changes: 14 additions & 19 deletions pkg/ccl/storageccl/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,21 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

const (
// ParallelRequestsLimit is the number of Export or Import requests that can
// run at once. Both of these requests generally do some read/write from the
// network and cache results to a tmp file. In order to not exhaust the disk
// or memory, or saturate the network, limit the number of these that can be
// run in parallel. This number was chosen by a guess. If SST files are
// likely to not be over 200MB, then 5 parallel workers hopefully won't use
// more than 1GB of space in the tmp directory. It could be improved by more
// measured heuristics.
ParallelRequestsLimit = 5
)
// concurrentRequestLimiter allows a configurable number of requests to run at
// once, while respecting context.Context cancellation and adding tracing spans
// when a request has to block for the limiter.
type concurrentRequestLimiter struct {
sem chan struct{}
}

var (
parallelRequestsLimiter = make(chan struct{}, ParallelRequestsLimit)
)
func makeConcurrentRequestLimiter(limit int) concurrentRequestLimiter {
return concurrentRequestLimiter{sem: make(chan struct{}, limit)}
}

func beginLimitedRequest(ctx context.Context) error {
func (l *concurrentRequestLimiter) beginLimitedRequest(ctx context.Context) error {
// Check to see there's a slot immediately available.
select {
case parallelRequestsLimiter <- struct{}{}:
case l.sem <- struct{}{}:
return nil
default:
}
Expand All @@ -46,11 +41,11 @@ func beginLimitedRequest(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case parallelRequestsLimiter <- struct{}{}:
case l.sem <- struct{}{}:
return nil
}
}

func endLimitedRequest() {
<-parallelRequestsLimiter
func (l *concurrentRequestLimiter) endLimitedRequest() {
<-l.sem
}

0 comments on commit d95fdc5

Please sign in to comment.