Skip to content

Commit

Permalink
kvserver: teach allocator to prescribe non-voter addition/removal
Browse files Browse the repository at this point in the history
This commit adds basic logic in the allocator to spit out actions that
prescribe upreplication or downreplication of non-voting replicas based
on zone configs.

Note that this commit *does not* enable the `replicateQueue` or the
`StoreRebalancer` to actually act on these newly added
`AllocatorAction`s, not does it make the allocator smart enough to
prescribe _rebalancing_ non-voting replicas. This commit also does not
teach the allocator to rank non-voting replicas for addition or removal.

Release note: None
  • Loading branch information
aayushshah15 committed Jan 25, 2021
1 parent a28f71e commit 6ba4f15
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 56 deletions.
157 changes: 108 additions & 49 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,20 @@ const (
minReplicaWeight = 0.001

// Priorities for various repair operations.
// TODO DURING REVIEW: Do these priorities look sane? Note that these
// priorities only influence the replicateQueue's understanding of which
// ranges are to be dealt with before others. In other words, these priorities
// don't influence the relative order of actions taken on a given range.
finalizeAtomicReplicationChangePriority float64 = 12002
removeLearnerReplicaPriority float64 = 12001
addDeadReplacementVoterPriority float64 = 12000
addMissingVoterPriority float64 = 10000
addDecommissioningReplacementVoterPriority float64 = 5000
removeDeadVoterPriority float64 = 1000
removeDecommissioningVoterPriority float64 = 200
removeExtraVoterPriority float64 = 100
removeDecommissioningVoterPriority float64 = 900
removeExtraVoterPriority float64 = 800
addMissingNonVoterPriority float64 = 700
removeExtraNonVoterPriority float64 = 600
)

// MinLeaseTransferStatsDuration configures the minimum amount of time a
Expand Down Expand Up @@ -100,7 +106,9 @@ const (
_ AllocatorAction = iota
AllocatorNoop
AllocatorRemoveVoter
AllocatorRemoveNonVoter
AllocatorAddVoter
AllocatorAddNonVoter
AllocatorReplaceDeadVoter
AllocatorRemoveDeadVoter
AllocatorReplaceDecommissioningVoter
Expand All @@ -114,7 +122,9 @@ const (
var allocatorActionNames = map[AllocatorAction]string{
AllocatorNoop: "noop",
AllocatorRemoveVoter: "remove voter",
AllocatorRemoveNonVoter: "remove non-voter",
AllocatorAddVoter: "add voter",
AllocatorAddNonVoter: "add non-voter",
AllocatorReplaceDeadVoter: "replace dead voter",
AllocatorRemoveDeadVoter: "remove dead voter",
AllocatorReplaceDecommissioningVoter: "replace decommissioning voter",
Expand Down Expand Up @@ -145,7 +155,8 @@ const (
// can be retried quickly as soon as new stores come online, or additional
// space frees up.
type allocatorError struct {
constraints []zonepb.ConstraintsConjunction
constraints []zonepb.ConstraintsConjunction
// TODO(aayush): !!! This might need to be augmented.
existingReplicas int
aliveStores int
throttledStores int
Expand Down Expand Up @@ -262,11 +273,11 @@ func MakeAllocator(
}
}

// GetNeededReplicas calculates the number of replicas a range should
// have given its zone config and the number of nodes available for
// up-replication (i.e. not dead and not decommissioning).
func GetNeededReplicas(zoneConfigReplicaCount int32, clusterNodes int) int {
numZoneReplicas := int(zoneConfigReplicaCount)
// GetNeededVoters calculates the number of voters a range should have given its
// zone config and the number of nodes available for up-replication (i.e. not
// decommissioning).
func GetNeededVoters(zoneConfigVoterCount int32, clusterNodes int) int {
numZoneReplicas := int(zoneConfigVoterCount)
need := numZoneReplicas

// Adjust the replication factor for all ranges if there are fewer
Expand Down Expand Up @@ -299,6 +310,26 @@ func GetNeededReplicas(zoneConfigReplicaCount int32, clusterNodes int) int {
return need
}

// GetNeededNonVoters calculates the number of non-voters a range should have
// given the number of voting replicas the range has and the number of nodes
// available for up-replication.
//
// If `num_voters` aren't explicitly configured in the zone config, all replicas
// will be voting replicas and this method will always return 0.
//
// NB: This method assumes that we have exactly as many voters as we need, since
// this method should only be consulted after voting replicas have been
// upreplicated / rebalanced off of dead/decommissioning nodes.
func GetNeededNonVoters(numVoters, zoneConfigNumNonVoters, clusterNodes int) int {
need := zoneConfigNumNonVoters
if clusterNodes-numVoters < need {
// We only need non-voting replicas for the nodes that do not have a voting
// replica.
need = clusterNodes - numVoters
}
return need
}

// ComputeAction determines the exact operation needed to repair the
// supplied range, as governed by the supplied zone configuration. It
// returns the required action that should be taken and a priority.
Expand Down Expand Up @@ -357,71 +388,91 @@ func (a *Allocator) ComputeAction(
// removeLearnerReplicaPriority as the highest priority.
return AllocatorRemoveLearner, removeLearnerReplicaPriority
}
// computeAction expects to operate only on voters.
return a.computeAction(ctx, zone, desc.Replicas().VoterDescriptors())
return a.computeAction(ctx, zone, desc.Replicas().VoterDescriptors(),
desc.Replicas().NonVoterDescriptors())
}

func (a *Allocator) computeAction(
ctx context.Context, zone *zonepb.ZoneConfig, voterReplicas []roachpb.ReplicaDescriptor,
ctx context.Context,
zone *zonepb.ZoneConfig,
voterReplicas []roachpb.ReplicaDescriptor,
nonVoterReplicas []roachpb.ReplicaDescriptor,
) (AllocatorAction, float64) {
// TODO(mrtracy): Handle non-homogeneous and mismatched attribute sets.
have := len(voterReplicas)
decommissioningReplicas := a.storePool.decommissioningReplicas(voterReplicas)
haveVoters := len(voterReplicas)
decommissioningVoters := a.storePool.decommissioningReplicas(voterReplicas)
// Node count including dead nodes but excluding decommissioning/decommissioned
// nodes.
clusterNodes := a.storePool.ClusterNodeCount()
need := GetNeededReplicas(*zone.NumReplicas, clusterNodes)
desiredQuorum := computeQuorum(need)
quorum := computeQuorum(have)

if have < need {
// Range is under-replicated, and should add an additional replica.
// Priority is adjusted by the difference between the current replica
// count and the quorum of the desired replica count.
priority := addMissingVoterPriority + float64(desiredQuorum-have)
neededVoters := GetNeededVoters(zone.GetNumVoters(), clusterNodes)
desiredQuorum := computeQuorum(neededVoters)
quorum := computeQuorum(haveVoters)

// TODO(aayush): When haveVoters < neededVoters but we don't have quorum to
// actually execute the addition of a new replica, we should be returning a
// ALlocatorRangeUnavailable.
if haveVoters < neededVoters {
// Range is under-replicated, and should add an additional voter.
// Priority is adjusted by the difference between the current voter
// count and the quorum of the desired voter count.
priority := addMissingVoterPriority + float64(desiredQuorum-haveVoters)
action := AllocatorAddVoter
log.VEventf(ctx, 3, "%s - missing replica need=%d, have=%d, priority=%.2f",
action, need, have, priority)
log.VEventf(ctx, 3, "%s - missing voter need=%d, have=%d, priority=%.2f",
action, neededVoters, haveVoters, priority)
return action, priority
}

liveVoterReplicas, deadVoterReplicas := a.storePool.liveAndDeadReplicas(voterReplicas)

if len(liveVoterReplicas) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of live
// replicas. If we're correctly assessing the unavailable state of the range, we
// also won't be able to add replicas as we try above, but hope springs eternal.
log.VEventf(ctx, 1, "unable to take action - live replicas %v don't meet quorum of %d",
// Do not take any replacement/removal action if we do not have a quorum of
// live voters. If we're correctly assessing the unavailable state of the
// range, we also won't be able to add replicas as we try above, but hope
// springs eternal.
log.VEventf(ctx, 1, "unable to take action - live voters %v don't meet quorum of %d",
liveVoterReplicas, quorum)
return AllocatorRangeUnavailable, 0
}

if have == need && len(deadVoterReplicas) > 0 {
// Range has dead replica(s). We should up-replicate to add another before
if haveVoters == neededVoters && len(deadVoterReplicas) > 0 {
// Range has dead voter(s). We should up-replicate to add another before
// before removing the dead one. This can avoid permanent data loss in cases
// where the node is only temporarily dead, but we remove it from the range
// and lose a second node before we can up-replicate (#25392).
// The dead replica(s) will be down-replicated later.
// The dead voter(s) will be down-replicated later.
priority := addDeadReplacementVoterPriority
action := AllocatorReplaceDeadVoter
log.VEventf(ctx, 3, "%s - replacement for %d dead replicas priority=%.2f",
log.VEventf(ctx, 3, "%s - replacement for %d dead voters priority=%.2f",
action, len(deadVoterReplicas), priority)
return action, priority
}

if have == need && len(decommissioningReplicas) > 0 {
// Range has decommissioning replica(s), which should be replaced.
if haveVoters == neededVoters && len(decommissioningVoters) > 0 {
// Range has decommissioning voter(s), which should be replaced.
priority := addDecommissioningReplacementVoterPriority
action := AllocatorReplaceDecommissioningVoter
log.VEventf(ctx, 3, "%s - replacement for %d decommissioning replicas priority=%.2f",
action, len(decommissioningReplicas), priority)
log.VEventf(ctx, 3, "%s - replacement for %d decommissioning voters priority=%.2f",
action, len(decommissioningVoters), priority)
return action, priority
}

// Non-voting replica addition.
//liveNonVoters, deadNonVoters := a.storePool.liveAndDeadReplicas(nonVoterReplicas)
haveNonVoters := len(nonVoterReplicas)
neededNonVoters := GetNeededNonVoters(haveVoters, int(zone.GetNumNonVoters()), clusterNodes)
if haveNonVoters < neededNonVoters {
priority := addMissingNonVoterPriority
action := AllocatorAddNonVoter
log.VEventf(ctx, 3, "%s - missing non-voter need=%d, have=%d, priority=%.2f",
action, neededNonVoters, haveNonVoters, priority)
return action, priority
}

// Removal actions follow.
// TODO(a-robinson): There's an additional case related to dead replicas that
// we should handle above. If there are one or more dead replicas, have <
// need, and there are no available stores to up-replicate to, then we should
// try to remove the dead replica(s) to get down to an odd number of
// replicas.
// TODO(aayush): There's an additional case related to dead voters that we
// should handle above. If there are one or more dead replicas, have < need,
// and there are no available stores to up-replicate to, then we should try to
// remove the dead replica(s) to get down to an odd number of replicas.
if len(deadVoterReplicas) > 0 {
// The range has dead replicas, which should be removed immediately.
priority := removeDeadVoterPriority + float64(quorum-len(liveVoterReplicas))
Expand All @@ -431,24 +482,32 @@ func (a *Allocator) computeAction(
return action, priority
}

if len(decommissioningReplicas) > 0 {
// Range is over-replicated, and has a decommissioning replica which
if len(decommissioningVoters) > 0 {
// Range is over-replicated, and has a decommissioning voter which
// should be removed.
priority := removeDecommissioningVoterPriority
action := AllocatorRemoveDecommissioningVoter
log.VEventf(ctx, 3,
"%s - need=%d, have=%d, num_decommissioning=%d, priority=%.2f",
action, need, have, len(decommissioningReplicas), priority)
action, neededVoters, haveVoters, len(decommissioningVoters), priority)
return action, priority
}

if have > need {
// Range is over-replicated, and should remove a replica.
// Ranges with an even number of replicas get extra priority because
if haveVoters > neededVoters {
// Range is over-replicated, and should remove a voter.
// Ranges with an even number of voters get extra priority because
// they have a more fragile quorum.
priority := removeExtraVoterPriority - float64(have%2)
priority := removeExtraVoterPriority - float64(haveVoters%2)
action := AllocatorRemoveVoter
log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, need, have, priority)
log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, neededVoters, haveVoters, priority)
return action, priority
}

if haveNonVoters > neededNonVoters {
// Like above, the range is over-replicated but should remove a non-voter.
priority := removeExtraNonVoterPriority
action := AllocatorRemoveNonVoter
log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, neededNonVoters, haveNonVoters, priority)
return action, priority
}

Expand Down
Loading

0 comments on commit 6ba4f15

Please sign in to comment.