Skip to content

Commit

Permalink
kvserver: make Allocator methods return ReplicationTargets
Browse files Browse the repository at this point in the history
Previously they would return `ReplicaDescriptor`s, which was a vestige. These
return values were almost immediately getting cast into `ReplicationTarget` in
every single case anyway. This makes the return types of these allocator
methods a little more consistent.

Release note: None
  • Loading branch information
aayushshah15 committed Feb 11, 2022
1 parent bf24f02 commit d792adf
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 64 deletions.
45 changes: 26 additions & 19 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (a *Allocator) allocateTarget(
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
targetType targetReplicaType,
) (*roachpb.StoreDescriptor, string, error) {
) (roachpb.ReplicationTarget, string, error) {
candidateStoreList, aliveStoreCount, throttled := a.storePool.getStoreList(storeFilterThrottled)

target, details := a.allocateTargetFromList(
Expand All @@ -754,18 +754,19 @@ func (a *Allocator) allocateTarget(
false, /* allowMultipleReplsPerNode */
targetType,
)
if target != nil {

if !roachpb.Empty(target) {
return target, details, nil
}

// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if len(throttled) > 0 {
return nil, "", errors.Errorf(
return roachpb.ReplicationTarget{}, "", errors.Errorf(
"%d matching stores are currently throttled: %v", len(throttled), throttled,
)
}
return nil, "", &allocatorError{
return roachpb.ReplicationTarget{}, "", &allocatorError{
voterConstraints: conf.VoterConstraints,
constraints: conf.Constraints,
existingVoterCount: len(existingVoters),
Expand All @@ -782,7 +783,7 @@ func (a *Allocator) AllocateVoter(
ctx context.Context,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
) (*roachpb.StoreDescriptor, string, error) {
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, voterTarget)
}

Expand All @@ -793,7 +794,7 @@ func (a *Allocator) AllocateNonVoter(
ctx context.Context,
conf roachpb.SpanConfig,
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
) (*roachpb.StoreDescriptor, string, error) {
) (roachpb.ReplicationTarget, string, error) {
return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, nonVoterTarget)
}

Expand All @@ -805,7 +806,7 @@ func (a *Allocator) allocateTargetFromList(
options *rangeCountScorerOptions,
allowMultipleReplsPerNode bool,
targetType targetReplicaType,
) (*roachpb.StoreDescriptor, string) {
) (roachpb.ReplicationTarget, string) {
existingReplicas := append(existingVoters, existingNonVoters...)
analyzedOverallConstraints := constraint.AnalyzeConstraints(ctx, a.storePool.getStoreDescriptor,
existingReplicas, conf.NumReplicas, conf.Constraints)
Expand Down Expand Up @@ -851,10 +852,12 @@ func (a *Allocator) allocateTargetFromList(
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing allocate target: %+v", err)
}
return &target.store, string(detailsBytes)
return roachpb.ReplicationTarget{
NodeID: target.store.Node.NodeID, StoreID: target.store.StoreID,
}, string(detailsBytes)
}

return nil, ""
return roachpb.ReplicationTarget{}, ""
}

func (a Allocator) simulateRemoveTarget(
Expand All @@ -868,7 +871,7 @@ func (a Allocator) simulateRemoveTarget(
rangeUsageInfo RangeUsageInfo,
targetType targetReplicaType,
options scorerOptions,
) (roachpb.ReplicaDescriptor, string, error) {
) (roachpb.ReplicationTarget, string, error) {
candidateStores := make([]roachpb.StoreDescriptor, 0, len(candidates))
for _, cand := range candidates {
for _, store := range sl.stores {
Expand Down Expand Up @@ -912,7 +915,7 @@ func (a Allocator) simulateRemoveTarget(
}
}

func (a Allocator) storeListForCandidates(candidates []roachpb.ReplicationTarget) StoreList {
func (a Allocator) storeListForTargets(candidates []roachpb.ReplicationTarget) StoreList {
result := make([]roachpb.StoreDescriptor, 0, len(candidates))
sl, _, _ := a.storePool.getStoreList(storeFilterNone)
for _, cand := range candidates {
Expand All @@ -933,10 +936,12 @@ func (a Allocator) removeTarget(
existingNonVoters []roachpb.ReplicaDescriptor,
targetType targetReplicaType,
options scorerOptions,
) (roachpb.ReplicaDescriptor, string, error) {
) (roachpb.ReplicationTarget, string, error) {
if len(candidateStoreList.stores) == 0 {
return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one" +
" candidate replica to allocator.removeTarget()")
return roachpb.ReplicationTarget{}, "", errors.Errorf(
"must supply at least one" +
" candidate replica to allocator.removeTarget()",
)
}

existingReplicas := append(existingVoters, existingNonVoters...)
Expand Down Expand Up @@ -979,12 +984,14 @@ func (a Allocator) removeTarget(
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing remove target: %+v", err)
}
return exist, string(detailsBytes), nil
return roachpb.ReplicationTarget{
StoreID: exist.StoreID, NodeID: exist.NodeID,
}, string(detailsBytes), nil
}
}
}

return roachpb.ReplicaDescriptor{}, "", errors.New("could not select an appropriate replica to be removed")
return roachpb.ReplicationTarget{}, "", errors.New("could not select an appropriate replica to be removed")
}

// RemoveVoter returns a suitable replica to remove from the provided replica
Expand All @@ -998,7 +1005,7 @@ func (a Allocator) RemoveVoter(
existingVoters []roachpb.ReplicaDescriptor,
existingNonVoters []roachpb.ReplicaDescriptor,
options scorerOptions,
) (roachpb.ReplicaDescriptor, string, error) {
) (roachpb.ReplicationTarget, string, error) {
// Retrieve store descriptors for the provided candidates from the StorePool.
candidateStoreIDs := make(roachpb.StoreIDSlice, len(voterCandidates))
for i, exist := range voterCandidates {
Expand Down Expand Up @@ -1029,7 +1036,7 @@ func (a Allocator) RemoveNonVoter(
existingVoters []roachpb.ReplicaDescriptor,
existingNonVoters []roachpb.ReplicaDescriptor,
options scorerOptions,
) (roachpb.ReplicaDescriptor, string, error) {
) (roachpb.ReplicationTarget, string, error) {
// Retrieve store descriptors for the provided candidates from the StorePool.
candidateStoreIDs := make(roachpb.StoreIDSlice, len(nonVoterCandidates))
for i, exist := range nonVoterCandidates {
Expand Down Expand Up @@ -1124,7 +1131,7 @@ func (a Allocator) rebalanceTarget(
// pretty sure we won't want to remove immediately after adding it. If we
// would, we don't want to actually rebalance to that target.
var target, existingCandidate *candidate
var removeReplica roachpb.ReplicaDescriptor
var removeReplica roachpb.ReplicationTarget
for {
target, existingCandidate = bestRebalanceTarget(a.randGen, results)
if target == nil {
Expand Down
24 changes: 12 additions & 12 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"github.com/olekukonko/tablewriter"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
raft "go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/tracker"
)

Expand Down Expand Up @@ -399,7 +399,7 @@ func createTestAllocatorWithKnobs(

// checkReplExists checks whether the given `repl` exists on any of the
// `stores`.
func checkReplExists(repl roachpb.ReplicaDescriptor, stores []roachpb.StoreID) (found bool) {
func checkReplExists(repl roachpb.ReplicationTarget, stores []roachpb.StoreID) (found bool) {
for _, storeID := range stores {
if repl.StoreID == storeID {
found = true
Expand Down Expand Up @@ -503,7 +503,7 @@ func TestAllocatorSimpleRetrieval(t *testing.T) {
if err != nil {
t.Fatalf("Unable to perform allocation: %+v", err)
}
if result.Node.NodeID != 1 || result.StoreID != 1 {
if result.NodeID != 1 || result.StoreID != 1 {
t.Errorf("expected NodeID 1 and StoreID 1: %+v", result)
}
}
Expand All @@ -520,7 +520,7 @@ func TestAllocatorNoAvailableDisks(t *testing.T) {
simpleSpanConfig,
nil /* existingVoters */, nil, /* existingNonVoters */
)
if result != nil {
if !roachpb.Empty(result) {
t.Errorf("expected nil result: %+v", result)
}
if err == nil {
Expand Down Expand Up @@ -548,29 +548,29 @@ func TestAllocatorTwoDatacenters(t *testing.T) {
ctx,
multiDCConfigSSD,
[]roachpb.ReplicaDescriptor{{
NodeID: result1.Node.NodeID,
NodeID: result1.NodeID,
StoreID: result1.StoreID,
}}, nil, /* existingNonVoters */
)
if err != nil {
t.Fatalf("Unable to perform allocation: %+v", err)
}
ids := []int{int(result1.Node.NodeID), int(result2.Node.NodeID)}
ids := []int{int(result1.NodeID), int(result2.NodeID)}
sort.Ints(ids)
if expected := []int{1, 2}; !reflect.DeepEqual(ids, expected) {
t.Errorf("Expected nodes %+v: %+v vs %+v", expected, result1.Node, result2.Node)
t.Errorf("Expected nodes %+v: %+v vs %+v", expected, result1.NodeID, result2.NodeID)
}
// Verify that no result is forthcoming if we already have a replica.
result3, _, err := a.AllocateVoter(
ctx,
multiDCConfigSSD,
[]roachpb.ReplicaDescriptor{
{
NodeID: result1.Node.NodeID,
NodeID: result1.NodeID,
StoreID: result1.StoreID,
},
{
NodeID: result2.Node.NodeID,
NodeID: result2.NodeID,
StoreID: result2.StoreID,
},
}, nil, /* existingNonVoters */
Expand Down Expand Up @@ -711,7 +711,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) {
result, _, err := a.AllocateVoter(
ctx, emptySpanConfig(), tc.existing, nil,
)
if e, a := tc.expectTargetAllocate, result != nil; e != a {
if e, a := tc.expectTargetAllocate, !roachpb.Empty(result); e != a {
t.Errorf(
"AllocateVoter(%v) got target %v, err %v; expectTarget=%v",
tc.existing, result, err, tc.expectTargetAllocate,
Expand Down Expand Up @@ -5428,7 +5428,7 @@ func TestAllocatorRemoveTargetBasedOnCapacity(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if a, e1, e2 := targetRepl, replicas[1], replicas[2]; a != e1 && a != e2 {
if a, e1, e2 := targetRepl, replicas[1], replicas[2]; a.StoreID != e1.StoreID && a.StoreID != e2.StoreID {
t.Fatalf("%d: RemoveVoter did not select either expected replica; expected %v or %v, got %v",
i, e1, e2, a)
}
Expand Down Expand Up @@ -6989,7 +6989,7 @@ func TestAllocatorThrottled(t *testing.T) {
if err != nil {
t.Fatalf("unable to perform allocation: %+v", err)
}
if result.Node.NodeID != 1 || result.StoreID != 1 {
if result.NodeID != 1 || result.StoreID != 1 {
t.Errorf("expected NodeID 1 and StoreID 1: %+v", result)
}

Expand Down
11 changes: 3 additions & 8 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2908,7 +2908,7 @@ func (s *Store) relocateOne(
}
candidateStoreList := makeStoreList(candidateDescs)

targetStore, _ := s.allocator.allocateTargetFromList(
additionTarget, _ = s.allocator.allocateTargetFromList(
ctx,
candidateStoreList,
conf,
Expand All @@ -2921,18 +2921,13 @@ func (s *Store) relocateOne(
true, /* allowMultipleReplsPerNode */
args.targetType,
)
if targetStore == nil {
if roachpb.Empty(additionTarget) {
return nil, nil, fmt.Errorf(
"none of the remaining %ss %v are legal additions to %v",
args.targetType, args.targetsToAdd(), desc.Replicas(),
)
}

additionTarget = roachpb.ReplicationTarget{
NodeID: targetStore.Node.NodeID,
StoreID: targetStore.StoreID,
}

// Pretend the new replica is already there so that the removal logic below
// will take it into account when deciding which replica to remove.
if args.targetType == voterTarget {
Expand Down Expand Up @@ -2984,7 +2979,7 @@ func (s *Store) relocateOne(
targetStore, _, err := s.allocator.removeTarget(
ctx,
conf,
s.allocator.storeListForCandidates(args.targetsToRemove()),
s.allocator.storeListForTargets(args.targetsToRemove()),
existingVoters,
existingNonVoters,
args.targetType,
Expand Down
42 changes: 17 additions & 25 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,16 +566,12 @@ func (rq *replicateQueue) addOrReplaceVoters(
// we're removing it (i.e. dead or decommissioning). If we left the replica in
// the slice, the allocator would not be guaranteed to pick a replica that
// fills the gap removeRepl leaves once it's gone.
newStore, details, err := rq.allocator.AllocateVoter(ctx, conf, remainingLiveVoters, remainingLiveNonVoters)
newVoter, details, err := rq.allocator.AllocateVoter(ctx, conf, remainingLiveVoters, remainingLiveNonVoters)
if err != nil {
return false, err
}
if removeIdx >= 0 && newStore.StoreID == existingVoters[removeIdx].StoreID {
return false, errors.AssertionFailedf("allocator suggested to replace replica on s%d with itself", newStore.StoreID)
}
newVoter := roachpb.ReplicationTarget{
NodeID: newStore.Node.NodeID,
StoreID: newStore.StoreID,
if removeIdx >= 0 && newVoter.StoreID == existingVoters[removeIdx].StoreID {
return false, errors.AssertionFailedf("allocator suggested to replace replica on s%d with itself", newVoter.StoreID)
}

clusterNodes := rq.allocator.storePool.ClusterNodeCount()
Expand All @@ -598,10 +594,10 @@ func (rq *replicateQueue) addOrReplaceVoters(
// This means we are going to up-replicate to an even replica state.
// Check if it is possible to go to an odd replica state beyond it.
oldPlusNewReplicas := append([]roachpb.ReplicaDescriptor(nil), existingVoters...)
oldPlusNewReplicas = append(oldPlusNewReplicas, roachpb.ReplicaDescriptor{
NodeID: newStore.Node.NodeID,
StoreID: newStore.StoreID,
})
oldPlusNewReplicas = append(
oldPlusNewReplicas,
roachpb.ReplicaDescriptor{NodeID: newVoter.NodeID, StoreID: newVoter.StoreID},
)
_, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters)
if err != nil {
// It does not seem possible to go to the next odd replica state. Note
Expand Down Expand Up @@ -676,16 +672,12 @@ func (rq *replicateQueue) addOrReplaceNonVoters(
desc, conf := repl.DescAndSpanConfig()
existingNonVoters := desc.Replicas().NonVoterDescriptors()

newStore, details, err := rq.allocator.AllocateNonVoter(ctx, conf, liveVoterReplicas, liveNonVoterReplicas)
newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, conf, liveVoterReplicas, liveNonVoterReplicas)
if err != nil {
return false, err
}
rq.metrics.AddReplicaCount.Inc(1)

newNonVoter := roachpb.ReplicationTarget{
NodeID: newStore.Node.NodeID,
StoreID: newStore.StoreID,
}
ops := roachpb.MakeReplicationChanges(roachpb.ADD_NON_VOTER, newNonVoter)
if removeIdx < 0 {
log.VEventf(ctx, 1, "adding non-voter %+v: %s",
Expand Down Expand Up @@ -734,7 +726,7 @@ func (rq *replicateQueue) findRemoveVoter(
RaftStatus() *raft.Status
},
existingVoters, existingNonVoters []roachpb.ReplicaDescriptor,
) (roachpb.ReplicaDescriptor, string, error) {
) (roachpb.ReplicationTarget, string, error) {
_, zone := repl.DescAndSpanConfig()
// This retry loop involves quick operations on local state, so a
// small MaxBackoff is good (but those local variables change on
Expand All @@ -759,7 +751,7 @@ func (rq *replicateQueue) findRemoveVoter(
raftStatus := repl.RaftStatus()
if raftStatus == nil || raftStatus.RaftState != raft.StateLeader {
// If we've lost raft leadership, we're unlikely to regain it so give up immediately.
return roachpb.ReplicaDescriptor{}, "", &benignError{errors.Errorf("not raft leader while range needs removal")}
return roachpb.ReplicationTarget{}, "", &benignError{errors.Errorf("not raft leader while range needs removal")}
}
candidates = filterUnremovableReplicas(ctx, raftStatus, existingVoters, lastReplAdded)
log.VEventf(ctx, 3, "filtered unremovable replicas from %v to get %v as candidates for removal: %s",
Expand Down Expand Up @@ -790,8 +782,12 @@ func (rq *replicateQueue) findRemoveVoter(
}
if len(candidates) == 0 {
// If we timed out and still don't have any valid candidates, give up.
return roachpb.ReplicaDescriptor{}, "", &benignError{errors.Errorf("no removable replicas from range that needs a removal: %s",
rangeRaftProgress(repl.RaftStatus(), existingVoters))}
return roachpb.ReplicationTarget{}, "", &benignError{
errors.Errorf(
"no removable replicas from range that needs a removal: %s",
rangeRaftProgress(repl.RaftStatus(), existingVoters),
),
}
}

return rq.allocator.RemoveVoter(
Expand Down Expand Up @@ -873,10 +869,6 @@ func (rq *replicateQueue) removeVoter(
rq.metrics.RemoveReplicaCount.Inc(1)
log.VEventf(ctx, 1, "removing voting replica %+v due to over-replication: %s",
removeVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters))
target := roachpb.ReplicationTarget{
NodeID: removeVoter.NodeID,
StoreID: removeVoter.StoreID,
}
desc := repl.Desc()
// TODO(aayush): Directly removing the voter here is a bit of a missed
// opportunity since we could potentially be 1 non-voter short and the
Expand All @@ -886,7 +878,7 @@ func (rq *replicateQueue) removeVoter(
if err := rq.changeReplicas(
ctx,
repl,
roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, target),
roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, removeVoter),
desc,
kvserverpb.SnapshotRequest_UNKNOWN, // unused
kvserverpb.ReasonRangeOverReplicated,
Expand Down
Loading

0 comments on commit d792adf

Please sign in to comment.