Skip to content

Commit

Permalink
kvserver: refactor replicate queue allocator usage logic
Browse files Browse the repository at this point in the history
This change refactors parts of the replicate queue's `PlanOneChange(..)`
and `addOrRemove{Non}Voters(..)` functions to reusable helper functions
that simplify usage of the allocator and deduplicate repeated code
paths. The change also adds convenience methods to the `AllocatorAction`
enum, to move certain determinations (such as if a computed allocator
action is a remove or a replace) closer to the allocator type it is
based on. These changes move more of the logic needed to use the
allocator into the `allocatorimpl` package itself, enabling usage of the
allocator outside of the replicate queue.

Part of cockroachdb#91571.

Release note: None
  • Loading branch information
AlexTalks committed Dec 22, 2022
1 parent 15bc0c4 commit c62636b
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 115 deletions.
205 changes: 202 additions & 3 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,67 @@ const (
AllocatorFinalizeAtomicReplicationChange
)

func (a AllocatorAction) Add() bool {
return a == AllocatorAddVoter || a == AllocatorAddNonVoter
}

func (a AllocatorAction) Replace() bool {
return a == AllocatorReplaceDeadVoter ||
a == AllocatorReplaceDeadNonVoter ||
a == AllocatorReplaceDecommissioningVoter ||
a == AllocatorReplaceDecommissioningNonVoter
}

func (a AllocatorAction) Remove() bool {
return a == AllocatorRemoveVoter ||
a == AllocatorRemoveNonVoter ||
a == AllocatorRemoveDeadVoter ||
a == AllocatorRemoveDeadNonVoter ||
a == AllocatorRemoveDecommissioningVoter ||
a == AllocatorRemoveDecommissioningNonVoter
}

func (a AllocatorAction) TargetReplicaType() TargetReplicaType {
var t TargetReplicaType
if a == AllocatorRemoveVoter ||
a == AllocatorAddVoter ||
a == AllocatorReplaceDeadVoter ||
a == AllocatorRemoveDeadVoter ||
a == AllocatorReplaceDecommissioningVoter ||
a == AllocatorRemoveDecommissioningVoter {
t = VoterTarget
} else if a == AllocatorRemoveNonVoter ||
a == AllocatorAddNonVoter ||
a == AllocatorReplaceDeadNonVoter ||
a == AllocatorRemoveDeadNonVoter ||
a == AllocatorReplaceDecommissioningNonVoter ||
a == AllocatorRemoveDecommissioningNonVoter {
t = NonVoterTarget
}
return t
}

func (a AllocatorAction) ReplicaStatus() ReplicaStatus {
var s ReplicaStatus
if a == AllocatorRemoveVoter ||
a == AllocatorRemoveNonVoter ||
a == AllocatorAddVoter ||
a == AllocatorAddNonVoter {
s = Alive
} else if a == AllocatorReplaceDeadVoter ||
a == AllocatorReplaceDeadNonVoter ||
a == AllocatorRemoveDeadVoter ||
a == AllocatorRemoveDeadNonVoter {
s = Dead
} else if a == AllocatorReplaceDecommissioningVoter ||
a == AllocatorReplaceDecommissioningNonVoter ||
a == AllocatorRemoveDecommissioningVoter ||
a == AllocatorRemoveDecommissioningNonVoter {
s = Decommissioning
}
return s
}

var allocatorActionNames = map[AllocatorAction]string{
AllocatorNoop: "noop",
AllocatorRemoveVoter: "remove voter",
Expand Down Expand Up @@ -268,6 +329,19 @@ func (t TargetReplicaType) String() string {
}
}

func (s ReplicaStatus) String() string {
switch s {
case Alive:
return "live"
case Dead:
return "dead"
case Decommissioning:
return "decommissioning"
default:
panic(fmt.Sprintf("unknown replicaStatus %d", s))
}
}

type transferDecision int

const (
Expand Down Expand Up @@ -595,6 +669,131 @@ func GetNeededNonVoters(numVoters, zoneConfigNonVoterCount, clusterNodes int) in
return need
}

// LiveAndDeadVoterAndNonVoterReplicas splits up the replica in the given range
// descriptor by voters vs non-voters and live replicas vs dead replicas.
func LiveAndDeadVoterAndNonVoterReplicas(
storePool storepool.AllocatorStorePool, desc *roachpb.RangeDescriptor,
) (
voterReplicas, nonVoterReplicas, liveVoterReplicas, deadVoterReplicas, liveNonVoterReplicas, deadNonVoterReplicas []roachpb.ReplicaDescriptor,
) {
voterReplicas = desc.Replicas().VoterDescriptors()
nonVoterReplicas = desc.Replicas().NonVoterDescriptors()
liveVoterReplicas, deadVoterReplicas = storePool.LiveAndDeadReplicas(
voterReplicas, true, /* includeSuspectAndDrainingStores */
)
liveNonVoterReplicas, deadNonVoterReplicas = storePool.LiveAndDeadReplicas(
nonVoterReplicas, true, /* includeSuspectAndDrainingStores */
)
return
}

// DetermineReplicaToReplaceAndFilter is used on add or replace allocator actions
// to filter the set of live voter and non-voter replicas to use in determining
// a new allocation target. It identifies a dead or decommissioning replica to
// replace from the list of voters or non-voters, depending on the replica
// status and target type, and returns the filtered live voters and non-voters
// along with the list of existing replicas and the index of the removal candidate.
// In case of an add action, no replicas are removed and a removeIdx of -1 is
// returned, and if no candidates for replacement can be found during a replace
// action, the returned nothingToDo flag will be set to true.
func DetermineReplicaToReplaceAndFilter(
storePool storepool.AllocatorStorePool,
action AllocatorAction,
voters, nonVoters []roachpb.ReplicaDescriptor,
liveVoterReplicas, deadVoterReplicas []roachpb.ReplicaDescriptor,
liveNonVoterReplicas, deadNonVoterReplicas []roachpb.ReplicaDescriptor,
) (
existing, remainingLiveVoters, remainingLiveNonVoters []roachpb.ReplicaDescriptor,
removeIdx int,
nothingToDo bool,
err error,
) {
removeIdx = -1
remainingLiveVoters = liveVoterReplicas
remainingLiveNonVoters = liveNonVoterReplicas
var deadReplicas, removalCandidates []roachpb.ReplicaDescriptor

if !(action.Add() || action.Replace()) {
err = errors.AssertionFailedf(
"unexpected attempt to filter replicas on non-add/non-replacement action %s",
action,
)
return
}

replicaType := action.TargetReplicaType()
replicaStatus := action.ReplicaStatus()

switch replicaType {
case VoterTarget:
existing = voters
deadReplicas = deadVoterReplicas
case NonVoterTarget:
existing = nonVoters
deadReplicas = deadNonVoterReplicas
default:
panic(fmt.Sprintf("unknown targetReplicaType: %s", replicaType))
}
switch replicaStatus {
case Alive:
// NB: Live replicas are not candidates for replacement.
return
case Dead:
removalCandidates = deadReplicas
case Decommissioning:
removalCandidates = storePool.DecommissioningReplicas(existing)
default:
panic(fmt.Sprintf("unknown replicaStatus: %s", replicaStatus))
}
if len(removalCandidates) == 0 {
nothingToDo = true
return
}

removeIdx = getRemoveIdx(existing, removalCandidates[0])
if removeIdx < 0 {
err = errors.AssertionFailedf(
"%s %s %v unexpectedly not found in %v",
replicaStatus, replicaType, removalCandidates[0], existing,
)
return
}

// TODO(sarkesian): Add comment on why this filtering only happens for voters.
if replicaType == VoterTarget {
if len(existing) == 1 {
// If only one replica remains, that replica is the leaseholder and
// we won't be able to swap it out. Ignore the removal and simply add
// a replica.
removeIdx = -1
}

if removeIdx >= 0 {
replToRemove := existing[removeIdx]
for i, r := range liveVoterReplicas {
if r.ReplicaID == replToRemove.ReplicaID {
remainingLiveVoters = append(liveVoterReplicas[:i:i], liveVoterReplicas[i+1:]...)
break
}
}
}
}
return
}

func getRemoveIdx(
repls []roachpb.ReplicaDescriptor, deadOrDecommissioningRepl roachpb.ReplicaDescriptor,
) (removeIdx int) {
removeIdx = -1
for i, rDesc := range repls {
if rDesc.StoreID == deadOrDecommissioningRepl.StoreID {
removeIdx = i
break
}
}
return removeIdx
}

// ComputeAction determines the exact operation needed to repair the
// supplied range, as governed by the supplied zone configuration. It
// returns the required action that should be taken and a priority.
Expand Down Expand Up @@ -925,7 +1124,7 @@ func (s *GoodCandidateSelector) selectOne(cl candidateList) *candidate {
return cl.selectGood(s.randGen)
}

func (a *Allocator) allocateTarget(
func (a *Allocator) AllocateTarget(
ctx context.Context,
storePool storepool.AllocatorStorePool,
conf roachpb.SpanConfig,
Expand Down Expand Up @@ -996,7 +1195,7 @@ func (a *Allocator) AllocateVoter(
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget)
}

// AllocateNonVoter returns a suitable store for a new allocation of a
Expand All @@ -1009,7 +1208,7 @@ func (a *Allocator) AllocateNonVoter(
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
replicaStatus ReplicaStatus,
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget)
}

// AllocateTargetFromList returns a suitable store for a new allocation of a
Expand Down
Loading

0 comments on commit c62636b

Please sign in to comment.