Skip to content

Commit

Permalink
kvserver: the timeout of queued items should consider the rates of al…
Browse files Browse the repository at this point in the history
…l item types in the queue

When the kv.snapshot_recovery.max_rate and kv.snapshot_rebalance.max_rate
settings are given different values, if the recovery rate is high and the
rebalance rate is low, recovery snapshots can have a lower timeout than the
expected duration of a single rebalance snapshot. This means that any steady
rebalance load can starve recovery snapshots. To mitigate the issue, this PR
sets the timeout for a snapshot based on min(kv.snapshot_recovery.max_rate,
kv.snapshot_rebalance.max_rate).

Release note: None
  • Loading branch information
shralex committed Jul 21, 2022
1 parent 82923fc commit ead518f
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
// hard to determine ahead of time. An alternative would be to calculate
// the timeout with a function that additionally considers the replication
// factor.
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate),
needsLease: true,
needsSystemConfig: true,
acceptsUnsplitRanges: false,
Expand Down
20 changes: 15 additions & 5 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,32 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura
// or calculate a range checksum) while processing should have a timeout which
// is a function of the size of the range and the maximum allowed rate of data
// transfer that adheres to a minimum timeout specified in a cluster setting.
// When the queue contains different types of work items, with different rates,
// the timeout of all items is set according to the minimum rate of the
// different types, to prevent slower items from causing faster items appearing
// after them in the queue to time-out.
//
// The parameter controls which rate to use.
func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc {
// The parameter controls which rate(s) to use.
func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queueProcessTimeoutFunc {
return func(cs *cluster.Settings, r replicaInQueue) time.Duration {
minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
// NB: In production code this will type assertion will always succeed.
// Some tests set up a fake implementation of replicaInQueue in which
// case we fall back to the configured minimum timeout.
repl, ok := r.(interface{ GetMVCCStats() enginepb.MVCCStats })
if !ok {
if !ok || len(rateSettings) == 0 {
return minimumTimeout
}
snapshotRate := rateSetting.Get(&cs.SV)
minSnapshotRate := rateSettings[0].Get(&cs.SV)
for i := 1; i < len(rateSettings); i++ {
snapshotRate := rateSettings[i].Get(&cs.SV)
if snapshotRate < minSnapshotRate {
minSnapshotRate = snapshotRate
}
}
stats := repl.GetMVCCStats()
totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes
estimatedDuration := time.Duration(totalBytes/snapshotRate) * time.Second
estimatedDuration := time.Duration(totalBytes/minSnapshotRate) * time.Second
timeout := estimatedDuration * permittedRangeScanSlowdown
if timeout < minimumTimeout {
timeout = minimumTimeout
Expand Down
52 changes: 43 additions & 9 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,16 +984,18 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) {
ctx := context.Background()
type testCase struct {
guaranteedProcessingTime time.Duration
rateLimit int64 // bytes/s
recoverySnapshotRate int64 // bytes/s
rebalanceSnapshotRate int64 // bytes/s
replicaSize int64 // bytes
expectedTimeout time.Duration
}
makeTest := func(tc testCase) (string, func(t *testing.T)) {
return fmt.Sprintf("%+v", tc), func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
queueGuaranteedProcessingTimeBudget.Override(ctx, &st.SV, tc.guaranteedProcessingTime)
recoverySnapshotRate.Override(ctx, &st.SV, tc.rateLimit)
tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate)
recoverySnapshotRate.Override(ctx, &st.SV, tc.recoverySnapshotRate)
rebalanceSnapshotRate.Override(ctx, &st.SV, tc.rebalanceSnapshotRate)
tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate)
repl := mvccStatsReplicaInQueue{
size: tc.replicaSize,
}
Expand All @@ -1003,25 +1005,57 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) {
for _, tc := range []testCase{
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 30,
recoverySnapshotRate: 1 << 30,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 1 << 20,
expectedTimeout: time.Minute,
expectedTimeout: time.Minute, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 20,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 30,
replicaSize: 1 << 20,
expectedTimeout: time.Minute, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 2 << 20,
replicaSize: 100 << 20,
expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 2 << 20,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Hour,
rateLimit: 1 << 20,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 30,
replicaSize: 100 << 20,
expectedTimeout: time.Hour,
expectedTimeout: time.Hour, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Hour,
recoverySnapshotRate: 1 << 30,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: time.Hour, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 1 << 10, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 20,
replicaSize: 100 << 20,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 10,
recoverySnapshotRate: 1 << 20,
rebalanceSnapshotRate: 1 << 10, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newRaftSnapshotQueue(store *Store) *raftSnapshotQueue {
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate),
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
// timeout based on the range size and the sending rate in addition
// to consulting the setting which controls the minimum timeout.
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate),
successes: store.metrics.ReplicateQueueSuccesses,
failures: store.metrics.ReplicateQueueFailures,
pending: store.metrics.ReplicateQueuePending,
Expand Down

0 comments on commit ead518f

Please sign in to comment.