Skip to content

Commit

Permalink
Merge #65379
Browse files Browse the repository at this point in the history
65379: kvserver: actuate load-based replica rebalancing under heterogeneous localities r=aayushshah15 a=aayushshah15

This commit teaches the `StoreRebalancer` to make load-based rebalancing
decisions that are meaningful within the context of the replication constraints
placed on the ranges being relocated and the set of stores that can legally
receive replicas for such ranges.

Previously, the `StoreRebalancer` would compute the QPS underfull and overfull
thresholds based on the overall average QPS being served by all stores in the
cluster. Notably, this included stores that were in replication zones that
would not satisfy required constraints for the range being considered for
rebalancing. This meant that the store rebalancer would effectively never be
able to rebalance ranges within the stores inside heavily loaded replication
zones (since all the _valid_ stores would be above the overfull thresholds).

This patch is a move away from the bespoke relocation logic in the
`StoreRebalancer`. Instead, we have the `StoreRebalancer` rely on the
rebalancing logic used by the `replicateQueue` that already has the machinery
to compute load based signals for candidates _relative to other comparable
stores_. The main difference here is that the `StoreRebalancer` uses this
machinery to promote convergence of QPS across stores, whereas the
`replicateQueue` uses it to promote convergence of range counts. A series of
preceeding commits in this patchset generalize the existing replica rebalancing
logic, and this commit teaches the `StoreRebalancer` to use it.

This generalization also addresses another key limitation (see #62992) of the
`StoreRebalancer` regarding its inability to make partial improvements to a
range. Previously, if the `StoreRebalancer` couldn't move a range _entirely_
off of overfull stores, it would give up and not even move the subset of
replicas it could. This is no longer the case.

Resolves #61883
Resolves #62992
Resolves #31135

/cc @cockroachdb/kv

Release justification: fixes a set of major limitations behind numerous support escalations

Release note (performance improvement): QPS-based rebalancing is now
aware of different constraints placed on different replication zones. This
means that heterogeneously loaded replication zones (for instance, regions)
will achieve a more even distribution of QPS within the stores inside each
such zone.


Co-authored-by: Aayush Shah <[email protected]>
  • Loading branch information
craig[bot] and aayushshah15 committed Sep 8, 2021
2 parents 91311cf + d61f474 commit fa9acb4
Show file tree
Hide file tree
Showing 13 changed files with 2,165 additions and 1,253 deletions.
317 changes: 225 additions & 92 deletions pkg/kv/kvserver/allocator.go

Large diffs are not rendered by default.

502 changes: 334 additions & 168 deletions pkg/kv/kvserver/allocator_scorer.go

Large diffs are not rendered by default.

89 changes: 70 additions & 19 deletions pkg/kv/kvserver/allocator_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func TestConstraintsCheck(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
for _, s := range testStores {
valid := constraintsCheck(s, tc.constraints)
valid := isStoreValid(s, tc.constraints)
ok := tc.expected[s.StoreID]
if valid != ok {
t.Errorf("expected store %d to be %t, but got %t", s.StoreID, ok, valid)
Expand Down Expand Up @@ -1036,7 +1036,7 @@ func TestShouldRebalanceDiversity(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

options := scorerOptions{}
options := rangeCountScorerOptions{}
newStore := func(id int, locality roachpb.Locality) roachpb.StoreDescriptor {
return roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(id),
Expand Down Expand Up @@ -1193,7 +1193,7 @@ func TestShouldRebalanceDiversity(t *testing.T) {
actual := len(targets) > 0
if actual != tc.expected {
t.Errorf(
"%d: shouldRebalanceBasedOnRangeCount on s%d with replicas on %v got %t, expected %t",
"%d: shouldRebalanceBasedOnThresholds on s%d with replicas on %v got %t, expected %t",
i,
tc.s.StoreID,
tc.existingNodeIDs,
Expand Down Expand Up @@ -1462,11 +1462,13 @@ func TestDiversityScoreEquivalence(t *testing.T) {
}
}

func TestBalanceScore(t *testing.T) {
func TestBalanceScoreByRangeCount(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

options := scorerOptions{}
options := rangeCountScorerOptions{
rangeRebalanceThreshold: 0.1,
}
storeList := StoreList{
candidateRanges: stat{mean: 1000},
}
Expand All @@ -1486,24 +1488,68 @@ func TestBalanceScore(t *testing.T) {
sRangesOverfull.RangeCount = 1500
sRangesUnderfull := sMean
sRangesUnderfull.RangeCount = 500
sRangesLessThanMean := sMean
sRangesLessThanMean.RangeCount = 900
sRangesMoreThanMean := sMean
sRangesMoreThanMean.RangeCount = 1099

testCases := []struct {
sc roachpb.StoreCapacity
expected float64
expected balanceStatus
}{
{sEmpty, 1},
{sMean, 0},
{sRangesOverfull, -1},
{sRangesUnderfull, 1},
{sEmpty, underfull},
{sRangesLessThanMean, lessThanEqualToMean},
{sMean, lessThanEqualToMean},
{sRangesMoreThanMean, moreThanMean},
{sRangesOverfull, overfull},
{sRangesUnderfull, underfull},
}
for i, tc := range testCases {
if a, e := balanceScore(storeList, tc.sc, options), tc.expected; a.totalScore() != e {
t.Errorf("%d: balanceScore(storeList, %+v) got %s; want %.2f", i, tc.sc, a, e)
if a, e := options.balanceScore(storeList, tc.sc), tc.expected; a != e {
t.Errorf("%d: balanceScore(storeList, %+v) got %d; want %d", i, tc.sc, a, e)
}
}
}

func TestRebalanceConvergesOnMean(t *testing.T) {
func TestRebalanceBalanceScoreOnQPS(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

storeList := StoreList{
candidateQueriesPerSecond: stat{mean: 1000},
}

testCases := []struct {
QPS float64
expBalanceScore balanceStatus
}{
{0, underfull},
{900, lessThanEqualToMean},
{999, lessThanEqualToMean},
{1000, lessThanEqualToMean},
{1001, moreThanMean},
{2000, overfull},
}

for i, tc := range testCases {
sc := roachpb.StoreCapacity{
QueriesPerSecond: tc.QPS,
}
options := qpsScorerOptions{
qpsRebalanceThreshold: 0.1,
}
if a, e := options.balanceScore(storeList, sc), tc.expBalanceScore; a != e {
t.Errorf("%d: rebalanceToConvergesScore(storeList, %+v) got %d; want %d", i, sc, a, e)
}
// NB: Any replica whose removal would not converge the QPS to the mean is
// given a score of 1 to make it less attractive for removal.
if a, e := options.balanceScore(storeList, sc), tc.expBalanceScore; a != e {
t.Errorf("%d: rebalanceFromConvergesScore(storeList, %+v) got %d; want %d", i, sc, a, e)
}
}
}

func TestRebalanceConvergesRangeCountOnMean(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -1518,23 +1564,28 @@ func TestRebalanceConvergesOnMean(t *testing.T) {
}{
{0, true, false},
{900, true, false},
{900, true, false},
{999, true, false},
{1000, false, false},
{1001, false, true},
{2000, false, true},
{900, true, false},
}

options := rangeCountScorerOptions{}
for i, tc := range testCases {
sc := roachpb.StoreCapacity{
RangeCount: tc.rangeCount,
}
if a, e := rebalanceToConvergesOnMean(storeList, sc), tc.toConverges; a != e {
t.Errorf("%d: rebalanceToConvergesOnMean(storeList, %+v) got %t; want %t", i, sc, a, e)
if a, e := options.rebalanceToConvergesScore(
storeList, sc,
) == 1, tc.toConverges; a != e {
t.Errorf("%d: rebalanceToConvergesScore(storeList, %+v) got %t; want %t", i, sc, a, e)
}
if a, e := rebalanceFromConvergesOnMean(storeList, sc), tc.fromConverges; a != e {
t.Errorf("%d: rebalanceFromConvergesOnMean(storeList, %+v) got %t; want %t", i, sc, a, e)
// NB: Any replica whose removal would not converge the range count to the
// mean is given a score of 1 to make it less attractive for removal.
if a, e := options.rebalanceFromConvergesScore(
storeList, sc,
) == 0, tc.fromConverges; a != e {
t.Errorf("%d: rebalanceFromConvergesScore(storeList, %+v) got %t; want %t", i, sc, a, e)
}
}
}
Expand Down
Loading

0 comments on commit fa9acb4

Please sign in to comment.