diff --git a/glide.lock b/glide.lock index 816ce434690e..f9643f684ebe 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: 620ed7b5a97e5e1c3f144f49863859862222390b5e00249714aab0b1d4c581c7 -updated: 2017-04-07T01:28:30.307271586-04:00 +updated: 2017-04-07T17:26:49.573117984-04:00 imports: - name: cloud.google.com/go version: f4dfefc998d92c53913c2ae5bc7cd3fba18e9e4b @@ -12,7 +12,7 @@ imports: - name: github.com/abourget/teamcity version: 6dde447fa54bc5b08b1a7bb1b85e39089cf27fb1 - name: github.com/Azure/azure-sdk-for-go - version: 088007b3b08cc02b27f2eadfdcd870958460ce7e + version: 5b0aa1d05fbf518373abb366951e3587cc1b3154 subpackages: - storage - name: github.com/Azure/go-ansiterm @@ -408,6 +408,10 @@ imports: - transform - unicode/norm - width +- name: golang.org/x/time + version: a4bde12657593d5e90d0533a3e4fd95e635124cb + subpackages: + - rate - name: golang.org/x/tools version: 24acc66eabead631b4e856255a9ad925549cee80 subpackages: diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 9c6b67f73fe0..add55ffe9388 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -32,6 +32,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "golang.org/x/net/context" + "golang.org/x/time/rate" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/build" @@ -3234,6 +3235,11 @@ type SnapshotStorePool interface { throttle(reason throttleReason, toStoreID roachpb.StoreID) } +var preemptiveSnapshotRate = envutil.EnvOrDefaultBytes( + "COCKROACH_PREEMPTIVE_SNAPSHOT_RATE", 2<<20 /* 2 MB */) +var raftSnapshotRate = envutil.EnvOrDefaultBytes( + "COCKROACH_RAFT_SNAPSHOT_RATE", 8<<20 /* 8 MB */) + // sendSnapshot sends an outgoing snapshot via a pre-opened GRPC stream. func sendSnapshot( ctx context.Context, @@ -3281,12 +3287,26 @@ func sendSnapshot( header.State.Desc.RangeID, resp.Status) } + // The size of batches to send. This is the granularity of rate limiting. + const batchSize = 256 << 10 // 256 KB + + // Convert the bytes/sec rate limit to batches/sec. + // + // TODO(peter): Using bytes/sec for rate limiting seems more natural but has + // practical difficulties. We either need to use a very large burst size + // which seems to disable the rate limiting, or call WaitN in smaller than + // burst size chunks which caused excessive slowness in testing. Would be + // nice to figure this out, but the batches/sec rate limit works for now. + targetRate := rate.Limit(raftSnapshotRate) / batchSize + if header.CanDecline { + targetRate = rate.Limit(preemptiveSnapshotRate) / batchSize + } + limiter := rate.NewLimiter(targetRate, 1 /* burst size */) + // Determine the unreplicated key prefix so we can drop any // unreplicated keys from the snapshot. unreplicatedPrefix := keys.MakeRangeIDUnreplicatedPrefix(header.State.Desc.RangeID) var alloc bufalloc.ByteAllocator - // TODO(jordan) make this configurable. For now, 1MB. - const batchSize = 1 << 20 n := 0 var b engine.Batch for ; ; snap.Iter.Next() { @@ -3315,6 +3335,9 @@ func sendSnapshot( } if len(b.Repr()) >= batchSize { + if err := limiter.WaitN(ctx, 1); err != nil { + return err + } if err := sendBatch(stream, b); err != nil { return err } @@ -3325,6 +3348,9 @@ func sendSnapshot( } } if b != nil { + if err := limiter.WaitN(ctx, 1); err != nil { + return err + } if err := sendBatch(stream, b); err != nil { return err } diff --git a/vendor b/vendor index aa232f5d63f6..27b8c2d19fa8 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit aa232f5d63f6b0256b16f08e9933067a7d90a883 +Subproject commit 27b8c2d19fa81333cd1a5d1f0550641c2efbbdec