Skip to content

Commit

Permalink
Merge #59389
Browse files Browse the repository at this point in the history
59389: kvserver: teach allocator to prescribe non-voter addition/removal r=aayushshah15 a=aayushshah15

This PR adds basic logic in the allocator to spit out actions that
prescribe upreplication or downreplication of non-voting replicas based
on zone configs.

Note that this commit *does not* enable the `replicateQueue` or the
`StoreRebalancer` to actually act on these newly added
`AllocatorAction`s, not does it make the allocator smart enough to
prescribe _rebalancing_ non-voting replicas. This commit also does not
teach the allocator to rank non-voting replicas for addition or removal.

Release note: None


Co-authored-by: Aayush Shah <[email protected]>
  • Loading branch information
craig[bot] and aayushshah15 committed Feb 9, 2021
2 parents a8532bc + dd4ff5d commit 7853fd3
Show file tree
Hide file tree
Showing 7 changed files with 398 additions and 132 deletions.
31 changes: 31 additions & 0 deletions pkg/config/zonepb/zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,37 @@ func (z ZoneConfig) GetSubzoneForKeySuffix(keySuffix []byte) (*Subzone, int32) {
return nil, -1
}

// GetNumVoters returns the number of voting replicas for the given zone config.
//
// This method will panic if called on a ZoneConfig with an uninitialized
// NumReplicas attribute.
func (z *ZoneConfig) GetNumVoters() int32 {
if z.NumReplicas == nil {
panic("NumReplicas must not be nil")
}
if z.NumVoters != nil && *z.NumVoters != 0 {
return *z.NumVoters
}
return *z.NumReplicas
}

// GetNumNonVoters returns the number of non-voting replicas as defined in the
// zone config.
//
// This method will panic if called on a ZoneConfig with an uninitialized
// NumReplicas attribute.
func (z *ZoneConfig) GetNumNonVoters() int32 {
if z.NumReplicas == nil {
panic("NumReplicas must not be nil")
}
if z.NumVoters != nil && *z.NumVoters != 0 {
return *z.NumReplicas - *z.NumVoters
}
// `num_voters` hasn't been explicitly configured. Every replica should be a
// voting replica.
return 0
}

// SetSubzone installs subzone into the ZoneConfig, overwriting any existing
// subzone with the same IndexID and PartitionName.
func (z *ZoneConfig) SetSubzone(subzone Subzone) {
Expand Down
249 changes: 161 additions & 88 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,23 @@ const (
minReplicaWeight = 0.001

// Priorities for various repair operations.
finalizeAtomicReplicationChangePriority float64 = 12002
removeLearnerReplicaPriority float64 = 12001
addDeadReplacementPriority float64 = 12000
addMissingReplicaPriority float64 = 10000
addDecommissioningReplacementPriority float64 = 5000
removeDeadReplicaPriority float64 = 1000
removeDecommissioningReplicaPriority float64 = 200
removeExtraReplicaPriority float64 = 100
//
// NB: These priorities only influence the replicateQueue's understanding of
// which ranges are to be dealt with before others. In other words, these
// priorities don't influence the relative order of actions taken on a given
// range. Within a given range, the ordering of the various checks inside
// `Allocator.computeAction` determines which repair/rebalancing actions are
// taken before the others.
finalizeAtomicReplicationChangePriority float64 = 12002
removeLearnerReplicaPriority float64 = 12001
addDeadReplacementVoterPriority float64 = 12000
addMissingVoterPriority float64 = 10000
addDecommissioningReplacementVoterPriority float64 = 5000
removeDeadVoterPriority float64 = 1000
removeDecommissioningVoterPriority float64 = 900
removeExtraVoterPriority float64 = 800
addMissingNonVoterPriority float64 = 700
removeExtraNonVoterPriority float64 = 600
)

// MinLeaseTransferStatsDuration configures the minimum amount of time a
Expand Down Expand Up @@ -99,27 +108,33 @@ type AllocatorAction int
const (
_ AllocatorAction = iota
AllocatorNoop
AllocatorRemove
AllocatorAdd
AllocatorReplaceDead
AllocatorRemoveDead
AllocatorReplaceDecommissioning
AllocatorRemoveDecommissioning
AllocatorRemoveVoter
AllocatorRemoveNonVoter
AllocatorAddVoter
AllocatorAddNonVoter
AllocatorReplaceDeadVoter
AllocatorRemoveDeadVoter
AllocatorReplaceDecommissioningVoter
AllocatorRemoveDecommissioningVoter
AllocatorRemoveLearner
AllocatorConsiderRebalance
AllocatorRangeUnavailable
AllocatorFinalizeAtomicReplicationChange
)

var allocatorActionNames = map[AllocatorAction]string{
AllocatorNoop: "noop",
AllocatorRemove: "remove",
AllocatorAdd: "add",
AllocatorReplaceDead: "replace dead",
AllocatorRemoveDead: "remove dead",
AllocatorReplaceDecommissioning: "replace decommissioning",
AllocatorRemoveDecommissioning: "remove decommissioning",
AllocatorRemoveLearner: "remove learner",
AllocatorNoop: "noop",
AllocatorRemoveVoter: "remove voter",
AllocatorRemoveNonVoter: "remove non-voter",
AllocatorAddVoter: "add voter",
AllocatorAddNonVoter: "add non-voter",
AllocatorReplaceDeadVoter: "replace dead voter",
AllocatorRemoveDeadVoter: "remove dead voter",
AllocatorReplaceDecommissioningVoter: "replace decommissioning voter",
AllocatorRemoveDecommissioningVoter: "remove decommissioning voter",
AllocatorRemoveLearner: "remove learner",
// TODO(aayush): Rationalize whether or not rebalancing of non-voters needs to
// be dictated by a distinct allocator action.
AllocatorConsiderRebalance: "consider rebalance",
AllocatorRangeUnavailable: "range unavailable",
AllocatorFinalizeAtomicReplicationChange: "finalize conf change",
Expand Down Expand Up @@ -260,11 +275,11 @@ func MakeAllocator(
}
}

// GetNeededReplicas calculates the number of replicas a range should
// have given its zone config and the number of nodes available for
// up-replication (i.e. not dead and not decommissioning).
func GetNeededReplicas(zoneConfigReplicaCount int32, clusterNodes int) int {
numZoneReplicas := int(zoneConfigReplicaCount)
// GetNeededVoters calculates the number of voters a range should have given its
// zone config and the number of nodes available for up-replication (i.e. not
// decommissioning).
func GetNeededVoters(zoneConfigVoterCount int32, clusterNodes int) int {
numZoneReplicas := int(zoneConfigVoterCount)
need := numZoneReplicas

// Adjust the replication factor for all ranges if there are fewer
Expand Down Expand Up @@ -297,6 +312,23 @@ func GetNeededReplicas(zoneConfigReplicaCount int32, clusterNodes int) int {
return need
}

// GetNeededNonVoters calculates the number of non-voters a range should have
// given the number of voting replicas the range has and the number of nodes
// available for up-replication.
//
// NB: This method assumes that we have exactly as many voters as we need, since
// this method should only be consulted after voting replicas have been
// upreplicated / rebalanced off of dead/decommissioning nodes.
func GetNeededNonVoters(numVoters, zoneConfigNonVoterCount, clusterNodes int) int {
need := zoneConfigNonVoterCount
if clusterNodes-numVoters < need {
// We only need non-voting replicas for the nodes that do not have a voting
// replica.
need = clusterNodes - numVoters
}
return need
}

// ComputeAction determines the exact operation needed to repair the
// supplied range, as governed by the supplied zone configuration. It
// returns the required action that should be taken and a priority.
Expand Down Expand Up @@ -350,103 +382,144 @@ func (a *Allocator) ComputeAction(
// TODO(dan): Since this goes before anything else, the priority here should
// be influenced by whatever operations would happen right after the learner
// is removed. In the meantime, we don't want to block something important
// from happening (like addDeadReplacementPriority) by queueing this at a
// low priority so until this TODO is done, keep
// from happening (like addDeadReplacementVoterPriority) by queueing this at
// a low priority so until this TODO is done, keep
// removeLearnerReplicaPriority as the highest priority.
return AllocatorRemoveLearner, removeLearnerReplicaPriority
}
// computeAction expects to operate only on voters.
return a.computeAction(ctx, zone, desc.Replicas().VoterDescriptors())
return a.computeAction(ctx, zone, desc.Replicas().VoterDescriptors(),
desc.Replicas().NonVoterDescriptors())
}

func (a *Allocator) computeAction(
ctx context.Context, zone *zonepb.ZoneConfig, voterReplicas []roachpb.ReplicaDescriptor,
ctx context.Context,
zone *zonepb.ZoneConfig,
voterReplicas []roachpb.ReplicaDescriptor,
nonVoterReplicas []roachpb.ReplicaDescriptor,
) (AllocatorAction, float64) {
// TODO(mrtracy): Handle non-homogeneous and mismatched attribute sets.
have := len(voterReplicas)
decommissioningReplicas := a.storePool.decommissioningReplicas(voterReplicas)
// NB: The ordering of the checks in this method is intentional. The order in
// which these actions are returned by this method determines the relative
// priority of the actions taken on a given range. We want this to be
// symmetric with regards to the priorities defined at the top of this file
// (which influence the replicateQueue's decision of which range it'll pick to
// repair/rebalance before the others).
//
// In broad strokes, we first handle all voting replica-based actions and then
// the actions pertaining to non-voting replicas. Within each replica set, we
// first handle operations that correspond to repairing/recovering the range.
// After that we handle rebalancing related actions, followed by removal
// actions.
haveVoters := len(voterReplicas)
decommissioningVoters := a.storePool.decommissioningReplicas(voterReplicas)
// Node count including dead nodes but excluding decommissioning/decommissioned
// nodes.
clusterNodes := a.storePool.ClusterNodeCount()
need := GetNeededReplicas(*zone.NumReplicas, clusterNodes)
desiredQuorum := computeQuorum(need)
quorum := computeQuorum(have)

if have < need {
// Range is under-replicated, and should add an additional replica.
// Priority is adjusted by the difference between the current replica
// count and the quorum of the desired replica count.
priority := addMissingReplicaPriority + float64(desiredQuorum-have)
action := AllocatorAdd
log.VEventf(ctx, 3, "%s - missing replica need=%d, have=%d, priority=%.2f",
action, need, have, priority)
neededVoters := GetNeededVoters(zone.GetNumVoters(), clusterNodes)
desiredQuorum := computeQuorum(neededVoters)
quorum := computeQuorum(haveVoters)

// TODO(aayush): When haveVoters < neededVoters but we don't have quorum to
// actually execute the addition of a new replica, we should be returning a
// AllocatorRangeUnavailable.
if haveVoters < neededVoters {
// Range is under-replicated, and should add an additional voter.
// Priority is adjusted by the difference between the current voter
// count and the quorum of the desired voter count.
priority := addMissingVoterPriority + float64(desiredQuorum-haveVoters)
action := AllocatorAddVoter
log.VEventf(ctx, 3, "%s - missing voter need=%d, have=%d, priority=%.2f",
action, neededVoters, haveVoters, priority)
return action, priority
}

liveVoterReplicas, deadVoterReplicas := a.storePool.liveAndDeadReplicas(voterReplicas)
liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas)

if len(liveVoterReplicas) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of live
// replicas. If we're correctly assessing the unavailable state of the range, we
// also won't be able to add replicas as we try above, but hope springs eternal.
log.VEventf(ctx, 1, "unable to take action - live replicas %v don't meet quorum of %d",
liveVoterReplicas, quorum)
if len(liveVoters) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of
// live voters. If we're correctly assessing the unavailable state of the
// range, we also won't be able to add replicas as we try above, but hope
// springs eternal.
log.VEventf(ctx, 1, "unable to take action - live voters %v don't meet quorum of %d",
liveVoters, quorum)
return AllocatorRangeUnavailable, 0
}

if have == need && len(deadVoterReplicas) > 0 {
// Range has dead replica(s). We should up-replicate to add another before
if haveVoters == neededVoters && len(deadVoters) > 0 {
// Range has dead voter(s). We should up-replicate to add another before
// before removing the dead one. This can avoid permanent data loss in cases
// where the node is only temporarily dead, but we remove it from the range
// and lose a second node before we can up-replicate (#25392).
// The dead replica(s) will be down-replicated later.
priority := addDeadReplacementPriority
action := AllocatorReplaceDead
log.VEventf(ctx, 3, "%s - replacement for %d dead replicas priority=%.2f",
action, len(deadVoterReplicas), priority)
// The dead voter(s) will be down-replicated later.
priority := addDeadReplacementVoterPriority
action := AllocatorReplaceDeadVoter
log.VEventf(ctx, 3, "%s - replacement for %d dead voters priority=%.2f",
action, len(deadVoters), priority)
return action, priority
}

if have == need && len(decommissioningReplicas) > 0 {
// Range has decommissioning replica(s), which should be replaced.
priority := addDecommissioningReplacementPriority
action := AllocatorReplaceDecommissioning
log.VEventf(ctx, 3, "%s - replacement for %d decommissioning replicas priority=%.2f",
action, len(decommissioningReplicas), priority)
if haveVoters == neededVoters && len(decommissioningVoters) > 0 {
// Range has decommissioning voter(s), which should be replaced.
priority := addDecommissioningReplacementVoterPriority
action := AllocatorReplaceDecommissioningVoter
log.VEventf(ctx, 3, "%s - replacement for %d decommissioning voters priority=%.2f",
action, len(decommissioningVoters), priority)
return action, priority
}

// Removal actions follow.
// TODO(a-robinson): There's an additional case related to dead replicas that
// we should handle above. If there are one or more dead replicas, have <
// need, and there are no available stores to up-replicate to, then we should
// try to remove the dead replica(s) to get down to an odd number of
// replicas.
if len(deadVoterReplicas) > 0 {
// Voting replica removal actions follow.
// TODO(aayush): There's an additional case related to dead voters that we
// should handle above. If there are one or more dead replicas, have < need,
// and there are no available stores to up-replicate to, then we should try to
// remove the dead replica(s) to get down to an odd number of replicas.
if len(deadVoters) > 0 {
// The range has dead replicas, which should be removed immediately.
priority := removeDeadReplicaPriority + float64(quorum-len(liveVoterReplicas))
action := AllocatorRemoveDead
priority := removeDeadVoterPriority + float64(quorum-len(liveVoters))
action := AllocatorRemoveDeadVoter
log.VEventf(ctx, 3, "%s - dead=%d, live=%d, quorum=%d, priority=%.2f",
action, len(deadVoterReplicas), len(liveVoterReplicas), quorum, priority)
action, len(deadVoters), len(liveVoters), quorum, priority)
return action, priority
}

if len(decommissioningReplicas) > 0 {
// Range is over-replicated, and has a decommissioning replica which
if len(decommissioningVoters) > 0 {
// Range is over-replicated, and has a decommissioning voter which
// should be removed.
priority := removeDecommissioningReplicaPriority
action := AllocatorRemoveDecommissioning
priority := removeDecommissioningVoterPriority
action := AllocatorRemoveDecommissioningVoter
log.VEventf(ctx, 3,
"%s - need=%d, have=%d, num_decommissioning=%d, priority=%.2f",
action, need, have, len(decommissioningReplicas), priority)
action, neededVoters, haveVoters, len(decommissioningVoters), priority)
return action, priority
}

if have > need {
// Range is over-replicated, and should remove a replica.
// Ranges with an even number of replicas get extra priority because
if haveVoters > neededVoters {
// Range is over-replicated, and should remove a voter.
// Ranges with an even number of voters get extra priority because
// they have a more fragile quorum.
priority := removeExtraReplicaPriority - float64(have%2)
action := AllocatorRemove
log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, need, have, priority)
priority := removeExtraVoterPriority - float64(haveVoters%2)
action := AllocatorRemoveVoter
log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, neededVoters, haveVoters, priority)
return action, priority
}

// Non-voting replica actions follow.
//
// Non-voting replica addition.
haveNonVoters := len(nonVoterReplicas)
neededNonVoters := GetNeededNonVoters(haveVoters, int(zone.GetNumNonVoters()), clusterNodes)
if haveNonVoters < neededNonVoters {
priority := addMissingNonVoterPriority
action := AllocatorAddNonVoter
log.VEventf(ctx, 3, "%s - missing non-voter need=%d, have=%d, priority=%.2f",
action, neededNonVoters, haveNonVoters, priority)
return action, priority
}

// Non-voting replica removal.
if haveNonVoters > neededNonVoters {
// Like above, the range is over-replicated but should remove a non-voter.
priority := removeExtraNonVoterPriority
action := AllocatorRemoveNonVoter
log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, neededNonVoters, haveNonVoters, priority)
return action, priority
}

Expand Down
Loading

0 comments on commit 7853fd3

Please sign in to comment.