From 2d803cfa48c0f47abebe4d1104eead010561c266 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 26 Sep 2024 15:22:52 -0400 Subject: [PATCH] kvserver: exclude lease transfer targets with a send queue Previously, it was possible to transfer a lease to a replica which had a raft send queue, likely unable to keep up once acquiring the lease. As a result, the leaseholder would throttle incoming replication traffic before evaluation because it waited in the store work queue, or for elastic work, waited for available flow tokens. Using the new `RangeController` method exposing information about replicas' send queue, `SendStreamStats()`, exclude voter candidates as lease transfer targets if: ``` HasSendQueue is true OR IsStateReplicate is false ``` Resolves: #128028 Release note: None --- .../allocator/allocatorimpl/BUILD.bazel | 2 + .../allocator/allocatorimpl/allocator.go | 45 +++++ .../allocator/allocatorimpl/allocator_test.go | 182 +++++++++++++++++- pkg/kv/kvserver/allocator/plan/BUILD.bazel | 1 + pkg/kv/kvserver/allocator/plan/replicate.go | 2 + pkg/kv/kvserver/asim/queue/BUILD.bazel | 1 + .../kvserver/asim/queue/allocator_replica.go | 5 + .../kvserver/asim/storerebalancer/BUILD.bazel | 1 + .../asim/storerebalancer/candidate_replica.go | 5 + pkg/kv/kvserver/replica.go | 10 + pkg/kv/kvserver/replica_rankings.go | 4 + 11 files changed, 255 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel b/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel index 4fbd67304ae6..d432c584b732 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/kv/kvserver/allocator/load", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/constraint", + "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/raftutil", @@ -52,6 +53,7 @@ go_test( "//pkg/kv/kvserver/allocator/load", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/constraint", + "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/replicastats", diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index f470321d279f..dfcd8398890b 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" @@ -2019,6 +2020,7 @@ func (a *Allocator) ValidLeaseTargets( StoreID() roachpb.StoreID RaftStatus() *raft.Status GetFirstIndex() kvpb.RaftIndex + SendStreamStats() rac2.RangeSendStreamStats }, opts allocator.TransferLeaseOptions, ) []roachpb.ReplicaDescriptor { @@ -2082,6 +2084,8 @@ func (a *Allocator) ValidLeaseTargets( candidates = append(validSnapshotCandidates, excludeReplicasInNeedOfSnapshots( ctx, status, leaseRepl.GetFirstIndex(), candidates)...) + candidates = excludeReplicasInNeedOfCatchup( + ctx, leaseRepl.SendStreamStats(), candidates) } // Determine which store(s) is preferred based on user-specified preferences. @@ -2190,6 +2194,7 @@ func (a *Allocator) LeaseholderShouldMoveDueToPreferences( StoreID() roachpb.StoreID RaftStatus() *raft.Status GetFirstIndex() kvpb.RaftIndex + SendStreamStats() rac2.RangeSendStreamStats }, allExistingReplicas []roachpb.ReplicaDescriptor, exclReplsInNeedOfSnapshots bool, @@ -2222,6 +2227,8 @@ func (a *Allocator) LeaseholderShouldMoveDueToPreferences( if exclReplsInNeedOfSnapshots { preferred = excludeReplicasInNeedOfSnapshots( ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), preferred) + preferred = excludeReplicasInNeedOfCatchup( + ctx, leaseRepl.SendStreamStats(), preferred) } if len(preferred) == 0 { return false @@ -2280,6 +2287,7 @@ func (a *Allocator) TransferLeaseTarget( GetRangeID() roachpb.RangeID RaftStatus() *raft.Status GetFirstIndex() kvpb.RaftIndex + SendStreamStats() rac2.RangeSendStreamStats }, usageInfo allocator.RangeUsageInfo, forceDecisionWithoutStats bool, @@ -2648,6 +2656,7 @@ func (a *Allocator) ShouldTransferLease( StoreID() roachpb.StoreID RaftStatus() *raft.Status GetFirstIndex() kvpb.RaftIndex + SendStreamStats() rac2.RangeSendStreamStats }, usageInfo allocator.RangeUsageInfo, ) TransferLeaseDecision { @@ -3050,6 +3059,42 @@ func excludeReplicasInNeedOfSnapshots( return replicas[:filled] } +// excludeReplicasInNeedOfCatchup filters out the `replicas` that may be in +// need of a catchup messages before able to apply the lease, based on the +// provided RangeSendStreamStats. +func excludeReplicasInNeedOfCatchup( + ctx context.Context, + sendStreamStats rac2.RangeSendStreamStats, + replicas []roachpb.ReplicaDescriptor, +) []roachpb.ReplicaDescriptor { + if sendStreamStats == nil { + // When we don't have stats, we can't make an informed decision about which + // replicas are behind. We'll just return the replicas as is. This can + // occur if the current leaseholder is not yet the raft leader, or only + // recently became one (concurrent to the lease transfer decision). + return replicas + } + filled := 0 + for _, repl := range replicas { + if stats, ok := sendStreamStats[repl.ReplicaID]; ok && + (!stats.IsStateReplicate || stats.HasSendQueue) { + log.KvDistribution.VEventf(ctx, 5, + "not considering %s as a potential candidate for a lease transfer "+ + "because the replica requires catchup: "+ + "[is_state_replicate=%v has_send_queue=%v]", + repl, stats.IsStateReplicate, stats.HasSendQueue) + continue + } + // We are also not excluding any replicas which weren't included in the + // stats here. If they weren't included it indicates that they were either + // recently added or removed and in either case we don't know enough to + // preclude them as lease transfer targets. + replicas[filled] = repl + filled++ + } + return replicas[:filled] +} + // simulateFilterUnremovableReplicas removes any unremovable replicas from the // supplied slice. Unlike FilterUnremovableReplicas, brandNewReplicaID is // considered up-to-date (and thus can participate in quorum), but is not diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 6de19b7f1542..b97f1693199d 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" @@ -1910,9 +1911,11 @@ func TestAllocatorRebalanceByCount(t *testing.T) { // mockRepl satisfies the interface for the `leaseRepl` passed into // `Allocator.TransferLeaseTarget()` for these tests. type mockRepl struct { - replicationFactor int32 - storeID roachpb.StoreID - replsInNeedOfSnapshot map[roachpb.ReplicaID]struct{} + replicationFactor int32 + storeID roachpb.StoreID + replsInNeedOfSnapshot map[roachpb.ReplicaID]struct{} + replsWithSendQueue map[roachpb.ReplicaID]struct{} + replsNotInStateReplicate map[roachpb.ReplicaID]struct{} } func (r *mockRepl) RaftStatus() *raft.Status { @@ -1945,6 +1948,21 @@ func (r *mockRepl) GetRangeID() roachpb.RangeID { return roachpb.RangeID(0) } +func (r *mockRepl) SendStreamStats() rac2.RangeSendStreamStats { + rangeStats := rac2.RangeSendStreamStats{} + for i := int32(1); i <= r.replicationFactor; i++ { + replStats := rac2.ReplicaSendStreamStats{} + if _, ok := r.replsWithSendQueue[roachpb.ReplicaID(i)]; ok { + replStats.HasSendQueue = true + } + if _, ok := r.replsNotInStateReplicate[roachpb.ReplicaID(i)]; !ok { + replStats.IsStateReplicate = true + } + rangeStats[roachpb.ReplicaID(i)] = replStats + } + return rangeStats +} + func (r *mockRepl) markReplAsNeedingSnapshot(id roachpb.ReplicaID) { if r.replsInNeedOfSnapshot == nil { r.replsInNeedOfSnapshot = make(map[roachpb.ReplicaID]struct{}) @@ -1952,6 +1970,20 @@ func (r *mockRepl) markReplAsNeedingSnapshot(id roachpb.ReplicaID) { r.replsInNeedOfSnapshot[id] = struct{}{} } +func (r *mockRepl) markReplAsHavingSendQueue(id roachpb.ReplicaID) { + if r.replsWithSendQueue == nil { + r.replsWithSendQueue = make(map[roachpb.ReplicaID]struct{}) + } + r.replsWithSendQueue[id] = struct{}{} +} + +func (r *mockRepl) markReplAsNotInStateReplicate(id roachpb.ReplicaID) { + if r.replsNotInStateReplicate == nil { + r.replsNotInStateReplicate = make(map[roachpb.ReplicaID]struct{}) + } + r.replsNotInStateReplicate[id] = struct{}{} +} + func TestAllocatorTransferLeaseTarget(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2268,6 +2300,150 @@ func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) { } } +func rIDs(replicaIDs ...int) []roachpb.ReplicaID { + repls := make([]roachpb.ReplicaID, len(replicaIDs)) + for i, id := range replicaIDs { + repls[i] = roachpb.ReplicaID(id) + } + return repls +} + +func TestAllocatorTransferLeaseToReplicasNeedingCatchup(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + existing := []roachpb.ReplicaDescriptor{ + {StoreID: 1, NodeID: 1, ReplicaID: 1}, + {StoreID: 2, NodeID: 2, ReplicaID: 2}, + {StoreID: 3, NodeID: 3, ReplicaID: 3}, + {StoreID: 4, NodeID: 4, ReplicaID: 4}, + } + ctx := context.Background() + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) + defer stopper.Stop(ctx) + + // 4 stores where the lease count for each store is equal to 10x the store + // ID. + var stores []*roachpb.StoreDescriptor + for i := 1; i <= 4; i++ { + stores = append(stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)}, + }) + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + testCases := []struct { + existing []roachpb.ReplicaDescriptor + replsWithSendQueue, replsNotInStateReplicate []roachpb.ReplicaID + leaseholder roachpb.StoreID + excludeLeaseRepl bool + transferTarget roachpb.StoreID + }{ + { + existing: existing, + replsWithSendQueue: rIDs(1), + replsNotInStateReplicate: rIDs(1), + leaseholder: 3, + excludeLeaseRepl: false, + transferTarget: 0, + }, + { + existing: existing, + replsWithSendQueue: rIDs(1), + leaseholder: 3, + excludeLeaseRepl: false, + transferTarget: 0, + }, + { + existing: existing, + replsWithSendQueue: rIDs(1), + leaseholder: 3, + excludeLeaseRepl: true, + transferTarget: 2, + }, + { + existing: existing, + replsNotInStateReplicate: rIDs(1), + leaseholder: 3, + excludeLeaseRepl: true, + transferTarget: 2, + }, + { + existing: existing, + replsWithSendQueue: rIDs(1), + leaseholder: 4, + excludeLeaseRepl: false, + transferTarget: 2, + }, + { + existing: existing, + replsWithSendQueue: rIDs(1), + leaseholder: 4, + excludeLeaseRepl: true, + transferTarget: 2, + }, + { + existing: existing, + replsWithSendQueue: rIDs(1), + replsNotInStateReplicate: rIDs(2), + leaseholder: 4, + excludeLeaseRepl: true, + transferTarget: 3, + }, + { + existing: existing, + replsWithSendQueue: rIDs(1), + replsNotInStateReplicate: rIDs(2), + leaseholder: 4, + excludeLeaseRepl: false, + transferTarget: 0, + }, + { + existing: existing, + replsWithSendQueue: rIDs(1), + replsNotInStateReplicate: rIDs(2, 3), + leaseholder: 4, + excludeLeaseRepl: false, + transferTarget: 0, + }, + } + + for _, c := range testCases { + repl := &mockRepl{ + replicationFactor: 4, + storeID: c.leaseholder, + } + for _, r := range c.replsWithSendQueue { + repl.markReplAsHavingSendQueue(r) + } + for _, r := range c.replsNotInStateReplicate { + repl.markReplAsNotInStateReplicate(r) + } + t.Run("", func(t *testing.T) { + target := a.TransferLeaseTarget( + ctx, + sp, + &roachpb.RangeDescriptor{}, + emptySpanConfig(), + c.existing, + repl, + allocator.RangeUsageInfo{}, + false, /* alwaysAllowDecisionWithoutStats */ + allocator.TransferLeaseOptions{ + ExcludeLeaseRepl: c.excludeLeaseRepl, + CheckCandidateFullness: true, + }, + ) + if c.transferTarget != target.StoreID { + t.Fatalf("expected %d, but found %d", c.transferTarget, target.StoreID) + } + }) + } +} + func TestAllocatorTransferLeaseTargetConstraints(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/allocator/plan/BUILD.bazel b/pkg/kv/kvserver/allocator/plan/BUILD.bazel index b8acc612a70f..7a90f18450d8 100644 --- a/pkg/kv/kvserver/allocator/plan/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/plan/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/benignerror", + "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/kv/kvserver/kvserverpb", "//pkg/raft", "//pkg/raft/raftpb", diff --git a/pkg/kv/kvserver/allocator/plan/replicate.go b/pkg/kv/kvserver/allocator/plan/replicate.go index e461c1ace0fe..f2338eed8535 100644 --- a/pkg/kv/kvserver/allocator/plan/replicate.go +++ b/pkg/kv/kvserver/allocator/plan/replicate.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/benignerror" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -104,6 +105,7 @@ type AllocatorReplica interface { LastReplicaAdded() (roachpb.ReplicaID, time.Time) StoreID() roachpb.StoreID GetRangeID() roachpb.RangeID + SendStreamStats() rac2.RangeSendStreamStats } // ReplicaPlanner implements the ReplicationPlanner interface. diff --git a/pkg/kv/kvserver/asim/queue/BUILD.bazel b/pkg/kv/kvserver/asim/queue/BUILD.bazel index a696f92ea214..77bcf96eba90 100644 --- a/pkg/kv/kvserver/asim/queue/BUILD.bazel +++ b/pkg/kv/kvserver/asim/queue/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/state", "//pkg/kv/kvserver/constraint", + "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/kv/kvserver/kvserverpb", "//pkg/raft", "//pkg/roachpb", diff --git a/pkg/kv/kvserver/asim/queue/allocator_replica.go b/pkg/kv/kvserver/asim/queue/allocator_replica.go index a6c8ffa56f69..5249268ed25e 100644 --- a/pkg/kv/kvserver/asim/queue/allocator_replica.go +++ b/pkg/kv/kvserver/asim/queue/allocator_replica.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -162,3 +163,7 @@ func (sr *SimulatorReplica) Desc() *roachpb.RangeDescriptor { func (sr *SimulatorReplica) RangeUsageInfo() allocator.RangeUsageInfo { return sr.usage } + +func (sr *SimulatorReplica) SendStreamStats() rac2.RangeSendStreamStats { + return nil +} diff --git a/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel b/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel index d4d72a57fef8..85f11df40daa 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel +++ b/pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/op", "//pkg/kv/kvserver/asim/state", + "//pkg/kv/kvserver/kvflowcontrol/rac2", "//pkg/raft", "//pkg/roachpb", "//pkg/util/hlc", diff --git a/pkg/kv/kvserver/asim/storerebalancer/candidate_replica.go b/pkg/kv/kvserver/asim/storerebalancer/candidate_replica.go index 55e327971282..fbdf4b5f446c 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/candidate_replica.go +++ b/pkg/kv/kvserver/asim/storerebalancer/candidate_replica.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -114,6 +115,10 @@ func (sr *simulatorReplica) AdminTransferLease( return nil } +func (sr *simulatorReplica) SendStreamStats() rac2.RangeSendStreamStats { + return rac2.RangeSendStreamStats{} +} + // Replica returns the underlying kvserver replica, however when called from // the simulator it only returns nil. func (sr *simulatorReplica) Repl() *kvserver.Replica { diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 98332e0490cb..7daf30441daa 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/replica_rac2" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -2584,3 +2585,12 @@ func (r *Replica) maybeEnqueueProblemRange( r.store.replicateQueue.AddAsync(ctx, r, allocatorimpl.AllocatorReplaceDecommissioningVoter.Priority()) } + +// SendStreamStats returns the range's flow control send stream stats iff the +// replica is the raft leader and RACv2 is enabled, otherwise nil. +func (r *Replica) SendStreamStats() rac2.RangeSendStreamStats { + if r.flowControlV2 == nil { + return nil + } + return r.flowControlV2.SendStreamStats() +} diff --git a/pkg/kv/kvserver/replica_rankings.go b/pkg/kv/kvserver/replica_rankings.go index 14516d4dc74f..de54803d9f75 100644 --- a/pkg/kv/kvserver/replica_rankings.go +++ b/pkg/kv/kvserver/replica_rankings.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -53,6 +54,9 @@ type CandidateReplica interface { // RangeUsageInfo returns usage information (sizes and traffic) needed by // the allocator to make rebalancing decisions for a given range. RangeUsageInfo() allocator.RangeUsageInfo + // SendStreamStats returns the range's flow control send stream stats iff the + // replica is the raft leader and RACv2 is enabled, otherwise nil. + SendStreamStats() rac2.RangeSendStreamStats // AdminTransferLease transfers the LeaderLease to another replica. AdminTransferLease(ctx context.Context, target roachpb.StoreID, bypassSafetyChecks bool) error // Repl returns the underlying replica for this CandidateReplica. It is