Skip to content

Commit

Permalink
kvserver: make AdminRelocateRange work with non-voting replicas
Browse files Browse the repository at this point in the history
This commit teaches `RelocateRange` to work with non-voters. This lets
the merge queue rebalance a range that has non-voters so the merge can
actually proceed, as range merges require that replica sets of the LHS
and RHS ranges must be aligned.

Release note: None
  • Loading branch information
aayushshah15 committed Dec 15, 2020
1 parent 56668ce commit 1247a2f
Show file tree
Hide file tree
Showing 19 changed files with 1,230 additions and 848 deletions.
7 changes: 5 additions & 2 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,9 @@ func (b *Batch) adminChangeReplicas(

// adminRelocateRange is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminRelocateRange(key interface{}, targets []roachpb.ReplicationTarget) {
func (b *Batch) adminRelocateRange(
key interface{}, voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -695,7 +697,8 @@ func (b *Batch) adminRelocateRange(key interface{}, targets []roachpb.Replicatio
RequestHeader: roachpb.RequestHeader{
Key: k,
},
Targets: targets,
VoterTargets: voterTargets,
NonVoterTargets: nonVoterTargets,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,10 +608,10 @@ func (db *DB) AdminChangeReplicas(
// AdminRelocateRange relocates the replicas for a range onto the specified
// list of stores.
func (db *DB) AdminRelocateRange(
ctx context.Context, key interface{}, targets []roachpb.ReplicationTarget,
ctx context.Context, key interface{}, voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
) error {
b := &Batch{}
b.adminRelocateRange(key, targets)
b.adminRelocateRange(key, voterTargets, nonVoterTargets)
return getOneErr(db.Run(ctx, b), b)
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,17 +494,17 @@ func (a *Allocator) AllocateTarget(

func (a *Allocator) allocateTargetFromList(
ctx context.Context,
sl StoreList,
candidateStores StoreList,
zone *zonepb.ZoneConfig,
candidateReplicas []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
options scorerOptions,
) (*roachpb.StoreDescriptor, string) {
analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, candidateReplicas, zone)
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
candidates := allocateCandidates(
ctx,
sl, analyzedConstraints, candidateReplicas,
a.storePool.getLocalitiesByStore(candidateReplicas),
candidateStores, analyzedConstraints, existingReplicas,
a.storePool.getLocalitiesByStore(existingReplicas),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
options,
)
Expand Down Expand Up @@ -559,17 +559,17 @@ func (a Allocator) RemoveTarget(
}

// Retrieve store descriptors for the provided candidates from the StorePool.
existingStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
candidateStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
for i, exist := range candidates {
existingStoreIDs[i] = exist.StoreID
candidateStoreIDs[i] = exist.StoreID
}
sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, storeFilterNone)
candidateStoreList, _, _ := a.storePool.getStoreListFromIDs(candidateStoreIDs, storeFilterNone)

analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
options := a.scorerOptions()
rankedCandidates := removeCandidates(
sl,
candidateStoreList,
analyzedConstraints,
a.storePool.getLocalitiesByStore(existingReplicas),
options,
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,16 +408,16 @@ func (cl candidateList) removeCandidate(c candidate) candidateList {
// stores that meet the criteria are included in the list.
func allocateCandidates(
ctx context.Context,
sl StoreList,
candidateStores StoreList,
constraints constraint.AnalyzedConstraints,
existing []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
options scorerOptions,
) candidateList {
var candidates candidateList
for _, s := range sl.stores {
if nodeHasReplica(s.Node.NodeID, existing) {
for _, s := range candidateStores.stores {
if nodeHasReplica(s.Node.NodeID, existingReplicas) {
continue
}
if !isNodeValidForRoutineReplicaTransfer(ctx, s.Node.NodeID) {
Expand All @@ -432,14 +432,14 @@ func allocateCandidates(
continue
}
diversityScore := diversityAllocateScore(s, existingStoreLocalities)
balanceScore := balanceScore(sl, s.Capacity, options)
balanceScore := balanceScore(candidateStores, s.Capacity, options)
var convergesScore int
if options.qpsRebalanceThreshold > 0 {
if s.Capacity.QueriesPerSecond < underfullThreshold(sl.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
if s.Capacity.QueriesPerSecond < underfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
convergesScore = 1
} else if s.Capacity.QueriesPerSecond < sl.candidateQueriesPerSecond.mean {
} else if s.Capacity.QueriesPerSecond < candidateStores.candidateQueriesPerSecond.mean {
convergesScore = 0
} else if s.Capacity.QueriesPerSecond < overfullThreshold(sl.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
} else if s.Capacity.QueriesPerSecond < overfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
convergesScore = -1
} else {
convergesScore = -2
Expand Down
Loading

0 comments on commit 1247a2f

Please sign in to comment.