Skip to content

Commit

Permalink
kv: introduce a rate limiter for the range consistency checker
Browse files Browse the repository at this point in the history
Closes #47290

This commit introduces a new flag server.consistency_check.max_rate to control the rate at which the consistency checker may scan through the range to compute it's checksum. Without a rate limit in place the checker cmay overwhelm the cluster with sufficiently large nodes. For example on a 10B node the checker is expected to produce 120MB/second of disk reads. The flag is defined as bytes/second and set to 8MB/s by default. We expect the customers to continue to use it in conjunction with server.consistency_check.interval, which will givem the ability to control both the frequency and speed of checks.

Release note (performance improvement): Introduce a new flag server.consistency_check.max_rate expressed in bytes/second to throttle the rate at which cockroach scans through the disk to perform a consistency check. This control is necessary to ensure smooth performance on a cluster with large node sizes (i.e. in the 10TB+ range)
  • Loading branch information
lunevalex committed Jun 3, 2020
1 parent b254299 commit 76c63de
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 43 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<tr><td><code>server.auth_log.sql_sessions.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, log SQL session login/disconnection events (note: may hinder performance on loaded nodes)</td></tr>
<tr><td><code>server.clock.forward_jump_check_enabled</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, forward clock jumps > max_offset/2 will cause a panic</td></tr>
<tr><td><code>server.clock.persist_upper_bound_interval</code></td><td>duration</td><td><code>0s</code></td><td>the interval between persisting the wall time upper bound of the clock. The clock does not generate a wall time greater than the persisted timestamp and will panic if it sees a wall time greater than this value. When cockroach starts, it waits for the wall time to catch-up till this persisted timestamp. This guarantees monotonic wall time across server restarts. Not setting this or setting a value of 0 disables this feature.</td></tr>
<tr><td><code>server.consistency_check.max_rate</code></td><td>byte size</td><td><code>8.0 MiB</code></td><td>the rate limit (bytes/sec) to use for consistency checks; used in conjunction with server.consistency_check.interval to control the frequency of consistency checks. Note that setting this too high can negatively impact performance.</td></tr>
<tr><td><code>server.eventlog.ttl</code></td><td>duration</td><td><code>2160h0m0s</code></td><td>if nonzero, event log entries older than this duration are deleted every 10m0s. Should not be lowered below 24 hours.</td></tr>
<tr><td><code>server.host_based_authentication.configuration</code></td><td>string</td><td><code></code></td><td>host-based authentication configuration to use during connection authentication</td></tr>
<tr><td><code>server.rangelog.ttl</code></td><td>duration</td><td><code>720h0m0s</code></td><td>if nonzero, range log entries older than this duration are deleted every 10m0s. Should not be lowered below 24 hours.</td></tr>
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ var consistencyCheckInterval = settings.RegisterNonNegativeDurationSetting(
24*time.Hour,
)

var consistencyCheckRate = settings.RegisterPublicValidatedByteSizeSetting(
"server.consistency_check.max_rate",
"the rate limit (bytes/sec) to use for consistency checks; used in "+
"conjunction with server.consistency_check.interval to control the "+
"frequency of consistency checks. Note that setting this too high can "+
"negatively impact performance.",
8<<20, // 8MB
validatePositive,
)

var testingAggressiveConsistencyChecks = envutil.EnvOrDefaultBool("COCKROACH_CONSISTENCY_AGGRESSIVE", false)

type consistencyQueue struct {
Expand Down Expand Up @@ -58,6 +68,7 @@ func newConsistencyQueue(store *Store, gossip *gossip.Gossip) *consistencyQueue
failures: store.metrics.ConsistencyQueueFailures,
pending: store.metrics.ConsistencyQueuePending,
processingNanos: store.metrics.ConsistencyQueueProcessingNanos,
processTimeoutFunc: makeRateLimitedTimeoutFunc(consistencyCheckRate),
},
)
return q
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newMergeQueue(store *Store, db *kv.DB, gossip *gossip.Gossip) *mergeQueue {
// hard to determine ahead of time. An alternative would be to calculate
// the timeout with a function that additionally considers the replication
// factor.
processTimeoutFunc: makeQueueSnapshotTimeoutFunc(rebalanceSnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
needsLease: true,
needsSystemConfig: true,
acceptsUnsplitRanges: false,
Expand Down
15 changes: 8 additions & 7 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura
return queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
}

// The queues which send snapshots while processing should have a timeout which
// The queues which traverse through the data in the range (i.e. send a snapshot
// or calculate a range checksum) while processing should have a timeout which
// is a function of the size of the range and the maximum allowed rate of data
// transfer that adheres to a minimum timeout specified in a cluster setting.
//
// The parameter controls which rate to use.
func makeQueueSnapshotTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc {
func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc {
return func(cs *cluster.Settings, r replicaInQueue) time.Duration {
minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
// NB: In production code this will type assertion will always succeed.
Expand All @@ -84,18 +85,18 @@ func makeQueueSnapshotTimeoutFunc(rateSetting *settings.ByteSizeSetting) queuePr
stats := repl.GetMVCCStats()
totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes
estimatedDuration := time.Duration(totalBytes/snapshotRate) * time.Second
timeout := estimatedDuration * permittedSnapshotSlowdown
timeout := estimatedDuration * permittedRangeScanSlowdown
if timeout < minimumTimeout {
timeout = minimumTimeout
}
return timeout
}
}

// permittedSnapshotSlowdown is the factor of the above the estimated duration
// for a snapshot given the configured snapshot rate which we use to configure
// the snapshot's timeout.
const permittedSnapshotSlowdown = 10
// permittedRangeScanSlowdown is the factor of the above the estimated duration
// for a range scan given the configured rate which we use to configure
// the operations's timeout.
const permittedRangeScanSlowdown = 10

// a purgatoryError indicates a replica processing failure which indicates
// the replica can be placed into purgatory for faster retries when the
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,20 +897,20 @@ func (r mvccStatsReplicaInQueue) GetMVCCStats() enginepb.MVCCStats {
return enginepb.MVCCStats{ValBytes: r.size}
}

func TestQueueSnapshotTimeoutFunc(t *testing.T) {
func TestQueueRateLimitedTimeoutFunc(t *testing.T) {
defer leaktest.AfterTest(t)()
type testCase struct {
guaranteedProcessingTime time.Duration
snapshotRate int64 // bytes/s
rateLimit int64 // bytes/s
replicaSize int64 // bytes
expectedTimeout time.Duration
}
makeTest := func(tc testCase) (string, func(t *testing.T)) {
return fmt.Sprintf("%+v", tc), func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
queueGuaranteedProcessingTimeBudget.Override(&st.SV, tc.guaranteedProcessingTime)
recoverySnapshotRate.Override(&st.SV, tc.snapshotRate)
tf := makeQueueSnapshotTimeoutFunc(recoverySnapshotRate)
recoverySnapshotRate.Override(&st.SV, tc.rateLimit)
tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate)
repl := mvccStatsReplicaInQueue{
size: tc.replicaSize,
}
Expand All @@ -920,27 +920,27 @@ func TestQueueSnapshotTimeoutFunc(t *testing.T) {
for _, tc := range []testCase{
{
guaranteedProcessingTime: time.Minute,
snapshotRate: 1 << 30,
rateLimit: 1 << 30,
replicaSize: 1 << 20,
expectedTimeout: time.Minute,
},
{
guaranteedProcessingTime: time.Minute,
snapshotRate: 1 << 20,
rateLimit: 1 << 20,
replicaSize: 100 << 20,
expectedTimeout: 100 * time.Second * permittedSnapshotSlowdown,
expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Hour,
snapshotRate: 1 << 20,
rateLimit: 1 << 20,
replicaSize: 100 << 20,
expectedTimeout: time.Hour,
},
{
guaranteedProcessingTime: time.Minute,
snapshotRate: 1 << 10,
rateLimit: 1 << 10,
replicaSize: 100 << 20,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedSnapshotSlowdown,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown,
},
} {
t.Run(makeTest(tc))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newRaftSnapshotQueue(store *Store, g *gossip.Gossip) *raftSnapshotQueue {
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
processTimeoutFunc: makeQueueSnapshotTimeoutFunc(recoverySnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate),
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
Expand Down
33 changes: 12 additions & 21 deletions pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -56,15 +56,6 @@ import (
// know old CRDB versions (<19.1 at time of writing) were not involved.
var fatalOnStatsMismatch = envutil.EnvOrDefaultBool("COCKROACH_ENFORCE_CONSISTENT_STATS", false)

const (
// collectChecksumTimeout controls how long we'll wait to collect a checksum
// for a CheckConsistency request. We need to bound the time that we wait
// because the checksum might never be computed for a replica if that replica
// is caught up via a snapshot and never performs the ComputeChecksum
// operation.
collectChecksumTimeout = 15 * time.Second
)

// ReplicaChecksum contains progress on a replica checksum computation.
type ReplicaChecksum struct {
CollectChecksumResponse
Expand Down Expand Up @@ -372,17 +363,11 @@ func (r *Replica) RunConsistencyCheck(
func(ctx context.Context) {
defer wg.Done()

var resp CollectChecksumResponse
err := contextutil.RunWithTimeout(ctx, "collect checksum", collectChecksumTimeout,
func(ctx context.Context) error {
var masterChecksum []byte
if len(results) > 0 {
masterChecksum = results[0].Response.Checksum
}
var err error
resp, err = r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum)
return err
})
var masterChecksum []byte
if len(results) > 0 {
masterChecksum = results[0].Response.Checksum
}
resp, err := r.collectChecksumFromReplica(ctx, replica, ccRes.ChecksumID, masterChecksum)
resultCh <- ConsistencyCheckResult{
Replica: replica,
Response: resp,
Expand Down Expand Up @@ -505,6 +490,7 @@ func (r *Replica) sha512(
snap storage.Reader,
snapshot *roachpb.RaftSnapshotData,
mode roachpb.ChecksumMode,
limiter *limit.LimiterBurstDisabled,
) (*replicaHash, error) {
statsOnly := mode == roachpb.ChecksumMode_CHECK_STATS

Expand All @@ -519,6 +505,11 @@ func (r *Replica) sha512(
hasher := sha512.New()

visitor := func(unsafeKey storage.MVCCKey, unsafeValue []byte) error {
// Rate Limit the scan through the range
if err := limiter.WaitN(ctx, len(unsafeKey.Key)+len(unsafeValue)); err != nil {
return err
}

if snapshot != nil {
// Add (a copy of) the kv pair into the debug message.
kv := roachpb.RaftSnapshotData_KeyValue{
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
Expand Down Expand Up @@ -228,6 +229,8 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co
}
}

limiter := limit.NewLimiter(rate.Limit(consistencyCheckRate.Get(&r.store.ClusterSettings().SV)))

// Compute SHA asynchronously and store it in a map by UUID.
if err := stopper.RunAsyncTask(ctx, "storage.Replica: computing checksum", func(ctx context.Context) {
func() {
Expand All @@ -236,7 +239,8 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co
if cc.SaveSnapshot {
snapshot = &roachpb.RaftSnapshotData{}
}
result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode)

result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, limiter)
if err != nil {
log.Errorf(ctx, "%v", err)
result = nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand All @@ -75,6 +76,7 @@ import (
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
"golang.org/x/net/trace"
"golang.org/x/time/rate"
)

// allSpans is a SpanSet that covers *everything* for use in tests that don't
Expand Down Expand Up @@ -9616,7 +9618,7 @@ func TestReplicaServersideRefreshes(t *testing.T) {
// Regression test for #31870.
snap := tc.engine.NewSnapshot()
defer snap.Close()
res, err := tc.repl.sha512(context.Background(), *tc.repl.Desc(), tc.engine, nil /* diff */, roachpb.ChecksumMode_CHECK_FULL)
res, err := tc.repl.sha512(context.Background(), *tc.repl.Desc(), tc.engine, nil /* diff */, roachpb.ChecksumMode_CHECK_FULL, limit.NewLimiter(rate.Inf))
if err != nil {
return hlc.Timestamp{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func newReplicateQueue(store *Store, g *gossip.Gossip, allocator Allocator) *rep
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
// timeout based on the range size and the sending rate in addition
// to consulting the setting which controls the minimum timeout.
processTimeoutFunc: makeQueueSnapshotTimeoutFunc(rebalanceSnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
successes: store.metrics.ReplicateQueueSuccesses,
failures: store.metrics.ReplicateQueueFailures,
pending: store.metrics.ReplicateQueuePending,
Expand Down
72 changes: 72 additions & 0 deletions pkg/util/limit/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package limit

import (
"context"
"math/big"

"golang.org/x/time/rate"
)

// maxInt is the maximum int allowed by the architecture running this code i.e.
// it could be int32 or int64, unfortunately golang does not have a built in
// field for this.
const maxInt = int(^uint(0) >> 1)

// LimiterBurstDisabled is used to solve a complication in rate.Limiter.
// The rate.Limiter requires a burst parameter and if the throttled value
// exceeds the burst it just fails. This not always the desired behavior,
// sometimes we want the limiter to apply the throttle and not enforce any
// hard limits on an arbitrarily large value. This feature is particularly
// useful in Cockroach, when we want to throttle on the KV pair, the size
// of which is not strictly enforced.
type LimiterBurstDisabled struct {
// Avoid embedding, as most methods on the limiter take the parameter
// burst into consideration.
limiter *rate.Limiter
}

// NewLimiter returns a new LimiterBurstDisabled that allows events up to rate r.
func NewLimiter(r rate.Limit) *LimiterBurstDisabled {
// Unfortunately we can't disable the burst parameter on the
// limiter, so we have to provide some value to it. To remove the cognitive
// burden from the user, we set this value to be equal to the limit.
// Semantically the choice of burst parameter does not matter, since
// we will loop the limiter until all the tokens have been consumed. However
// we want to minimize the number of loops for performance, which is why
// setting the burst parameter to the limit is a good trade off.
var burst, _ = big.NewFloat(float64(r)).Int64()
if burst > int64(maxInt) {
burst = int64(maxInt)
}
return &LimiterBurstDisabled{
limiter: rate.NewLimiter(r, int(burst)),
}
}

// WaitN blocks until lim permits n events to happen.
//
// This function will now only return an error if the Context is canceled and
// should never in practice hit the burst check in the underlying limiter.
func (lim *LimiterBurstDisabled) WaitN(ctx context.Context, n int) error {
for n > 0 {
cur := n
if cur > lim.limiter.Burst() {
cur = lim.limiter.Burst()
}
if err := lim.limiter.WaitN(ctx, cur); err != nil {
return err
}
n -= cur
}
return nil
}
31 changes: 31 additions & 0 deletions pkg/util/limit/rate_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package limit

import (
"context"
"math"
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)

func TestLimiterBurstDisabled(t *testing.T) {
limiter := NewLimiter(100)

if err := limiter.WaitN(context.Background(), 101); err != nil {
t.Fatal(err)
}

limiter = NewLimiter(rate.Limit(math.MaxFloat64))
require.Equal(t, maxInt, limiter.limiter.Burst())
}

0 comments on commit 76c63de

Please sign in to comment.