diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index ec5eab9c965f..8f305f05b16e 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -32,6 +32,7 @@
server.auth_log.sql_sessions.enabled | boolean | false | if set, log SQL session login/disconnection events (note: may hinder performance on loaded nodes) |
server.clock.forward_jump_check_enabled | boolean | false | if enabled, forward clock jumps > max_offset/2 will cause a panic |
server.clock.persist_upper_bound_interval | duration | 0s | the interval between persisting the wall time upper bound of the clock. The clock does not generate a wall time greater than the persisted timestamp and will panic if it sees a wall time greater than this value. When cockroach starts, it waits for the wall time to catch-up till this persisted timestamp. This guarantees monotonic wall time across server restarts. Not setting this or setting a value of 0 disables this feature. |
+server.consistency_check.max_rate | byte size | 8.0 MiB | the rate limit (bytes/sec) to use for consistency checks; used in conjunction with server.consistency_check.interval to control the frequency of consistency checks. Note that setting this too high can negatively impact performance. |
server.eventlog.ttl | duration | 2160h0m0s | if nonzero, event log entries older than this duration are deleted every 10m0s. Should not be lowered below 24 hours. |
server.host_based_authentication.configuration | string |
| host-based authentication configuration to use during connection authentication |
server.rangelog.ttl | duration | 720h0m0s | if nonzero, range log entries older than this duration are deleted every 10m0s. Should not be lowered below 24 hours. |
diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go
index 380078b3bec2..40e0bb3e3328 100644
--- a/pkg/kv/kvserver/consistency_queue.go
+++ b/pkg/kv/kvserver/consistency_queue.go
@@ -31,6 +31,16 @@ var consistencyCheckInterval = settings.RegisterNonNegativeDurationSetting(
24*time.Hour,
)
+var consistencyCheckRate = settings.RegisterPublicValidatedByteSizeSetting(
+ "server.consistency_check.max_rate",
+ "the rate limit (bytes/sec) to use for consistency checks; used in "+
+ "conjunction with server.consistency_check.interval to control the "+
+ "frequency of consistency checks. Note that setting this too high can "+
+ "negatively impact performance.",
+ 8<<20, // 8MB
+ validatePositive,
+)
+
var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false)
type consistencyQueue struct {
@@ -58,6 +68,7 @@ func newConsistencyQueue(store *Store, gossip *gossip.Gossip) *consistencyQueue
failures: store.metrics.ConsistencyQueueFailures,
pending: store.metrics.ConsistencyQueuePending,
processingNanos: store.metrics.ConsistencyQueueProcessingNanos,
+ processTimeoutFunc: makeRateLimitedTimeoutFunc(consistencyCheckRate),
},
)
return q
diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go
index 9c53238c40c1..1d649bc45327 100644
--- a/pkg/kv/kvserver/merge_queue.go
+++ b/pkg/kv/kvserver/merge_queue.go
@@ -109,7 +109,7 @@ func newMergeQueue(store *Store, db *kv.DB, gossip *gossip.Gossip) *mergeQueue {
// hard to determine ahead of time. An alternative would be to calculate
// the timeout with a function that additionally considers the replication
// factor.
- processTimeoutFunc: makeQueueSnapshotTimeoutFunc(rebalanceSnapshotRate),
+ processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
needsLease: true,
needsSystemConfig: true,
acceptsUnsplitRanges: false,
diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go
index 0622dafa049d..05f288d02651 100644
--- a/pkg/kv/kvserver/queue.go
+++ b/pkg/kv/kvserver/queue.go
@@ -65,12 +65,13 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura
return queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
}
-// The queues which send snapshots while processing should have a timeout which
+// The queues which traverse through the data in the range (i.e. send a snapshot
+// or calculate a range checksum) while processing should have a timeout which
// is a function of the size of the range and the maximum allowed rate of data
// transfer that adheres to a minimum timeout specified in a cluster setting.
//
// The parameter controls which rate to use.
-func makeQueueSnapshotTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc {
+func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc {
return func(cs *cluster.Settings, r replicaInQueue) time.Duration {
minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
// NB: In production code this will type assertion will always succeed.
@@ -84,7 +85,7 @@ func makeQueueSnapshotTimeoutFunc(rateSetting *settings.ByteSizeSetting) queuePr
stats := repl.GetMVCCStats()
totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes
estimatedDuration := time.Duration(totalBytes/snapshotRate) * time.Second
- timeout := estimatedDuration * permittedSnapshotSlowdown
+ timeout := estimatedDuration * permittedRangeScanSlowdown
if timeout < minimumTimeout {
timeout = minimumTimeout
}
@@ -92,10 +93,10 @@ func makeQueueSnapshotTimeoutFunc(rateSetting *settings.ByteSizeSetting) queuePr
}
}
-// permittedSnapshotSlowdown is the factor of the above the estimated duration
-// for a snapshot given the configured snapshot rate which we use to configure
-// the snapshot's timeout.
-const permittedSnapshotSlowdown = 10
+// permittedRangeScanSlowdown is the factor of the above the estimated duration
+// for a range scan given the configured rate which we use to configure
+// the operations's timeout.
+const permittedRangeScanSlowdown = 10
// a purgatoryError indicates a replica processing failure which indicates
// the replica can be placed into purgatory for faster retries when the
diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go
index 88e89e8b6e20..1aa708e0fade 100644
--- a/pkg/kv/kvserver/queue_test.go
+++ b/pkg/kv/kvserver/queue_test.go
@@ -897,11 +897,11 @@ func (r mvccStatsReplicaInQueue) GetMVCCStats() enginepb.MVCCStats {
return enginepb.MVCCStats{ValBytes: r.size}
}
-func TestQueueSnapshotTimeoutFunc(t *testing.T) {
+func TestQueueRateLimitedTimeoutFunc(t *testing.T) {
defer leaktest.AfterTest(t)()
type testCase struct {
guaranteedProcessingTime time.Duration
- snapshotRate int64 // bytes/s
+ rateLimit int64 // bytes/s
replicaSize int64 // bytes
expectedTimeout time.Duration
}
@@ -909,8 +909,8 @@ func TestQueueSnapshotTimeoutFunc(t *testing.T) {
return fmt.Sprintf("%+v", tc), func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
queueGuaranteedProcessingTimeBudget.Override(&st.SV, tc.guaranteedProcessingTime)
- recoverySnapshotRate.Override(&st.SV, tc.snapshotRate)
- tf := makeQueueSnapshotTimeoutFunc(recoverySnapshotRate)
+ recoverySnapshotRate.Override(&st.SV, tc.rateLimit)
+ tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate)
repl := mvccStatsReplicaInQueue{
size: tc.replicaSize,
}
@@ -920,27 +920,27 @@ func TestQueueSnapshotTimeoutFunc(t *testing.T) {
for _, tc := range []testCase{
{
guaranteedProcessingTime: time.Minute,
- snapshotRate: 1 << 30,
+ rateLimit: 1 << 30,
replicaSize: 1 << 20,
expectedTimeout: time.Minute,
},
{
guaranteedProcessingTime: time.Minute,
- snapshotRate: 1 << 20,
+ rateLimit: 1 << 20,
replicaSize: 100 << 20,
- expectedTimeout: 100 * time.Second * permittedSnapshotSlowdown,
+ expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Hour,
- snapshotRate: 1 << 20,
+ rateLimit: 1 << 20,
replicaSize: 100 << 20,
expectedTimeout: time.Hour,
},
{
guaranteedProcessingTime: time.Minute,
- snapshotRate: 1 << 10,
+ rateLimit: 1 << 10,
replicaSize: 100 << 20,
- expectedTimeout: 100 * (1 << 10) * time.Second * permittedSnapshotSlowdown,
+ expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown,
},
} {
t.Run(makeTest(tc))
diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go
index 07f006bb69cf..3bba46fecf00 100644
--- a/pkg/kv/kvserver/raft_snapshot_queue.go
+++ b/pkg/kv/kvserver/raft_snapshot_queue.go
@@ -51,7 +51,7 @@ func newRaftSnapshotQueue(store *Store, g *gossip.Gossip) *raftSnapshotQueue {
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
- processTimeoutFunc: makeQueueSnapshotTimeoutFunc(recoverySnapshotRate),
+ processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate),
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go
index e670c2d0ac98..71e3f9075db9 100644
--- a/pkg/kv/kvserver/replica_consistency.go
+++ b/pkg/kv/kvserver/replica_consistency.go
@@ -32,9 +32,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
- "github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -56,15 +56,6 @@ import (
// know old CRDB versions (<19.1 at time of writing) were not involved.
var fatalOnStatsMismatch = envutil.EnvOrDefaultBool("COCKROACH_ENFORCE_CONSISTENT_STATS", false)
-const (
- // collectChecksumTimeout controls how long we'll wait to collect a checksum
- // for a CheckConsistency request. We need to bound the time that we wait
- // because the checksum might never be computed for a replica if that replica
- // is caught up via a snapshot and never performs the ComputeChecksum
- // operation.
- collectChecksumTimeout = 15 * time.Second
-)
-
// ReplicaChecksum contains progress on a replica checksum computation.
type ReplicaChecksum struct {
CollectChecksumResponse
@@ -372,17 +363,11 @@ func (r *Replica) RunConsistencyCheck(
func(ctx context.Context) {
defer wg.Done()
- var resp CollectChecksumResponse
- err := contextutil.RunWithTimeout(ctx, "collect checksum", collectChecksumTimeout,
- func(ctx context.Context) error {
- var masterChecksum []byte
- if len(results) > 0 {
- masterChecksum = results[0].Response.Checksum
- }
- var err error
- resp, err = r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum)
- return err
- })
+ var masterChecksum []byte
+ if len(results) > 0 {
+ masterChecksum = results[0].Response.Checksum
+ }
+ resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum)
resultCh <- ConsistencyCheckResult{
Replica: replica,
Response: resp,
@@ -505,6 +490,7 @@ func (r *Replica) sha512(
snap storage.Reader,
snapshot *roachpb.RaftSnapshotData,
mode roachpb.ChecksumMode,
+ limiter *limit.LimiterBurstDisabled,
) (*replicaHash, error) {
statsOnly := mode == roachpb.ChecksumMode_CHECK_STATS
@@ -519,6 +505,11 @@ func (r *Replica) sha512(
hasher := sha512.New()
visitor := func(unsafeKey storage.MVCCKey, unsafeValue []byte) error {
+ // Rate Limit the scan through the range
+ if err := limiter.WaitN(ctx, len(unsafeKey.Key)+len(unsafeValue)); err != nil {
+ return err
+ }
+
if snapshot != nil {
// Add (a copy of) the kv pair into the debug message.
kv := roachpb.RaftSnapshotData_KeyValue{
diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go
index 51082d56082d..9dea22aa6d01 100644
--- a/pkg/kv/kvserver/replica_proposal.go
+++ b/pkg/kv/kvserver/replica_proposal.go
@@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
@@ -228,6 +229,8 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co
}
}
+ limiter := limit.NewLimiter(rate.Limit(consistencyCheckRate.Get(&r.store.ClusterSettings().SV)))
+
// Compute SHA asynchronously and store it in a map by UUID.
if err := stopper.RunAsyncTask(ctx, "storage.Replica: computing checksum", func(ctx context.Context) {
func() {
@@ -236,7 +239,8 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co
if cc.SaveSnapshot {
snapshot = &roachpb.RaftSnapshotData{}
}
- result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode)
+
+ result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, limiter)
if err != nil {
log.Errorf(ctx, "%v", err)
result = nil
diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go
index e538d269f4e8..2f191e5f0bb9 100644
--- a/pkg/kv/kvserver/replica_test.go
+++ b/pkg/kv/kvserver/replica_test.go
@@ -56,6 +56,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
@@ -75,6 +76,7 @@ import (
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
"golang.org/x/net/trace"
+ "golang.org/x/time/rate"
)
// allSpans is a SpanSet that covers *everything* for use in tests that don't
@@ -9616,7 +9618,7 @@ func TestReplicaServersideRefreshes(t *testing.T) {
// Regression test for #31870.
snap := tc.engine.NewSnapshot()
defer snap.Close()
- res, err := tc.repl.sha512(context.Background(), *tc.repl.Desc(), tc.engine, nil /* diff */, roachpb.ChecksumMode_CHECK_FULL)
+ res, err := tc.repl.sha512(context.Background(), *tc.repl.Desc(), tc.engine, nil /* diff */, roachpb.ChecksumMode_CHECK_FULL, limit.NewLimiter(rate.Inf))
if err != nil {
return hlc.Timestamp{}, err
}
diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go
index b7d37114e649..a2b48678b866 100644
--- a/pkg/kv/kvserver/replicate_queue.go
+++ b/pkg/kv/kvserver/replicate_queue.go
@@ -167,7 +167,7 @@ func newReplicateQueue(store *Store, g *gossip.Gossip, allocator Allocator) *rep
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
// timeout based on the range size and the sending rate in addition
// to consulting the setting which controls the minimum timeout.
- processTimeoutFunc: makeQueueSnapshotTimeoutFunc(rebalanceSnapshotRate),
+ processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
successes: store.metrics.ReplicateQueueSuccesses,
failures: store.metrics.ReplicateQueueFailures,
pending: store.metrics.ReplicateQueuePending,
diff --git a/pkg/util/limit/rate_limiter.go b/pkg/util/limit/rate_limiter.go
new file mode 100644
index 000000000000..fa69259264b3
--- /dev/null
+++ b/pkg/util/limit/rate_limiter.go
@@ -0,0 +1,72 @@
+// Copyright 2020 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package limit
+
+import (
+ "context"
+ "math/big"
+
+ "golang.org/x/time/rate"
+)
+
+// maxInt is the maximum int allowed by the architecture running this code i.e.
+// it could be int32 or int64, unfortunately golang does not have a built in
+// field for this.
+const maxInt = int(^uint(0) >> 1)
+
+// LimiterBurstDisabled is used to solve a complication in rate.Limiter.
+// The rate.Limiter requires a burst parameter and if the throttled value
+// exceeds the burst it just fails. This not always the desired behavior,
+// sometimes we want the limiter to apply the throttle and not enforce any
+// hard limits on an arbitrarily large value. This feature is particularly
+// useful in Cockroach, when we want to throttle on the KV pair, the size
+// of which is not strictly enforced.
+type LimiterBurstDisabled struct {
+ // Avoid embedding, as most methods on the limiter take the parameter
+ // burst into consideration.
+ limiter *rate.Limiter
+}
+
+// NewLimiter returns a new LimiterBurstDisabled that allows events up to rate r.
+func NewLimiter(r rate.Limit) *LimiterBurstDisabled {
+ // Unfortunately we can't disable the burst parameter on the
+ // limiter, so we have to provide some value to it. To remove the cognitive
+ // burden from the user, we set this value to be equal to the limit.
+ // Semantically the choice of burst parameter does not matter, since
+ // we will loop the limiter until all the tokens have been consumed. However
+ // we want to minimize the number of loops for performance, which is why
+ // setting the burst parameter to the limit is a good trade off.
+ var burst, _ = big.NewFloat(float64(r)).Int64()
+ if burst > int64(maxInt) {
+ burst = int64(maxInt)
+ }
+ return &LimiterBurstDisabled{
+ limiter: rate.NewLimiter(r, int(burst)),
+ }
+}
+
+// WaitN blocks until lim permits n events to happen.
+//
+// This function will now only return an error if the Context is canceled and
+// should never in practice hit the burst check in the underlying limiter.
+func (lim *LimiterBurstDisabled) WaitN(ctx context.Context, n int) error {
+ for n > 0 {
+ cur := n
+ if cur > lim.limiter.Burst() {
+ cur = lim.limiter.Burst()
+ }
+ if err := lim.limiter.WaitN(ctx, cur); err != nil {
+ return err
+ }
+ n -= cur
+ }
+ return nil
+}
diff --git a/pkg/util/limit/rate_limiter_test.go b/pkg/util/limit/rate_limiter_test.go
new file mode 100644
index 000000000000..4dab6521c0ce
--- /dev/null
+++ b/pkg/util/limit/rate_limiter_test.go
@@ -0,0 +1,31 @@
+// Copyright 2020 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package limit
+
+import (
+ "context"
+ "math"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "golang.org/x/time/rate"
+)
+
+func TestLimiterBurstDisabled(t *testing.T) {
+ limiter := NewLimiter(100)
+
+ if err := limiter.WaitN(context.Background(), 101); err != nil {
+ t.Fatal(err)
+ }
+
+ limiter = NewLimiter(rate.Limit(math.MaxFloat64))
+ require.Equal(t, maxInt, limiter.limiter.Burst())
+}