diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index bb8d7357eab2..ee6a8e27fb3f 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -110,7 +110,7 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue { // hard to determine ahead of time. An alternative would be to calculate // the timeout with a function that additionally considers the replication // factor. - processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate), + processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate), needsLease: true, needsSystemConfig: true, acceptsUnsplitRanges: false, diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index 1ce0c969d38f..57ea80245d39 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -71,22 +71,32 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura // or calculate a range checksum) while processing should have a timeout which // is a function of the size of the range and the maximum allowed rate of data // transfer that adheres to a minimum timeout specified in a cluster setting. +// When the queue contains different types of work items, with different rates, +// the timeout of all items is set according to the minimum rate of the +// different types, to prevent slower items from causing faster items appearing +// after them in the queue to time-out. // -// The parameter controls which rate to use. -func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc { +// The parameter controls which rate(s) to use. +func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queueProcessTimeoutFunc { return func(cs *cluster.Settings, r replicaInQueue) time.Duration { minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV) // NB: In production code this will type assertion will always succeed. // Some tests set up a fake implementation of replicaInQueue in which // case we fall back to the configured minimum timeout. repl, ok := r.(interface{ GetMVCCStats() enginepb.MVCCStats }) - if !ok { + if !ok || len(rateSettings) == 0 { return minimumTimeout } - snapshotRate := rateSetting.Get(&cs.SV) + minSnapshotRate := rateSettings[0].Get(&cs.SV) + for i := 1; i < len(rateSettings); i++ { + snapshotRate := rateSettings[i].Get(&cs.SV) + if snapshotRate < minSnapshotRate { + minSnapshotRate = snapshotRate + } + } stats := repl.GetMVCCStats() totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes - estimatedDuration := time.Duration(totalBytes/snapshotRate) * time.Second + estimatedDuration := time.Duration(totalBytes/minSnapshotRate) * time.Second timeout := estimatedDuration * permittedRangeScanSlowdown if timeout < minimumTimeout { timeout = minimumTimeout diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index 26913f0e25b9..7e9235d03a13 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -984,7 +984,8 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) { ctx := context.Background() type testCase struct { guaranteedProcessingTime time.Duration - rateLimit int64 // bytes/s + recoverySnapshotRate int64 // bytes/s + rebalanceSnapshotRate int64 // bytes/s replicaSize int64 // bytes expectedTimeout time.Duration } @@ -992,8 +993,9 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) { return fmt.Sprintf("%+v", tc), func(t *testing.T) { st := cluster.MakeTestingClusterSettings() queueGuaranteedProcessingTimeBudget.Override(ctx, &st.SV, tc.guaranteedProcessingTime) - recoverySnapshotRate.Override(ctx, &st.SV, tc.rateLimit) - tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate) + recoverySnapshotRate.Override(ctx, &st.SV, tc.recoverySnapshotRate) + rebalanceSnapshotRate.Override(ctx, &st.SV, tc.rebalanceSnapshotRate) + tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate) repl := mvccStatsReplicaInQueue{ size: tc.replicaSize, } @@ -1003,25 +1005,57 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) { for _, tc := range []testCase{ { guaranteedProcessingTime: time.Minute, - rateLimit: 1 << 30, + recoverySnapshotRate: 1 << 30, + rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation. replicaSize: 1 << 20, - expectedTimeout: time.Minute, + expectedTimeout: time.Minute, // the minimum timeout (guaranteedProcessingTime). }, { guaranteedProcessingTime: time.Minute, - rateLimit: 1 << 20, + recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation. + rebalanceSnapshotRate: 1 << 30, + replicaSize: 1 << 20, + expectedTimeout: time.Minute, // the minimum timeout (guaranteedProcessingTime). + }, + { + guaranteedProcessingTime: time.Minute, + recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation. + rebalanceSnapshotRate: 2 << 20, + replicaSize: 100 << 20, + expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown, + }, + { + guaranteedProcessingTime: time.Minute, + recoverySnapshotRate: 2 << 20, + rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation. replicaSize: 100 << 20, expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown, }, { guaranteedProcessingTime: time.Hour, - rateLimit: 1 << 20, + recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation. + rebalanceSnapshotRate: 1 << 30, replicaSize: 100 << 20, - expectedTimeout: time.Hour, + expectedTimeout: time.Hour, // the minimum timeout (guaranteedProcessingTime). + }, + { + guaranteedProcessingTime: time.Hour, + recoverySnapshotRate: 1 << 30, + rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation. + replicaSize: 100 << 20, + expectedTimeout: time.Hour, // the minimum timeout (guaranteedProcessingTime). + }, + { + guaranteedProcessingTime: time.Minute, + recoverySnapshotRate: 1 << 10, // minimum rate for timeout calculation. + rebalanceSnapshotRate: 1 << 20, + replicaSize: 100 << 20, + expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown, }, { guaranteedProcessingTime: time.Minute, - rateLimit: 1 << 10, + recoverySnapshotRate: 1 << 20, + rebalanceSnapshotRate: 1 << 10, // minimum rate for timeout calculation. replicaSize: 100 << 20, expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown, }, diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index bb51c0ab94b7..cab8fbb416b9 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -50,7 +50,7 @@ func newRaftSnapshotQueue(store *Store) *raftSnapshotQueue { needsLease: false, needsSystemConfig: false, acceptsUnsplitRanges: true, - processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate), + processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate), successes: store.metrics.RaftSnapshotQueueSuccesses, failures: store.metrics.RaftSnapshotQueueFailures, pending: store.metrics.RaftSnapshotQueuePending, diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 0276920e28f7..310ef779ced6 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -388,7 +388,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica // so we use the raftSnapshotQueueTimeoutFunc. This function sets a // timeout based on the range size and the sending rate in addition // to consulting the setting which controls the minimum timeout. - processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate), + processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate), successes: store.metrics.ReplicateQueueSuccesses, failures: store.metrics.ReplicateQueueFailures, pending: store.metrics.ReplicateQueuePending,