Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: make AdminRelocateRange work with non-voting replicas #56197

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
}, entry.Desc().Replicas().All())
}, entry.Desc().Replicas().Descriptors())

// Relocate the follower. n2 will no longer have a replica.
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VALUES (ARRAY[1,3], 1)`)
Expand All @@ -296,7 +296,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
require.Equal(t, []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
}, entry.Desc().Replicas().All())
}, entry.Desc().Replicas().Descriptors())

// Make a note of the follower reads metric on n3. We'll check that it was
// incremented.
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ func removeDeadReplicas(
err = kvserver.IterateRangeDescriptors(ctx, db, func(desc roachpb.RangeDescriptor) error {
hasSelf := false
numDeadPeers := 0
allReplicas := desc.Replicas().All()
allReplicas := desc.Replicas().Descriptors()
maxLivePeer := roachpb.StoreID(-1)
for _, rep := range allReplicas {
if rep.StoreID == storeIdent.StoreID {
Expand Down Expand Up @@ -998,7 +998,7 @@ func removeDeadReplicas(
StoreID: storeIdent.StoreID,
ReplicaID: desc.NextReplicaID,
}}
newDesc.SetReplicas(roachpb.MakeReplicaDescriptors(replicas))
newDesc.SetReplicas(roachpb.MakeReplicaSet(replicas))
newDesc.NextReplicaID++
fmt.Printf("Replica %s -> %s\n", &desc, &newDesc)
newDescs = append(newDescs, newDesc)
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,9 @@ func (b *Batch) adminChangeReplicas(

// adminRelocateRange is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminRelocateRange(key interface{}, targets []roachpb.ReplicationTarget) {
func (b *Batch) adminRelocateRange(
key interface{}, voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -725,7 +727,8 @@ func (b *Batch) adminRelocateRange(key interface{}, targets []roachpb.Replicatio
RequestHeader: roachpb.RequestHeader{
Key: k,
},
Targets: targets,
VoterTargets: voterTargets,
NonVoterTargets: nonVoterTargets,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,10 +649,10 @@ func (db *DB) AdminChangeReplicas(
// AdminRelocateRange relocates the replicas for a range onto the specified
// list of stores.
func (db *DB) AdminRelocateRange(
ctx context.Context, key interface{}, targets []roachpb.ReplicationTarget,
ctx context.Context, key interface{}, voterTargets, nonVoterTargets []roachpb.ReplicationTarget,
) error {
b := &Batch{}
b.adminRelocateRange(key, targets)
b.adminRelocateRange(key, voterTargets, nonVoterTargets)
return getOneErr(db.Run(ctx, b), b)
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()

recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().Voters()[1]
recognizedLeaseHolder := testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors()[1]
unrecognizedLeaseHolder := roachpb.ReplicaDescriptor{
NodeID: 99,
StoreID: 999,
Expand Down Expand Up @@ -642,7 +642,7 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
g := makeGossip(t, stopper, rpcContext)
for _, n := range testUserRangeDescriptor3Replicas.Replicas().Voters() {
for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(n.NodeID),
newNodeDesc(n.NodeID),
Expand Down Expand Up @@ -823,7 +823,7 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(clock, stopper)
g := makeGossip(t, stopper, rpcContext)
for _, n := range testUserRangeDescriptor3Replicas.Replicas().Voters() {
for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(n.NodeID),
newNodeDesc(n.NodeID),
Expand Down Expand Up @@ -4329,17 +4329,17 @@ func TestDistSenderDescEvictionAfterLeaseUpdate(t *testing.T) {
br := &roachpb.BatchResponse{}
switch call {
case 0:
expRepl := desc1.Replicas().All()[0]
expRepl := desc1.Replicas().Descriptors()[0]
require.Equal(t, expRepl, ba.Replica)
br.Error = roachpb.NewError(&roachpb.NotLeaseHolderError{
Lease: &roachpb.Lease{Replica: desc1.Replicas().All()[1]},
Lease: &roachpb.Lease{Replica: desc1.Replicas().Descriptors()[1]},
})
case 1:
expRep := desc1.Replicas().All()[1]
expRep := desc1.Replicas().Descriptors()[1]
require.Equal(t, ba.Replica, expRep)
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(ba.RangeID, ba.Replica.StoreID))
case 2:
expRep := desc2.Replicas().All()[0]
expRep := desc2.Replicas().Descriptors()[0]
require.Equal(t, ba.Replica, expRep)
br = ba.CreateReply()
default:
Expand Down Expand Up @@ -4420,7 +4420,7 @@ func TestDistSenderRPCMetrics(t *testing.T) {
br := &roachpb.BatchResponse{}
if call == 0 {
br.Error = roachpb.NewError(&roachpb.NotLeaseHolderError{
Lease: &roachpb.Lease{Replica: desc.Replicas().All()[1]},
Lease: &roachpb.Lease{Replica: desc.Replicas().Descriptors()[1]},
})
} else {
br.Error = roachpb.NewError(&roachpb.ConditionFailedError{})
Expand Down Expand Up @@ -4449,7 +4449,7 @@ func TestDistSenderRPCMetrics(t *testing.T) {
ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: roachpb.Lease{
Replica: desc.Replicas().All()[0],
Replica: desc.Replicas().Descriptors()[0],
},
})
var ba roachpb.BatchRequest
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ func NewReplicaSlice(
}

// Learner replicas won't serve reads/writes, so we'll send only to the
// `Voters` replicas. This is just an optimization to save a network hop,
// `VoterDescriptors` replicas. This is just an optimization to save a network hop,
// everything would still work if we had `All` here.
voters := desc.Replicas().Voters()
voters := desc.Replicas().VoterDescriptors()
// If we know a leaseholder, though, let's make sure we include it.
if leaseholder != nil && len(voters) < len(desc.Replicas().All()) {
if leaseholder != nil && len(voters) < len(desc.Replicas().Descriptors()) {
found := false
for _, v := range voters {
if v == *leaseholder {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func newGetReplicasFn(dbs ...*kv.DB) GetReplicasFn {
ctx := context.Background()
return func(key roachpb.Key) []roachpb.ReplicationTarget {
desc := getRangeDesc(ctx, key, dbs...)
replicas := desc.Replicas().All()
replicas := desc.Replicas().Descriptors()
targets := make([]roachpb.ReplicationTarget, len(replicas))
for i, replica := range replicas {
targets[i] = roachpb.ReplicationTarget{
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/build",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/config/zonepb",
Expand Down
22 changes: 11 additions & 11 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (a *Allocator) ComputeAction(
// On the other hand if we get the race where a leaseholder starts adding a
// replica in the replicate queue and during this loses its lease, it should
// probably not retry.
if learners := desc.Replicas().Learners(); len(learners) > 0 {
if learners := desc.Replicas().LearnerDescriptors(); len(learners) > 0 {
// TODO(dan): Since this goes before anything else, the priority here should
// be influenced by whatever operations would happen right after the learner
// is removed. In the meantime, we don't want to block something important
Expand All @@ -356,7 +356,7 @@ func (a *Allocator) ComputeAction(
return AllocatorRemoveLearner, removeLearnerReplicaPriority
}
// computeAction expects to operate only on voters.
return a.computeAction(ctx, zone, desc.Replicas().Voters())
return a.computeAction(ctx, zone, desc.Replicas().VoterDescriptors())
}

func (a *Allocator) computeAction(
Expand Down Expand Up @@ -495,17 +495,17 @@ func (a *Allocator) AllocateTarget(

func (a *Allocator) allocateTargetFromList(
ctx context.Context,
sl StoreList,
candidateStores StoreList,
zone *zonepb.ZoneConfig,
candidateReplicas []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
options scorerOptions,
) (*roachpb.StoreDescriptor, string) {
analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, candidateReplicas, zone)
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
candidates := allocateCandidates(
ctx,
sl, analyzedConstraints, candidateReplicas,
a.storePool.getLocalitiesByStore(candidateReplicas),
candidateStores, analyzedConstraints, existingReplicas,
a.storePool.getLocalitiesByStore(existingReplicas),
a.storePool.isNodeReadyForRoutineReplicaTransfer,
options,
)
Expand Down Expand Up @@ -560,17 +560,17 @@ func (a Allocator) RemoveTarget(
}

// Retrieve store descriptors for the provided candidates from the StorePool.
existingStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
candidateStoreIDs := make(roachpb.StoreIDSlice, len(candidates))
for i, exist := range candidates {
existingStoreIDs[i] = exist.StoreID
candidateStoreIDs[i] = exist.StoreID
}
sl, _, _ := a.storePool.getStoreListFromIDs(existingStoreIDs, storeFilterNone)
candidateStoreList, _, _ := a.storePool.getStoreListFromIDs(candidateStoreIDs, storeFilterNone)

analyzedConstraints := constraint.AnalyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existingReplicas, zone)
options := a.scorerOptions()
rankedCandidates := removeCandidates(
sl,
candidateStoreList,
analyzedConstraints,
a.storePool.getLocalitiesByStore(existingReplicas),
options,
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,16 +409,16 @@ func (cl candidateList) removeCandidate(c candidate) candidateList {
// stores that meet the criteria are included in the list.
func allocateCandidates(
ctx context.Context,
sl StoreList,
candidateStores StoreList,
constraints constraint.AnalyzedConstraints,
existing []roachpb.ReplicaDescriptor,
existingReplicas []roachpb.ReplicaDescriptor,
existingStoreLocalities map[roachpb.StoreID]roachpb.Locality,
isNodeValidForRoutineReplicaTransfer func(context.Context, roachpb.NodeID) bool,
options scorerOptions,
) candidateList {
var candidates candidateList
for _, s := range sl.stores {
if nodeHasReplica(s.Node.NodeID, existing) {
for _, s := range candidateStores.stores {
if nodeHasReplica(s.Node.NodeID, existingReplicas) {
continue
}
if !isNodeValidForRoutineReplicaTransfer(ctx, s.Node.NodeID) {
Expand All @@ -433,14 +433,14 @@ func allocateCandidates(
continue
}
diversityScore := diversityAllocateScore(s, existingStoreLocalities)
balanceScore := balanceScore(sl, s.Capacity, options)
balanceScore := balanceScore(candidateStores, s.Capacity, options)
var convergesScore int
if options.qpsRebalanceThreshold > 0 {
if s.Capacity.QueriesPerSecond < underfullThreshold(sl.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
if s.Capacity.QueriesPerSecond < underfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
convergesScore = 1
} else if s.Capacity.QueriesPerSecond < sl.candidateQueriesPerSecond.mean {
} else if s.Capacity.QueriesPerSecond < candidateStores.candidateQueriesPerSecond.mean {
convergesScore = 0
} else if s.Capacity.QueriesPerSecond < overfullThreshold(sl.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
} else if s.Capacity.QueriesPerSecond < overfullThreshold(candidateStores.candidateQueriesPerSecond.mean, options.qpsRebalanceThreshold) {
convergesScore = -1
} else {
convergesScore = -2
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestLeaseCommandLearnerReplica(t *testing.T) {
{NodeID: 2, StoreID: learnerStoreID, Type: roachpb.ReplicaTypeLearner(), ReplicaID: 2},
}
desc := roachpb.RangeDescriptor{}
desc.SetReplicas(roachpb.MakeReplicaDescriptors(replicas))
desc.SetReplicas(roachpb.MakeReplicaSet(replicas))
cArgs := CommandArgs{
EvalCtx: (&MockEvalCtx{
StoreID: voterStoreID,
Expand Down
Loading