Skip to content

Commit

Permalink
storage: Re-apply the rule solver.
Browse files Browse the repository at this point in the history
Instead of applying 1ef40f3
or cockroachdb#10252, this finishes the reapplication of the rule solver. However, this
also puts the rule solver under the environment flag
COCKROACH_ENABLE_RULE_SOLVER for ease of testing and defaults to not enabled.

This commit re-applies the rule solver, specifically the following commits:

1) 4446345
storage: add constraint rule solver for allocation

Rules are represented as a single function that returns the candidacy of the
store as well as a float value representing the score. These scores are then
aggregated from all rules and returns the stores sorted by them.

Current rules:
- ruleReplicasUniqueNodes ensures that no two replicas are put on the same node.
- ruleConstraints enforces that required and prohibited constraints are
  followed, and that stores with more positive constraints are ranked higher.
- ruleDiversity ensures that nodes that have the fewest locality tiers in common
  are given higher priority.
- ruleCapacity prioritizes placing data on empty nodes when the choice is
  available and prevents data from going onto mostly full nodes.

2) dd3229a
storage: implemented RuleSolver into allocator

The follow up to this commit is cockroachdb#10275 and a lot of testing to ensure that the
rule solver does indeed perform as expected.

Closes cockroachdb#9336
  • Loading branch information
BramGruneir committed Nov 9, 2016
1 parent 28a31c0 commit e651674
Show file tree
Hide file tree
Showing 8 changed files with 1,619 additions and 472 deletions.
217 changes: 203 additions & 14 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package storage

import (
"fmt"
"math"
"math/rand"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/pkg/errors"
Expand Down Expand Up @@ -123,28 +125,37 @@ type AllocatorOptions struct {
// AllowRebalance allows this store to attempt to rebalance its own
// replicas to other stores.
AllowRebalance bool

// UseRuleSolver enables this store to use the updated rules based
// constraint solver instead of the original rebalancer.
UseRuleSolver bool
}

// Allocator tries to spread replicas as evenly as possible across the stores
// in the cluster.
type Allocator struct {
storePool *StorePool
randGen allocatorRand
options AllocatorOptions
storePool *StorePool
randGen allocatorRand
options AllocatorOptions
ruleSolver ruleSolver
}

// MakeAllocator creates a new allocator using the specified StorePool.
func MakeAllocator(storePool *StorePool, options AllocatorOptions) Allocator {
var randSource rand.Source
// There are number of test cases that make a test store but don't add
// gossip or a store pool. So we can't rely on the existence of the
// store pool in those cases.
if storePool != nil && storePool.deterministic {
randSource = rand.NewSource(777)
} else {
randSource = rand.NewSource(rand.Int63())
}
return Allocator{
storePool: storePool,
options: options,
randGen: makeAllocatorRand(randSource),
storePool: storePool,
options: options,
randGen: makeAllocatorRand(randSource),
ruleSolver: makeRuleSolver(defaultSolverRules),
}
}

Expand Down Expand Up @@ -203,6 +214,28 @@ func (a *Allocator) AllocateTarget(
rangeID roachpb.RangeID,
relaxConstraints bool,
) (*roachpb.StoreDescriptor, error) {
if a.options.UseRuleSolver {
sl, _, throttledStoreCount := a.storePool.getStoreList(rangeID)
// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
}

candidates, err := a.ruleSolver.Solve(sl, constraints, existing)
if err != nil {
return nil, err
}

if len(candidates) == 0 {
return nil, &allocatorError{
required: constraints.Constraints,
}
}
// TODO(bram): #10275 Need some randomness here!
return &candidates[0].store, nil
}

existingNodes := make(nodeIDSet, len(existing))
for _, repl := range existing {
existingNodes[repl.NodeID] = struct{}{}
Expand Down Expand Up @@ -244,12 +277,55 @@ func (a *Allocator) AllocateTarget(
// make correct decisions in the case of ranges with heterogeneous replica
// requirements (i.e. multiple data centers).
func (a Allocator) RemoveTarget(
existing []roachpb.ReplicaDescriptor, leaseStoreID roachpb.StoreID,
constraints config.Constraints,
existing []roachpb.ReplicaDescriptor,
leaseStoreID roachpb.StoreID,
) (roachpb.ReplicaDescriptor, error) {
if len(existing) == 0 {
return roachpb.ReplicaDescriptor{}, errors.Errorf("must supply at least one replica to allocator.RemoveTarget()")
}

if a.options.UseRuleSolver {
// TODO(bram): #10275 Is this getStoreList call required? Compute candidate
// requires a store list, but we should be able to create one using only
// the stores that belong to the range.
// Use an invalid range ID as we don't care about a corrupt replicas since
// as we are removing a replica and not trying to add one.
sl, _, _ := a.storePool.getStoreList(roachpb.RangeID(0))

var worst roachpb.ReplicaDescriptor
worstScore := math.Inf(0)
for _, exist := range existing {
if exist.StoreID == leaseStoreID {
continue
}
desc, ok := a.storePool.getStoreDescriptor(exist.StoreID)
if !ok {
continue
}

// When a candidate is not valid, the score will be 0 and it should
// be chosen for removal.
if candidate, _ := a.ruleSolver.computeCandidate(solveState{
constraints: constraints,
store: desc,
existing: nil,
sl: sl,
tierOrder: canonicalTierOrder(sl),
tiers: storeTierMap(sl),
}); candidate.score < worstScore {
worstScore = candidate.score
worst = exist
}
}

if !math.IsInf(worstScore, 0) {
return worst, nil
}

return roachpb.ReplicaDescriptor{}, errors.New("could not select an appropriate replica to be removed")
}

// Retrieve store descriptors for the provided replicas from the StorePool.
var descriptors []roachpb.StoreDescriptor
for _, exist := range existing {
Expand All @@ -269,7 +345,7 @@ func (a Allocator) RemoveTarget(
}
}
}
return roachpb.ReplicaDescriptor{}, errors.Errorf("RemoveTarget() could not select an appropriate replica to be remove")
return roachpb.ReplicaDescriptor{}, errors.New("could not select an appropriate replica to be removed")
}

// RebalanceTarget returns a suitable store for a rebalance target with
Expand Down Expand Up @@ -297,9 +373,80 @@ func (a Allocator) RebalanceTarget(
existing []roachpb.ReplicaDescriptor,
leaseStoreID roachpb.StoreID,
rangeID roachpb.RangeID,
) *roachpb.StoreDescriptor {
) (*roachpb.StoreDescriptor, error) {
if !a.options.AllowRebalance {
return nil
return nil, nil
}

if a.options.UseRuleSolver {
sl, _, _ := a.storePool.getStoreList(rangeID)
if log.V(3) {
log.Infof(context.TODO(), "rebalance-target (lease-holder=%d):\n%s", leaseStoreID, sl)
}

var shouldRebalance bool
for _, repl := range existing {
if leaseStoreID == repl.StoreID {
continue
}
storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID)
if ok && a.shouldRebalance(storeDesc, sl) {
shouldRebalance = true
break
}
}
if !shouldRebalance {
return nil, nil
}

// Load the exiting storesIDs into a map so to eliminate having to loop
// through the existing descriptors more than once.
existingStoreIDs := make(map[roachpb.StoreID]struct{})
for _, repl := range existing {
existingStoreIDs[repl.StoreID] = struct{}{}
}

// Split the store list into existing and candidate stores lists so that
// we can call solve independently on both store lists.
var existingDescs []roachpb.StoreDescriptor
var candidateDescs []roachpb.StoreDescriptor
for _, desc := range sl.stores {
if _, ok := existingStoreIDs[desc.StoreID]; ok {
existingDescs = append(existingDescs, desc)
} else {
candidateDescs = append(candidateDescs, desc)
}
}

existingStoreList := makeStoreList(existingDescs)
candidateStoreList := makeStoreList(candidateDescs)

existingCandidates, err := a.ruleSolver.Solve(existingStoreList, constraints, nil)
if err != nil {
return nil, err
}
candidates, err := a.ruleSolver.Solve(candidateStoreList, constraints, nil)
if err != nil {
return nil, err
}

// Find all candidates that are better than the worst existing store.
var worstCandidateStore float64
// If any store from existing is not included in existingCandidates, it
// is because it no longer meets the constraints. If so, its score is
// considered to be 0.
if len(existingCandidates) == len(existing) {
worstCandidateStore = existingCandidates[len(existingCandidates)-1].score
}

// TODO(bram): #10275 Need some randomness here!
for _, cand := range candidates {
if cand.score > worstCandidateStore {
return &candidates[0].store, nil
}
}

return nil, nil
}

sl, _, _ := a.storePool.getStoreList(rangeID)
Expand All @@ -320,14 +467,14 @@ func (a Allocator) RebalanceTarget(
}
}
if !shouldRebalance {
return nil
return nil, nil
}

existingNodes := make(nodeIDSet, len(existing))
for _, repl := range existing {
existingNodes[repl.NodeID] = struct{}{}
}
return a.improve(sl, existingNodes)
return a.improve(sl, existingNodes), nil
}

// selectGood attempts to select a store from the supplied store list that it
Expand Down Expand Up @@ -356,11 +503,53 @@ func (a Allocator) improve(sl StoreList, excluded nodeIDSet) *roachpb.StoreDescr
return rcb.improve(sl, excluded)
}

// rebalanceThreshold is the minimum ratio of a store's range surplus to the
// mean range count that permits rebalances away from that store.
var rebalanceThreshold = envutil.EnvOrDefaultFloat("COCKROACH_REBALANCE_THRESHOLD", 0.05)

// shouldRebalance returns whether the specified store is a candidate for
// having a replica removed from it given the candidate store list.
func (a Allocator) shouldRebalance(store roachpb.StoreDescriptor, sl StoreList) bool {
rcb := rangeCountBalancer{a.randGen}
return rcb.shouldRebalance(store, sl)
// TODO(peter,bram,cuong): The FractionUsed check seems suspicious. When a
// node becomes fuller than maxFractionUsedThreshold we will always select it
// for rebalancing.
maxCapacityUsed := store.Capacity.FractionUsed() >= maxFractionUsedThreshold

// Rebalance if we're above the rebalance target, which is
// mean*(1+rebalanceThreshold).
target := int32(math.Ceil(sl.candidateCount.mean * (1 + rebalanceThreshold)))
rangeCountAboveTarget := store.Capacity.RangeCount > target

// Rebalance if the candidate store has a range count above the mean, and
// there exists another store that is underfull: its range count is smaller
// than mean*(1-rebalanceThreshold).
var rebalanceToUnderfullStore bool
if float64(store.Capacity.RangeCount) > sl.candidateCount.mean {
underfullThreshold := int32(math.Floor(sl.candidateCount.mean * (1 - rebalanceThreshold)))
for _, desc := range sl.stores {
if desc.Capacity.RangeCount < underfullThreshold {
rebalanceToUnderfullStore = true
break
}
}
}

// Require that moving a replica from the given store makes its range count
// converge on the mean range count. This only affects clusters with a
// small number of ranges.
rebalanceConvergesOnMean := rebalanceFromConvergesOnMean(sl, store)

result :=
(maxCapacityUsed || rangeCountAboveTarget || rebalanceToUnderfullStore) && rebalanceConvergesOnMean
if log.V(2) {
log.Infof(context.TODO(),
"%d: should-rebalance=%t: fraction-used=%.2f range-count=%d "+
"(mean=%.1f, target=%d, fraction-used=%t, above-target=%t, underfull=%t, converges=%t)",
store.StoreID, result, store.Capacity.FractionUsed(), store.Capacity.RangeCount,
sl.candidateCount.mean, target, maxCapacityUsed, rangeCountAboveTarget,
rebalanceToUnderfullStore, rebalanceConvergesOnMean)
}
return result
}

// computeQuorum computes the quorum value for the given number of nodes.
Expand Down
Loading

0 comments on commit e651674

Please sign in to comment.