Skip to content

Commit

Permalink
storageccl: rate limit the WriteBatch command
Browse files Browse the repository at this point in the history
Many of our RESTORE stability problems seem to be starting with
overloaded disks, which caused contention in RocksDB, which slowed down
heartbeats, which caused mass lease transfers. WriteBatch and the
resuting write amplification is the largest part of this, so add a rate
limit directly for that.

The WriteBatch limiting also lets us be less conservative about the
client-side Import request limiting in Restore.

For cockroachdb#15341.
  • Loading branch information
danhhz committed Apr 27, 2017
1 parent 3b24664 commit 14f045c
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 17 deletions.
10 changes: 6 additions & 4 deletions pkg/ccl/sqlccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,14 +724,16 @@ func Restore(
// transfers, poor performance on SQL workloads, etc) as well as log spam
// about slow distsender requests. Rate limit them here, too.
//
// Use the number of nodes in the cluster as the number of outstanding
// Import requests for the rate limiting. TODO(dan): This is very
// conservative, see if we can bump it back up by rate limiting WriteBatch.
// Use twice the number of nodes in the cluster as the number of outstanding
// Import requests for the rate limiting. We don't do anything smart here
// about per-node limits, but this was picked to balance between Import
// requests sitting in distsender for a long time and all nodes having work
// to do.
//
// TODO(dan): Make this limiting per node.
//
// TODO(dan): See if there's some better solution than rate-limiting #14798.
maxConcurrentImports := clusterNodeCount(p.ExecCfg().Gossip)
maxConcurrentImports := clusterNodeCount(p.ExecCfg().Gossip) * 2
importsSem := make(chan struct{}, maxConcurrentImports)

mu := struct {
Expand Down
26 changes: 14 additions & 12 deletions pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// importRequestLimit is the number of Import requests that can run at once.
// Each downloads a file from cloud storage to a temp file, iterates it, and
// sends WriteBatch requests to batch insert it. After accounting for write
// amplification, a single ImportRequest and the resulting WriteBatch
// requests is enough to keep an SSD busy. Any more and we risk contending
// RocksDB, which slows down heartbeats, which can cause mass lease
// transfers.
const importRequestLimit = 1
const (
// importRequestLimit is the number of Import requests that can run at once.
// Each downloads a file from cloud storage to a temp file, iterates it, and
// sends WriteBatch requests to batch insert it. After accounting for write
// amplification, a single ImportRequest and the resulting WriteBatch
// requests is enough to keep an SSD busy. Any more and we risk contending
// RocksDB, which slows down heartbeats, which can cause mass lease
// transfers.
importRequestLimit = 1

// Arrived at by tuning and watching the effect on BenchmarkRestore.
importBatchSizeBytes = 1000000
)

var importRequestLimiter = makeConcurrentRequestLimiter(importRequestLimit)

Expand Down Expand Up @@ -69,9 +74,6 @@ func evalImport(ctx context.Context, cArgs storage.CommandArgs) (*roachpb.Import
defer importRequestLimiter.endLimitedRequest()
log.Infof(ctx, "import [%s,%s)", importStart, importEnd)

// Arrived at by tuning and watching the effect on BenchmarkRestore.
const batchSizeBytes = 1000000

type batchBuilder struct {
batch engine.RocksDBBatchBuilder
batchStartKey []byte
Expand Down Expand Up @@ -219,7 +221,7 @@ func evalImport(ctx context.Context, cArgs storage.CommandArgs) (*roachpb.Import
b.batchEndKey = append(b.batchEndKey[:0], key.Key...)
}

if b.batch.Len() > batchSizeBytes {
if b.batch.Len() > importBatchSizeBytes {
sendWriteBatch()
}
}
Expand Down
40 changes: 39 additions & 1 deletion pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,33 @@
package storageccl

import (
"errors"
"fmt"
"sync"

"golang.org/x/net/context"
"golang.org/x/time/rate"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)

// TODO(dan): This 2 MB/s default is quite conservative. There's more tuning to
// be done here.
var writeBatchLimit = settings.RegisterByteSizeSetting(
"enterprise.writebatch.max_rate",
"the rate limit (bytes/sec) to use for WriteBatch requests",
2*1024*1024, // 2 MB/s
)

var writeBatchLimiter = rate.NewLimiter(rate.Limit(writeBatchLimit.Get()), importBatchSizeBytes)
var writeBatchLimiterMu sync.Mutex

func init() {
storage.SetWriteBatchCmd(storage.Command{
DeclareKeys: storage.DefaultDeclareKeys,
Expand All @@ -42,6 +56,30 @@ func evalWriteBatch(

_, span := tracing.ChildSpan(ctx, fmt.Sprintf("WriteBatch [%s,%s)", args.Key, args.EndKey))
defer tracing.FinishSpan(span)

// TODO(dan): If settings had callbacks, we could use one here to adjust the
// rate limit when it changed.
{
l := rate.Limit(writeBatchLimit.Get())
writeBatchLimiterMu.Lock()
if l != writeBatchLimiter.Limit() {
writeBatchLimiter.SetLimit(l)
}
writeBatchLimiterMu.Unlock()
}

// TODO(dan): This limiting should be per-store and shared between any
// operations that need lots of disk throughput.
cost := len(args.Data)
if cost > importBatchSizeBytes {
// The limiter disallows anything greater that its burst (set to
// importBatchSizeBytes), so cap the batch size if it would overflow.
cost = importBatchSizeBytes
}
if err := writeBatchLimiter.WaitN(ctx, cost); err != nil {
return storage.EvalResult{}, errors.Wrap(err, "writebatch rate limit")
}

if log.V(1) {
log.Infof(ctx, "writebatch [%s,%s)", args.Key, args.EndKey)
}
Expand Down

0 comments on commit 14f045c

Please sign in to comment.