Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: teach replicateQueue to handle non-voter addition/removal #59403

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 227 additions & 61 deletions pkg/kv/kvserver/allocator.go

Large diffs are not rendered by default.

113 changes: 95 additions & 18 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,13 @@ func (cl candidateList) removeCandidate(c candidate) candidateList {
return cl
}

// allocateCandidates creates a candidate list of all stores that can be used
// for allocating a new replica ordered from the best to the worst. Only
// stores that meet the criteria are included in the list.
func allocateCandidates(
// rankedCandidateListForAllocation creates a candidate list of all stores that
// can be used for allocating a new replica ordered from the best to the worst.
// Only stores that meet the criteria are included in the list.
func rankedCandidateListForAllocation(
ctx context.Context,
candidateStores StoreList,
constraints constraint.AnalyzedConstraints,
constraintsCheck constraintsCheckFn,
existingReplicas []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
Expand All @@ -425,22 +425,25 @@ func allocateCandidates(
log.VEventf(ctx, 3, "not considering non-ready node n%d for allocate", s.Node.NodeID)
continue
}
constraintsOK, necessary := allocateConstraintsCheck(s, constraints)
constraintsOK, necessary := constraintsCheck(s)
if !constraintsOK {
continue
}

if !maxCapacityCheck(s) {
continue
}
diversityScore := diversityAllocateScore(s, existingStoreLocalities)
balanceScore := balanceScore(candidateStores, s.Capacity, options)
var convergesScore int
if options.qpsRebalanceThreshold > 0 {
if s.Capacity.QueriesPerSecond < underfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
if s.Capacity.QueriesPerSecond < underfullThreshold(
candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
convergesScore = 1
} else if s.Capacity.QueriesPerSecond < candidateStores.candidateQueriesPerSecond.mean {
convergesScore = 0
} else if s.Capacity.QueriesPerSecond < overfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
} else if s.Capacity.QueriesPerSecond < overfullThreshold(
candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
convergesScore = -1
} else {
convergesScore = -2
Expand All @@ -464,18 +467,18 @@ func allocateCandidates(
return candidates
}

// removeCandidates creates a candidate list of all existing replicas' stores
// ordered from least qualified for removal to most qualified. Stores that are
// marked as not valid, are in violation of a required criteria.
func removeCandidates(
// rankedCandidateListForRemoval creates a candidate list of all existing
// replicas' stores ordered from least qualified for removal to most qualified.
// Stores that are marked as not valid, are in violation of a required criteria.
func rankedCandidateListForRemoval(
sl StoreList,
constraints constraint.AnalyzedConstraints,
constraintsCheck constraintsCheckFn,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
options scorerOptions,
) candidateList {
var candidates candidateList
for _, s := range sl.stores {
constraintsOK, necessary := removeConstraintsCheck(s, constraints)
constraintsOK, necessary := constraintsCheck(s)
if !constraintsOK {
candidates = append(candidates, candidate{
store: s,
Expand Down Expand Up @@ -707,10 +710,9 @@ func rebalanceCandidates(
balanceScore := balanceScore(comparable.sl, existing.store.Capacity, options)
var convergesScore int
if !rebalanceFromConvergesOnMean(comparable.sl, existing.store.Capacity) {
// Similarly to in removeCandidates, any replica whose removal
// would not converge the range stats to their means is given a
// constraint score boost of 1 to make it less attractive for
// removal.
// Similarly to in rankedCandidateListForRemoval, any replica whose
// removal would not converge the range stats to their means is given a
// constraint score boost of 1 to make it less attractive for removal.
convergesScore = 1
}
existing.convergesScore = convergesScore
Expand Down Expand Up @@ -904,6 +906,78 @@ func sameLocalityAndAttrs(s1, s2 roachpb.StoreDescriptor) bool {
return true
}

// constraintsCheckFn determines whether the given store is a valid and/or
// necessary candidate for an addition of a new replica or a removal of an
// existing one.
type constraintsCheckFn func(roachpb.StoreDescriptor) (valid, necessary bool)

// voterConstraintsCheckerForAllocation returns a constraintsCheckFn that
// determines whether a candidate for a new voting replica is valid and/or
// necessary as per the `voter_constraints` and `constraints` on the range.
//
// NB: Potential voting replica candidates are "valid" only if they satisfy both
// the `voter_constraints` as well as the overall `constraints`. Additionally,
// candidates are only marked "necessary" if they're required in order to
// satisfy either the `voter_constraints` set or the `constraints` set.
func voterConstraintsCheckerForAllocation(
overallConstraints, voterConstraints constraint.AnalyzedConstraints,
) constraintsCheckFn {
return func(s roachpb.StoreDescriptor) (valid, necessary bool) {
overallConstraintsOK, necessaryOverall := allocateConstraintsCheck(s, overallConstraints)
voterConstraintsOK, necessaryForVoters := allocateConstraintsCheck(s, voterConstraints)

return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters
}
}

// nonVoterConstraintsCheckerForAllocation returns a constraintsCheckFn that
// determines whether a candidate for a new non-voting replica is valid and/or
// necessary as per the `constraints` on the range.
//
// NB: Non-voting replicas don't care about `voter_constraints`, so that
// constraint set is entirely disregarded here.
func nonVoterConstraintsCheckerForAllocation(
overallConstraints constraint.AnalyzedConstraints,
) constraintsCheckFn {
return func(s roachpb.StoreDescriptor) (valid, necessary bool) {
return allocateConstraintsCheck(s, overallConstraints)
}
}

// voterConstraintsCheckerForRemoval returns a constraintsCheckFn that
// determines whether an existing voting replica is valid and/or necessary with
// respect to the `constraints` and `voter_constraints` on the range.
//
// NB: Candidates are marked invalid if their removal would result in a
// violation of `voter_constraints` or `constraints` on the range. They are
// marked necessary if constraints conformance (for either `constraints` or
// `voter_constraints`) is not possible without them.
func voterConstraintsCheckerForRemoval(
overallConstraints, voterConstraints constraint.AnalyzedConstraints,
) constraintsCheckFn {
return func(s roachpb.StoreDescriptor) (valid, necessary bool) {
overallConstraintsOK, necessaryOverall := removeConstraintsCheck(s, overallConstraints)
voterConstraintsOK, necessaryForVoters := removeConstraintsCheck(s, voterConstraints)

return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters
}
}

// nonVoterConstraintsCheckerForRemoval returns a constraintsCheckFn that
// determines whether an existing non-voting replica is valid and/or necessary
// with respect to the `constraints` on the range.
//
// NB: Candidates are marked invalid if their removal would result in a
// violation of `constraints` on the range. They are marked necessary if
// constraints conformance (for `constraints`) is not possible without them.
func nonVoterConstraintsCheckerForRemoval(
overallConstraints constraint.AnalyzedConstraints,
) constraintsCheckFn {
return func(s roachpb.StoreDescriptor) (valid, necessary bool) {
return removeConstraintsCheck(s, overallConstraints)
}
}

// allocateConstraintsCheck checks the potential allocation target store
// against all the constraints. If it matches a constraint at all, it's valid.
// If it matches a constraint that is not already fully satisfied by existing
Expand All @@ -927,6 +1001,9 @@ func allocateConstraintsCheck(
); constraintsOK {
valid = true
matchingStores := analyzed.SatisfiedBy[i]
// NB: We check for "<" here instead of "<=" because `matchingStores`
// doesn't include `store`. Thus, `store` is only marked necessary if we
// have strictly fewer matchingStores than we need.
if len(matchingStores) < int(constraints.NumReplicas) {
return true, true
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/allocator_scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,8 @@ func TestAllocateConstraintsCheck(t *testing.T) {
NumReplicas: proto.Int32(tc.zoneNumReplicas),
}
analyzed := constraint.AnalyzeConstraints(
context.Background(), getTestStoreDesc, testStoreReplicas(tc.existing), zone)
context.Background(), getTestStoreDesc, testStoreReplicas(tc.existing),
*zone.NumReplicas, zone.Constraints)
for _, s := range testStores {
valid, necessary := allocateConstraintsCheck(s, analyzed)
if e, a := tc.expectedValid[s.StoreID], valid; e != a {
Expand Down Expand Up @@ -1052,7 +1053,7 @@ func TestRemoveConstraintsCheck(t *testing.T) {
NumReplicas: proto.Int32(tc.zoneNumReplicas),
}
analyzed := constraint.AnalyzeConstraints(
context.Background(), getTestStoreDesc, existing, zone)
context.Background(), getTestStoreDesc, existing, *zone.NumReplicas, zone.Constraints)
for storeID, expected := range tc.expected {
valid, necessary := removeConstraintsCheck(testStores[storeID], analyzed)
if e, a := expected.valid, valid; e != a {
Expand Down
Loading