From 4446345bf7a9e1a6eb7c7a3c71ec9c8e3b3d17fc Mon Sep 17 00:00:00 2001 From: Tristan Rice Date: Thu, 18 Aug 2016 18:06:31 -0500 Subject: [PATCH] storage: add constraint rule solver for allocation Rules are represented as a single function that returns the candidacy of the store as well as a float value representing the score. These scores are then aggregated from all rules and returns the stores sorted by them. Current rules: - ruleReplicasUniqueNodes ensures that no two replicas are put on the same node. - ruleConstraints enforces that required and prohibited constraints are followed, and that stores with more positive constraints are ranked higher. - ruleDiversity ensures that nodes that have the fewest locality tiers in common are given higher priority. - ruleCapacity prioritizes placing data on empty nodes when the choice is available and prevents data from going onto mostly full nodes. --- storage/allocator_test.go | 10 +- storage/rule_solver.go | 282 ++++++++++++++++++++++++++++++ storage/rule_solver_test.go | 330 ++++++++++++++++++++++++++++++++++++ 3 files changed, 620 insertions(+), 2 deletions(-) create mode 100644 storage/rule_solver.go create mode 100644 storage/rule_solver_test.go diff --git a/storage/allocator_test.go b/storage/allocator_test.go index d34b50935dd0..25b50c939413 100644 --- a/storage/allocator_test.go +++ b/storage/allocator_test.go @@ -180,13 +180,19 @@ func mockStorePool(storePool *StorePool, aliveStoreIDs, deadStoreIDs []roachpb.S storePool.mu.stores = make(map[roachpb.StoreID]*storeDetail) for _, storeID := range aliveStoreIDs { detail := newStoreDetail() - detail.desc = &roachpb.StoreDescriptor{StoreID: storeID} + detail.desc = &roachpb.StoreDescriptor{ + StoreID: storeID, + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)}, + } storePool.mu.stores[storeID] = detail } for _, storeID := range deadStoreIDs { detail := newStoreDetail() detail.dead = true - detail.desc = &roachpb.StoreDescriptor{StoreID: storeID} + detail.desc = &roachpb.StoreDescriptor{ + StoreID: storeID, + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)}, + } storePool.mu.stores[storeID] = detail } for storeID, detail := range storePool.mu.stores { diff --git a/storage/rule_solver.go b/storage/rule_solver.go new file mode 100644 index 000000000000..4b08e5b91e83 --- /dev/null +++ b/storage/rule_solver.go @@ -0,0 +1,282 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Tristan Rice (rice@fn.lc) + +package storage + +import ( + "math" + "sort" + + "github.com/cockroachdb/cockroach/config" + "github.com/cockroachdb/cockroach/roachpb" + "github.com/pkg/errors" +) + +// candidate represents a candidate for allocation. +type candidate struct { + store roachpb.StoreDescriptor + score float64 +} + +// solveState is used to pass solution state information into a rule. +type solveState struct { + constraints config.Constraints + store roachpb.StoreDescriptor + existing []roachpb.ReplicaDescriptor + sl StoreList + tiers map[roachpb.StoreID]map[string]roachpb.Tier + tierOrder []roachpb.Tier +} + +// rule is a generic rule that can be used to solve a constraint problem. +// Returning false will remove the store from the list of candidate stores. The +// score will be weighted and then summed together with the other rule scores to +// create a store ranking (higher is better). +type rule struct { + weight float64 + run func(state solveState) (candidate bool, score float64) +} + +// defaultRules is the default rule set to use. +var defaultRules = []rule{ + { + weight: 1.0, + run: ruleReplicasUniqueNodes, + }, + { + weight: 1.0, + run: ruleConstraints, + }, + { + weight: 0.01, + run: ruleCapacity, + }, + { + weight: 0.1, + run: ruleDiversity, + }, +} + +// makeDefaultRuleSolver returns a ruleSolver with defaultRules. +func makeDefaultRuleSolver(storePool *StorePool) *ruleSolver { + return makeRuleSolver(storePool, defaultRules) +} + +// makeRuleSolver makes a new ruleSolver. The order of the rules is the order in +// which they are run. For optimization purposes, less computationally intense +// rules should run first to eliminate candidates. +func makeRuleSolver(storePool *StorePool, rules []rule) *ruleSolver { + return &ruleSolver{ + storePool: storePool, + rules: rules, + } +} + +// ruleSolver solves a set of rules for a store. +type ruleSolver struct { + storePool *StorePool + rules []rule +} + +// solve given constraints and return the score. +func (rs *ruleSolver) Solve( + c config.Constraints, existing []roachpb.ReplicaDescriptor, +) ([]candidate, error) { + sl, _, throttledStoreCount := rs.storePool.getStoreList(config.Constraints{}, false) + + // When there are throttled stores that do match, we shouldn't send + // the replica to purgatory or even consider relaxing the constraints. + if throttledStoreCount > 0 { + return nil, errors.Errorf("%d matching stores are currently throttled", throttledStoreCount) + } + + candidates := make([]candidate, 0, len(sl.stores)) + state := solveState{ + constraints: c, + existing: existing, + sl: sl, + tierOrder: canonicalTierOrder(sl), + tiers: storeTierMap(sl), + } + + for _, store := range sl.stores { + state.store = store + if cand, ok := rs.computeCandidate(state); ok { + candidates = append(candidates, cand) + } + } + sort.Sort(byScore(candidates)) + return candidates, nil +} + +// computeCandidate runs all the rules for the store and returns the candidacy +// information. Returns false if not a candidate. +func (rs *ruleSolver) computeCandidate( + state solveState, +) (candidate, bool) { + var totalScore float64 + for _, rule := range rs.rules { + isCandidate, score := rule.run(state) + if !isCandidate { + return candidate{}, false + } + if !math.IsNaN(score) { + totalScore += score * rule.weight + } + } + return candidate{store: state.store, score: totalScore}, true +} + +// ruleReplicasUniqueNodes ensures that no two replicas are put on the same +// node. +func ruleReplicasUniqueNodes(state solveState) (candidate bool, score float64) { + for _, r := range state.existing { + if r.NodeID == state.store.Node.NodeID { + return false, 0 + } + } + return true, 0 +} + +// storeHasConstraint returns whether a store descriptor attributes or locality +// matches the key value pair in the constraint. +func storeHasConstraint(store roachpb.StoreDescriptor, c config.Constraint) bool { + var found bool + if c.Key == "" { + for _, attrs := range []roachpb.Attributes{store.Attrs, store.Node.Attrs} { + for _, attr := range attrs.Attrs { + if attr == c.Value { + return true + } + } + } + } else { + for _, tier := range store.Locality.Tiers { + if c.Key == tier.Key && c.Value == tier.Value { + return true + } + } + } + return found +} + +// ruleConstraints enforces that required and prohibited constraints are +// followed, and that stores with more positive constraints are ranked higher. +func ruleConstraints(state solveState) (candidate bool, score float64) { + matched := 0 + for _, c := range state.constraints.Constraints { + hasConstraint := storeHasConstraint(state.store, c) + switch { + case c.Type == config.Constraint_POSITIVE && hasConstraint: + matched++ + case c.Type == config.Constraint_REQUIRED && !hasConstraint: + return false, 0 + case c.Type == config.Constraint_PROHIBITED && hasConstraint: + return false, 0 + } + } + + return true, float64(matched) / float64(len(state.constraints.Constraints)) +} + +// ruleDiversity ensures that nodes that have the fewest locality tiers in +// common are given higher priority. +func ruleDiversity(state solveState) (candidate bool, score float64) { + storeTiers := state.tiers[state.store.StoreID] + var maxScore float64 + for i, tier := range state.tierOrder { + storeTier, ok := storeTiers[tier.Key] + if !ok { + continue + } + tierScore := 1 / (float64(i) + 1) + for _, existing := range state.existing { + existingTier, ok := state.tiers[existing.StoreID][tier.Key] + if ok && existingTier.Value != storeTier.Value { + score += tierScore + } + maxScore += tierScore + } + } + return true, score / maxScore +} + +// ruleCapacity prioritizes placing data on empty nodes when the choice is +// available and prevents data from going onto mostly full nodes. +func ruleCapacity(state solveState) (candidate bool, score float64) { + // Don't overfill stores. + if state.store.Capacity.FractionUsed() > maxFractionUsedThreshold { + return false, 0 + } + + return true, 1 / float64(state.store.Capacity.RangeCount+1) +} + +// canonicalTierOrder returns the most common key at each tier level. +func canonicalTierOrder(sl StoreList) []roachpb.Tier { + maxTierCount := 0 + for _, store := range sl.stores { + if count := len(store.Locality.Tiers); maxTierCount < count { + maxTierCount = count + } + } + + // Might have up to maxTierCount of tiers. + tiers := make([]roachpb.Tier, 0, maxTierCount) + for i := 0; i < maxTierCount; i++ { + // At each tier, count the number of occurrences of each key. + counts := map[string]int{} + maxKey := "" + for _, store := range sl.stores { + key := "" + if i < len(store.Locality.Tiers) { + key = store.Locality.Tiers[i].Key + } + counts[key]++ + if counts[key] > counts[maxKey] { + maxKey = key + } + } + // Don't add the tier if most nodes don't have that many tiers. + if maxKey != "" { + tiers = append(tiers, roachpb.Tier{Key: maxKey}) + } + } + return tiers +} + +// storeTierMap indexes a store list so you can look up the locality tier +// value from store ID and tier key. +func storeTierMap(sl StoreList) map[roachpb.StoreID]map[string]roachpb.Tier { + m := map[roachpb.StoreID]map[string]roachpb.Tier{} + for _, store := range sl.stores { + sm := map[string]roachpb.Tier{} + m[store.StoreID] = sm + for _, tier := range store.Locality.Tiers { + sm[tier.Key] = tier + } + } + return m +} + +// byScore implements sort.Interface for candidate slices. +type byScore []candidate + +var _ sort.Interface = byScore(nil) + +func (c byScore) Len() int { return len(c) } +func (c byScore) Less(i, j int) bool { return c[i].score > c[j].score } +func (c byScore) Swap(i, j int) { c[i], c[j] = c[j], c[i] } diff --git a/storage/rule_solver_test.go b/storage/rule_solver_test.go new file mode 100644 index 000000000000..2cea11aeb5e5 --- /dev/null +++ b/storage/rule_solver_test.go @@ -0,0 +1,330 @@ +// Copyright 2016 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. +// +// Author: Tristan Rice (rice@fn.lc) + +package storage + +import ( + "reflect" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/config" + "github.com/cockroachdb/cockroach/roachpb" + "github.com/cockroachdb/cockroach/util/leaktest" +) + +type byScoreAndID []candidate + +func (c byScoreAndID) Len() int { return len(c) } +func (c byScoreAndID) Less(i, j int) bool { + if c[i].score == c[j].score { + return c[i].store.StoreID < c[j].store.StoreID + } + return c[i].score > c[j].score +} +func (c byScoreAndID) Swap(i, j int) { c[i], c[j] = c[j], c[i] } + +// TestRuleSolver tests the mechanics of ruleSolver. +func TestRuleSolver(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, _, _, storePool := createTestStorePool(TestTimeUntilStoreDeadOff) + defer stopper.Stop() + // 3 alive replicas, 1 dead + mockStorePool(storePool, []roachpb.StoreID{1, 2, 3, 5}, []roachpb.StoreID{4}, nil) + + storePool.mu.Lock() + storePool.mu.stores[1].desc.Attrs.Attrs = []string{"a"} + storePool.mu.stores[2].desc.Attrs.Attrs = []string{"a", "b"} + storePool.mu.stores[3].desc.Attrs.Attrs = []string{"a", "b", "c"} + + storePool.mu.stores[1].desc.Locality.Tiers = []roachpb.Tier{ + {Key: "datacenter", Value: "us"}, + {Key: "rack", Value: "1"}, + {Key: "slot", Value: "5"}, + } + storePool.mu.stores[2].desc.Locality.Tiers = []roachpb.Tier{ + {Key: "datacenter", Value: "us"}, + {Key: "rack", Value: "1"}, + } + storePool.mu.stores[3].desc.Locality.Tiers = []roachpb.Tier{ + {Key: "datacenter", Value: "us"}, + {Key: "floor", Value: "1"}, + {Key: "rack", Value: "2"}, + } + storePool.mu.stores[5].desc.Locality.Tiers = []roachpb.Tier{ + {Key: "datacenter", Value: "eur"}, + {Key: "rack", Value: "1"}, + } + + storePool.mu.stores[1].desc.Capacity = roachpb.StoreCapacity{ + Capacity: 100, + Available: 1, + RangeCount: 99, + } + storePool.mu.stores[2].desc.Capacity = roachpb.StoreCapacity{ + Capacity: 100, + Available: 100, + RangeCount: 0, + } + storePool.mu.stores[3].desc.Capacity = roachpb.StoreCapacity{ + Capacity: 100, + Available: 50, + RangeCount: 50, + } + storePool.mu.stores[5].desc.Capacity = roachpb.StoreCapacity{ + Capacity: 100, + Available: 60, + RangeCount: 40, + } + storePool.mu.Unlock() + + testCases := []struct { + rules []rule + c config.Constraints + existing []roachpb.ReplicaDescriptor + expected []roachpb.StoreID + }{ + // No constraints or rules. + { + expected: []roachpb.StoreID{1, 2, 3, 5}, + }, + // Store 1: score 0; Store 3: score 1; everything else fails. + { + rules: []rule{ + { + weight: 1, + run: func(state solveState) (candidate bool, score float64) { + switch state.store.StoreID { + case 1: + return true, 0 + case 3: + return true, 1 + default: + return false, 0 + } + }, + }, + }, + expected: []roachpb.StoreID{3, 1}, + }, + // Don't put a replica on the same node as another. + { + rules: []rule{{weight: 1, run: ruleReplicasUniqueNodes}}, + existing: []roachpb.ReplicaDescriptor{ + {NodeID: 1}, + {NodeID: 3}, + }, + expected: []roachpb.StoreID{2, 5}, + }, + { + rules: []rule{{weight: 1, run: ruleReplicasUniqueNodes}}, + existing: []roachpb.ReplicaDescriptor{ + {NodeID: 1}, + {NodeID: 2}, + {NodeID: 3}, + {NodeID: 5}, + }, + expected: nil, + }, + // Only put replicas on nodes with required constraints. + { + rules: []rule{{weight: 1, run: ruleConstraints}}, + c: config.Constraints{ + Constraints: []config.Constraint{ + {Value: "b", Type: config.Constraint_REQUIRED}, + }, + }, + expected: []roachpb.StoreID{2, 3}, + }, + // Required locality constraints. + { + rules: []rule{{weight: 1, run: ruleConstraints}}, + c: config.Constraints{ + Constraints: []config.Constraint{ + {Key: "datacenter", Value: "us", Type: config.Constraint_REQUIRED}, + }, + }, + expected: []roachpb.StoreID{1, 2, 3}, + }, + // Don't put a replica on a node with a prohibited constraint. + { + rules: []rule{{weight: 1, run: ruleConstraints}}, + c: config.Constraints{ + Constraints: []config.Constraint{ + {Value: "b", Type: config.Constraint_PROHIBITED}, + }, + }, + expected: []roachpb.StoreID{1, 5}, + }, + // Prohibited locality constraints. + { + rules: []rule{{weight: 1, run: ruleConstraints}}, + c: config.Constraints{ + Constraints: []config.Constraint{ + {Key: "datacenter", Value: "us", Type: config.Constraint_PROHIBITED}, + }, + }, + expected: []roachpb.StoreID{5}, + }, + // Positive constraints ordered by number of matches. + { + rules: []rule{{weight: 1, run: ruleConstraints}}, + c: config.Constraints{ + Constraints: []config.Constraint{ + {Value: "a"}, + {Value: "b"}, + {Value: "c"}, + }, + }, + expected: []roachpb.StoreID{3, 2, 1, 5}, + }, + // Positive locality constraints. + { + rules: []rule{{weight: 1, run: ruleConstraints}}, + c: config.Constraints{ + Constraints: []config.Constraint{ + {Key: "datacenter", Value: "eur"}, + }, + }, + expected: []roachpb.StoreID{5, 1, 2, 3}, + }, + // Diversity with no existing. + { + rules: []rule{{weight: 1, run: ruleDiversity}}, + existing: nil, + expected: []roachpb.StoreID{1, 2, 3, 5}, + }, + // Diversity with one existing. + { + rules: []rule{{weight: 1, run: ruleDiversity}}, + existing: []roachpb.ReplicaDescriptor{ + {StoreID: 1}, + }, + expected: []roachpb.StoreID{5, 3, 1, 2}, + }, + // Prioritize lower capacity nodes, and don't overfill. + { + rules: []rule{{weight: 1, run: ruleCapacity}}, + expected: []roachpb.StoreID{2, 5, 3}, + }, + } + + for i, tc := range testCases { + solver := makeRuleSolver(storePool, tc.rules) + candidates, err := solver.Solve(tc.c, tc.existing) + if err != nil { + t.Fatal(err) + } + sort.Sort(byScoreAndID(candidates)) + if len(candidates) != len(tc.expected) { + t.Errorf("%d: length of %+v should match %+v", i, candidates, tc.expected) + continue + } + for j, expected := range tc.expected { + if out := candidates[j].store.StoreID; out != expected { + t.Errorf("%d: candidates[%d].store.StoreID = %d; not %d; %+v", i, j, out, expected, candidates) + } + } + } +} + +func TestCanonicalTierOrder(t *testing.T) { + defer leaktest.AfterTest(t)() + + testCases := []struct { + stores [][]roachpb.Tier + want []roachpb.Tier + }{ + { + nil, + []roachpb.Tier{}, + }, + { + [][]roachpb.Tier{nil, nil}, + []roachpb.Tier{}, + }, + { + [][]roachpb.Tier{ + { + {Key: "a"}, + {Key: "b"}, + {Key: "c"}, + }, + }, + []roachpb.Tier{ + {Key: "a"}, + {Key: "b"}, + {Key: "c"}, + }, + }, + { + [][]roachpb.Tier{ + {{Key: "a"}, + {Key: "b"}, + {Key: "c"}, + }, + { + {Key: "a"}, + {Key: "b"}, + {Key: "c"}, + }, + { + {Key: "b"}, + {Key: "c"}, + {Key: "a"}, + {Key: "d"}, + }, + }, + []roachpb.Tier{ + {Key: "a"}, + {Key: "b"}, + {Key: "c"}, + }, + }, + { + [][]roachpb.Tier{ + { + {Key: "a"}, + {Key: "b"}, + {Key: "c"}, + }, + { + {Key: "e"}, + {Key: "f"}, + {Key: "g"}, + }, + }, + []roachpb.Tier{ + {Key: "a"}, + {Key: "b"}, + {Key: "c"}, + }, + }, + } + + for i, tc := range testCases { + sl := StoreList{} + for _, tiers := range tc.stores { + sl.stores = append(sl.stores, roachpb.StoreDescriptor{ + Locality: roachpb.Locality{Tiers: tiers}, + }) + } + + if out := canonicalTierOrder(sl); !reflect.DeepEqual(out, tc.want) { + t.Errorf("%d: canonicalTierOrder(%+v) = %+v; not %+v", i, tc.stores, out, tc.want) + } + } +}