Skip to content

Commit

Permalink
allocator: refactor allocator to accept StorePool arguments
Browse files Browse the repository at this point in the history
This change adds methods to be to evaluate allocator actions and targets
utilizing a passed-in `StorePool` object, allowing for the allocator to
consider potential scenarios rather than those simply based on the
current liveness.

Depends on #91461, #91965.

Part of #91570.

Release note: None
  • Loading branch information
AlexTalks committed Dec 15, 2022
1 parent d6f98e9 commit 758b541
Show file tree
Hide file tree
Showing 2 changed files with 524 additions and 15 deletions.
87 changes: 72 additions & 15 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,20 @@ func GetNeededNonVoters(numVoters, zoneConfigNonVoterCount, clusterNodes int) in
func (a *Allocator) ComputeAction(
ctx context.Context, conf roachpb.SpanConfig, desc *roachpb.RangeDescriptor,
) (action AllocatorAction, priority float64) {
if a.StorePool == nil {
return a.ComputeActionWithStorePool(ctx, a.StorePool, conf, desc)
}

// ComputeActionWithStorePool determines the exact operation needed to repair the
// supplied range using the provided StorePool, as governed by the supplied zone
// configuration. It returns the required action that should be taken and a
// priority.
func (a *Allocator) ComputeActionWithStorePool(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
desc *roachpb.RangeDescriptor,
) (action AllocatorAction, priority float64) {
if storePool == nil {
// Do nothing if storePool is nil for some unittests.
action = AllocatorNoop
return action, action.Priority()
Expand Down Expand Up @@ -657,13 +670,14 @@ func (a *Allocator) ComputeAction(
return action, action.Priority()
}

return a.computeAction(ctx, conf, desc.Replicas().VoterDescriptors(),
return a.computeAction(ctx, storePool, conf, desc.Replicas().VoterDescriptors(),
desc.Replicas().NonVoterDescriptors())

}

func (a *Allocator) computeAction(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
voterReplicas []roachpb.ReplicaDescriptor,
nonVoterReplicas []roachpb.ReplicaDescriptor,
Expand All @@ -681,10 +695,10 @@ func (a *Allocator) computeAction(
// After that we handle rebalancing related actions, followed by removal
// actions.
haveVoters := len(voterReplicas)
decommissioningVoters := a.StorePool.DecommissioningReplicas(voterReplicas)
decommissioningVoters := storePool.DecommissioningReplicas(voterReplicas)
// Node count including dead nodes but excluding
// decommissioning/decommissioned nodes.
clusterNodes := a.StorePool.ClusterNodeCount()
clusterNodes := storePool.ClusterNodeCount()
neededVoters := GetNeededVoters(conf.GetNumVoters(), clusterNodes)
desiredQuorum := computeQuorum(neededVoters)
quorum := computeQuorum(haveVoters)
Expand All @@ -710,7 +724,7 @@ func (a *Allocator) computeAction(
// heartbeat in the recent past. This means we won't move those replicas
// elsewhere (for a regular rebalance or for decommissioning).
const includeSuspectAndDrainingStores = true
liveVoters, deadVoters := a.StorePool.LiveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores)
liveVoters, deadVoters := storePool.LiveAndDeadReplicas(voterReplicas, includeSuspectAndDrainingStores)

if len(liveVoters) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of
Expand Down Expand Up @@ -789,7 +803,7 @@ func (a *Allocator) computeAction(
return action, action.Priority()
}

liveNonVoters, deadNonVoters := a.StorePool.LiveAndDeadReplicas(
liveNonVoters, deadNonVoters := storePool.LiveAndDeadReplicas(
nonVoterReplicas, includeSuspectAndDrainingStores,
)
if haveNonVoters == neededNonVoters && len(deadNonVoters) > 0 {
Expand All @@ -800,7 +814,7 @@ func (a *Allocator) computeAction(
return action, action.Priority()
}

decommissioningNonVoters := a.StorePool.DecommissioningReplicas(nonVoterReplicas)
decommissioningNonVoters := storePool.DecommissioningReplicas(nonVoterReplicas)
if haveNonVoters == neededNonVoters && len(decommissioningNonVoters) > 0 {
// The range has non-voter(s) on a decommissioning node that we should
// replace.
Expand Down Expand Up @@ -922,12 +936,13 @@ func (s *GoodCandidateSelector) selectOne(cl candidateList) *candidate {

func (a *Allocator) allocateTarget(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string, error) {
candidateStoreList, aliveStoreCount, throttled := a.StorePool.GetStoreList(storepool.StoreFilterThrottled)
candidateStoreList, aliveStoreCount, throttled := storePool.GetStoreList(storepool.StoreFilterThrottled)

// If the replica is alive we are upreplicating, and in that case we want to
// allocate new replicas on the best possible store. Otherwise, the replica is
Expand All @@ -941,8 +956,9 @@ func (a *Allocator) allocateTarget(
selector = a.NewGoodCandidateSelector()
}

target, details := a.AllocateTargetFromList(
target, details := a.allocateTargetFromList(
ctx,
storePool,
candidateStoreList,
conf,
existingVoters,
Expand Down Expand Up @@ -988,7 +1004,20 @@ func (a *Allocator) AllocateVoter(
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
return a.allocateTarget(ctx, a.StorePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
}

// AllocateVoterWithStorePool returns a suitable store for a new allocation of a voting
// replica with the required attributes using the given storePool. Nodes already
// accommodating existing voting replicas are ruled out as targets.
func (a *Allocator) AllocateVoterWithStorePool(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
}

// AllocateNonVoter returns a suitable store for a new allocation of a
Expand All @@ -1000,7 +1029,20 @@ func (a *Allocator) AllocateNonVoter(
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
return a.allocateTarget(ctx, a.StorePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
}

// AllocateNonVoterWithStorePool returns a suitable store for a new allocation of a
// non-voting replica with the required attributes using the given storePool. Nodes already
// accommodating _any_ existing replicas are ruled out as targets.
func (a *Allocator) AllocateNonVoterWithStorePool(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
}

// AllocateTargetFromList returns a suitable store for a new allocation of a
Expand All @@ -1015,16 +1057,31 @@ func (a *Allocator) AllocateTargetFromList(
selector CandidateSelector,
allowMultipleReplsPerNode bool,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string) {
return a.allocateTargetFromList(ctx, a.StorePool, candidateStores, conf, existingVoters,
existingNonVoters, options, selector, allowMultipleReplsPerNode, targetType)
}

func (a *Allocator) allocateTargetFromList(
ctx context.Context,
storePool storepool.AllocatorStorePool,
candidateStores storepool.StoreList,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
options ScorerOptions,
selector CandidateSelector,
allowMultipleReplsPerNode bool,
targetType TargetReplicaType,
) (roachpb.ReplicationTarget, string) {
existingReplicas := append(existingVoters, existingNonVoters...)
analyzedOverallConstraints := constraint.AnalyzeConstraints(
a.StorePool,
storePool,
existingReplicas,
conf.NumReplicas,
conf.Constraints,
)
analyzedVoterConstraints := constraint.AnalyzeConstraints(
a.StorePool,
storePool,
existingVoters,
conf.GetNumVoters(),
conf.VoterConstraints,
Expand Down Expand Up @@ -1056,8 +1113,8 @@ func (a *Allocator) AllocateTargetFromList(
constraintsChecker,
existingReplicaSet,
existingNonVoters,
a.StorePool.GetLocalitiesByStore(existingReplicaSet),
a.StorePool.IsStoreReadyForRoutineReplicaTransfer,
storePool.GetLocalitiesByStore(existingReplicaSet),
storePool.IsStoreReadyForRoutineReplicaTransfer,
allowMultipleReplsPerNode,
options,
targetType,
Expand Down
Loading

0 comments on commit 758b541

Please sign in to comment.