Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add support for allocator range check via store #92367

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 74 additions & 16 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ func (ae *allocatorError) Error() string {
return b.String()
}

func (*allocatorError) PurgatoryErrorMarker() {}
func (*allocatorError) AllocationErrorMarker() {}
func (*allocatorError) PurgatoryErrorMarker() {}

// allocatorRand pairs a rand.Rand with a mutex.
// NOTE: Allocator is typically only accessed from a single thread (the
Expand Down Expand Up @@ -599,7 +600,20 @@ func GetNeededNonVoters(numVoters, zoneConfigNonVoterCount, clusterNodes int) in
func (a *Allocator) ComputeAction(
ctx context.Context, conf roachpb.SpanConfig, desc *roachpb.RangeDescriptor,
) (action AllocatorAction, priority float64) {
if a.StorePool == nil {
return a.ComputeActionWithStorePool(ctx, a.StorePool, conf, desc)
}

// ComputeActionWithStorePool determines the exact operation needed to repair the
// supplied range using the provided StorePool, as governed by the supplied zone
// configuration. It returns the required action that should be taken and a
// priority.
func (a *Allocator) ComputeActionWithStorePool(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
desc *roachpb.RangeDescriptor,
) (action AllocatorAction, priority float64) {
if storePool == nil {
// Do nothing if storePool is nil for some unittests.
action = AllocatorNoop
return action, action.Priority()
Expand Down Expand Up @@ -657,13 +671,14 @@ func (a *Allocator) ComputeAction(
return action, action.Priority()
}

return a.computeAction(ctx, conf, desc.Replicas().VoterDescriptors(),
return a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(),
desc.Replicas().NonVoterDescriptors())

}

func (a *Allocator) computeAction(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
voterReplicas []roachpb.ReplicaDescriptor,
nonVoterReplicas []roachpb.ReplicaDescriptor,
Expand All @@ -681,10 +696,10 @@ func (a *Allocator) computeAction(
// After that we handle rebalancing related actions, followed by removal
// actions.
haveVoters := len(voterReplicas)
decommissioningVoters := a.StorePool.DecommissioningReplicas(voterReplicas)
decommissioningVoters := storePool.DecommissioningReplicas(voterReplicas)
// Node count including dead nodes but excluding
// decommissioning/decommissioned nodes.
clusterNodes := a.StorePool.ClusterNodeCount()
clusterNodes := storePool.ClusterNodeCount()
neededVoters := GetNeededVoters(conf.GetNumVoters(), clusterNodes)
desiredQuorum := computeQuorum(neededVoters)
quorum := computeQuorum(haveVoters)
Expand All @@ -710,7 +725,7 @@ func (a *Allocator) computeAction(
// heartbeat in the recent past. This means we won't move those replicas
// elsewhere (for a regular rebalance or for decommissioning).
const includeSuspectAndDrainingStores = true
liveVoters, deadVoters := a.StorePool.LiveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores)
liveVoters, deadVoters := storePool.LiveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores)

if len(liveVoters) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of
Expand Down Expand Up @@ -789,7 +804,7 @@ func (a *Allocator) computeAction(
return action, action.Priority()
}

liveNonVoters, deadNonVoters := a.StorePool.LiveAndDeadReplicas(
liveNonVoters, deadNonVoters := storePool.LiveAndDeadReplicas(
nonVoterReplicas, includeSuspectAndDrainingStores,
)
if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 {
Expand All @@ -800,7 +815,7 @@ func (a *Allocator) computeAction(
return action, action.Priority()
}

decommissioningNonVoters := a.StorePool.DecommissioningReplicas(nonVoterReplicas)
decommissioningNonVoters := storePool.DecommissioningReplicas(nonVoterReplicas)
if haveNonVoters == neededNonVoters && len(decommissioningNonVoters) > 0 {
// The range has non-voter(s) on a decommissioning node that we should
// replace.
Expand Down Expand Up @@ -922,12 +937,13 @@ func (s *GoodCandidateSelector) selectOne(cl candidateList) *candidate {

func (a *Allocator) allocateTarget(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string, error) {
candidateStoreList, aliveStoreCount, throttled := a.StorePool.GetStoreList(storepool.StoreFilterThrottled)
candidateStoreList, aliveStoreCount, throttled := storePool.GetStoreList(storepool.StoreFilterThrottled)

// If the replica is alive we are upreplicating, and in that case we want to
// allocate new replicas on the best possible store. Otherwise, the replica is
Expand All @@ -941,8 +957,9 @@ func (a *Allocator) allocateTarget(
selector = a.NewGoodCandidateSelector()
}

target, details := a.AllocateTargetFromList(
target, details := a.allocateTargetFromList(
ctx,
storePool,
candidateStoreList,
conf,
existingVoters,
Expand Down Expand Up @@ -988,7 +1005,20 @@ func (a *Allocator) AllocateVoter(
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
return a.allocateTarget(ctx, a.StorePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
}

// AllocateVoterWithStorePool returns a suitable store for a new allocation of a voting
// replica with the required attributes using the given storePool. Nodes already
// accommodating existing voting replicas are ruled out as targets.
func (a *Allocator) AllocateVoterWithStorePool(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
}

// AllocateNonVoter returns a suitable store for a new allocation of a
Expand All @@ -1000,7 +1030,20 @@ func (a *Allocator) AllocateNonVoter(
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
return a.allocateTarget(ctx, a.StorePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
}

// AllocateNonVoterWithStorePool returns a suitable store for a new allocation of a
// non-voting replica with the required attributes using the given storePool. Nodes already
// accommodating _any_ existing replicas are ruled out as targets.
func (a *Allocator) AllocateNonVoterWithStorePool(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
}

// AllocateTargetFromList returns a suitable store for a new allocation of a
Expand All @@ -1015,16 +1058,31 @@ func (a *Allocator) AllocateTargetFromList(
selector CandidateSelector,
allowMultipleReplsPerNode bool,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string) {
return a.allocateTargetFromList(ctx, a.StorePool, candidateStores, conf, existingVoters,
existingNonVoters, options, selector, allowMultipleReplsPerNode, targetType)
}

func (a *Allocator) allocateTargetFromList(
ctx context.Context,
storePool storepool.AllocatorStorePool,
candidateStores storepool.StoreList,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
options ScorerOptions,
selector CandidateSelector,
allowMultipleReplsPerNode bool,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string) {
existingReplicas := append(existingVoters, existingNonVoters...)
analyzedOverallConstraints := constraint.AnalyzeConstraints(
a.StorePool,
storePool,
existingReplicas,
conf.NumReplicas,
conf.Constraints,
)
analyzedVoterConstraints := constraint.AnalyzeConstraints(
a.StorePool,
storePool,
existingVoters,
conf.GetNumVoters(),
conf.VoterConstraints,
Expand Down Expand Up @@ -1056,8 +1114,8 @@ func (a *Allocator) AllocateTargetFromList(
constraintsChecker,
existingReplicaSet,
existingNonVoters,
a.StorePool.GetLocalitiesByStore(existingReplicaSet),
a.StorePool.IsStoreReadyForRoutineReplicaTransfer,
storePool.GetLocalitiesByStore(existingReplicaSet),
storePool.IsStoreReadyForRoutineReplicaTransfer,
allowMultipleReplsPerNode,
options,
targetType,
Expand Down
Loading