Skip to content

Commit

Permalink
storage: add replacement ops to allocator
Browse files Browse the repository at this point in the history
Teach the allocator that there is a concept of "replacing" dead and
decommissioning replicas.
Add a stub handler for them to the replicate queue that does exactly what
AllocatorAdd does, i.e., maintain the previous behavior.

A follow-up commit will then teach the replicate queue to actually issue
replica replacement operations both for these two new options as well as
in AllocatorConsiderRebalance.

When that is established, we should be in a place where the replicate
queue never puts ranges into fragile intermediate configurations except
that it will still transition through even configurations when
upreplicating (which can be tackled separately).

Release note: None
  • Loading branch information
tbg committed Aug 28, 2019
1 parent dbf36b8 commit 8d8e8ff
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 158 deletions.
64 changes: 38 additions & 26 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ const (
AllocatorNoop
AllocatorRemove
AllocatorAdd
AllocatorReplaceDead
AllocatorRemoveDead
AllocatorReplaceDecommissioning
AllocatorRemoveDecommissioning
AllocatorRemoveLearner
AllocatorConsiderRebalance
Expand All @@ -111,7 +113,9 @@ var allocatorActionNames = map[AllocatorAction]string{
AllocatorNoop: "noop",
AllocatorRemove: "remove",
AllocatorAdd: "add",
AllocatorReplaceDead: "replace dead",
AllocatorRemoveDead: "remove dead",
AllocatorReplaceDecommissioning: "replace decommissioning",
AllocatorRemoveDecommissioning: "remove decommissioning",
AllocatorRemoveLearner: "remove learner",
AllocatorConsiderRebalance: "consider rebalance",
Expand Down Expand Up @@ -372,25 +376,18 @@ func (a *Allocator) computeAction(
// Priority is adjusted by the difference between the current replica
// count and the quorum of the desired replica count.
priority := addMissingReplicaPriority + float64(desiredQuorum-have)
log.VEventf(ctx, 3, "AllocatorAdd - missing replica need=%d, have=%d, priority=%.2f",
need, have, priority)
return AllocatorAdd, priority
}

if have == need && len(decommissioningReplicas) > 0 {
// Range has decommissioning replica(s). We should up-replicate to add
// another replica. The decommissioning replica(s) will be down-replicated
// later.
priority := addDecommissioningReplacementPriority
log.VEventf(ctx, 3, "AllocatorAdd - replacement for %d decommissioning replicas priority=%.2f",
len(decommissioningReplicas), priority)
return AllocatorAdd, priority
action := AllocatorAdd
log.VEventf(ctx, 3, "%s - missing replica need=%d, have=%d, priority=%.2f",
action, need, have, priority)
return action, priority
}

liveVoterReplicas, deadVoterReplicas := a.storePool.liveAndDeadReplicas(rangeID, voterReplicas)

if len(liveVoterReplicas) < quorum {
// Do not take any removal action if we do not have a quorum of live
// replicas.
// Do not take any replacement/removal action if we do not have a quorum of live
// replicas. If we're correctly assessing the unavailable state of the range, we
// also won't be able to add replicas as we try above, but hope springs eternal.
log.VEventf(ctx, 1, "unable to take action - live replicas %v don't meet quorum of %d",
liveVoterReplicas, quorum)
return AllocatorRangeUnavailable, 0
Expand All @@ -403,9 +400,19 @@ func (a *Allocator) computeAction(
// and lose a second node before we can up-replicate (#25392).
// The dead replica(s) will be down-replicated later.
priority := addDeadReplacementPriority
log.VEventf(ctx, 3, "AllocatorAdd - replacement for %d dead replicas priority=%.2f",
len(deadVoterReplicas), priority)
return AllocatorAdd, priority
action := AllocatorReplaceDead
log.VEventf(ctx, 3, "%s - replacement for %d dead replicas priority=%.2f",
action, len(deadVoterReplicas), priority)
return action, priority
}

if have == need && len(decommissioningReplicas) > 0 {
// Range has decommissioning replica(s), which should be replaced.
priority := addDecommissioningReplacementPriority
action := AllocatorReplaceDecommissioning
log.VEventf(ctx, 3, "%s - replacement for %d decommissioning replicas priority=%.2f",
action, len(decommissioningReplicas), priority)
return action, priority
}

// Removal actions follow.
Expand All @@ -417,28 +424,31 @@ func (a *Allocator) computeAction(
if len(deadVoterReplicas) > 0 {
// The range has dead replicas, which should be removed immediately.
priority := removeDeadReplicaPriority + float64(quorum-len(liveVoterReplicas))
log.VEventf(ctx, 3, "AllocatorRemoveDead - dead=%d, live=%d, quorum=%d, priority=%.2f",
len(deadVoterReplicas), len(liveVoterReplicas), quorum, priority)
return AllocatorRemoveDead, priority
action := AllocatorRemoveDead
log.VEventf(ctx, 3, "%s - dead=%d, live=%d, quorum=%d, priority=%.2f",
action, len(deadVoterReplicas), len(liveVoterReplicas), quorum, priority)
return action, priority
}

if len(decommissioningReplicas) > 0 {
// Range is over-replicated, and has a decommissioning replica which
// should be removed.
priority := removeDecommissioningReplicaPriority
action := AllocatorRemoveDecommissioning
log.VEventf(ctx, 3,
"AllocatorRemoveDecommissioning - need=%d, have=%d, num_decommissioning=%d, priority=%.2f",
need, have, len(decommissioningReplicas), priority)
return AllocatorRemoveDecommissioning, priority
"%s - need=%d, have=%d, num_decommissioning=%d, priority=%.2f",
action, need, have, len(decommissioningReplicas), priority)
return action, priority
}

if have > need {
// Range is over-replicated, and should remove a replica.
// Ranges with an even number of replicas get extra priority because
// they have a more fragile quorum.
priority := removeExtraReplicaPriority - float64(have%2)
log.VEventf(ctx, 3, "AllocatorRemove - need=%d, have=%d, priority=%.2f", need, have, priority)
return AllocatorRemove, priority
action := AllocatorRemove
log.VEventf(ctx, 3, "%s - need=%d, have=%d, priority=%.2f", action, need, have, priority)
return action, priority
}

// Nothing needs to be done, but we may want to rebalance.
Expand All @@ -455,6 +465,8 @@ type decisionDetails struct {
// out as targets. The range ID of the replica being allocated for is also
// passed in to ensure that we don't try to replace an existing dead replica on
// a store.
//
// TODO(tbg): AllocateReplacement?
func (a *Allocator) AllocateTarget(
ctx context.Context,
zone *config.ZoneConfig,
Expand Down
Loading

0 comments on commit 8d8e8ff

Please sign in to comment.