From 14f045c969bddfc1c06c1a0349d71419d57cff10 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Thu, 27 Apr 2017 15:37:07 -0400 Subject: [PATCH] storageccl: rate limit the WriteBatch command Many of our RESTORE stability problems seem to be starting with overloaded disks, which caused contention in RocksDB, which slowed down heartbeats, which caused mass lease transfers. WriteBatch and the resuting write amplification is the largest part of this, so add a rate limit directly for that. The WriteBatch limiting also lets us be less conservative about the client-side Import request limiting in Restore. For #15341. --- pkg/ccl/sqlccl/restore.go | 10 ++++---- pkg/ccl/storageccl/import.go | 26 +++++++++++---------- pkg/ccl/storageccl/writebatch.go | 40 +++++++++++++++++++++++++++++++- 3 files changed, 59 insertions(+), 17 deletions(-) diff --git a/pkg/ccl/sqlccl/restore.go b/pkg/ccl/sqlccl/restore.go index fef50d86d595..6bf2e160f977 100644 --- a/pkg/ccl/sqlccl/restore.go +++ b/pkg/ccl/sqlccl/restore.go @@ -724,14 +724,16 @@ func Restore( // transfers, poor performance on SQL workloads, etc) as well as log spam // about slow distsender requests. Rate limit them here, too. // - // Use the number of nodes in the cluster as the number of outstanding - // Import requests for the rate limiting. TODO(dan): This is very - // conservative, see if we can bump it back up by rate limiting WriteBatch. + // Use twice the number of nodes in the cluster as the number of outstanding + // Import requests for the rate limiting. We don't do anything smart here + // about per-node limits, but this was picked to balance between Import + // requests sitting in distsender for a long time and all nodes having work + // to do. // // TODO(dan): Make this limiting per node. // // TODO(dan): See if there's some better solution than rate-limiting #14798. - maxConcurrentImports := clusterNodeCount(p.ExecCfg().Gossip) + maxConcurrentImports := clusterNodeCount(p.ExecCfg().Gossip) * 2 importsSem := make(chan struct{}, maxConcurrentImports) mu := struct { diff --git a/pkg/ccl/storageccl/import.go b/pkg/ccl/storageccl/import.go index 8976c7ab711a..9705267a9b74 100644 --- a/pkg/ccl/storageccl/import.go +++ b/pkg/ccl/storageccl/import.go @@ -26,14 +26,19 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" ) -// importRequestLimit is the number of Import requests that can run at once. -// Each downloads a file from cloud storage to a temp file, iterates it, and -// sends WriteBatch requests to batch insert it. After accounting for write -// amplification, a single ImportRequest and the resulting WriteBatch -// requests is enough to keep an SSD busy. Any more and we risk contending -// RocksDB, which slows down heartbeats, which can cause mass lease -// transfers. -const importRequestLimit = 1 +const ( + // importRequestLimit is the number of Import requests that can run at once. + // Each downloads a file from cloud storage to a temp file, iterates it, and + // sends WriteBatch requests to batch insert it. After accounting for write + // amplification, a single ImportRequest and the resulting WriteBatch + // requests is enough to keep an SSD busy. Any more and we risk contending + // RocksDB, which slows down heartbeats, which can cause mass lease + // transfers. + importRequestLimit = 1 + + // Arrived at by tuning and watching the effect on BenchmarkRestore. + importBatchSizeBytes = 1000000 +) var importRequestLimiter = makeConcurrentRequestLimiter(importRequestLimit) @@ -69,9 +74,6 @@ func evalImport(ctx context.Context, cArgs storage.CommandArgs) (*roachpb.Import defer importRequestLimiter.endLimitedRequest() log.Infof(ctx, "import [%s,%s)", importStart, importEnd) - // Arrived at by tuning and watching the effect on BenchmarkRestore. - const batchSizeBytes = 1000000 - type batchBuilder struct { batch engine.RocksDBBatchBuilder batchStartKey []byte @@ -219,7 +221,7 @@ func evalImport(ctx context.Context, cArgs storage.CommandArgs) (*roachpb.Import b.batchEndKey = append(b.batchEndKey[:0], key.Key...) } - if b.batch.Len() > batchSizeBytes { + if b.batch.Len() > importBatchSizeBytes { sendWriteBatch() } } diff --git a/pkg/ccl/storageccl/writebatch.go b/pkg/ccl/storageccl/writebatch.go index 7c6ed8f6f5a4..e6e2c99cea4a 100644 --- a/pkg/ccl/storageccl/writebatch.go +++ b/pkg/ccl/storageccl/writebatch.go @@ -9,19 +9,33 @@ package storageccl import ( - "errors" "fmt" + "sync" "golang.org/x/net/context" + "golang.org/x/time/rate" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" ) +// TODO(dan): This 2 MB/s default is quite conservative. There's more tuning to +// be done here. +var writeBatchLimit = settings.RegisterByteSizeSetting( + "enterprise.writebatch.max_rate", + "the rate limit (bytes/sec) to use for WriteBatch requests", + 2*1024*1024, // 2 MB/s +) + +var writeBatchLimiter = rate.NewLimiter(rate.Limit(writeBatchLimit.Get()), importBatchSizeBytes) +var writeBatchLimiterMu sync.Mutex + func init() { storage.SetWriteBatchCmd(storage.Command{ DeclareKeys: storage.DefaultDeclareKeys, @@ -42,6 +56,30 @@ func evalWriteBatch( _, span := tracing.ChildSpan(ctx, fmt.Sprintf("WriteBatch [%s,%s)", args.Key, args.EndKey)) defer tracing.FinishSpan(span) + + // TODO(dan): If settings had callbacks, we could use one here to adjust the + // rate limit when it changed. + { + l := rate.Limit(writeBatchLimit.Get()) + writeBatchLimiterMu.Lock() + if l != writeBatchLimiter.Limit() { + writeBatchLimiter.SetLimit(l) + } + writeBatchLimiterMu.Unlock() + } + + // TODO(dan): This limiting should be per-store and shared between any + // operations that need lots of disk throughput. + cost := len(args.Data) + if cost > importBatchSizeBytes { + // The limiter disallows anything greater that its burst (set to + // importBatchSizeBytes), so cap the batch size if it would overflow. + cost = importBatchSizeBytes + } + if err := writeBatchLimiter.WaitN(ctx, cost); err != nil { + return storage.EvalResult{}, errors.Wrap(err, "writebatch rate limit") + } + if log.V(1) { log.Infof(ctx, "writebatch [%s,%s)", args.Key, args.EndKey) }