Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: teach allocator to prescribe non-voter addition/removal #59389

Merged
merged 3 commits into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions pkg/config/zonepb/zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,37 @@ func (z ZoneConfig) GetSubzoneForKeySuffix(keySuffix []byte) (*Subzone, int32) {
return nil, -1
}

// GetNumVoters returns the number of voting replicas for the given zone config.
//
// This method will panic if called on a ZoneConfig with an uninitialized
// NumReplicas attribute.
func (z *ZoneConfig) GetNumVoters() int32 {
if z.NumReplicas == nil {
panic("NumReplicas must not be nil")
}
if z.NumVoters != nil && *z.NumVoters != 0 {
return *z.NumVoters
}
return *z.NumReplicas
}

// GetNumNonVoters returns the number of non-voting replicas as defined in the
// zone config.
//
// This method will panic if called on a ZoneConfig with an uninitialized
// NumReplicas attribute.
func (z *ZoneConfig) GetNumNonVoters() int32 {
if z.NumReplicas == nil {
panic("NumReplicas must not be nil")
}
if z.NumVoters != nil && *z.NumVoters != 0 {
return *z.NumReplicas - *z.NumVoters
}
// `num_voters` hasn't been explicitly configured. Every replica should be a
// voting replica.
return 0
}

// SetSubzone installs subzone into the ZoneConfig, overwriting any existing
// subzone with the same IndexID and PartitionName.
func (z *ZoneConfig) SetSubzone(subzone Subzone) {
Expand Down
249 changes: 161 additions & 88 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,23 @@ const (
minReplicaWeight = 0.001

// Priorities for various repair operations.
finalizeAtomicReplicationChangePriority float64 = 12002
removeLearnerReplicaPriority float64 = 12001
addDeadReplacementPriority float64 = 12000
addMissingReplicaPriority float64 = 10000
addDecommissioningReplacementPriority float64 = 5000
removeDeadReplicaPriority float64 = 1000
removeDecommissioningReplicaPriority float64 = 200
removeExtraReplicaPriority float64 = 100
//
// NB: These priorities only influence the replicateQueue's understanding of
// which ranges are to be dealt with before others. In other words, these
// priorities don't influence the relative order of actions taken on a given
// range. Within a given range, the ordering of the various checks inside
// `Allocator.computeAction` determines which repair/rebalancing actions are
// taken before the others.
finalizeAtomicReplicationChangePriority float64 = 12002
removeLearnerReplicaPriority float64 = 12001
addDeadReplacementVoterPriority float64 = 12000
addMissingVoterPriority float64 = 10000
addDecommissioningReplacementVoterPriority float64 = 5000
removeDeadVoterPriority float64 = 1000
removeDecommissioningVoterPriority float64 = 900
removeExtraVoterPriority float64 = 800
addMissingNonVoterPriority float64 = 700
removeExtraNonVoterPriority float64 = 600
)

// MinLeaseTransferStatsDuration configures the minimum amount of time a
Expand Down Expand Up @@ -99,27 +108,33 @@ type AllocatorAction int
const (
_ AllocatorAction = iota
AllocatorNoop
AllocatorRemove
AllocatorAdd
AllocatorReplaceDead
AllocatorRemoveDead
AllocatorReplaceDecommissioning
AllocatorRemoveDecommissioning
AllocatorRemoveVoter
AllocatorRemoveNonVoter
AllocatorAddVoter
AllocatorAddNonVoter
AllocatorReplaceDeadVoter
AllocatorRemoveDeadVoter
AllocatorReplaceDecommissioningVoter
AllocatorRemoveDecommissioningVoter
AllocatorRemoveLearner
AllocatorConsiderRebalance
AllocatorRangeUnavailable
AllocatorFinalizeAtomicReplicationChange
)

var allocatorActionNames = map[AllocatorAction]string{
AllocatorNoop: "noop",
AllocatorRemove: "remove",
AllocatorAdd: "add",
AllocatorReplaceDead: "replace dead",
AllocatorRemoveDead: "remove dead",
AllocatorReplaceDecommissioning: "replace decommissioning",
AllocatorRemoveDecommissioning: "remove decommissioning",
AllocatorRemoveLearner: "remove learner",
AllocatorNoop: "noop",
AllocatorRemoveVoter: "remove voter",
AllocatorRemoveNonVoter: "remove non-voter",
AllocatorAddVoter: "add voter",
AllocatorAddNonVoter: "add non-voter",
AllocatorReplaceDeadVoter: "replace dead voter",
AllocatorRemoveDeadVoter: "remove dead voter",
AllocatorReplaceDecommissioningVoter: "replace decommissioning voter",
AllocatorRemoveDecommissioningVoter: "remove decommissioning voter",
AllocatorRemoveLearner: "remove learner",
// TODO(aayush): Rationalize whether or not rebalancing of non-voters needs to
// be dictated by a distinct allocator action.
AllocatorConsiderRebalance: "consider rebalance",
AllocatorRangeUnavailable: "range unavailable",
AllocatorFinalizeAtomicReplicationChange: "finalize conf change",
Expand Down Expand Up @@ -260,11 +275,11 @@ func MakeAllocator(
}
}

// GetNeededReplicas calculates the number of replicas a range should
// have given its zone config and the number of nodes available for
// up-replication (i.e. not dead and not decommissioning).
func GetNeededReplicas(zoneConfigReplicaCount int32, clusterNodes int) int {
numZoneReplicas := int(zoneConfigReplicaCount)
// GetNeededVoters calculates the number of voters a range should have given its
// zone config and the number of nodes available for up-replication (i.e. not
// decommissioning).
func GetNeededVoters(zoneConfigVoterCount int32, clusterNodes int) int {
numZoneReplicas := int(zoneConfigVoterCount)
need := numZoneReplicas

// Adjust the replication factor for all ranges if there are fewer
Expand Down Expand Up @@ -297,6 +312,23 @@ func GetNeededReplicas(zoneConfigReplicaCount int32, clusterNodes int) int {
return need
}

// GetNeededNonVoters calculates the number of non-voters a range should have
// given the number of voting replicas the range has and the number of nodes
// available for up-replication.
//
// NB: This method assumes that we have exactly as many voters as we need, since
// this method should only be consulted after voting replicas have been
// upreplicated / rebalanced off of dead/decommissioning nodes.
func GetNeededNonVoters(numVoters, zoneConfigNonVoterCount, clusterNodes int) int {
need := zoneConfigNonVoterCount
if clusterNodes-numVoters < need {
// We only need non-voting replicas for the nodes that do not have a voting
// replica.
need = clusterNodes - numVoters
}
return need
}

// ComputeAction determines the exact operation needed to repair the
// supplied range, as governed by the supplied zone configuration. It
// returns the required action that should be taken and a priority.
Expand Down Expand Up @@ -350,103 +382,144 @@ func (a *Allocator) ComputeAction(
// TODO(dan): Since this goes before anything else, the priority here should
// be influenced by whatever operations would happen right after the learner
// is removed. In the meantime, we don't want to block something important
// from happening (like addDeadReplacementPriority) by queueing this at a
// low priority so until this TODO is done, keep
// from happening (like addDeadReplacementVoterPriority) by queueing this at
// a low priority so until this TODO is done, keep
// removeLearnerReplicaPriority as the highest priority.
return AllocatorRemoveLearner, removeLearnerReplicaPriority
}
// computeAction expects to operate only on voters.
return a.computeAction(ctx, zone, desc.Replicas().VoterDescriptors())
return a.computeAction(ctx, zone, desc.Replicas().VoterDescriptors(),
desc.Replicas().NonVoterDescriptors())
}

func (a *Allocator) computeAction(
ctx context.Context, zone *zonepb.ZoneConfig, voterReplicas []roachpb.ReplicaDescriptor,
ctx context.Context,
zone *zonepb.ZoneConfig,
voterReplicas []roachpb.ReplicaDescriptor,
nonVoterReplicas []roachpb.ReplicaDescriptor,
) (AllocatorAction, float64) {
// TODO(mrtracy): Handle non-homogeneous and mismatched attribute sets.
have := len(voterReplicas)
decommissioningReplicas := a.storePool.decommissioningReplicas(voterReplicas)
// NB: The ordering of the checks in this method is intentional. The order in
// which these actions are returned by this method determines the relative
// priority of the actions taken on a given range. We want this to be
// symmetric with regards to the priorities defined at the top of this file
// (which influence the replicateQueue's decision of which range it'll pick to
// repair/rebalance before the others).
//
// In broad strokes, we first handle all voting replica-based actions and then
// the actions pertaining to non-voting replicas. Within each replica set, we
// first handle operations that correspond to repairing/recovering the range.
// After that we handle rebalancing related actions, followed by removal
// actions.
haveVoters := len(voterReplicas)
decommissioningVoters := a.storePool.decommissioningReplicas(voterReplicas)
// Node count including dead nodes but excluding decommissioning/decommissioned
// nodes.
clusterNodes := a.storePool.ClusterNodeCount()
need := GetNeededReplicas(*zone.NumReplicas, clusterNodes)
desiredQuorum := computeQuorum(need)
quorum := computeQuorum(have)

if have < need {
// Range is under-replicated, and should add an additional replica.
// Priority is adjusted by the difference between the current replica
// count and the quorum of the desired replica count.
priority := addMissingReplicaPriority + float64(desiredQuorum-have)
action := AllocatorAdd
log.VEventf(ctx, 3, "%s - missing replica need=%d, have=%d, priority=%.2f",
action, need, have, priority)
neededVoters := GetNeededVoters(zone.GetNumVoters(), clusterNodes)
desiredQuorum := computeQuorum(neededVoters)
quorum := computeQuorum(haveVoters)

// TODO(aayush): When haveVoters < neededVoters but we don't have quorum to
// actually execute the addition of a new replica, we should be returning a
// AllocatorRangeUnavailable.
if haveVoters < neededVoters {
// Range is under-replicated, and should add an additional voter.
// Priority is adjusted by the difference between the current voter
// count and the quorum of the desired voter count.
priority := addMissingVoterPriority + float64(desiredQuorum-haveVoters)
action := AllocatorAddVoter
log.VEventf(ctx, 3, "%s - missing voter need=%d, have=%d, priority=%.2f",
action, neededVoters, haveVoters, priority)
return action, priority
}

liveVoterReplicas, deadVoterReplicas := a.storePool.liveAndDeadReplicas(voterReplicas)
liveVoters, deadVoters := a.storePool.liveAndDeadReplicas(voterReplicas)

if len(liveVoterReplicas) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of live
// replicas. If we're correctly assessing the unavailable state of the range, we
// also won't be able to add replicas as we try above, but hope springs eternal.
log.VEventf(ctx, 1, "unable to take action - live replicas %v don't meet quorum of %d",
liveVoterReplicas, quorum)
if len(liveVoters) < quorum {
// Do not take any replacement/removal action if we do not have a quorum of
// live voters. If we're correctly assessing the unavailable state of the
// range, we also won't be able to add replicas as we try above, but hope
// springs eternal.
log.VEventf(ctx, 1, "unable to take action - live voters %v don't meet quorum of %d",
liveVoters, quorum)
return AllocatorRangeUnavailable, 0
}

if have == need && len(deadVoterReplicas) > 0 {
// Range has dead replica(s). We should up-replicate to add another before
if haveVoters == neededVoters && len(deadVoters) > 0 {
// Range has dead voter(s). We should up-replicate to add another before
// before removing the dead one. This can avoid permanent data loss in cases
// where the node is only temporarily dead, but we remove it from the range
// and lose a second node before we can up-replicate (#25392).
// The dead replica(s) will be down-replicated later.
priority := addDeadReplacementPriority
action := AllocatorReplaceDead
log.VEventf(ctx, 3, "%s - replacement for %d dead replicas priority=%.2f",
action, len(deadVoterReplicas), priority)
// The dead voter(s) will be down-replicated later.
priority := addDeadReplacementVoterPriority
action := AllocatorReplaceDeadVoter
log.VEventf(ctx, 3, "%s - replacement for %d dead voters priority=%.2f",
action, len(deadVoters), priority)
return action, priority
}

if have == need && len(decommissioningReplicas) > 0 {
// Range has decommissioning replica(s), which should be replaced.
priority := addDecommissioningReplacementPriority
action := AllocatorReplaceDecommissioning
log.VEventf(ctx, 3, "%s - replacement for %d decommissioning replicas priority=%.2f",
action, len(decommissioningReplicas), priority)
if haveVoters == neededVoters && len(decommissioningVoters) > 0 {
// Range has decommissioning voter(s), which should be replaced.
priority := addDecommissioningReplacementVoterPriority
action := AllocatorReplaceDecommissioningVoter
log.VEventf(ctx, 3, "%s - replacement for %d decommissioning voters priority=%.2f",
action, len(decommissioningVoters), priority)
return action, priority
}

// Removal actions follow.
// TODO(a-robinson): There's an additional case related to dead replicas that
// we should handle above. If there are one or more dead replicas, have <
// need, and there are no available stores to up-replicate to, then we should
// try to remove the dead replica(s) to get down to an odd number of
// replicas.
if len(deadVoterReplicas) > 0 {
// Voting replica removal actions follow.
// TODO(aayush): There's an additional case related to dead voters that we
// should handle above. If there are one or more dead replicas, have < need,
// and there are no available stores to up-replicate to, then we should try to
// remove the dead replica(s) to get down to an odd number of replicas.
if len(deadVoters) > 0 {
// The range has dead replicas, which should be removed immediately.
priority := removeDeadReplicaPriority + float64(quorum-len(liveVoterReplicas))
action := AllocatorRemoveDead
priority := removeDeadVoterPriority + float64(quorum-len(liveVoters))
action := AllocatorRemoveDeadVoter
log.VEventf(ctx, 3, "%s - dead=%d, live=%d, quorum=%d, priority=%.2f",
action, len(deadVoterReplicas), len(liveVoterReplicas), quorum, priority)
action, len(deadVoters), len(liveVoters), quorum, priority)
return action, priority
}

if len(decommissioningReplicas) > 0 {
// Range is over-replicated, and has a decommissioning replica which
if len(decommissioningVoters) > 0 {
// Range is over-replicated, and has a decommissioning voter which
// should be removed.
priority := removeDecommissioningReplicaPriority
action := AllocatorRemoveDecommissioning
priority := removeDecommissioningVoterPriority
action := AllocatorRemoveDecommissioningVoter
log.VEventf(ctx, 3,
"%s - need=%d, have=%d, num_decommissioning=%d, priority=%.2f",
action, need, have, len(decommissioningReplicas), priority)
action, neededVoters, haveVoters, len(decommissioningVoters), priority)
return action, priority
}

if have > need {
// Range is over-replicated, and should remove a replica.
// Ranges with an even number of replicas get extra priority because
if haveVoters > neededVoters {
// Range is over-replicated, and should remove a voter.
// Ranges with an even number of voters get extra priority because
// they have a more fragile quorum.
priority := removeExtraReplicaPriority - float64(have%2)
action := AllocatorRemove
log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, need, have, priority)
priority := removeExtraVoterPriority - float64(haveVoters%2)
action := AllocatorRemoveVoter
log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, neededVoters, haveVoters, priority)
return action, priority
}

// Non-voting replica actions follow.
//
// Non-voting replica addition.
haveNonVoters := len(nonVoterReplicas)
neededNonVoters := GetNeededNonVoters(haveVoters, int(zone.GetNumNonVoters()), clusterNodes)
if haveNonVoters < neededNonVoters {
priority := addMissingNonVoterPriority
action := AllocatorAddNonVoter
log.VEventf(ctx, 3, "%s - missing non-voter need=%d, have=%d, priority=%.2f",
action, neededNonVoters, haveNonVoters, priority)
return action, priority
}

// Non-voting replica removal.
if haveNonVoters > neededNonVoters {
// Like above, the range is over-replicated but should remove a non-voter.
priority := removeExtraNonVoterPriority
action := AllocatorRemoveNonVoter
log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, neededNonVoters, haveNonVoters, priority)
return action, priority
}

Expand Down
Loading