Skip to content

Commit

Permalink
storage: Avoid adding all replicas at once in RelocateRange
Browse files Browse the repository at this point in the history
Does a more gradual process of adding one replica then removing one
until all the desired changes have been made, using the existing
allocator code to do so in a reasonable order.

Fixes #29130

Release note: None
  • Loading branch information
a-robinson committed Sep 6, 2018
1 parent 922422c commit 2e09efa
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 10 deletions.
10 changes: 3 additions & 7 deletions pkg/storage/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,9 @@ func (mq *mergeQueue) process(
break
}
}
// TODO(benesch): RelocateRange needs to be made more robust. It cannot
// currently handle certain edge cases, like multiple stores on one node. It
// also adds all new replicas before removing any old replicas, rather than
// performing interleaved adds/removes, resulting in a moment where the
// number of replicas is potentially double the configured replication
// factor.
if err := RelocateRange(ctx, mq.db, rhsDesc, targets); err != nil {
// TODO(benesch): RelocateRange can sometimes fail if it needs to move a replica
// from one store to another store on the same node.
if err := mq.store.RelocateRangeThatDoesntSuck(ctx, rhsDesc, targets); err != nil {
return err
}
}
Expand Down
222 changes: 222 additions & 0 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,212 @@ func updateRangeDescriptor(
return nil
}

// RelocateRangeThatDoesntSuck relocates a given range to a given set of
// stores. The first store in the slice becomes the new leaseholder.
//
// This is best-effort; it's possible that the replicate queue on the
// leaseholder could take action at the same time, causing errors.
//
// TODO(a-robinson): Reconcile this with the RelocateRange used by the sql
// package. Unfortunately, doing this well requires an Allocator and StorePool,
// which in practice requires a Store, which hasn't historically been
// accessible from sql-land AFAIK.
func (s *Store) RelocateRangeThatDoesntSuck(
ctx context.Context, rangeDesc roachpb.RangeDescriptor, targets []roachpb.ReplicationTarget,
) error {
// Step 1: Compute which replicas are to be added and which are to be removed.
//
// TODO(radu): we can't have multiple replicas on different stores on the
// same node, and this code doesn't do anything to specifically avoid that
// case (although the allocator will avoid even trying to send snapshots to
// such stores), so it could cause some failures.

var addTargets []roachpb.ReplicaDescriptor
for _, t := range targets {
found := false
for _, replicaDesc := range rangeDesc.Replicas {
if replicaDesc.StoreID == t.StoreID && replicaDesc.NodeID == t.NodeID {
found = true
break
}
}
if !found {
addTargets = append(addTargets, roachpb.ReplicaDescriptor{
NodeID: t.NodeID,
StoreID: t.StoreID,
})
}
}

var removeTargets []roachpb.ReplicaDescriptor
for _, replicaDesc := range rangeDesc.Replicas {
found := false
for _, t := range targets {
if replicaDesc.StoreID == t.StoreID && replicaDesc.NodeID == t.NodeID {
found = true
break
}
}
if !found {
removeTargets = append(addTargets, roachpb.ReplicaDescriptor{
NodeID: replicaDesc.NodeID,
StoreID: replicaDesc.StoreID,
})
}
}

canRetry := func(err error) bool {
whitelist := []string{
snapshotApplySemBusyMsg,
IntersectingSnapshotMsg,
}
for _, substr := range whitelist {
if strings.Contains(err.Error(), substr) {
return true
}
}
return false
}

transferLease := func() {
if err := s.DB().AdminTransferLease(
ctx, rangeDesc.StartKey.AsRawKey(), targets[0].StoreID,
); err != nil {
log.Warningf(ctx, "while transferring lease: %s", err)
}
}

sysCfg, cfgOk := s.cfg.Gossip.GetSystemConfig()
if !cfgOk {
return fmt.Errorf("no system config available, unable to perform RelocateRange")
}
zone, err := sysCfg.GetZoneConfigForKey(rangeDesc.StartKey)
if err != nil {
return err
}

storeList, _, _ := s.allocator.storePool.getStoreList(rangeDesc.RangeID, storeFilterNone)
storeMap := storeListToMap(storeList)

// Deep-copy the Replicas slice (in our shallow copy of the RangeDescriptor)
// since we'll mutate it in the loop below.
desc := rangeDesc
desc.Replicas = append([]roachpb.ReplicaDescriptor(nil), desc.Replicas...)
rangeInfo := RangeInfo{Desc: &rangeDesc}

// Step 2: Repeatedly add a replica then remove a replica until we reach the
// desired state.
every := log.Every(time.Minute)
re := retry.StartWithCtx(ctx, retry.Options{MaxBackoff: 5 * time.Second})
for len(addTargets) > 0 || len(removeTargets) > 0 {
if err := ctx.Err(); err != nil {
return err
}

if len(addTargets) > 0 && len(addTargets) >= len(removeTargets) {
// Each iteration, pick the most desirable replica to add. However,
// prefer the first target if it doesn't yet have a replica so that we
// can always transfer the lease to it before removing a replica below.
// This makes it easier to avoid removing a replica that's still
// leaseholder without needing to bounce the lease around a bunch.
candidates := addTargets
if storeHasReplica(targets[0].StoreID, candidates) {
candidates = []roachpb.ReplicaDescriptor{
{NodeID: targets[0].NodeID, StoreID: targets[0].StoreID},
}
}

// The storeList's list of stores is used to constrain which stores the
// allocator considers putting a new replica on. We want it to only
// consider the stores in candidates.
storeList.stores = storeList.stores[:0]
for _, candidate := range candidates {
store, ok := storeMap[candidate.StoreID]
if !ok {
return fmt.Errorf("cannot up-repliate to s%d; missing gossiped StoreDescriptor", candidate.StoreID)
}
storeList.stores = append(storeList.stores, *store)
}

targetStore, _ := s.allocator.allocateTargetFromList(
ctx,
storeList,
zone,
rangeInfo.Desc.Replicas,
rangeInfo,
s.allocator.scorerOptions())
if targetStore == nil {
return fmt.Errorf("none of the remaining targets %v are legal additions to %v",
addTargets, rangeInfo.Desc.Replicas)
}

target := roachpb.ReplicationTarget{
NodeID: targetStore.Node.NodeID,
StoreID: targetStore.StoreID,
}
if err := s.DB().AdminChangeReplicas(
ctx, rangeDesc.StartKey.AsRawKey(), roachpb.ADD_REPLICA, []roachpb.ReplicationTarget{target},
); err != nil {
returnErr := errors.Wrapf(err, "while adding target %v", target)
if !canRetry(err) {
return returnErr
}
if every.ShouldLog() {
log.Warning(ctx, returnErr)
}
re.Next()
continue
}

// Upon success, remove the target from our to-do list and add it to our
// local copy of the range descriptor such that future allocator
// decisions take it into account.
addTargets = removeTargetFromSlice(addTargets, target)
rangeInfo.Desc.Replicas = append(rangeInfo.Desc.Replicas, roachpb.ReplicaDescriptor{
NodeID: target.NodeID,
StoreID: target.StoreID,
})
}

if len(removeTargets) > 0 && len(removeTargets) > len(addTargets) {
targetStore, _, err := s.allocator.RemoveTarget(ctx, zone, removeTargets, rangeInfo)
if err != nil {
return errors.Wrapf(err, "unable to select removal target from %v; current replicas %v",
removeTargets, rangeInfo.Desc.Replicas)
}
target := roachpb.ReplicationTarget{
NodeID: targetStore.NodeID,
StoreID: targetStore.StoreID,
}
// Note that attempting to remove the leaseholder won't work, so transfer
// the lease first in such scenarios. The first specified target should be
// the leaseholder now, so we can always transfer the lease there.
transferLease()
if err := s.DB().AdminChangeReplicas(
ctx, rangeDesc.StartKey.AsRawKey(), roachpb.REMOVE_REPLICA, []roachpb.ReplicationTarget{target},
); err != nil {
log.Warningf(ctx, "while removing target %v: %s", target, err)
if !canRetry(err) {
return err
}
re.Next()
continue
}

// Upon success, remove the target from our to-do list and from our local
// copy of the range descriptor such that future allocator decisions take
// its absence into account.
removeTargets = removeTargetFromSlice(removeTargets, target)
rangeInfo.Desc.Replicas = removeTargetFromSlice(rangeInfo.Desc.Replicas, target)
}
}

// Step 3: Transfer the lease to the first listed target, as the API specifies.
transferLease()

return ctx.Err()
}

// RelocateRange relocates a given range to a given set of stores. The first
// store in the slice becomes the new leaseholder.
//
Expand Down Expand Up @@ -1135,6 +1341,22 @@ func RelocateRange(
return ctx.Err()
}

// Modifies the underlying storage of the slice rather than copying.
// Don't use on a shared slice where the order matters.
func removeTargetFromSlice(
targets []roachpb.ReplicaDescriptor, target roachpb.ReplicationTarget,
) []roachpb.ReplicaDescriptor {
for i, t := range targets {
if t.NodeID == target.NodeID && t.StoreID == target.StoreID {
// Swap the removed target with the last element in the slice and return
// a slice that's 1 element shorter than before.
targets[i], targets[len(targets)-1] = targets[len(targets)-1], targets[i]
return targets[:len(targets)-1]
}
}
return targets
}

// adminScatter moves replicas and leaseholders for a selection of ranges.
func (r *Replica) adminScatter(
ctx context.Context, args roachpb.AdminScatterRequest,
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ func (sr *StoreRebalancer) rebalanceStore(
replCtx, cancel := context.WithTimeout(replWithStats.repl.AnnotateCtx(ctx), sr.rq.processTimeout)
// TODO(a-robinson): Either make RelocateRange production-ready or do the
// rebalancing another way.
if err := RelocateRange(replCtx, sr.rq.store.DB(), *descBeforeRebalance, targets); err != nil {
if err := sr.rq.store.RelocateRangeThatDoesntSuck(
replCtx, *descBeforeRebalance, targets,
); err != nil {
cancel()
log.Errorf(replCtx, "unable to relocate range to %v: %v", targets, err)
continue
Expand Down Expand Up @@ -596,8 +598,7 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance(
}

// Pick the replica with the least QPS to be leaseholder;
// TestingRelocateRange transfers the lease to the first provided
// target.
// RelocateRange transfers the lease to the first provided target.
newLeaseIdx := 0
newLeaseQPS := math.MaxFloat64
for i := 0; i < len(targets); i++ {
Expand Down

0 comments on commit 2e09efa

Please sign in to comment.