Skip to content

Commit

Permalink
kvserver: version gate locality-aware load-based rebalancing
Browse files Browse the repository at this point in the history
This commit introduces a set of deprecated store rebalancer methods
corresponding to the pre-22.1 load-rebalancing scheme. Until a store detects
that the version upgrade (to 22.1) has been finalized, the store will fall back
to the old (pre-22.1) load-based rebalancing logic that wasn't locality aware.

This is done in order to minimize risk of unexpected behavior in mixed version
clusters.

Resolves cockroachdb#76702

Release note: None
  • Loading branch information
aayushshah15 committed Apr 26, 2022
1 parent 1ec69d8 commit 58dde3c
Show file tree
Hide file tree
Showing 9 changed files with 1,452 additions and 20 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"compact_span_client.go",
"consistency_queue.go",
"debug_print.go",
"deprecated_store_rebalancer.go",
"doc.go",
"lease_history.go",
"log.go",
Expand Down Expand Up @@ -239,6 +240,7 @@ go_test(
"closed_timestamp_test.go",
"consistency_queue_test.go",
"debug_print_test.go",
"deprecated_store_rebalancer_test.go",
"gossip_test.go",
"helpers_test.go",
"intent_resolver_integration_test.go",
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ func (a *Allocator) allocateTargetFromList(
candidateStores StoreList,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
options *rangeCountScorerOptions,
options scorerOptions,
allowMultipleReplsPerNode bool,
targetType targetReplicaType,
) (roachpb.ReplicationTarget, string) {
Expand Down Expand Up @@ -1454,7 +1454,7 @@ func (a Allocator) RebalanceNonVoter(
func (a *Allocator) scorerOptions() *rangeCountScorerOptions {
return &rangeCountScorerOptions{
deterministic: a.storePool.deterministic,
rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV),
rangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.storePool.st.SV),
}
}

Expand All @@ -1470,7 +1470,7 @@ func (a *Allocator) scorerOptionsForScatter() *scatterScorerOptions {
// made by the replicateQueue during normal course of operations. In other
// words, we don't want stores that are too far away from the mean to be
// affected by the jitter.
jitter: rangeRebalanceThreshold.Get(&a.storePool.st.SV),
jitter: RangeRebalanceThreshold.Get(&a.storePool.st.SV),
}
}

Expand Down Expand Up @@ -2093,6 +2093,8 @@ func (a Allocator) shouldTransferLeaseForLeaseCountConvergence(
return false
}

// preferredLeaseholders returns a slice of replica descriptors corresponding to
// replicas that meet lease preferences (among the `existing` replicas).
func (a Allocator) preferredLeaseholders(
conf roachpb.SpanConfig, existing []roachpb.ReplicaDescriptor,
) []roachpb.ReplicaDescriptor {
Expand Down
60 changes: 53 additions & 7 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ const (
minRangeRebalanceThreshold = 2
)

// rangeRebalanceThreshold is the minimum ratio of a store's range count to
// RangeRebalanceThreshold is the minimum ratio of a store's range count to
// the mean range count at which that store is considered overfull or underfull
// of ranges.
var rangeRebalanceThreshold = func() *settings.FloatSetting {
var RangeRebalanceThreshold = func() *settings.FloatSetting {
s := settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.allocator.range_rebalance_threshold",
Expand Down Expand Up @@ -133,13 +133,15 @@ type scorerOptions interface {
// (relative to the equivalence class `eqClass`). This makes it more likely
// for us to pick this store as the rebalance target.
rebalanceToConvergesScore(eqClass equivalenceClass, candidate roachpb.StoreDescriptor) int
// removalConvergesScore is similar to `rebalanceFromConvergesScore` (both
// deal with computing a converges score for existing stores that might
// removalMaximallyConvergesScore is similar to `rebalanceFromConvergesScore`
// (both deal with computing a converges score for existing stores that might
// relinquish a replica). removalConvergesScore assigns a negative convergence
// score to the existing store (or multiple replicas, if there are multiple
// with the same QPS) that would converge the range's existing stores' QPS the
// most.
removalMaximallyConvergesScore(removalCandStoreList StoreList, existing roachpb.StoreDescriptor) int
// getRangeRebalanceThreshold returns the current range rebalance threshold.
getRangeRebalanceThreshold() float64
}

func jittered(val float64, jitter float64, rand allocatorRand) float64 {
Expand Down Expand Up @@ -290,13 +292,22 @@ func (o *rangeCountScorerOptions) removalMaximallyConvergesScore(
return 0
}

func (o *rangeCountScorerOptions) getRangeRebalanceThreshold() float64 {
return o.rangeRebalanceThreshold
}

// qpsScorerOptions is used by the StoreRebalancer to tell the Allocator's
// rebalancing machinery to base its balance/convergence scores on
// queries-per-second. This means that the resulting rebalancing decisions will
// further the goal of converging QPS across stores in the cluster.
// queries-per-second. This means that the resulting rebalancing decisions will further the goal of
// converging QPS across stores in the cluster.
type qpsScorerOptions struct {
deterministic bool
qpsRebalanceThreshold, minRequiredQPSDiff float64
// NB: For mixed version compatibility with 21.2, we need to include the range
// count based rebalance threshold here. This is because in 21.2, the store
// rebalancer took range count into account when trying to rank candidate
// stores.
deprecatedRangeRebalanceThreshold float64

// QPS-based rebalancing assumes that:
// 1. Every replica of a range currently receives the same level of traffic.
Expand All @@ -315,6 +326,10 @@ type qpsScorerOptions struct {
qpsPerReplica float64
}

func (o *qpsScorerOptions) getRangeRebalanceThreshold() float64 {
return o.deprecatedRangeRebalanceThreshold
}

func (o *qpsScorerOptions) maybeJitterStoreStats(sl StoreList, _ allocatorRand) StoreList {
return sl
}
Expand Down Expand Up @@ -745,7 +760,7 @@ func rankedCandidateListForAllocation(
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isStoreValidForRoutineReplicaTransfer func(context.Context, roachpb.StoreID) bool,
allowMultipleReplsPerNode bool,
options *rangeCountScorerOptions,
options scorerOptions,
) candidateList {
var candidates candidateList
existingReplTargets := roachpb.MakeReplicaSet(existingReplicas).ReplicationTargets()
Expand Down Expand Up @@ -779,11 +794,42 @@ func rankedCandidateListForAllocation(
}
diversityScore := diversityAllocateScore(s, existingStoreLocalities)
balanceScore := options.balanceScore(candidateStores, s.Capacity)
// NB: This is only applicable in mixed version (21.2 along with 22.1)
// clusters. `rankedCandidateListForAllocation` will never be called in 22.1
// with a `qpsScorerOptions`.
//
// TODO(aayush): Remove this some time in the 22.2 cycle.
var convergesScore int
if qpsOpts, ok := options.(*qpsScorerOptions); ok {
if qpsOpts.qpsRebalanceThreshold > 0 {
if s.Capacity.QueriesPerSecond < underfullQPSThreshold(
qpsOpts, candidateStores.candidateQueriesPerSecond.mean,
) {
convergesScore = 1
} else if s.Capacity.QueriesPerSecond < candidateStores.candidateQueriesPerSecond.mean {
convergesScore = 0
} else if s.Capacity.QueriesPerSecond < overfullQPSThreshold(
qpsOpts, candidateStores.candidateQueriesPerSecond.mean,
) {
convergesScore = -1
} else {
convergesScore = -2
}

// NB: Maintain parity with the 21.2 implementation, which computed the
// `balanceScore` using range-counts instead of QPS even during
// load-based rebalancing.
balanceScore = (&rangeCountScorerOptions{
rangeRebalanceThreshold: options.getRangeRebalanceThreshold(),
}).balanceScore(candidateStores, s.Capacity)
}
}
candidates = append(candidates, candidate{
store: s,
valid: constraintsOK,
necessary: necessary,
diversityScore: diversityScore,
convergesScore: convergesScore,
balanceScore: balanceScore,
rangeCount: int(s.Capacity.RangeCount),
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/allocator_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,7 +1447,7 @@ func TestDiversityScoreEquivalence(t *testing.T) {
{[]roachpb.StoreID{testStoreUSa15, testStoreUSa15Dupe, testStoreUSa1, testStoreUSb, testStoreEurope}, 13.0 / 20.0},
}

// Ensure that rangeDiversityScore and diversityRebalanceFromScore return
// Ensure that RangeDiversityScore and diversityRebalanceFromScore return
// the same results for the same configurations, enabling their results
// to be directly compared with each other. The same is not true for
// diversityAllocateScore and diversityRemovalScore as of their initial
Expand All @@ -1460,7 +1460,7 @@ func TestDiversityScoreEquivalence(t *testing.T) {
}
rangeScore := rangeDiversityScore(existingLocalities)
if a, e := rangeScore, tc.expected; !scoresAlmostEqual(a, e) {
t.Errorf("rangeDiversityScore(%v) got %f, want %f", existingLocalities, a, e)
t.Errorf("RangeDiversityScore(%v) got %f, want %f", existingLocalities, a, e)
}
for _, storeID := range tc.stores {
s := testStores[storeID]
Expand All @@ -1472,7 +1472,7 @@ func TestDiversityScoreEquivalence(t *testing.T) {
s, fromStoreID, existingLocalities, a, e)
}
if a, e := rebalanceScore, rangeScore; !scoresAlmostEqual(a, e) {
t.Errorf("diversityRebalanceFromScore(%v, %d, %v)=%f not equal to rangeDiversityScore(%v)=%f",
t.Errorf("diversityRebalanceFromScore(%v, %d, %v)=%f not equal to RangeDiversityScore(%v)=%f",
s, fromStoreID, existingLocalities, a, existingLocalities, e)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) {
for i := range stores {
stores[i].rangeCount = mean
}
surplus := int32(math.Ceil(float64(mean)*rangeRebalanceThreshold.Get(&st.SV) + 1))
surplus := int32(math.Ceil(float64(mean)*RangeRebalanceThreshold.Get(&st.SV) + 1))
stores[0].rangeCount += surplus
stores[0].shouldRebalanceFrom = true
for i := 1; i < len(stores); i++ {
Expand All @@ -1257,7 +1257,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) {
// Subtract enough ranges from the first store to make it a suitable
// rebalance target. To maintain the specified mean, we then add that delta
// back to the rest of the replicas.
deficit := int32(math.Ceil(float64(mean)*rangeRebalanceThreshold.Get(&st.SV) + 1))
deficit := int32(math.Ceil(float64(mean)*RangeRebalanceThreshold.Get(&st.SV) + 1))
stores[0].rangeCount -= deficit
for i := 1; i < len(stores); i++ {
stores[i].rangeCount += int32(math.Ceil(float64(deficit) / float64(len(stores)-1)))
Expand Down
Loading

0 comments on commit 58dde3c

Please sign in to comment.