Skip to content

Commit

Permalink
Merge pull request #8959 from d4l3k/new-allocator
Browse files Browse the repository at this point in the history
storage: implemented RuleSolver into allocator
  • Loading branch information
d4l3k authored Sep 2, 2016
2 parents a4e2b2f + 27353a8 commit d86abd2
Show file tree
Hide file tree
Showing 14 changed files with 1,013 additions and 551 deletions.
9 changes: 0 additions & 9 deletions roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,6 @@ func (sc StoreCapacity) FractionUsed() float64 {
return float64(sc.Capacity-sc.Available) / float64(sc.Capacity)
}

// CombinedAttrs returns the full list of attributes for the store, including
// both the node and store attributes.
func (s StoreDescriptor) CombinedAttrs() *Attributes {
var a []string
a = append(a, s.Node.Attrs.Attrs...)
a = append(a, s.Attrs.Attrs...)
return &Attributes{Attrs: a}
}

// String returns a string representation of the Tier.
func (t Tier) String() string {
return fmt.Sprintf("%s=%s", t.Key, t.Value)
Expand Down
207 changes: 87 additions & 120 deletions storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ package storage

import (
"fmt"
"math/rand"

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/syncutil"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -88,58 +86,28 @@ func (*allocatorError) purgatoryErrorMarker() {}

var _ purgatoryError = &allocatorError{}

// allocatorRand pairs a rand.Rand with a mutex.
// TODO: Allocator is typically only accessed from a single thread (the
// replication queue), but this assumption is broken in tests which force
// replication scans. If those tests can be modified to suspend the normal
// replication queue during the forced scan, then this rand could be used
// without a mutex.
type allocatorRand struct {
*syncutil.Mutex
*rand.Rand
}

func makeAllocatorRand(source rand.Source) allocatorRand {
return allocatorRand{
Mutex: &syncutil.Mutex{},
Rand: rand.New(source),
}
}

// AllocatorOptions are configurable options which effect the way that the
// replicate queue will handle rebalancing opportunities.
type AllocatorOptions struct {
// AllowRebalance allows this store to attempt to rebalance its own
// replicas to other stores.
AllowRebalance bool

// Deterministic makes allocation decisions deterministic, based on
// current cluster statistics. If this flag is not set, allocation operations
// will have random behavior. This flag is intended to be set for testing
// purposes only.
Deterministic bool
}

// Allocator tries to spread replicas as evenly as possible across the stores
// in the cluster.
type Allocator struct {
storePool *StorePool
randGen allocatorRand
options AllocatorOptions
storePool *StorePool
options AllocatorOptions
ruleSolver *ruleSolver
}

// MakeAllocator creates a new allocator using the specified StorePool.
func MakeAllocator(storePool *StorePool, options AllocatorOptions) Allocator {
var randSource rand.Source
if options.Deterministic {
randSource = rand.NewSource(777)
} else {
randSource = rand.NewSource(rand.Int63())
}
return Allocator{
storePool: storePool,
options: options,
randGen: makeAllocatorRand(randSource),
storePool: storePool,
options: options,
ruleSolver: makeDefaultRuleSolver(storePool),
}
}

Expand Down Expand Up @@ -186,62 +154,42 @@ func (a *Allocator) ComputeAction(zone config.ZoneConfig, desc *roachpb.RangeDes

// AllocateTarget returns a suitable store for a new allocation with the
// required attributes. Nodes already accommodating existing replicas are ruled
// out as targets. If relaxConstraints is true, then the required attributes
// will be relaxed as necessary, from least specific to most specific, in order
// to allocate a target.
// out as targets.
func (a *Allocator) AllocateTarget(
constraints config.Constraints,
existing []roachpb.ReplicaDescriptor,
relaxConstraints bool,
) (*roachpb.StoreDescriptor, error) {
existingNodes := make(nodeIDSet, len(existing))
for _, repl := range existing {
existingNodes[repl.NodeID] = struct{}{}
candidates, err := a.ruleSolver.Solve(constraints, existing)
if err != nil {
return nil, err
}

// Because more redundancy is better than less, if relaxConstraints, the
// matching here is lenient, and tries to find a target by relaxing an
// attribute constraint, from last attribute to first.
for attrs := append([]config.Constraint(nil), constraints.Constraints...); ; attrs = attrs[:len(attrs)-1] {
sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(
config.Constraints{Constraints: attrs},
a.options.Deterministic,
)
if target := a.selectGood(sl, existingNodes); target != nil {
return target, nil
}

// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory or even consider relaxing the constraints.
if throttledStoreCount > 0 {
return nil, errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
}
if len(attrs) == 0 || !relaxConstraints {
return nil, &allocatorError{
required: constraints.Constraints,
relaxConstraints: relaxConstraints,
aliveStoreCount: aliveStoreCount,
}
if len(candidates) == 0 {
return nil, &allocatorError{
required: constraints.Constraints,
}
}
return &candidates[0].store, nil
}

// RemoveTarget returns a suitable replica to remove from the provided replica
// set. It attempts to consider which of the provided replicas would be the best
// candidate for removal. It also will exclude any replica that belongs to the
// range lease holder's store ID.
//
// TODO(mrtracy): removeTarget eventually needs to accept the attributes from
// the zone config associated with the provided replicas. This will allow it to
// make correct decisions in the case of ranges with heterogeneous replica
// requirements (i.e. multiple data centers).
func (a Allocator) RemoveTarget(existing []roachpb.ReplicaDescriptor, leaseStoreID roachpb.StoreID) (roachpb.ReplicaDescriptor, error) {
func (a Allocator) RemoveTarget(
constraints config.Constraints,
existing []roachpb.ReplicaDescriptor,
leaseStoreID roachpb.StoreID,
) (roachpb.ReplicaDescriptor, error) {
if len(existing) == 0 {
return roachpb.ReplicaDescriptor{}, errors.Errorf("must supply at least one replica to allocator.RemoveTarget()")
}

// Retrieve store descriptors for the provided replicas from the StorePool.
sl := StoreList{}
sl, _, _ := a.storePool.getStoreList()

found := false
var worst roachpb.ReplicaDescriptor
var worstScore float64
for _, exist := range existing {
if exist.StoreID == leaseStoreID {
continue
Expand All @@ -250,16 +198,26 @@ func (a Allocator) RemoveTarget(existing []roachpb.ReplicaDescriptor, leaseStore
if !ok {
continue
}
sl.add(desc)
// If it's not a valid candidate, score will be zero.
candidate, _ := a.ruleSolver.computeCandidate(solveState{
constraints: constraints,
store: desc,
existing: nil,
sl: sl,
tierOrder: canonicalTierOrder(sl),
tiers: storeTierMap(sl),
})
if !found || candidate.score < worstScore {
worstScore = candidate.score
worst = exist
found = true
}
}

if bad := a.selectBad(sl); bad != nil {
for _, exist := range existing {
if exist.StoreID == bad.StoreID {
return exist, nil
}
}
if found {
return worst, nil
}

return roachpb.ReplicaDescriptor{}, errors.Errorf("RemoveTarget() could not select an appropriate replica to be remove")
}

Expand All @@ -285,12 +243,12 @@ func (a Allocator) RebalanceTarget(
constraints config.Constraints,
existing []roachpb.ReplicaDescriptor,
leaseStoreID roachpb.StoreID,
) *roachpb.StoreDescriptor {
) (*roachpb.StoreDescriptor, error) {
if !a.options.AllowRebalance {
return nil
return nil, nil
}

sl, _, _ := a.storePool.getStoreList(constraints, a.options.Deterministic)
sl, _, _ := a.storePool.getStoreList()
if log.V(3) {
log.Infof(context.TODO(), "rebalance-target (lease-holder=%d):\n%s", leaseStoreID, sl)
}
Expand All @@ -307,51 +265,60 @@ func (a Allocator) RebalanceTarget(
}
}
if !shouldRebalance {
return nil
return nil, nil
}

existingNodes := make(nodeIDSet, len(existing))
for _, repl := range existing {
existingNodes[repl.NodeID] = struct{}{}
// Get candidate stores.
candidates, err := a.ruleSolver.Solve(constraints, nil)
if err != nil {
return nil, err
}
return a.improve(sl, existingNodes)
}

// selectGood attempts to select a store from the supplied store list that it
// considers to be 'Good' relative to the other stores in the list. Any nodes
// in the supplied 'exclude' list will be disqualified from selection. Returns
// the selected store or nil if no such store can be found.
func (a Allocator) selectGood(sl StoreList, excluded nodeIDSet) *roachpb.StoreDescriptor {
rcb := rangeCountBalancer{a.randGen}
return rcb.selectGood(sl, excluded)
}

// selectBad attempts to select a store from the supplied store list that it
// considers to be 'Bad' relative to the other stores in the list. Returns the
// selected store or nil if no such store can be found.
func (a Allocator) selectBad(sl StoreList) *roachpb.StoreDescriptor {
rcb := rangeCountBalancer{a.randGen}
return rcb.selectBad(sl)
}

// improve attempts to select an improvement over the given store from the
// stores in the given store list. Any nodes in the supplied 'exclude' list
// will be disqualified from selection. Returns the selected store, or nil if
// no such store can be found.
func (a Allocator) improve(
sl StoreList, excluded nodeIDSet,
) *roachpb.StoreDescriptor {
rcb := rangeCountBalancer{a.randGen}
return rcb.improve(sl, excluded)
// Find a candidate that is better than one of the existing stores, otherwise
// return nil.
candidatesFound := 0
for _, candidate := range candidates {
store := candidate.store
found := false
for _, repl := range existing {
if repl.StoreID == store.StoreID {
found = true
break
}
}
if !found {
return &store, nil
}
candidatesFound++
if candidatesFound > len(existing) {
break
}
}
return nil, nil
}

// shouldRebalance returns whether the specified store is a candidate for
// having a replica removed from it given the candidate store list.
func (a Allocator) shouldRebalance(
store roachpb.StoreDescriptor, sl StoreList,
) bool {
rcb := rangeCountBalancer{a.randGen}
return rcb.shouldRebalance(store, sl)
const replicaInbalanceTolerance = 1

// Moving a replica from the given store makes its range count converge on
// the mean range count.
//
// TODO(peter,bram,cuong): The FractionUsed check seems suspicious. When a
// node becomes fuller than maxFractionUsedThreshold we will always select it
// for rebalancing. This is currently utilized by tests.
shouldRebalance := store.Capacity.FractionUsed() >= maxFractionUsedThreshold || (float64(store.Capacity.RangeCount)-sl.candidateCount.mean) >= replicaInbalanceTolerance

if log.V(2) {
log.Infof(context.TODO(),
"%d: should-rebalance=%t: fraction-used=%.2f range-count=%d (mean=%.1f)",
store.StoreID, shouldRebalance, store.Capacity.FractionUsed(),
store.Capacity.RangeCount, sl.candidateCount.mean)
}
return shouldRebalance
}

// computeQuorum computes the quorum value for the given number of nodes.
Expand Down
Loading

0 comments on commit d86abd2

Please sign in to comment.