Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
83667: kvserver: the timeout of queued items should consider the rates of al… r=shralex a=shralex

…l item types in the queue

When the kv.snapshot_recovery.max_rate and kv.snapshot_rebalance.max_rate
settings are given different values, if the recovery rate is high and the
rebalance rate is low, recovery snapshots can have a lower timeout than the
expected duration of a single rebalance snapshot. This means that any steady
rebalance load can starve recovery snapshots. To mitigate the issue, this PR
sets the timeout for a snapshot based on min(kv.snapshot_recovery.max_rate,
kv.snapshot_rebalance.max_rate).

Release note: None

84856: ui: set up eslint-plugin-crdb for db-console r=maryliag a=sjbarag

When the custom eslint-plugin-crdb was added to this repo [1], it was
accidentally only configured in the cluster-ui subproject. Add
eslint-plugin-crdb to the db-console subproject, fixing a lint failure
in the process.

[1] ba68179 (ui: use esbuild-loader in webpack configs, 2022-05-26)

Release note: None

Co-authored-by: shralex <[email protected]>
Co-authored-by: Sean Barag <[email protected]>
  • Loading branch information
3 people committed Jul 21, 2022
3 parents a3f3295 + ead518f + 227fb14 commit c73adb1
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
// hard to determine ahead of time. An alternative would be to calculate
// the timeout with a function that additionally considers the replication
// factor.
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate),
needsLease: true,
needsSystemConfig: true,
acceptsUnsplitRanges: false,
Expand Down
20 changes: 15 additions & 5 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,32 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura
// or calculate a range checksum) while processing should have a timeout which
// is a function of the size of the range and the maximum allowed rate of data
// transfer that adheres to a minimum timeout specified in a cluster setting.
// When the queue contains different types of work items, with different rates,
// the timeout of all items is set according to the minimum rate of the
// different types, to prevent slower items from causing faster items appearing
// after them in the queue to time-out.
//
// The parameter controls which rate to use.
func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc {
// The parameter controls which rate(s) to use.
func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queueProcessTimeoutFunc {
return func(cs *cluster.Settings, r replicaInQueue) time.Duration {
minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
// NB: In production code this will type assertion will always succeed.
// Some tests set up a fake implementation of replicaInQueue in which
// case we fall back to the configured minimum timeout.
repl, ok := r.(interface{ GetMVCCStats() enginepb.MVCCStats })
if !ok {
if !ok || len(rateSettings) == 0 {
return minimumTimeout
}
snapshotRate := rateSetting.Get(&cs.SV)
minSnapshotRate := rateSettings[0].Get(&cs.SV)
for i := 1; i < len(rateSettings); i++ {
snapshotRate := rateSettings[i].Get(&cs.SV)
if snapshotRate < minSnapshotRate {
minSnapshotRate = snapshotRate
}
}
stats := repl.GetMVCCStats()
totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes
estimatedDuration := time.Duration(totalBytes/snapshotRate) * time.Second
estimatedDuration := time.Duration(totalBytes/minSnapshotRate) * time.Second
timeout := estimatedDuration * permittedRangeScanSlowdown
if timeout < minimumTimeout {
timeout = minimumTimeout
Expand Down
52 changes: 43 additions & 9 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,16 +984,18 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) {
ctx := context.Background()
type testCase struct {
guaranteedProcessingTime time.Duration
rateLimit int64 // bytes/s
recoverySnapshotRate int64 // bytes/s
rebalanceSnapshotRate int64 // bytes/s
replicaSize int64 // bytes
expectedTimeout time.Duration
}
makeTest := func(tc testCase) (string, func(t *testing.T)) {
return fmt.Sprintf("%+v", tc), func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
queueGuaranteedProcessingTimeBudget.Override(ctx, &st.SV, tc.guaranteedProcessingTime)
recoverySnapshotRate.Override(ctx, &st.SV, tc.rateLimit)
tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate)
recoverySnapshotRate.Override(ctx, &st.SV, tc.recoverySnapshotRate)
rebalanceSnapshotRate.Override(ctx, &st.SV, tc.rebalanceSnapshotRate)
tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate)
repl := mvccStatsReplicaInQueue{
size: tc.replicaSize,
}
Expand All @@ -1003,25 +1005,57 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) {
for _, tc := range []testCase{
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 30,
recoverySnapshotRate: 1 << 30,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 1 << 20,
expectedTimeout: time.Minute,
expectedTimeout: time.Minute, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 20,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 30,
replicaSize: 1 << 20,
expectedTimeout: time.Minute, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 2 << 20,
replicaSize: 100 << 20,
expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 2 << 20,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Hour,
rateLimit: 1 << 20,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 30,
replicaSize: 100 << 20,
expectedTimeout: time.Hour,
expectedTimeout: time.Hour, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Hour,
recoverySnapshotRate: 1 << 30,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: time.Hour, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 1 << 10, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 20,
replicaSize: 100 << 20,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 10,
recoverySnapshotRate: 1 << 20,
rebalanceSnapshotRate: 1 << 10, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newRaftSnapshotQueue(store *Store) *raftSnapshotQueue {
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate),
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
// timeout based on the range size and the sending rate in addition
// to consulting the setting which controls the minimum timeout.
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate),
successes: store.metrics.ReplicateQueueSuccesses,
failures: store.metrics.ReplicateQueueFailures,
pending: store.metrics.ReplicateQueuePending,
Expand Down
4 changes: 3 additions & 1 deletion pkg/ui/workspaces/db-console/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"root": true,
"extends": "@cockroachlabs/eslint-config",
"plugins": ["@cockroachlabs/eslint-plugin-crdb"],
"ignorePatterns": ["protos.d.ts", "protos.js"],
"rules": {
"@typescript-eslint/no-inferrable-types": "off",
Expand All @@ -12,6 +13,7 @@
"@typescript-eslint/no-explicit-any": "off",
"no-restricted-imports": ["error", {
"patterns": ["@cockroachlabs/cluster-ui/src/*"]
}]
}],
"@cockroachlabs/crdb/require-antd-style-import": "error"
}
}
1 change: 1 addition & 0 deletions pkg/ui/workspaces/db-console/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ eslint_test(
"ccl",
"prettier.config.js",
"src",
"//pkg/ui/workspaces/eslint-plugin-crdb",
"@npm_db_console//@cockroachlabs/eslint-config",
"@npm_db_console//@typescript-eslint/eslint-plugin",
"@npm_db_console//@typescript-eslint/parser",
Expand Down
1 change: 1 addition & 0 deletions pkg/ui/workspaces/db-console/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"@cockroachlabs/crdb-protobuf-client": "link:./src/js",
"@cockroachlabs/crdb-protobuf-client-ccl": "link:./ccl/src/js",
"@cockroachlabs/design-tokens": "0.4.5",
"@cockroachlabs/eslint-plugin-crdb": "link:../eslint-plugin-crdb",
"@cockroachlabs/icons": "0.5.2",
"@cockroachlabs/ui-components": "0.2.20",
"analytics-node": "^3.5.0",
Expand Down
1 change: 1 addition & 0 deletions pkg/ui/workspaces/db-console/src/views/tracez/tracez.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
} from "src/util/api";
import { CaretRight } from "@cockroachlabs/icons";
import { Switch } from "antd";
import "antd/lib/switch/style";
import ISnapshotInfo = cockroach.server.serverpb.ISnapshotInfo;
import ITracingSpan = cockroach.server.serverpb.ITracingSpan;
import GetTracingSnapshotRequest = cockroach.server.serverpb.GetTracingSnapshotRequest;
Expand Down
4 changes: 4 additions & 0 deletions pkg/ui/workspaces/db-console/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,10 @@
"@typescript-eslint/parser" "^2.34.0"
eslint-config-prettier "^6.11.0"

"@cockroachlabs/eslint-plugin-crdb@link:../eslint-plugin-crdb":
version "0.0.0"
uid ""

"@cockroachlabs/[email protected]":
version "0.5.2"
resolved "https://registry.yarnpkg.com/@cockroachlabs/icons/-/icons-0.5.2.tgz#ddf87eba8e173f7ee6c787e178f460a0a1872b80"
Expand Down

0 comments on commit c73adb1

Please sign in to comment.