Skip to content

Commit

Permalink
sqlccl: rate limit Export and Import requests sent
Browse files Browse the repository at this point in the history
We're already limiting these on the server-side, but the BACKUP/RESTORE
gateway would fill up its distsender/grpc/something and cause all sorts
of badness (node liveness timeouts leading to mass leaseholder
transfers, poor performance on SQL workloads, etc) as well as log spam
about slow distsender requests.

There is likely some better fix post 1.0, this is being tracked in cockroachdb#14798.

For cockroachdb#14792.
  • Loading branch information
danhhz committed Apr 11, 2017
1 parent 61a77fd commit 213f0e9
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 10 deletions.
27 changes: 27 additions & 0 deletions pkg/ccl/sqlccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,38 @@ func Backup(
return BackupDescriptor{}, err
}

// We're already limiting these on the server-side, but sending all the
// Export requests at once would fill up distsender/grpc/something and cause
// all sorts of badness (node liveness timeouts leading to mass leaseholder
// transfers, poor performance on SQL workloads, etc) as well as log spam
// about slow distsender requests. Rate limit them here, too.
//
// TODO(dan): See if there's some better solution #14798.
var maxConcurrentExports int
{
maxReplicas := 1
for _, r := range ranges {
if numReplicas := len(r.Replicas); numReplicas > maxReplicas {
maxReplicas = numReplicas
}
}
maxConcurrentExports = maxReplicas * storageccl.ParallelRequestsLimit
}
exportsSem := make(chan struct{}, maxConcurrentExports)

header := roachpb.Header{Timestamp: endTime}
g, gCtx := errgroup.WithContext(ctx)
for i := range spans {
select {
case exportsSem <- struct{}{}:
case <-ctx.Done():
return BackupDescriptor{}, ctx.Err()
}

span := spans[i]
g.Go(func() error {
defer func() { <-exportsSem }()

req := &roachpb.ExportRequest{
Span: span,
Storage: storageConf,
Expand Down
36 changes: 36 additions & 0 deletions pkg/ccl/sqlccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,10 +675,46 @@ func Restore(
// TODO(dan): Wait for the newly created ranges (and leaseholders) to
// rebalance.

// We're already limiting these on the server-side, but sending all the
// Import requests at once would fill up distsender/grpc/something and cause
// all sorts of badness (node liveness timeouts leading to mass leaseholder
// transfers, poor performance on SQL workloads, etc) as well as log spam
// about slow distsender requests. Rate limit them here, too.
//
// TODO(dan): See if there's some better solution #14798.
var maxConcurrentImports int
{
var ranges []roachpb.RangeDescriptor
if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
var err error
ranges, err = allRangeDescriptors(ctx, txn)
return err
}); err != nil {
return err
}

maxReplicas := 1
for _, r := range ranges {
if numReplicas := len(r.Replicas); numReplicas > maxReplicas {
maxReplicas = numReplicas
}
}
maxConcurrentImports = maxReplicas * storageccl.ParallelRequestsLimit
}
importsSem := make(chan struct{}, maxConcurrentImports)

g, gCtx := errgroup.WithContext(ctx)
for i := range importRequests {
select {
case importsSem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}

ir := importRequests[i]
g.Go(func() error {
defer func() { <-importsSem }()

if err := Import(gCtx, db, ir.Key, ir.EndKey, ir.files, kr); err != nil {
return err
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/ccl/storageccl/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ import (
)

const (
// The number of ExportKeys or Ingest requests that can run at once. Both
// of these requests generally do some read/write from the network and
// cache results to a tmp file. In order to not exhaust the disk or memory,
// or saturate the network, limit the number of these that can be run in
// parallel. This number was chosen by a guess. If SST files are likely to
// not be over 200MB, then 5 parallel workers hopefully won't use more than
// 1GB of space in the tmp directory. It could be improved by more measured
// heuristics.
parallelRequestsLimit = 5
// ParallelRequestsLimit is the number of Export or Import requests that can
// run at once. Both of these requests generally do some read/write from the
// network and cache results to a tmp file. In order to not exhaust the disk
// or memory, or saturate the network, limit the number of these that can be
// run in parallel. This number was chosen by a guess. If SST files are
// likely to not be over 200MB, then 5 parallel workers hopefully won't use
// more than 1GB of space in the tmp directory. It could be improved by more
// measured heuristics.
ParallelRequestsLimit = 5
)

var (
parallelRequestsLimiter = make(chan struct{}, parallelRequestsLimit)
parallelRequestsLimiter = make(chan struct{}, ParallelRequestsLimit)
)

func beginLimitedRequest(ctx context.Context) error {
Expand Down

0 comments on commit 213f0e9

Please sign in to comment.