Skip to content

Commit

Permalink
storage: add constraint rule solver for allocation
Browse files Browse the repository at this point in the history
Rules are represented as a single function that returns the candidacy of the
store as well as a float value representing the score. These scores are then
aggregated from all rules and returns the stores sorted by them.

Current rules:
* ruleReplicasUniqueNodes ensures that no two replicas are put on the same node.
* ruleNoProhibitedConstraints ensures that the candidate store has no prohibited constraints.
* ruleRequiredConstraints ensures that the candidate store has the required constraints.
* rulePositiveConstraints ensures that nodes that match more the positive constraints are given higher priority.
* ruleDiversity ensures that nodes that have the fewest locality tiers in common are given higher priority.
* ruleCapacity prioritizes placing data on empty nodes when the choice is available and prevents data from going onto mostly full nodes.
  • Loading branch information
d4l3k committed Aug 30, 2016
1 parent 9b41a2d commit 907c60d
Show file tree
Hide file tree
Showing 3 changed files with 510 additions and 2 deletions.
10 changes: 8 additions & 2 deletions storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,19 @@ func mockStorePool(storePool *StorePool, aliveStoreIDs, deadStoreIDs []roachpb.S
storePool.mu.stores = make(map[roachpb.StoreID]*storeDetail)
for _, storeID := range aliveStoreIDs {
detail := newStoreDetail()
detail.desc = &roachpb.StoreDescriptor{StoreID: storeID}
detail.desc = &roachpb.StoreDescriptor{
StoreID: storeID,
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)},
}
storePool.mu.stores[storeID] = detail
}
for _, storeID := range deadStoreIDs {
detail := newStoreDetail()
detail.dead = true
detail.desc = &roachpb.StoreDescriptor{StoreID: storeID}
detail.desc = &roachpb.StoreDescriptor{
StoreID: storeID,
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)},
}
storePool.mu.stores[storeID] = detail
}
for storeID, detail := range storePool.mu.stores {
Expand Down
258 changes: 258 additions & 0 deletions storage/rule_solver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Tristan Rice ([email protected])

package storage

import (
"math"
"sort"

"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/pkg/errors"
)

type candidate struct {
store roachpb.StoreDescriptor
score float64
}

// rule is a generic rule that can be used to solve a constraint problem.
// Returning false will remove the store from the list of candidate stores. The
// score will be weighted and then summed together with the other rule scores to
// create a store ranking (higher is better).
type rule struct {
weight float64
run func(
c config.Constraints,
store roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
sl StoreList,
) (candidate bool, score float64)
}

// defaultRules is the default rule set to use.
var defaultRules = []rule{
{
weight: 1.0,
run: ruleReplicasUniqueNodes,
},
{
weight: 1.0,
run: ruleConstraints,
},
{
weight: 0.01,
run: ruleCapacity,
},
{
weight: 0.1,
run: ruleDiversity,
},
}

// makeDefaultRuleSolver returns a ruleSolver with defaultRules.
func makeDefaultRuleSolver(storePool *StorePool) *ruleSolver {
return makeRuleSolver(storePool, defaultRules)
}

// makeRuleSolver makes a new ruleSolver. The order of the rules is the order in
// which they are run. For optimization purposes, less computationally intense
// rules should run first to eliminate candidates.
func makeRuleSolver(storePool *StorePool, rules []rule) *ruleSolver {
return &ruleSolver{
storePool: storePool,
rules: rules,
}
}

// ruleSolver solves a set of rules for a store.
type ruleSolver struct {
storePool *StorePool
rules []rule
}

// solve solves given constraints. See (*ruleSolver).solveInternal.
func (rs *ruleSolver) solve(
c config.Constraints, existing []roachpb.ReplicaDescriptor,
) ([]roachpb.StoreDescriptor, error) {
candidates, err := rs.solveScores(c, existing)
if err != nil {
return nil, err
}

candidateStores := make([]roachpb.StoreDescriptor, len(candidates))
for i, candidate := range candidates {
candidateStores[i] = candidate.store
}
return candidateStores, nil
}

// solveScores solves given constraints and returns the score.
func (rs *ruleSolver) solveScores(
c config.Constraints, existing []roachpb.ReplicaDescriptor,
) ([]candidate, error) {
sl, _, throttledStoreCount := rs.storePool.getStoreList(config.Constraints{}, false)

// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory or even consider relaxing the constraints.
if throttledStoreCount > 0 {
return nil, errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
}

candidates := make([]candidate, 0, len(sl.stores))

for _, store := range sl.stores {
if cand, ok := rs.computeCandidate(c, store, existing, sl); ok {
candidates = append(candidates, cand)
}
}
sort.Sort(byScore(candidates))
return candidates, nil
}

func (rs *ruleSolver) computeCandidate(
constraints config.Constraints,
store roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
sl StoreList,
) (candidate, bool) {
var totalScore float64
for _, rule := range rs.rules {
isCandidate, score := rule.run(constraints, store, existing, sl)
if !isCandidate {
return candidate{}, false
}
if !math.IsNaN(score) {
totalScore += score * rule.weight
}
}
return candidate{store: store, score: totalScore}, true
}

// ruleReplicasUniqueNodes ensures that no two replicas are put on the same
// node.
func ruleReplicasUniqueNodes(
_ config.Constraints,
store roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
_ StoreList,
) (candidate bool, score float64) {
for _, r := range existing {
if r.NodeID == store.Node.NodeID {
return false, 0
}
}
return true, 0
}

func storeHasConstraint(store roachpb.StoreDescriptor, c config.Constraint) bool {
var found bool
if c.Key == "" {
for _, attrs := range []roachpb.Attributes{store.Attrs, store.Node.Attrs} {
for _, attr := range attrs.Attrs {
if attr == c.Value {
return true
}
}
}
} else {
for _, tier := range store.Locality.Tiers {
if c.Key == tier.Key && c.Value == tier.Value {
return true
}
}
}
return found
}

// ruleConstraints enforces that required and prohibited constraints are
// followed, and that stores with more positive constraints are ranked higher.
func ruleConstraints(
constraints config.Constraints,
store roachpb.StoreDescriptor,
_ []roachpb.ReplicaDescriptor,
_ StoreList,
) (candidate bool, score float64) {
matched := 0
for _, c := range constraints.Constraints {
hasConstraint := storeHasConstraint(store, c)
switch {
case c.Type == config.Constraint_POSITIVE && hasConstraint:
matched++
case c.Type == config.Constraint_REQUIRED && !hasConstraint:
return false, 0
case c.Type == config.Constraint_PROHIBITED && hasConstraint:
return false, 0
}
}

return true, float64(matched) / float64(len(constraints.Constraints))
}

// ruleDiversity ensures that nodes that have the fewest locality tiers in
// common are given higher priority.
func ruleDiversity(
_ config.Constraints,
store roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
sl StoreList,
) (candidate bool, score float64) {
stores := map[roachpb.StoreID]roachpb.StoreDescriptor{}
for _, store := range sl.stores {
stores[store.StoreID] = store
}

var maxScore float64
tiers := store.Locality.Tiers
for i, tier := range tiers {
tierScore := float64(int(1) << uint(len(tiers)-i-1))
for _, existing := range existing {
store := stores[existing.StoreID]
st := store.Locality.Tiers
if len(st) < i || st[i].Key != tier.Key {
panic("TODO(d4l3k): Node locality configurations are not equivalent")
}
if st[i].Value != tier.Value {
score += tierScore
}
maxScore += tierScore
}
}
return true, score / maxScore
}

// ruleCapacity prioritizes placing data on empty nodes when the choice is
// available and prevents data from going onto mostly full nodes.
func ruleCapacity(
_ config.Constraints,
store roachpb.StoreDescriptor,
_ []roachpb.ReplicaDescriptor,
_ StoreList,
) (candidate bool, score float64) {
// Don't overfill stores.
if store.Capacity.FractionUsed() > maxFractionUsedThreshold {
return false, 0
}

return true, 1 / float64(store.Capacity.RangeCount+1)
}

type byScore []candidate

func (c byScore) Len() int { return len(c) }
func (c byScore) Less(i, j int) bool { return c[i].score > c[j].score }
func (c byScore) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
Loading

0 comments on commit 907c60d

Please sign in to comment.