Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
61239: kvserver: handle range-level rebalancing of non-voting replicas r=aayushshah15 a=aayushshah15

First commit from cockroachdb#60029

This PR adds the necessary machinery within the allocator and the
plumbing within the replicateQueue to be able to rebalance non-voting
replicas.

A few things to note are:

Voting replicas are allowed to rebalance to nodes that already have a
non-voting replica. This will trigger what is, essentially, a metadata
operation: flipping the replica type of the corresponding non-voter to a
voter and the type of the voter to a non_voter. Notably, this PR changes
voter allocation code to also be able to do this.

Computation of diversity scores works slightly differently for voting
replicas and non-voting replicas. Non-voting replicas compute candidate
diversity scores based off of all existing replicas, whereas voting
replicas compute candidate diversity scores off of just the set of
voting replicas.

This PR does not yet add support in the store rebalancer to make rebalancing
decisions for non-voting replicas and we don't yet support removal/
replacement of dead and decommissioning non-voters. These things are
coming in follow-up PRs.

Release justification: necessary for non-voting replicas

Release note: None


61373: kvserver, timeutil: fix some Timer user-after-Stops r=andreimatei a=andreimatei

Two guys were continuing to use a Timer after Stop()ing it, which is
illegal.

Release note: None
Release justification: Bug fix.

Co-authored-by: Aayush Shah <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
3 people committed Mar 4, 2021
3 parents cbdce1b + a75a287 + 1512a14 commit 37cdf01
Show file tree
Hide file tree
Showing 12 changed files with 1,001 additions and 351 deletions.
311 changes: 217 additions & 94 deletions pkg/kv/kvserver/allocator.go

Large diffs are not rendered by default.

86 changes: 64 additions & 22 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,18 +523,20 @@ type rebalanceOptions struct {
candidates candidateList
}

// rebalanceCandidates creates two candidate lists. The first contains all
// existing replica's stores, ordered from least qualified for rebalancing to
// most qualified. The second list is of all potential stores that could be
// used as rebalancing receivers, ordered from best to worst.
func rebalanceCandidates(
// rankedCandidateListForRebalancing creates two candidate lists. The first
// contains all existing replica's stores, ordered from least qualified for
// rebalancing to most qualified. The second list is of all potential stores
// that could be used as rebalancing receivers, ordered from best to worst.
func rankedCandidateListForRebalancing(
ctx context.Context,
allStores StoreList,
constraints constraint.AnalyzedConstraints,
existingReplicas []roachpb.ReplicaDescriptor,
removalConstraintsChecker constraintsCheckFn,
rebalanceConstraintsChecker rebalanceConstraintsCheckFn,
existingReplicasForType, replicasWithExcludedStores []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
options scorerOptions,
replicaType targetReplicaType,
) []rebalanceOptions {
// 1. Determine whether existing replicas are valid and/or necessary.
existingStores := make(map[roachpb.StoreID]candidate)
Expand All @@ -545,11 +547,11 @@ func rebalanceCandidates(
log.VEventf(ctx, 3, "not considering non-ready node n%d for rebalance", store.Node.NodeID)
continue
}
for _, repl := range existingReplicas {
for _, repl := range existingReplicasForType {
if store.StoreID != repl.StoreID {
continue
}
valid, necessary := removeConstraintsCheck(store, constraints)
valid, necessary := removalConstraintsChecker(store)
fullDisk := !maxCapacityCheck(store)
if !valid {
if !needRebalanceFrom {
Expand Down Expand Up @@ -617,8 +619,15 @@ func rebalanceCandidates(
}
var comparableCands candidateList
for _, store := range allStores.stores {
constraintsOK, necessary := rebalanceFromConstraintsCheck(
store, existing.store.StoreID, constraints)
// Ignore any stores that contain any of the replicas within
// `replicasWithExcludedStores`.
for _, excluded := range replicasWithExcludedStores {
if store.StoreID == excluded.StoreID {
continue
}
}

constraintsOK, necessary := rebalanceConstraintsChecker(store, existing.store)
maxCapacityOK := maxCapacityCheck(store)
diversityScore := diversityRebalanceFromScore(
store, existing.store.StoreID, existingStoreLocalities)
Expand All @@ -630,6 +639,7 @@ func rebalanceCandidates(
diversityScore: diversityScore,
}
if !cand.less(existing) {
// If `cand` is not worse than `existing`, add it to the list.
comparableCands = append(comparableCands, cand)
if !needRebalanceFrom && !needRebalanceTo && existing.less(cand) {
needRebalanceTo = true
Expand Down Expand Up @@ -676,9 +686,12 @@ func rebalanceCandidates(
}
}
}
// TODO(a-robinson): Some moderate refactoring could extract this logic out
// into the loop below, avoiding duplicate balanceScore calculations.
if shouldRebalance(ctx, existing.store, sl, options) {
// NB: Due to step 2 from above, we're guaranteed to have a non-empty `sl`
// at this point.
//
// TODO(a-robinson): Some moderate refactoring could extract this logic
// out into the loop below, avoiding duplicate balanceScore calculations.
if shouldRebalanceBasedOnRangeCount(ctx, existing.store, sl, options) {
shouldRebalanceCheck = true
break
}
Expand Down Expand Up @@ -742,7 +755,7 @@ func rebalanceCandidates(
// Only consider this candidate if we must rebalance due to constraint,
// disk fullness, or diversity reasons.
log.VEventf(ctx, 3, "not considering %+v as a candidate for range %+v: score=%s storeList=%+v",
s, existingReplicas, cand.balanceScore, comparable.sl)
s, existingReplicasForType, cand.balanceScore, comparable.sl)
continue
}
cand.rangeCount = int(s.Capacity.RangeCount)
Expand Down Expand Up @@ -840,9 +853,10 @@ func betterRebalanceTarget(target1, existing1, target2, existing2 *candidate) *c
return target1
}

// shouldRebalance returns whether the specified store is a candidate for
// having a replica removed from it given the candidate store list.
func shouldRebalance(
// shouldRebalanceBasedOnRangeCount returns whether the specified store is a
// candidate for having a replica removed from it given the candidate store
// list.
func shouldRebalanceBasedOnRangeCount(
ctx context.Context, store roachpb.StoreDescriptor, sl StoreList, options scorerOptions,
) bool {
overfullThreshold := int32(math.Ceil(overfullRangeThreshold(options, sl.candidateRanges.mean)))
Expand Down Expand Up @@ -911,6 +925,11 @@ func sameLocalityAndAttrs(s1, s2 roachpb.StoreDescriptor) bool {
// existing one.
type constraintsCheckFn func(roachpb.StoreDescriptor) (valid, necessary bool)

// rebalanceConstraintsCheckFn determines whether `toStore` is a valid and/or
// necessary replacement candidate for `fromStore` (which must contain an
// existing replica).
type rebalanceConstraintsCheckFn func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool)

// voterConstraintsCheckerForAllocation returns a constraintsCheckFn that
// determines whether a candidate for a new voting replica is valid and/or
// necessary as per the `voter_constraints` and `constraints` on the range.
Expand Down Expand Up @@ -978,6 +997,31 @@ func nonVoterConstraintsCheckerForRemoval(
}
}

// voterConstraintsCheckerForRebalance returns a rebalanceConstraintsCheckFn
// that determines whether a given store is a valid and/or necessary rebalance
// candidate from a given store of an existing voting replica.
func voterConstraintsCheckerForRebalance(
overallConstraints, voterConstraints constraint.AnalyzedConstraints,
) rebalanceConstraintsCheckFn {
return func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool) {
overallConstraintsOK, necessaryOverall := rebalanceFromConstraintsCheck(toStore, fromStore, overallConstraints)
voterConstraintsOK, necessaryForVoters := rebalanceFromConstraintsCheck(toStore, fromStore, voterConstraints)

return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters
}
}

// nonVoterConstraintsCheckerForRebalance returns a rebalanceConstraintsCheckFn
// that determines whether a given store is a valid and/or necessary rebalance
// candidate from a given store of an existing non-voting replica.
func nonVoterConstraintsCheckerForRebalance(
overallConstraints constraint.AnalyzedConstraints,
) rebalanceConstraintsCheckFn {
return func(toStore, fromStore roachpb.StoreDescriptor) (valid, necessary bool) {
return rebalanceFromConstraintsCheck(toStore, fromStore, overallConstraints)
}
}

// allocateConstraintsCheck checks the potential allocation target store
// against all the constraints. If it matches a constraint at all, it's valid.
// If it matches a constraint that is not already fully satisfied by existing
Expand Down Expand Up @@ -1057,9 +1101,7 @@ func removeConstraintsCheck(
// will be necessary if fromStoreID (an existing replica) is removed from the
// range.
func rebalanceFromConstraintsCheck(
store roachpb.StoreDescriptor,
fromStoreID roachpb.StoreID,
analyzed constraint.AnalyzedConstraints,
store, fromStoreID roachpb.StoreDescriptor, analyzed constraint.AnalyzedConstraints,
) (valid bool, necessary bool) {
// All stores are valid when there are no constraints.
if len(analyzed.Constraints) == 0 {
Expand All @@ -1083,7 +1125,7 @@ func rebalanceFromConstraintsCheck(
matchingStores := analyzed.SatisfiedBy[i]
if len(matchingStores) < int(constraints.NumReplicas) ||
(len(matchingStores) == int(constraints.NumReplicas) &&
containsStore(analyzed.SatisfiedBy[i], fromStoreID)) {
containsStore(analyzed.SatisfiedBy[i], fromStoreID.StoreID)) {
return true, true
}
}
Expand Down
36 changes: 27 additions & 9 deletions pkg/kv/kvserver/allocator_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ func TestBetterThan(t *testing.T) {
}

// TestBestRebalanceTarget constructs a hypothetical output of
// rebalanceCandidates and verifies that bestRebalanceTarget properly returns
// the candidates in the ideal order of preference and omits any that aren't
// desirable.
// rankedCandidateListForRebalancing and verifies that bestRebalanceTarget
// properly returns the candidates in the ideal order of preference and omits
// any that aren't desirable.
func TestBestRebalanceTarget(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1209,18 +1209,36 @@ func TestShouldRebalanceDiversity(t *testing.T) {
}
}

targets := rebalanceCandidates(
removalConstraintsChecker := voterConstraintsCheckerForRemoval(
constraint.EmptyAnalyzedConstraints,
constraint.EmptyAnalyzedConstraints,
)
rebalanceConstraintsChecker := voterConstraintsCheckerForRebalance(
constraint.EmptyAnalyzedConstraints,
constraint.EmptyAnalyzedConstraints,
)
targets := rankedCandidateListForRebalancing(
context.Background(),
filteredSL,
constraint.AnalyzedConstraints{},
removalConstraintsChecker,
rebalanceConstraintsChecker,
replicas,
nil,
existingStoreLocalities,
func(context.Context, roachpb.NodeID) bool { return true }, /* isNodeValidForRoutineReplicaTransfer */
options)
func(context.Context, roachpb.NodeID) bool { return true },
options,
voterTarget,
)
actual := len(targets) > 0
if actual != tc.expected {
t.Errorf("%d: shouldRebalance on s%d with replicas on %v got %t, expected %t",
i, tc.s.StoreID, tc.existingNodeIDs, actual, tc.expected)
t.Errorf(
"%d: shouldRebalanceBasedOnRangeCount on s%d with replicas on %v got %t, expected %t",
i,
tc.s.StoreID,
tc.existingNodeIDs,
actual,
tc.expected,
)
}
}
}
Expand Down
Loading

0 comments on commit 37cdf01

Please sign in to comment.