Skip to content

Commit

Permalink
allocator: ensure full constraint conformance on replacement
Browse files Browse the repository at this point in the history
This changes the allocator to be fully aware of the replica that is
beingn replaced when allocating new target replicas due to a dead or
decommissioning node. This ensures that if constraints were respected
before the dead or decommissioning node left the cluster, they will
still be respected afterwards, particularly in the case at issue, which
is when partial constraints are set on a range (e.g. `num_replicas = 3,
<some_constraint>: 1`). This is achieved by rejecting candidate
stores to allocate the replacement on when they do not satisfy a
constraint that was satisfied by the existing store. In doing so, this
means that some node decommissions that would previously be able to
execute will now not allow the user to decommission the node and violate
the configured constraints.

Fixes cockroachdb#94809.

Release note (bug fix): Decommissions that would violate constraints set
on a subset of replicas for a range (e.g. `num_replicas = 3,
<constraint>: 1`) will no longer be able to execute, respecting
constraints during and after the decommission.
  • Loading branch information
AlexTalks committed Feb 6, 2023
1 parent f4b0a9f commit 80d6a10
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 108 deletions.
71 changes: 54 additions & 17 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,26 +820,29 @@ func getRemoveIdx(
// voter and non-voter replicas needed to allocate a target for the given action.
// NB: This is a convenience method for callers of allocator.AllocateTarget(..).
func FilterReplicasForAction(
storePool storepool.AllocatorStorePool, desc *roachpb.RangeDescriptor, action AllocatorAction,
) (
filteredVoters, filteredNonVoters []roachpb.ReplicaDescriptor,
isReplacement, nothingToDo bool,
err error,
) {
storePool storepool.AllocatorStorePool,
desc *roachpb.RangeDescriptor,
action AllocatorAction,
) (filteredVoters, filteredNonVoters []roachpb.ReplicaDescriptor, replacing *roachpb.ReplicaDescriptor, nothingToDo bool, err error) {
voterReplicas, nonVoterReplicas,
liveVoterReplicas, deadVoterReplicas,
liveNonVoterReplicas, deadNonVoterReplicas := LiveAndDeadVoterAndNonVoterReplicas(storePool, desc)

removeIdx := -1
_, filteredVoters, filteredNonVoters, removeIdx, nothingToDo, err = DetermineReplicaToReplaceAndFilter(
var existing []roachpb.ReplicaDescriptor
existing, filteredVoters, filteredNonVoters, removeIdx, nothingToDo, err = DetermineReplicaToReplaceAndFilter(
storePool,
action,
voterReplicas, nonVoterReplicas,
liveVoterReplicas, deadVoterReplicas,
liveNonVoterReplicas, deadNonVoterReplicas,
)

return filteredVoters, filteredNonVoters, removeIdx >= 0, nothingToDo, err
if removeIdx >= 0 {
replacing = &existing[removeIdx]
}

return filteredVoters, filteredNonVoters, replacing, nothingToDo, err
}

// ComputeAction determines the exact operation needed to repair the
Expand Down Expand Up @@ -1181,6 +1184,7 @@ func (a *Allocator) AllocateTarget(
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replacing *roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string, error) {
Expand All @@ -1205,6 +1209,7 @@ func (a *Allocator) AllocateTarget(
conf,
existingVoters,
existingNonVoters,
replacing,
a.ScorerOptions(ctx),
selector,
// When allocating a *new* replica, we explicitly disregard nodes with any
Expand Down Expand Up @@ -1274,7 +1279,7 @@ func (a *Allocator) CheckAvoidsFragileQuorum(
roachpb.ReplicaDescriptor{NodeID: newTarget.NodeID, StoreID: newTarget.StoreID},
)

_, _, err := a.AllocateVoter(ctx, storePool, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus)
_, _, err := a.AllocateVoter(ctx, storePool, conf, oldPlusNewReplicas, remainingLiveNonVoters, nil /* replacing */, replicaStatus)
return err
}

Expand All @@ -1289,9 +1294,10 @@ func (a *Allocator) AllocateVoter(
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replacing *roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replacing, replicaStatus, VoterTarget)
}

// AllocateNonVoter returns a suitable store for a new allocation of a
Expand All @@ -1302,9 +1308,10 @@ func (a *Allocator) AllocateNonVoter(
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replacing *roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replacing, replicaStatus, NonVoterTarget)
}

// AllocateTargetFromList returns a suitable store for a new allocation of a
Expand All @@ -1316,13 +1323,14 @@ func (a *Allocator) AllocateTargetFromList(
candidateStores storepool.StoreList,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replacing *roachpb.ReplicaDescriptor,
options ScorerOptions,
selector CandidateSelector,
allowMultipleReplsPerNode bool,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string) {
return a.allocateTargetFromList(ctx, storePool, candidateStores, conf, existingVoters,
existingNonVoters, options, selector, allowMultipleReplsPerNode, targetType)
existingNonVoters, replacing, options, selector, allowMultipleReplsPerNode, targetType)
}

func (a *Allocator) allocateTargetFromList(
Expand All @@ -1331,12 +1339,16 @@ func (a *Allocator) allocateTargetFromList(
candidateStores storepool.StoreList,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replacing *roachpb.ReplicaDescriptor,
options ScorerOptions,
selector CandidateSelector,
allowMultipleReplsPerNode bool,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string) {
existingReplicas := append(existingVoters, existingNonVoters...)
if replacing != nil {
existingReplicas = append(existingReplicas, *replacing)
}
analyzedOverallConstraints := constraint.AnalyzeConstraints(
storePool,
existingReplicas,
Expand All @@ -1350,15 +1362,40 @@ func (a *Allocator) allocateTargetFromList(
conf.VoterConstraints,
)

var replacingStore roachpb.StoreDescriptor
var replacingStoreOK bool
if replacing != nil {
replacingStore, replacingStoreOK = storePool.GetStoreDescriptor(replacing.StoreID)
}

var constraintsChecker constraintsCheckFn
switch t := targetType; t {
case VoterTarget:
constraintsChecker = voterConstraintsCheckerForAllocation(
analyzedOverallConstraints,
analyzedVoterConstraints,
)
// If we are replacing an existing replica, make sure we check the
// constraints to ensure we are not going from a state in which a
// constraint is satisfied to one in which we are not. In this case, we
// consider no candidates to be valid, as no sorting of replicas would lead
// to a satisfying candidate being selected.
if replacing != nil && replacingStoreOK {
constraintsChecker = voterConstraintsCheckerForReplace(
analyzedOverallConstraints,
analyzedVoterConstraints,
replacingStore,
)
} else {
constraintsChecker = voterConstraintsCheckerForAllocation(
analyzedOverallConstraints,
analyzedVoterConstraints,
)
}
case NonVoterTarget:
constraintsChecker = nonVoterConstraintsCheckerForAllocation(analyzedOverallConstraints)
if replacing != nil && replacingStoreOK {
constraintsChecker = nonVoterConstraintsCheckerForReplace(
analyzedOverallConstraints, replacingStore,
)
} else {
constraintsChecker = nonVoterConstraintsCheckerForAllocation(analyzedOverallConstraints)
}
default:
log.KvDistribution.Fatalf(ctx, "unsupported targetReplicaType: %v", t)
}
Expand Down
114 changes: 105 additions & 9 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,15 @@ func rankedCandidateListForAllocation(
}
constraintsOK, necessary := constraintsCheck(s)
if !constraintsOK {
if necessary {
log.KvDistribution.VEventf(
ctx,
3,
"cannot allocate necessary %s on s%d",
targetType,
s.StoreID,
)
}
continue
}

Expand Down Expand Up @@ -1789,15 +1798,43 @@ func nonVoterConstraintsCheckerForRebalance(
}
}

// voterConstraintsCheckerForReplace returns a constraintsCheckFn
// that determines whether a given store is a valid and/or necessary replacement
// candidate for the given store of an existing voting replica.
func voterConstraintsCheckerForReplace(
overallConstraints, voterConstraints constraint.AnalyzedConstraints,
existingStore roachpb.StoreDescriptor,
) constraintsCheckFn {
return func(s roachpb.StoreDescriptor) (valid, necessary bool) {
overallConstraintsOK, necessaryOverall := replaceConstraintsCheck(s, existingStore, overallConstraints)
voterConstraintsOK, necessaryForVoters := replaceConstraintsCheck(s, existingStore, voterConstraints)

return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters
}
}

// nonVoterConstraintsCheckerForReplace returns a constraintsCheckFn
// that determines whether a given store is a valid and/or necessary replacement
// candidate for the given store of an existing non-voting replica.
func nonVoterConstraintsCheckerForReplace(
overallConstraints constraint.AnalyzedConstraints,
existingStore roachpb.StoreDescriptor,
) constraintsCheckFn {
return func(s roachpb.StoreDescriptor) (valid, necessary bool) {
return replaceConstraintsCheck(s, existingStore, overallConstraints)
}
}

// allocateConstraintsCheck checks the potential allocation target store
// against all the constraints. If it matches a constraint at all, it's valid.
// If it matches a constraint that is not already fully satisfied by existing
// replicas, then it's necessary.
//
// NB: This assumes that the sum of all constraints.NumReplicas is equal to
// configured number of replicas for the range, or that there's just one set of
// constraints with NumReplicas set to 0. This is meant to be enforced in the
// config package.
// NB: Formerly there was an assumption that the sum of all
// constraints.NumReplicas was equal to the configured number of replicas for
// the range, or that there was just one set of constraints with NumReplicas
// set to 0, however this is not enforced by the config package and this
// no longer holds, as we may have unconstrained replicas.
func allocateConstraintsCheck(
store roachpb.StoreDescriptor, analyzed constraint.AnalyzedConstraints,
) (valid bool, necessary bool) {
Expand Down Expand Up @@ -1828,6 +1865,57 @@ func allocateConstraintsCheck(
return valid, false
}

// replaceConstraintsCheck checks the potential allocation target store
// for a replacement operation against all the constraints, including checking
// that the candidate store matches a constraint satisfied by the existing
// store. If it matches a constraint, it's valid. If it matches a constraint
// that is not already overly satisfied by existing replicas (other than the
// replacement), then it's necessary. If there are any necessary constraints
// that are not satisfied by the candidate when the existing store did satisfy
// that constraint, then the candidate is considered invalid entirely.
func replaceConstraintsCheck(store, existingStore roachpb.StoreDescriptor,
analyzed constraint.AnalyzedConstraints,
) (valid bool, necessary bool) {
// All stores are valid when there are no constraints.
if len(analyzed.Constraints) == 0 {
return true, false
}

for i, constraints := range analyzed.Constraints {
matchingStores := analyzed.SatisfiedBy[i]
satisfiedByExistingStore := containsStore(matchingStores, existingStore.StoreID)
satisfiedByCandidateStore := constraint.ConjunctionsCheck(
store, constraints.Constraints,
)
if satisfiedByCandidateStore {
valid = true
}

// If the constraint is not already satisfied, it's necessary.
// Additionally, if the constraint is only just satisfied by the existing
// store being replaced, since that store is going away, the constraint is
// also marked as necessary.
if len(matchingStores) < int(constraints.NumReplicas) ||
(len(matchingStores) == int(constraints.NumReplicas) &&
satisfiedByExistingStore) {
necessary = true
}

// Check if existing store matches a constraint that isn't overly satisfied.
// If so, then only replacing it with a satisfying store is valid to ensure
// that the constraint stays fully satisfied.
if necessary && satisfiedByExistingStore && !satisfiedByCandidateStore {
return false, necessary
}
}

if analyzed.UnconstrainedReplicas {
valid = true
}

return valid, necessary
}

// removeConstraintsCheck checks the existing store against the analyzed
// constraints, determining whether it's valid (matches some constraint) and
// necessary (matches some constraint that no other existing replica matches).
Expand Down Expand Up @@ -1867,6 +1955,19 @@ func removeConstraintsCheck(
// against the analyzed constraints, determining whether it's valid whether it
// will be necessary if fromStoreID (an existing replica) is removed from the
// range.
//
// NB: Formerly there was an assumption that the sum of all
// constraints.NumReplicas was equal to the configured number of replicas for
// the range, or that there was just one set of constraints with NumReplicas
// set to 0, however this is not enforced by the config package and this
// no longer holds, as we may have unconstrained replicas.
//
// Note that rebalance, while seemingly similar to replacement, is distinct
// because leaving the replica on the existing store is a valid option.
// Hence, when leaving the existing store (and using it to satisfy a particular
// constraint) is not a possibility such as in the case of a decommissioning or
// dead node, the specialized replacement check is required.
// See replaceConstraintsCheck(..).
func rebalanceFromConstraintsCheck(
store, fromStoreID roachpb.StoreDescriptor, analyzed constraint.AnalyzedConstraints,
) (valid bool, necessary bool) {
Expand All @@ -1879,11 +1980,6 @@ func rebalanceFromConstraintsCheck(
// all, it's valid. If it matches a constraint that is not already fully
// satisfied by existing replicas or that is only fully satisfied because of
// fromStoreID, then it's necessary.
//
// NB: This assumes that the sum of all constraints.NumReplicas is equal to
// configured number of replicas for the range, or that there's just one set
// of constraints with NumReplicas set to 0. This is meant to be enforced in
// the config package.
for i, constraints := range analyzed.Constraints {
if constraintsOK := constraint.ConjunctionsCheck(
store, constraints.Constraints,
Expand Down
Loading

0 comments on commit 80d6a10

Please sign in to comment.