diff --git a/pkg/ccl/sqlccl/restore.go b/pkg/ccl/sqlccl/restore.go index fef50d86d595..6bf2e160f977 100644 --- a/pkg/ccl/sqlccl/restore.go +++ b/pkg/ccl/sqlccl/restore.go @@ -724,14 +724,16 @@ func Restore( // transfers, poor performance on SQL workloads, etc) as well as log spam // about slow distsender requests. Rate limit them here, too. // - // Use the number of nodes in the cluster as the number of outstanding - // Import requests for the rate limiting. TODO(dan): This is very - // conservative, see if we can bump it back up by rate limiting WriteBatch. + // Use twice the number of nodes in the cluster as the number of outstanding + // Import requests for the rate limiting. We don't do anything smart here + // about per-node limits, but this was picked to balance between Import + // requests sitting in distsender for a long time and all nodes having work + // to do. // // TODO(dan): Make this limiting per node. // // TODO(dan): See if there's some better solution than rate-limiting #14798. - maxConcurrentImports := clusterNodeCount(p.ExecCfg().Gossip) + maxConcurrentImports := clusterNodeCount(p.ExecCfg().Gossip) * 2 importsSem := make(chan struct{}, maxConcurrentImports) mu := struct { diff --git a/pkg/ccl/storageccl/import.go b/pkg/ccl/storageccl/import.go index 8976c7ab711a..9705267a9b74 100644 --- a/pkg/ccl/storageccl/import.go +++ b/pkg/ccl/storageccl/import.go @@ -26,14 +26,19 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" ) -// importRequestLimit is the number of Import requests that can run at once. -// Each downloads a file from cloud storage to a temp file, iterates it, and -// sends WriteBatch requests to batch insert it. After accounting for write -// amplification, a single ImportRequest and the resulting WriteBatch -// requests is enough to keep an SSD busy. Any more and we risk contending -// RocksDB, which slows down heartbeats, which can cause mass lease -// transfers. -const importRequestLimit = 1 +const ( + // importRequestLimit is the number of Import requests that can run at once. + // Each downloads a file from cloud storage to a temp file, iterates it, and + // sends WriteBatch requests to batch insert it. After accounting for write + // amplification, a single ImportRequest and the resulting WriteBatch + // requests is enough to keep an SSD busy. Any more and we risk contending + // RocksDB, which slows down heartbeats, which can cause mass lease + // transfers. + importRequestLimit = 1 + + // Arrived at by tuning and watching the effect on BenchmarkRestore. + importBatchSizeBytes = 1000000 +) var importRequestLimiter = makeConcurrentRequestLimiter(importRequestLimit) @@ -69,9 +74,6 @@ func evalImport(ctx context.Context, cArgs storage.CommandArgs) (*roachpb.Import defer importRequestLimiter.endLimitedRequest() log.Infof(ctx, "import [%s,%s)", importStart, importEnd) - // Arrived at by tuning and watching the effect on BenchmarkRestore. - const batchSizeBytes = 1000000 - type batchBuilder struct { batch engine.RocksDBBatchBuilder batchStartKey []byte @@ -219,7 +221,7 @@ func evalImport(ctx context.Context, cArgs storage.CommandArgs) (*roachpb.Import b.batchEndKey = append(b.batchEndKey[:0], key.Key...) } - if b.batch.Len() > batchSizeBytes { + if b.batch.Len() > importBatchSizeBytes { sendWriteBatch() } } diff --git a/pkg/ccl/storageccl/writebatch.go b/pkg/ccl/storageccl/writebatch.go index 7c6ed8f6f5a4..e6e2c99cea4a 100644 --- a/pkg/ccl/storageccl/writebatch.go +++ b/pkg/ccl/storageccl/writebatch.go @@ -9,19 +9,33 @@ package storageccl import ( - "errors" "fmt" + "sync" "golang.org/x/net/context" + "golang.org/x/time/rate" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" ) +// TODO(dan): This 2 MB/s default is quite conservative. There's more tuning to +// be done here. +var writeBatchLimit = settings.RegisterByteSizeSetting( + "enterprise.writebatch.max_rate", + "the rate limit (bytes/sec) to use for WriteBatch requests", + 2*1024*1024, // 2 MB/s +) + +var writeBatchLimiter = rate.NewLimiter(rate.Limit(writeBatchLimit.Get()), importBatchSizeBytes) +var writeBatchLimiterMu sync.Mutex + func init() { storage.SetWriteBatchCmd(storage.Command{ DeclareKeys: storage.DefaultDeclareKeys, @@ -42,6 +56,30 @@ func evalWriteBatch( _, span := tracing.ChildSpan(ctx, fmt.Sprintf("WriteBatch [%s,%s)", args.Key, args.EndKey)) defer tracing.FinishSpan(span) + + // TODO(dan): If settings had callbacks, we could use one here to adjust the + // rate limit when it changed. + { + l := rate.Limit(writeBatchLimit.Get()) + writeBatchLimiterMu.Lock() + if l != writeBatchLimiter.Limit() { + writeBatchLimiter.SetLimit(l) + } + writeBatchLimiterMu.Unlock() + } + + // TODO(dan): This limiting should be per-store and shared between any + // operations that need lots of disk throughput. + cost := len(args.Data) + if cost > importBatchSizeBytes { + // The limiter disallows anything greater that its burst (set to + // importBatchSizeBytes), so cap the batch size if it would overflow. + cost = importBatchSizeBytes + } + if err := writeBatchLimiter.WaitN(ctx, cost); err != nil { + return storage.EvalResult{}, errors.Wrap(err, "writebatch rate limit") + } + if log.V(1) { log.Infof(ctx, "writebatch [%s,%s)", args.Key, args.EndKey) }