Skip to content

Commit

Permalink
allocator: refactor allocator to accept StorePool arguments
Browse files Browse the repository at this point in the history
This change adds methods to evaluate allocator actions and targets
utilizing a passed-in `StorePool` object, allowing for the allocator to
consider potential scenarios rather than those simply based on the
current liveness.

Depends on cockroachdb#91461, cockroachdb#91965.

Part of cockroachdb#91570.

Release note: None
  • Loading branch information
AlexTalks committed Dec 21, 2022
1 parent 61f7c81 commit ef6db0c
Show file tree
Hide file tree
Showing 9 changed files with 695 additions and 98 deletions.
179 changes: 106 additions & 73 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go

Large diffs are not rendered by default.

526 changes: 513 additions & 13 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/override_store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ func (o *OverrideStorePool) GetStoreListFromIDs(
return o.sp.getStoreListFromIDsLocked(storeIDs, o.overrideNodeLivenessFn, filter)
}

// GetStoreListForTargets implements the AllocatorStorePool interface.
func (o *OverrideStorePool) GetStoreListForTargets(
candidates []roachpb.ReplicationTarget, filter StoreFilter,
) (StoreList, int, ThrottledStoreReasons) {
o.sp.DetailsMu.Lock()
defer o.sp.DetailsMu.Unlock()

storeIDs := make(roachpb.StoreIDSlice, 0, len(candidates))
for _, tgt := range candidates {
storeIDs = append(storeIDs, tgt.StoreID)
}

return o.sp.getStoreListFromIDsLocked(storeIDs, o.overrideNodeLivenessFn, filter)
}

// LiveAndDeadReplicas implements the AllocatorStorePool interface.
func (o *OverrideStorePool) LiveAndDeadReplicas(
repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool,
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,14 @@ type AllocatorStorePool interface {
filter StoreFilter,
) (StoreList, int, ThrottledStoreReasons)

// GetStoreListForTargets is the same as GetStoreList, but only returns stores
// from the subset of stores present in the passed in replication targets,
// converting to a StoreList.
GetStoreListForTargets(
candidates []roachpb.ReplicationTarget,
filter StoreFilter,
) (StoreList, int, ThrottledStoreReasons)

// GossipNodeIDAddress looks up the RPC address for the given node via gossip.
GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error)

Expand Down Expand Up @@ -1072,6 +1080,24 @@ func (sp *StorePool) GetStoreListFromIDs(
return sp.getStoreListFromIDsLocked(storeIDs, sp.NodeLivenessFn, filter)
}

// GetStoreListForTargets is the same as GetStoreList, but only returns stores
// from the subset of stores present in the passed in replication targets,
// converting to a StoreList.
func (sp *StorePool) GetStoreListForTargets(
candidates []roachpb.ReplicationTarget,
filter StoreFilter,
) (StoreList, int, ThrottledStoreReasons) {
sp.DetailsMu.Lock()
defer sp.DetailsMu.Unlock()

storeIDs := make(roachpb.StoreIDSlice, 0, len(candidates))
for _, tgt := range candidates {
storeIDs = append(storeIDs, tgt.StoreID)
}

return sp.getStoreListFromIDsLocked(storeIDs, sp.NodeLivenessFn, filter)
}

// getStoreListFromIDsRLocked is the same function as GetStoreList but requires
// that the detailsMU read lock is held.
func (sp *StorePool) getStoreListFromIDsLocked(
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/allocator_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) {
for i := 0; i < 10; i++ {
result, _, details, ok := a.RebalanceVoter(
ctx,
a.StorePool,
roachpb.SpanConfig{},
status,
replicas,
Expand All @@ -205,6 +206,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) {
for i := 0; i < 10; i++ {
target, _, details, ok := a.RebalanceVoter(
ctx,
a.StorePool,
roachpb.SpanConfig{},
status,
replicas,
Expand All @@ -224,6 +226,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) {
for i := 0; i < 10; i++ {
target, origin, details, ok := a.RebalanceVoter(
ctx,
a.StorePool,
roachpb.SpanConfig{},
status,
replicas,
Expand Down Expand Up @@ -252,14 +255,14 @@ func TestAllocatorThrottled(t *testing.T) {
defer stopper.Stop(ctx)

// First test to make sure we would send the replica to purgatory.
_, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead)
_, _, err := a.AllocateVoter(ctx, a.StorePool, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead)
if _, ok := IsPurgatoryError(err); !ok {
t.Fatalf("expected a purgatory error, got: %+v", err)
}

// Second, test the normal case in which we can allocate to the store.
gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t)
result, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead)
result, _, err := a.AllocateVoter(ctx, a.StorePool, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead)
if err != nil {
t.Fatalf("unable to perform allocation: %+v", err)
}
Expand All @@ -277,7 +280,7 @@ func TestAllocatorThrottled(t *testing.T) {
}
storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour)
storePool.DetailsMu.Unlock()
_, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead)
_, _, err = a.AllocateVoter(ctx, a.StorePool, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead)
if _, ok := IsPurgatoryError(err); ok {
t.Fatalf("expected a non purgatory error, got: %+v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/asim/queue/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (rq *replicateQueue) MaybeAdd(
return false
}

action, priority := rq.allocator.ComputeAction(ctx, rng.SpanConfig(), rng.Descriptor())
action, priority := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, rng.SpanConfig(), rng.Descriptor())
if action == allocatorimpl.AllocatorNoop {
return false
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func (rq *replicateQueue) Tick(ctx context.Context, tick time.Time, s state.Stat
return
}

action, _ := rq.allocator.ComputeAction(ctx, rng.SpanConfig(), rng.Descriptor())
action, _ := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, rng.SpanConfig(), rng.Descriptor())

switch action {
case allocatorimpl.AllocatorConsiderRebalance:
Expand All @@ -125,6 +125,7 @@ func (rq *replicateQueue) considerRebalance(
) {
add, remove, _, ok := rq.allocator.RebalanceVoter(
ctx,
rq.allocator.StorePool,
rng.SpanConfig(),
nil, /* raftStatus */
rng.Descriptor().Replicas().VoterDescriptors(),
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3339,6 +3339,7 @@ func RelocateOne(

additionTarget, _ = allocator.AllocateTargetFromList(
ctx,
allocator.StorePool,
candidateStoreList,
conf,
existingVoters,
Expand Down Expand Up @@ -3407,10 +3408,14 @@ func RelocateOne(
// (s1,s2,s3,s4) which is a reasonable request; that replica set is
// overreplicated. If we asked it instead to remove s3 from (s1,s2,s3) it
// may not want to do that due to constraints.
candidatesStoreList, _, _ := allocator.StorePool.GetStoreListForTargets(
args.targetsToRemove(), storepool.StoreFilterNone,
)
targetStore, _, err := allocator.RemoveTarget(
ctx,
allocator.StorePool,
conf,
allocator.StoreListForTargets(args.targetsToRemove()),
candidatesStoreList,
existingVoters,
existingNonVoters,
args.targetType,
Expand Down Expand Up @@ -3678,7 +3683,7 @@ func (r *Replica) adminScatter(
if args.RandomizeLeases && r.OwnsValidLease(ctx, r.store.Clock().NowAsClockTimestamp()) {
desc := r.Desc()
potentialLeaseTargets := r.store.allocator.ValidLeaseTargets(
ctx, r.SpanConfig(), desc.Replicas().VoterDescriptors(), r, allocator.TransferLeaseOptions{})
ctx, r.store.allocator.StorePool, r.SpanConfig(), desc.Replicas().VoterDescriptors(), r, allocator.TransferLeaseOptions{})
if len(potentialLeaseTargets) > 0 {
newLeaseholderIdx := rand.Intn(len(potentialLeaseTargets))
targetStoreID := potentialLeaseTargets[newLeaseholderIdx].StoreID
Expand Down
19 changes: 14 additions & 5 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func (rq *replicateQueue) shouldQueue(
ctx context.Context, now hlc.ClockTimestamp, repl *Replica, _ spanconfig.StoreReader,
) (shouldQueue bool, priority float64) {
desc, conf := repl.DescAndSpanConfig()
action, priority := rq.allocator.ComputeAction(ctx, conf, desc)
action, priority := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, conf, desc)

if action == allocatorimpl.AllocatorNoop {
log.KvDistribution.VEventf(ctx, 2, "no action to take")
Expand All @@ -627,6 +627,7 @@ func (rq *replicateQueue) shouldQueue(
rangeUsageInfo := rangeUsageInfoForRepl(repl)
_, _, _, ok := rq.allocator.RebalanceVoter(
ctx,
rq.allocator.StorePool,
conf,
repl.RaftStatus(),
voterReplicas,
Expand All @@ -641,6 +642,7 @@ func (rq *replicateQueue) shouldQueue(
}
_, _, _, ok = rq.allocator.RebalanceNonVoter(
ctx,
rq.allocator.StorePool,
conf,
repl.RaftStatus(),
voterReplicas,
Expand All @@ -660,6 +662,7 @@ func (rq *replicateQueue) shouldQueue(
if rq.canTransferLeaseFrom(ctx, repl) &&
rq.allocator.ShouldTransferLease(
ctx,
rq.allocator.StorePool,
conf,
voterReplicas,
repl,
Expand Down Expand Up @@ -1036,7 +1039,7 @@ func (rq *replicateQueue) PlanOneChange(
// unavailability; see:
_ = execChangeReplicasTxn

action, allocatorPrio := rq.allocator.ComputeAction(ctx, conf, desc)
action, allocatorPrio := rq.allocator.ComputeAction(ctx, rq.allocator.StorePool, conf, desc)
log.KvDistribution.VEventf(ctx, 1, "next replica action: %s", action)

var err error
Expand Down Expand Up @@ -1249,7 +1252,7 @@ func (rq *replicateQueue) addOrReplaceVoters(
// we're removing it (i.e. dead or decommissioning). If we left the replica in
// the slice, the allocator would not be guaranteed to pick a replica that
// fills the gap removeRepl leaves once it's gone.
newVoter, details, err := rq.allocator.AllocateVoter(ctx, conf, remainingLiveVoters, remainingLiveNonVoters, replicaStatus)
newVoter, details, err := rq.allocator.AllocateVoter(ctx, rq.allocator.StorePool, conf, remainingLiveVoters, remainingLiveNonVoters, replicaStatus)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1281,7 +1284,7 @@ func (rq *replicateQueue) addOrReplaceVoters(
oldPlusNewReplicas,
roachpb.ReplicaDescriptor{NodeID: newVoter.NodeID, StoreID: newVoter.StoreID},
)
_, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus)
_, _, err := rq.allocator.AllocateVoter(ctx, rq.allocator.StorePool, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus)
if err != nil {
// It does not seem possible to go to the next odd replica state. Note
// that AllocateVoter returns an allocatorError (a PurgatoryError)
Expand Down Expand Up @@ -1362,7 +1365,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters(
existingNonVoters := desc.Replicas().NonVoterDescriptors()
effects := effectBuilder{}

newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, conf, liveVoterReplicas, liveNonVoterReplicas, replicaStatus)
newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, rq.allocator.StorePool, conf, liveVoterReplicas, liveNonVoterReplicas, replicaStatus)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1485,6 +1488,7 @@ func (rq *replicateQueue) findRemoveVoter(

return rq.allocator.RemoveVoter(
ctx,
rq.allocator.StorePool,
zone,
candidates,
existingVoters,
Expand Down Expand Up @@ -1527,6 +1531,7 @@ func (rq *replicateQueue) maybeTransferLeaseAwayTarget(
// a replica needs to be removed for constraint violations.
target := rq.allocator.TransferLeaseTarget(
ctx,
rq.allocator.StorePool,
conf,
desc.Replicas().VoterDescriptors(),
repl,
Expand Down Expand Up @@ -1606,6 +1611,7 @@ func (rq *replicateQueue) removeNonVoter(
_, conf := repl.DescAndSpanConfig()
removeNonVoter, details, err := rq.allocator.RemoveNonVoter(
ctx,
rq.allocator.StorePool,
conf,
existingNonVoters,
existingVoters,
Expand Down Expand Up @@ -1765,6 +1771,7 @@ func (rq *replicateQueue) considerRebalance(
rangeUsageInfo := rangeUsageInfoForRepl(repl)
addTarget, removeTarget, details, ok := rq.allocator.RebalanceVoter(
ctx,
rq.allocator.StorePool,
conf,
repl.RaftStatus(),
existingVoters,
Expand All @@ -1779,6 +1786,7 @@ func (rq *replicateQueue) considerRebalance(
log.KvDistribution.Infof(ctx, "no suitable rebalance target for voters")
addTarget, removeTarget, details, ok = rq.allocator.RebalanceNonVoter(
ctx,
rq.allocator.StorePool,
conf,
repl.RaftStatus(),
existingVoters,
Expand Down Expand Up @@ -1962,6 +1970,7 @@ func (rq *replicateQueue) shedLease(
// so only consider the `VoterDescriptors` replicas.
target := rq.allocator.TransferLeaseTarget(
ctx,
rq.allocator.StorePool,
conf,
desc.Replicas().VoterDescriptors(),
repl,
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(

candidate := sr.allocator.TransferLeaseTarget(
ctx,
sr.allocator.StorePool,
conf,
candidates,
candidateReplica,
Expand All @@ -694,6 +695,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
filteredStoreList := rctx.allStoresList.ExcludeInvalid(conf.VoterConstraints)
if sr.allocator.FollowTheWorkloadPrefersLocal(
ctx,
sr.allocator.StorePool,
filteredStoreList,
rctx.LocalDesc,
candidate.StoreID,
Expand Down Expand Up @@ -864,6 +866,7 @@ func (sr *StoreRebalancer) chooseRangeToRebalance(
// misconfiguration.
validTargets := sr.allocator.ValidLeaseTargets(
ctx,
sr.allocator.StorePool,
rebalanceCtx.conf,
targetVoterRepls,
rebalanceCtx.candidateReplica,
Expand Down Expand Up @@ -935,6 +938,7 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS(
// `AdminRelocateRange` so that these decisions show up in system.rangelog
add, remove, _, shouldRebalance := sr.allocator.RebalanceTarget(
ctx,
sr.allocator.StorePool,
rbCtx.conf,
rbCtx.candidateReplica.RaftStatus(),
finalVoterTargets,
Expand Down Expand Up @@ -999,6 +1003,7 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS(
for i := 0; i < len(finalNonVoterTargets); i++ {
add, remove, _, shouldRebalance := sr.allocator.RebalanceTarget(
ctx,
sr.allocator.StorePool,
rbCtx.conf,
rbCtx.candidateReplica.RaftStatus(),
finalVoterTargets,
Expand Down

0 comments on commit ef6db0c

Please sign in to comment.