Skip to content

Commit

Permalink
storage: replace the rule solver
Browse files Browse the repository at this point in the history
Remove the concept of rules and replace it with clean direct functions.
  • Loading branch information
BramGruneir committed Dec 12, 2016
1 parent 095b5ec commit b51aaef
Show file tree
Hide file tree
Showing 4 changed files with 757 additions and 626 deletions.
125 changes: 43 additions & 82 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,38 +212,37 @@ func (a *Allocator) AllocateTarget(
rangeID roachpb.RangeID,
relaxConstraints bool,
) (*roachpb.StoreDescriptor, error) {
sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeID)
if a.options.UseRuleSolver {
sl, _, throttledStoreCount := a.storePool.getStoreList(rangeID)
// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
}

candidates := allocateRuleSolver.Solve(
candidates := allocateCandidates(
sl,
constraints,
existing,
a.storePool.getNodeLocalities(existing),
a.storePool.deterministic,
)
candidates = candidates.onlyValid()
if len(candidates) == 0 {
return nil, &allocatorError{
required: constraints.Constraints,
}
if log.V(3) {
log.Infof(context.TODO(), "allocate candidates: %s", candidates)
}
if target := candidates.selectGood(a.randGen); target != nil {
return target, nil
}

// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
}
return nil, &allocatorError{
required: constraints.Constraints,
}
chosenCandidate := candidates.selectGood(a.randGen).store
return &chosenCandidate, nil
}

existingNodes := make(nodeIDSet, len(existing))
for _, repl := range existing {
existingNodes[repl.NodeID] = struct{}{}
}

sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeID)

// Because more redundancy is better than less, if relaxConstraints, the
// matching here is lenient, and tries to find a target by relaxing an
// attribute constraint, from last attribute to first.
Expand Down Expand Up @@ -273,11 +272,6 @@ func (a *Allocator) AllocateTarget(
// that have greater than the average number of replicas. Failing that, it
// falls back to selecting a random target from any of the existing
// replicas. It also will exclude any replica that lives on leaseStoreID.
//
// TODO(mrtracy): removeTarget eventually needs to accept the attributes from
// the zone config associated with the provided replicas. This will allow it to
// make correct decisions in the case of ranges with heterogeneous replica
// requirements (i.e. multiple data centers).
func (a Allocator) RemoveTarget(
constraints config.Constraints,
existing []roachpb.ReplicaDescriptor,
Expand All @@ -298,29 +292,26 @@ func (a Allocator) RemoveTarget(
}
}
sl := makeStoreList(descriptors)
var badStoreID roachpb.StoreID
var bad *roachpb.StoreDescriptor

if a.options.UseRuleSolver {
candidates := removeRuleSolver.Solve(
candidates := removeCandidates(
sl,
constraints,
existing,
a.storePool.getNodeLocalities(existing),
a.storePool.deterministic,
)
if len(candidates) != 0 {
badStoreID = candidates.selectBad(a.randGen).store.StoreID
if log.V(3) {
log.Infof(context.TODO(), "remove candidates: %s", candidates)
}
bad = candidates.selectBad(a.randGen)
} else {
bad := a.selectBad(sl)
if bad != nil {
badStoreID = bad.StoreID
}
bad = a.selectBad(sl)
}

if badStoreID != 0 {
if bad != nil {
for _, exist := range existing {
if exist.StoreID == badStoreID {
if exist.StoreID == bad.StoreID {
return exist, nil
}
}
Expand Down Expand Up @@ -358,13 +349,13 @@ func (a Allocator) RebalanceTarget(
if !a.options.AllowRebalance {
return nil, nil
}
sl, _, _ := a.storePool.getStoreList(rangeID)

if a.options.UseRuleSolver {
sl, _, _ := a.storePool.getStoreList(rangeID)
if log.V(3) {
log.Infof(context.TODO(), "rebalance-target (lease-holder=%d):\n%s", leaseStoreID, sl)
}

// TODO(bram): ShouldRebalance should be part of rebalanceCandidates
// and decision made afterward, not it's own function. It is
// performing the same operations as rebalanceCandidates and any
// missing functionality can be added.
var shouldRebalance bool
for _, repl := range existing {
if leaseStoreID == repl.StoreID {
Expand All @@ -380,59 +371,29 @@ func (a Allocator) RebalanceTarget(
return nil, nil
}

// Load the exiting storesIDs into a map so to eliminate having to loop
// through the existing descriptors more than once.
existingStoreIDs := make(map[roachpb.StoreID]struct{})
for _, repl := range existing {
existingStoreIDs[repl.StoreID] = struct{}{}
}

// Split the store list into existing and candidate stores lists so that
// we can call solve independently on both store lists.
var existingDescs []roachpb.StoreDescriptor
var candidateDescs []roachpb.StoreDescriptor
for _, desc := range sl.stores {
if _, ok := existingStoreIDs[desc.StoreID]; ok {
existingDescs = append(existingDescs, desc)
} else {
candidateDescs = append(candidateDescs, desc)
}
}

existingStoreList := makeStoreList(existingDescs)
candidateStoreList := makeStoreList(candidateDescs)

localities := a.storePool.getNodeLocalities(existing)
existingCandidates := rebalanceExisting.Solve(
existingStoreList,
constraints,
existing,
localities,
a.storePool.deterministic,
)
candidates := rebalance.Solve(
candidateStoreList,
existingCandidates, candidates := rebalanceCandidates(
sl,
constraints,
existing,
localities,
a.storePool.getNodeLocalities(existing),
a.storePool.deterministic,
)

/*
fmt.Printf("existing: %s\n", existingCandidates)
fmt.Printf("candidates: %s\n", candidates)
*/
if len(existingCandidates) == 0 {
return nil, errors.Errorf(
"all existing replicas' stores are not present in the store pool: %v\n%s", existing, sl)
}

if log.V(3) {
log.Infof(context.TODO(), "existing replicas: %s", existingCandidates)
log.Infof(context.TODO(), "candidates: %s", candidates)
}

// Find all candidates that are better than the worst existing.
// Find all candidates that are better than the worst existing replica.
targets := candidates.betterThan(existingCandidates[len(existingCandidates)-1])
if len(targets) != 0 {
target := targets.selectGood(a.randGen).store
return &target, nil
}
return nil, nil
return targets.selectGood(a.randGen), nil
}

sl, _, _ := a.storePool.getStoreList(rangeID)
sl = sl.filter(constraints)
if log.V(3) {
log.Infof(context.TODO(), "rebalance-target (lease-holder=%d):\n%s", leaseStoreID, sl)
Expand Down
Loading

0 comments on commit b51aaef

Please sign in to comment.