Skip to content

Commit

Permalink
Merge pull request #12165 from BramGruneir/removerules
Browse files Browse the repository at this point in the history
storage: replace the rule solver
  • Loading branch information
BramGruneir authored Dec 13, 2016
2 parents 40eafec + bbe17fa commit eeb5276
Show file tree
Hide file tree
Showing 4 changed files with 1,040 additions and 458 deletions.
179 changes: 57 additions & 122 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"math"
"math/rand"

"github.com/coreos/etcd/raft"
"github.com/pkg/errors"
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/config"
Expand All @@ -31,8 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/coreos/etcd/raft"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -212,40 +212,37 @@ func (a *Allocator) AllocateTarget(
rangeID roachpb.RangeID,
relaxConstraints bool,
) (*roachpb.StoreDescriptor, error) {
sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeID)
if a.options.UseRuleSolver {
sl, _, throttledStoreCount := a.storePool.getStoreList(rangeID)
// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
}

candidates, err := allocateRuleSolver.Solve(
candidates := allocateCandidates(
sl,
constraints,
existing,
a.storePool.getNodeLocalities(existing),
a.storePool.deterministic,
)
if err != nil {
return nil, err
if log.V(3) {
log.Infof(context.TODO(), "allocate candidates: %s", candidates)
}
if target := candidates.selectGood(a.randGen); target != nil {
return target, nil
}

if len(candidates) == 0 {
return nil, &allocatorError{
required: constraints.Constraints,
}
// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
}
return nil, &allocatorError{
required: constraints.Constraints,
}
// TODO(bram): #10275 Need some randomness here!
return &candidates[0].store, nil
}

existingNodes := make(nodeIDSet, len(existing))
for _, repl := range existing {
existingNodes[repl.NodeID] = struct{}{}
}

sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeID)

// Because more redundancy is better than less, if relaxConstraints, the
// matching here is lenient, and tries to find a target by relaxing an
// attribute constraint, from last attribute to first.
Expand Down Expand Up @@ -275,11 +272,6 @@ func (a *Allocator) AllocateTarget(
// that have greater than the average number of replicas. Failing that, it
// falls back to selecting a random target from any of the existing
// replicas. It also will exclude any replica that lives on leaseStoreID.
//
// TODO(mrtracy): removeTarget eventually needs to accept the attributes from
// the zone config associated with the provided replicas. This will allow it to
// make correct decisions in the case of ranges with heterogeneous replica
// requirements (i.e. multiple data centers).
func (a Allocator) RemoveTarget(
constraints config.Constraints,
existing []roachpb.ReplicaDescriptor,
Expand All @@ -289,52 +281,6 @@ func (a Allocator) RemoveTarget(
return roachpb.ReplicaDescriptor{}, errors.Errorf("must supply at least one replica to allocator.RemoveTarget()")
}

if a.options.UseRuleSolver {
// TODO(bram): #10275 Is this getStoreList call required? Compute candidate
// requires a store list, but we should be able to create one using only
// the stores that belong to the range.
// Use an invalid range ID as we don't care about a corrupt replicas since
// as we are removing a replica and not trying to add one.
sl, _, _ := a.storePool.getStoreList(roachpb.RangeID(0))

var worst roachpb.ReplicaDescriptor
worstScore := math.Inf(0)
for _, exist := range existing {
if exist.StoreID == leaseStoreID {
continue
}
desc, ok := a.storePool.getStoreDescriptor(exist.StoreID)
if !ok {
continue
}

candidate, valid := removeRuleSolver.computeCandidate(solveState{
constraints: constraints,
store: desc,
sl: sl,
existing: nil,
existingNodeLocalities: a.storePool.getNodeLocalities(existing),
})
// When a candidate is not valid, it means that it can be
// considered the worst existing replica.
if !valid {
return exist, nil
}

if candidate.score < worstScore {
worstScore = candidate.score
worst = exist
}

}

if !math.IsInf(worstScore, 0) {
return worst, nil
}

return roachpb.ReplicaDescriptor{}, errors.New("could not select an appropriate replica to be removed")
}

// Retrieve store descriptors for the provided replicas from the StorePool.
descriptors := make([]roachpb.StoreDescriptor, 0, len(existing))
for _, exist := range existing {
Expand All @@ -345,15 +291,32 @@ func (a Allocator) RemoveTarget(
descriptors = append(descriptors, desc)
}
}

sl := makeStoreList(descriptors)
if bad := a.selectBad(sl); bad != nil {
var bad *roachpb.StoreDescriptor

if a.options.UseRuleSolver {
candidates := removeCandidates(
sl,
constraints,
a.storePool.getNodeLocalities(existing),
a.storePool.deterministic,
)
if log.V(3) {
log.Infof(context.TODO(), "remove candidates: %s", candidates)
}
bad = candidates.selectBad(a.randGen)
} else {
bad = a.selectBad(sl)
}

if bad != nil {
for _, exist := range existing {
if exist.StoreID == bad.StoreID {
return exist, nil
}
}
}

return roachpb.ReplicaDescriptor{}, errors.New("could not select an appropriate replica to be removed")
}

Expand Down Expand Up @@ -386,13 +349,13 @@ func (a Allocator) RebalanceTarget(
if !a.options.AllowRebalance {
return nil, nil
}
sl, _, _ := a.storePool.getStoreList(rangeID)

if a.options.UseRuleSolver {
sl, _, _ := a.storePool.getStoreList(rangeID)
if log.V(3) {
log.Infof(context.TODO(), "rebalance-target (lease-holder=%d):\n%s", leaseStoreID, sl)
}

// TODO(bram): ShouldRebalance should be part of rebalanceCandidates
// and decision made afterward, not it's own function. It is
// performing the same operations as rebalanceCandidates and any
// missing functionality can be added.
var shouldRebalance bool
for _, repl := range existing {
if leaseStoreID == repl.StoreID {
Expand All @@ -408,57 +371,29 @@ func (a Allocator) RebalanceTarget(
return nil, nil
}

// Load the exiting storesIDs into a map so to eliminate having to loop
// through the existing descriptors more than once.
existingStoreIDs := make(map[roachpb.StoreID]struct{})
for _, repl := range existing {
existingStoreIDs[repl.StoreID] = struct{}{}
}

// Split the store list into existing and candidate stores lists so that
// we can call solve independently on both store lists.
var existingDescs []roachpb.StoreDescriptor
var candidateDescs []roachpb.StoreDescriptor
for _, desc := range sl.stores {
if _, ok := existingStoreIDs[desc.StoreID]; ok {
existingDescs = append(existingDescs, desc)
} else {
candidateDescs = append(candidateDescs, desc)
}
}

existingStoreList := makeStoreList(existingDescs)
candidateStoreList := makeStoreList(candidateDescs)

existingCandidates, err := removeRuleSolver.Solve(existingStoreList, constraints, nil, nil)
if err != nil {
return nil, err
}
candidates, err := allocateRuleSolver.Solve(candidateStoreList, constraints, nil, nil)
if err != nil {
return nil, err
}
existingCandidates, candidates := rebalanceCandidates(
sl,
constraints,
existing,
a.storePool.getNodeLocalities(existing),
a.storePool.deterministic,
)

// Find all candidates that are better than the worst existing store.
var worstCandidateStore float64
// If any store from existing is not included in existingCandidates, it
// is because it no longer meets the constraints. If so, its score is
// considered to be 0.
if len(existingCandidates) == len(existing) {
worstCandidateStore = existingCandidates[len(existingCandidates)-1].score
if len(existingCandidates) == 0 {
return nil, errors.Errorf(
"all existing replicas' stores are not present in the store pool: %v\n%s", existing, sl)
}

// TODO(bram): #10275 Need some randomness here!
for _, cand := range candidates {
if cand.score > worstCandidateStore {
return &candidates[0].store, nil
}
if log.V(3) {
log.Infof(context.TODO(), "existing replicas: %s", existingCandidates)
log.Infof(context.TODO(), "candidates: %s", candidates)
}

return nil, nil
// Find all candidates that are better than the worst existing replica.
targets := candidates.betterThan(existingCandidates[len(existingCandidates)-1])
return targets.selectGood(a.randGen), nil
}

sl, _, _ := a.storePool.getStoreList(rangeID)
sl = sl.filter(constraints)
if log.V(3) {
log.Infof(context.TODO(), "rebalance-target (lease-holder=%d):\n%s", leaseStoreID, sl)
Expand Down
Loading

0 comments on commit eeb5276

Please sign in to comment.