Skip to content

Commit

Permalink
kvserver: teach replicateQueue to handle non-voter addition/removal
Browse files Browse the repository at this point in the history
This commit primarily teaches the allocator to be able to rank
non-voting replica candidates for addition and removal. This allows us
to have the replicateQueue execute upon the allocator's actions to add
or remove non-voting replicas to a range.

Note that this commit does not deal with _rebalancing_ of non-voting
replicas, just simple additions and removals when a range is over or
under-replicated.

Release note: None
  • Loading branch information
aayushshah15 committed Feb 6, 2021
1 parent 06b8019 commit 5634c53
Show file tree
Hide file tree
Showing 10 changed files with 646 additions and 288 deletions.
223 changes: 169 additions & 54 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,21 @@ const (
// can be retried quickly as soon as new stores come online, or additional
// space frees up.
type allocatorError struct {
constraints []zonepb.ConstraintsConjunction
existingReplicas int
aliveStores int
throttledStores int
constraints []zonepb.ConstraintsConjunction
voterConstraints []zonepb.ConstraintsConjunction
existingVoterCount int
existingNonVoterCount int
aliveStores int
throttledStores int
}

// TODO(aayush): !!! Add the appropriate string for non-voters.
func (ae *allocatorError) Error() string {
var existingReplsStr string
if ae.existingReplicas == 1 {
existingReplsStr = "1 already has a replica"
if ae.existingVoterCount == 1 {
existingReplsStr = "1 already has a voter"
} else {
existingReplsStr = fmt.Sprintf("%d already have a replica", ae.existingReplicas)
existingReplsStr = fmt.Sprintf("%d already have a voter", ae.existingVoterCount)
}

var baseMsg string
Expand Down Expand Up @@ -538,21 +541,23 @@ type decisionDetails struct {
Existing string `json:",omitempty"`
}

// AllocateTarget returns a suitable store for a new allocation with the
// required attributes. Nodes already accommodating existing replicas are ruled
// out as targets. The range ID of the replica being allocated for is also
// passed in to ensure that we don't try to replace an existing dead replica on
// a store.
//
// TODO(tbg): AllocateReplacement?
func (a *Allocator) AllocateTarget(
ctx context.Context, zone *zonepb.ZoneConfig, existingReplicas []roachpb.ReplicaDescriptor,
) (*roachpb.StoreDescriptor, string, error) {
sl, aliveStoreCount, throttled := a.storePool.getStoreList(storeFilterThrottled)
type chooseReplicaToAddFn func(
ctx context.Context,
candidateStores StoreList,
zone *zonepb.ZoneConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
options scorerOptions) (*roachpb.StoreDescriptor, string)

target, details := a.allocateTargetFromList(
ctx, sl, zone, existingReplicas, a.scorerOptions())
func (a *Allocator) allocateTarget(
ctx context.Context,
zone *zonepb.ZoneConfig,
allocateFromList chooseReplicaToAddFn,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
) (*roachpb.StoreDescriptor, string, error) {
candidateStoreList, aliveStoreCount, throttled := a.storePool.getStoreList(storeFilterThrottled)

target, details := allocateFromList(ctx, candidateStoreList, zone, existingVoters,
existingNonVoters, a.scorerOptions())
if target != nil {
return target, details, nil
}
Expand All @@ -565,30 +570,100 @@ func (a *Allocator) AllocateTarget(
)
}
return nil, "", &allocatorError{
constraints: zone.Constraints,
existingReplicas: len(existingReplicas),
aliveStores: aliveStoreCount,
throttledStores: len(throttled),
voterConstraints: zone.VoterConstraints,
constraints: zone.Constraints,
existingVoterCount: len(existingVoters),
existingNonVoterCount: len(existingNonVoters),
aliveStores: aliveStoreCount,
throttledStores: len(throttled),
}
}

func (a *Allocator) allocateTargetFromList(
// AllocateVoterTarget returns a suitable store for a new allocation of a voting
// replica with the required attributes. Nodes already accommodating existing
// replicas are ruled out as targets.
//
// TODO(aayush): This method currently rules out all the stores that have any
// replica for the given range. However, we want to get to a place where we'll
// consider the targets that have a non-voter as feasible
// relocation/up-replication targets for existing/new voting replicas, since it
// is cheaper to promote a non-voter than it is to add a new voter via the
// LEARNER->VOTER_FULL path.
func (a *Allocator) AllocateVoterTarget(
ctx context.Context,
zone *zonepb.ZoneConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
) (*roachpb.StoreDescriptor, string, error) {
return a.allocateTarget(ctx, zone, a.allocateVotersFromList, existingVoters, existingNonVoters)
}

// AllocateNonVoterTarget returns a suitable store for a new allocation of a
// non-voting replica with the required attributes. Nodes already accommodating
// _any_ existing replicas are ruled out as targets.
func (a *Allocator) AllocateNonVoterTarget(
ctx context.Context,
zone *zonepb.ZoneConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
) (*roachpb.StoreDescriptor, string, error) {
return a.allocateTarget(ctx, zone, a.allocateNonVotersFromList, existingVoters, existingNonVoters)
}

func (a *Allocator) allocateVotersFromList(
ctx context.Context,
candidateStores StoreList,
zone *zonepb.ZoneConfig,
existingReplicas []roachpb.ReplicaDescriptor,
// TODO(aayush): This method needs to be taught to make decisions to swap a
// voter with a non-voter when "appropriate" to do so. We would want to prefer
// promoting an existing non-voter to a voter over adding a new voting
// replica.
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
options scorerOptions,
) (*roachpb.StoreDescriptor, string) {
existingReplicas := append(existingVoters, existingNonVoters...)
// Voting replicas have to abide by both the overall `constraints` (which
// apply to all replicas) and `voter_constraints` which apply only to voting
// replicas.
analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
candidates := allocateCandidates(
ctx,
candidateStores, analyzedConstraints, existingReplicas,
ctx, a.storePool.getStoreDescriptor, existingReplicas, int(*zone.NumReplicas), zone.Constraints)
analyzedVoterConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existingVoters, int(zone.GetNumVoters()), zone.VoterConstraints)

candidates := allocateVoterCandidates(
ctx, candidateStores, analyzedConstraints, analyzedVoterConstraints, existingReplicas,
a.storePool.getLocalitiesByStore(existingReplicas),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
options,
a.storePool.isNodeReadyForRoutineReplicaTransfer, options)
log.VEventf(ctx, 3, "allocate voter candidates: %s", candidates)
if target := candidates.selectGood(a.randGen); target != nil {
log.VEventf(ctx, 3, "add target: %s", target)
details := decisionDetails{Target: target.compactString(options)}
detailsBytes, err := json.Marshal(details)
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing allocate target: %+v", err)
}
return &target.store, string(detailsBytes)
}

return nil, ""
}

func (a *Allocator) allocateNonVotersFromList(
ctx context.Context,
candidateStores StoreList,
zone *zonepb.ZoneConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
options scorerOptions,
) (*roachpb.StoreDescriptor, string) {
existingReplicas := append(existingVoters, existingNonVoters...)
analyzedConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor,
existingReplicas, int(*zone.NumReplicas), nil)

candidates := allocateNonVoterCandidates(
ctx, candidateStores, analyzedConstraints, constraint.EmptyAnalyzedConstraints, existingReplicas,
a.storePool.getLocalitiesByStore(existingReplicas),
a.storePool.isNodeReadyForRoutineReplicaTransfer, options,
)
log.VEventf(ctx, 3, "allocate candidates: %s", candidates)

log.VEventf(ctx, 3, "allocate non-voter candidates: %s", candidates)
if target := candidates.selectGood(a.randGen); target != nil {
log.VEventf(ctx, 3, "add target: %s", target)
details := decisionDetails{Target: target.compactString(options)}
Expand All @@ -602,12 +677,13 @@ func (a *Allocator) allocateTargetFromList(
return nil, ""
}

// TODO(aayush): Generalize this to work for non-voting replicas as well.
func (a Allocator) simulateRemoveTarget(
ctx context.Context,
targetStore roachpb.StoreID,
zone *zonepb.ZoneConfig,
candidates []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
existingVoters []roachpb.ReplicaDescriptor,
rangeUsageInfo RangeUsageInfo,
) (roachpb.ReplicaDescriptor, string, error) {
// Update statistics first
Expand All @@ -620,40 +696,41 @@ func (a Allocator) simulateRemoveTarget(
a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.REMOVE_VOTER)
}()
log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore)
return a.RemoveTarget(ctx, zone, candidates, existingReplicas)
return a.RemoveVoter(ctx, zone, candidates, existingVoters, nil)
}

// RemoveTarget returns a suitable replica to remove from the provided replica
// set. It first attempts to randomly select a target from the set of stores
// that have greater than the average number of replicas. Failing that, it
// falls back to selecting a random target from any of the existing
// replicas.
func (a Allocator) RemoveTarget(
func (a Allocator) removeTarget(
ctx context.Context,
zone *zonepb.ZoneConfig,
candidates []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
existingVoters []roachpb.ReplicaDescriptor,
existingNonVoters []roachpb.ReplicaDescriptor,
constraintsCheck constraintsCheckFn,
) (roachpb.ReplicaDescriptor, string, error) {
if len(candidates) == 0 {
return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()")
return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveVoter()")
}

existingReplicas := append(existingVoters, existingNonVoters...)
// Retrieve store descriptors for the provided candidates from the StorePool.
candidateStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
for i, exist := range candidates {
candidateStoreIDs[i] = exist.StoreID
}
candidateStoreList, _, _ := a.storePool.getStoreListFromIDs(candidateStoreIDs, storeFilterNone)

analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor,
existingReplicas, int(*zone.NumReplicas), zone.Constraints)
analyzedVoterConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor,
existingVoters, int(zone.GetNumVoters()), zone.VoterConstraints)
options := a.scorerOptions()
rankedCandidates := removeCandidates(
rankedCandidates := rankedCandidateListForRemoval(
candidateStoreList,
analyzedConstraints,
a.storePool.getLocalitiesByStore(existingReplicas),
analyzedOverallConstraints, analyzedVoterConstraints,
constraintsCheck,
a.storePool.getLocalitiesByStore(existingVoters),
options,
)

log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates)
if bad := rankedCandidates.selectBad(a.randGen); bad != nil {
for _, exist := range existingReplicas {
Expand All @@ -672,9 +749,48 @@ func (a Allocator) RemoveTarget(
return roachpb.ReplicaDescriptor{}, "", errors.New("could not select an appropriate replica to be removed")
}

type chooseReplicaToRemoveFn func(
ctx context.Context,
zone *zonepb.ZoneConfig,
nonVoterCandidates []roachpb.ReplicaDescriptor,
existingVoters []roachpb.ReplicaDescriptor,
existingNonVoters []roachpb.ReplicaDescriptor,
) (roachpb.ReplicaDescriptor, string, error)

// RemoveVoter returns a suitable replica to remove from the provided replica
// set. It first attempts to randomly select a target from the set of stores
// that have greater than the average number of replicas. Failing that, it
// falls back to selecting a random target from any of the existing
// replicas.
func (a Allocator) RemoveVoter(
ctx context.Context,
zone *zonepb.ZoneConfig,
voterCandidates []roachpb.ReplicaDescriptor,
existingVoters []roachpb.ReplicaDescriptor,
existingNonVoters []roachpb.ReplicaDescriptor,
) (roachpb.ReplicaDescriptor, string, error) {
return a.removeTarget(ctx, zone, voterCandidates, existingVoters, existingNonVoters,
checkVoterConstraintsForRemoval)
}

// RemoveNonVoter returns a suitable non-voting replica to remove from the
// provided set. It first attempts to randomly select a target from the set of
// stores that have greater than the average number of replicas. Failing that,
// it falls back to selecting a random target from any of the existing replicas.
func (a Allocator) RemoveNonVoter(
ctx context.Context,
zone *zonepb.ZoneConfig,
nonVoterCandidates []roachpb.ReplicaDescriptor,
existingVoters []roachpb.ReplicaDescriptor,
existingNonVoters []roachpb.ReplicaDescriptor,
) (roachpb.ReplicaDescriptor, string, error) {
return a.removeTarget(ctx, zone, nonVoterCandidates, existingVoters, existingNonVoters,
checkNonVoterConstraintsForRemoval)
}

// RebalanceTarget returns a suitable store for a rebalance target with
// required attributes. Rebalance targets are selected via the same mechanism
// as AllocateTarget(), except the chosen target must follow some additional
// as AllocateVoterTarget(), except the chosen target must follow some additional
// criteria. Namely, if chosen, it must further the goal of balancing the
// cluster.
//
Expand All @@ -686,7 +802,7 @@ func (a Allocator) RemoveTarget(
// replica to the range, then removing the most undesirable replica.
//
// Simply ignoring a rebalance opportunity in the event that the target chosen
// by AllocateTarget() doesn't fit balancing criteria is perfectly fine, as
// by AllocateVoterTarget() doesn't fit balancing criteria is perfectly fine, as
// other stores in the cluster will also be doing their probabilistic best to
// rebalance. This helps prevent a stampeding herd targeting an abnormally
// under-utilized store.
Expand Down Expand Up @@ -737,9 +853,8 @@ func (a Allocator) RebalanceTarget(
return zero, zero, "", false
}
}

analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
ctx, a.storePool.getStoreDescriptor, existingReplicas, int(*zone.NumReplicas), zone.Constraints)
options := a.scorerOptions()
results := rebalanceCandidates(
ctx,
Expand Down Expand Up @@ -804,7 +919,7 @@ func (a Allocator) RebalanceTarget(
rangeUsageInfo,
)
if err != nil {
log.Warningf(ctx, "simulating RemoveTarget failed: %+v", err)
log.Warningf(ctx, "simulating RemoveVoter failed: %+v", err)
return zero, zero, "", false
}
if target.store.StoreID != removeReplica.StoreID {
Expand Down
Loading

0 comments on commit 5634c53

Please sign in to comment.