Skip to content

Commit

Permalink
kvserver: Allow rebalances between stores on the same nodes.
Browse files Browse the repository at this point in the history
Closes cockroachdb#6782

This change modifies the replica_queue to allow rebalances between multiple stores within a single node. To make this correct we add a number of guard rails into the Allocator to prevent a rebalance from placing multiple replicas of a range on the same node. This is not desired because in a simple 3x replication scenario a single node crash may result in a whole range becoming unavailable. Unfortunately these changes are not enough and in the naive scenario the allocator ends up in an endless rebalance loop between the two stores on the same node. An additional set of changes were necessary in the allocator heuristics to better detect when the stores on a single node are balanced and stop attempting to move ranges around.

Release note (performance improvement): This change removes the last roadblock to running CockroachDB with multiple stores (i.e. disks) per node. The allocation algorithm now supports intra-node rebalances, which means CRDB can fully utilize the additional stores on the same node.
  • Loading branch information
lunevalex committed Jul 18, 2020
1 parent 02ceadd commit 18b278a
Show file tree
Hide file tree
Showing 6 changed files with 430 additions and 92 deletions.
124 changes: 81 additions & 43 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,8 @@ func (a Allocator) simulateRemoveTarget(
candidates []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
rangeUsageInfo RangeUsageInfo,
) (roachpb.ReplicaDescriptor, string, error) {
storeFilter roachpb.StoreDescriptorFilter,
) (*candidate, string, error) {
// Update statistics first
// TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines,
// but as of October 2017 calls to the Allocator are mostly serialized by the ReplicateQueue
Expand All @@ -537,7 +538,7 @@ func (a Allocator) simulateRemoveTarget(
a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeUsageInfo, roachpb.REMOVE_REPLICA)
}()
log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore)
return a.RemoveTarget(ctx, zone, candidates, existingReplicas)
return a.doRemoveTarget(ctx, zone, candidates, existingReplicas, storeFilter)
}

// RemoveTarget returns a suitable replica to remove from the provided replica
Expand All @@ -551,41 +552,16 @@ func (a Allocator) RemoveTarget(
candidates []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
) (roachpb.ReplicaDescriptor, string, error) {
if len(candidates) == 0 {
return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()")
}

// Retrieve store descriptors for the provided candidates from the StorePool.
existingStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
for i, exist := range candidates {
existingStoreIDs[i] = exist.StoreID
removeCandidate, removeDetails, err := a.doRemoveTarget(ctx, zone, candidates,
existingReplicas, roachpb.NoOpStoreDescriptorFilter{})
if err != nil {
return roachpb.ReplicaDescriptor{}, removeDetails, err
}
sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, storeFilterNone)

analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
options := a.scorerOptions()
rankedCandidates := removeCandidates(
sl,
analyzedConstraints,
a.storePool.getLocalities(existingReplicas),
options,
)
log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates)
if bad := rankedCandidates.selectBad(a.randGen); bad != nil {
for _, exist := range existingReplicas {
if exist.StoreID == bad.store.StoreID {
log.VEventf(ctx, 3, "remove target: %s", bad)
details := decisionDetails{Target: bad.compactString(options)}
detailsBytes, err := json.Marshal(details)
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing remove target: %+v", err)
}
return exist, string(detailsBytes), nil
}
for _, exist := range existingReplicas {
if exist.StoreID == removeCandidate.store.StoreID {
return exist, removeDetails, nil
}
}

return roachpb.ReplicaDescriptor{}, "", errors.New("could not select an appropriate replica to be removed")
}

Expand Down Expand Up @@ -675,14 +651,13 @@ func (a Allocator) RebalanceTarget(
// pretty sure we won't want to remove immediately after adding it.
// If we would, we don't want to actually rebalance to that target.
var target *candidate
var removeReplica roachpb.ReplicaDescriptor
var removalTarget *candidate
var existingCandidates candidateList
for {
target, existingCandidates = bestRebalanceTarget(a.randGen, results)
if target == nil {
return zero, zero, "", false
}

// Add a fake new replica to our copy of the range descriptor so that we can
// simulate the removal logic. If we decide not to go with this target, note
// that this needs to be removed from desc before we try any other target.
Expand Down Expand Up @@ -710,23 +685,26 @@ func (a Allocator) RebalanceTarget(
return zero, zero, "", false
}

var removeDetails string
var err error
removeReplica, removeDetails, err = a.simulateRemoveTarget(
var removeDetails string
var storeFilter roachpb.StoreDescriptorFilter = roachpb.NoOpStoreDescriptorFilter{}
if isSameNodeRebalanceAttempt(existingPlusOneNew, target) {
storeFilter = roachpb.SameNodeStoreDescriptorFilter{NodeID: target.store.Node.NodeID}
}
removalTarget, removeDetails, err = a.simulateRemoveTarget(
ctx,
target.store.StoreID,
zone,
replicaCandidates,
existingPlusOneNew,
rangeUsageInfo,
storeFilter,
)
if err != nil {
log.Warningf(ctx, "simulating RemoveTarget failed: %+v", err)
return zero, zero, "", false
}
if target.store.StoreID != removeReplica.StoreID {
// Successfully populated these variables
_, _ = target, removeReplica
if target.store.StoreID != removalTarget.store.StoreID && removalTarget.less(*target) {
break
}

Expand All @@ -750,8 +728,8 @@ func (a Allocator) RebalanceTarget(
StoreID: target.store.StoreID,
}
removeTarget := roachpb.ReplicationTarget{
NodeID: removeReplica.NodeID,
StoreID: removeReplica.StoreID,
NodeID: removalTarget.store.Node.NodeID,
StoreID: removalTarget.store.StoreID,
}
return addTarget, removeTarget, string(detailsBytes), true
}
Expand Down Expand Up @@ -1378,3 +1356,63 @@ func maxReplicaID(replicas []roachpb.ReplicaDescriptor) roachpb.ReplicaID {
}
return max
}

// doRemoveTarget returns a suitable replica to remove from the provided replica
// set. It first attempts to randomly select a target from the set of stores
// that have greater than the average number of replicas. Failing that, it
// falls back to selecting a random target from any of the existing
// replicas.
func (a Allocator) doRemoveTarget(
ctx context.Context,
zone *zonepb.ZoneConfig,
candidates []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
storeFilter roachpb.StoreDescriptorFilter,
) (*candidate, string, error) {
if len(candidates) == 0 {
return nil, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()")
}

// Retrieve store descriptors for the provided candidates from the StorePool.
existingStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
for i, exist := range candidates {
existingStoreIDs[i] = exist.StoreID
}
sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, storeFilterNone)

analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
options := a.scorerOptions()
rankedCandidates := removeCandidates(
sl,
analyzedConstraints,
a.storePool.getLocalities(existingReplicas),
options,
storeFilter,
)
log.VEventf(ctx, 3, "remove candidates: %s", rankedCandidates)
if bad := rankedCandidates.selectBad(a.randGen); bad != nil {
log.VEventf(ctx, 3, "remove target: %s", bad)
details := decisionDetails{Target: bad.compactString(options)}
detailsBytes, err := json.Marshal(details)
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing remove target: %+v", err)
}
return bad, string(detailsBytes), nil
}

return nil, "", errors.New("could not select an appropriate replica to be removed")
}

// isSameNodeRebalanceAttempt returns true if the target candidate is on the
//same node as an existing replica.
func isSameNodeRebalanceAttempt(
existingReplicas []roachpb.ReplicaDescriptor, target *candidate,
) bool {
for _, repl := range existingReplicas {
if target.store.Node.NodeID == repl.NodeID && target.store.StoreID != repl.StoreID {
return true
}
}
return false
}
28 changes: 18 additions & 10 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ func (c candidate) compare(o candidate) float64 {
if c.rangeCount == 0 && o.rangeCount == 0 {
return 0
}
// For same node rebalances we need to manually consider the
// minRangeRebalanceThreshold, since in lopsided clusters their under/over
// replication is not properly captured in the balanceScore.
if c.store.Node.NodeID == o.store.Node.NodeID && c.store.Node.NodeID != 0 {
if c.rangeCount < o.rangeCount {
if o.rangeCount-c.rangeCount <= minRangeRebalanceThreshold {
return 0
}
} else if c.rangeCount-o.rangeCount <= minRangeRebalanceThreshold {
return 0
}
}
if c.rangeCount < o.rangeCount {
return float64(o.rangeCount-c.rangeCount) / float64(o.rangeCount)
}
Expand Down Expand Up @@ -465,6 +477,7 @@ func removeCandidates(
constraints constraint.AnalyzedConstraints,
existingNodeLocalities map[roachpb.NodeID]roachpb.Locality,
options scorerOptions,
storeFilter roachpb.StoreDescriptorFilter,
) candidateList {
var candidates candidateList
for _, s := range sl.stores {
Expand All @@ -478,6 +491,11 @@ func removeCandidates(
})
continue
}
// If the candidate does not pass the filter then we cannot remove it,
// as in it must be a replica that remains.
if !storeFilter.Filter(s) {
continue
}
diversityScore := diversityRemovalScore(s.Node.NodeID, existingNodeLocalities)
balanceScore := balanceScore(sl, s.Capacity, options)
var convergesScore int
Expand Down Expand Up @@ -610,16 +628,6 @@ func rebalanceCandidates(
}
var comparableCands candidateList
for _, store := range allStores.stores {
// Nodes that already have a replica on one of their stores aren't valid
// rebalance targets. We do include stores that currently have a replica
// because we want them to be considered as valid stores in the
// ConvergesOnMean calculations below. This is subtle but important.
if nodeHasReplica(store.Node.NodeID, existingReplicas) &&
!storeHasReplica(store.StoreID, existingReplicas) {
log.VEventf(ctx, 2, "nodeHasReplica(n%d, %v)=true",
store.Node.NodeID, existingReplicas)
continue
}
constraintsOK, necessary := rebalanceFromConstraintsCheck(
store, existing.cand.store.StoreID, constraints)
maxCapacityOK := maxCapacityCheck(store)
Expand Down
Loading

0 comments on commit 18b278a

Please sign in to comment.