Skip to content

Commit

Permalink
kvserver: exclude lease transfer targets with a send queue
Browse files Browse the repository at this point in the history
Previously, it was possible to transfer a lease to a replica which had a
raft send queue, likely unable to keep up once acquiring the lease.

As a result, the leaseholder would throttle incoming replication traffic
before evaluation because it waited in the store work queue, or for
elastic work, waited for available flow tokens.

Using the new `RangeController` method exposing information about
replicas' send queue, `SendStreamStats()`, exclude voter candidates as
lease transfer targets if:

```
HasSendQueue is true OR
IsStateReplicate is false
```

Resolves: #128028
Release note: None
  • Loading branch information
kvoli committed Oct 1, 2024
1 parent 778d94a commit 2d803cf
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 3 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/kv/kvserver/allocator/load",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/raftutil",
Expand Down Expand Up @@ -52,6 +53,7 @@ go_test(
"//pkg/kv/kvserver/allocator/load",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/replicastats",
Expand Down
45 changes: 45 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
Expand Down Expand Up @@ -2019,6 +2020,7 @@ func (a *Allocator) ValidLeaseTargets(
StoreID() roachpb.StoreID
RaftStatus() *raft.Status
GetFirstIndex() kvpb.RaftIndex
SendStreamStats() rac2.RangeSendStreamStats
},
opts allocator.TransferLeaseOptions,
) []roachpb.ReplicaDescriptor {
Expand Down Expand Up @@ -2082,6 +2084,8 @@ func (a *Allocator) ValidLeaseTargets(

candidates = append(validSnapshotCandidates, excludeReplicasInNeedOfSnapshots(
ctx, status, leaseRepl.GetFirstIndex(), candidates)...)
candidates = excludeReplicasInNeedOfCatchup(
ctx, leaseRepl.SendStreamStats(), candidates)
}

// Determine which store(s) is preferred based on user-specified preferences.
Expand Down Expand Up @@ -2190,6 +2194,7 @@ func (a *Allocator) LeaseholderShouldMoveDueToPreferences(
StoreID() roachpb.StoreID
RaftStatus() *raft.Status
GetFirstIndex() kvpb.RaftIndex
SendStreamStats() rac2.RangeSendStreamStats
},
allExistingReplicas []roachpb.ReplicaDescriptor,
exclReplsInNeedOfSnapshots bool,
Expand Down Expand Up @@ -2222,6 +2227,8 @@ func (a *Allocator) LeaseholderShouldMoveDueToPreferences(
if exclReplsInNeedOfSnapshots {
preferred = excludeReplicasInNeedOfSnapshots(
ctx, leaseRepl.RaftStatus(), leaseRepl.GetFirstIndex(), preferred)
preferred = excludeReplicasInNeedOfCatchup(
ctx, leaseRepl.SendStreamStats(), preferred)
}
if len(preferred) == 0 {
return false
Expand Down Expand Up @@ -2280,6 +2287,7 @@ func (a *Allocator) TransferLeaseTarget(
GetRangeID() roachpb.RangeID
RaftStatus() *raft.Status
GetFirstIndex() kvpb.RaftIndex
SendStreamStats() rac2.RangeSendStreamStats
},
usageInfo allocator.RangeUsageInfo,
forceDecisionWithoutStats bool,
Expand Down Expand Up @@ -2648,6 +2656,7 @@ func (a *Allocator) ShouldTransferLease(
StoreID() roachpb.StoreID
RaftStatus() *raft.Status
GetFirstIndex() kvpb.RaftIndex
SendStreamStats() rac2.RangeSendStreamStats
},
usageInfo allocator.RangeUsageInfo,
) TransferLeaseDecision {
Expand Down Expand Up @@ -3050,6 +3059,42 @@ func excludeReplicasInNeedOfSnapshots(
return replicas[:filled]
}

// excludeReplicasInNeedOfCatchup filters out the `replicas` that may be in
// need of a catchup messages before able to apply the lease, based on the
// provided RangeSendStreamStats.
func excludeReplicasInNeedOfCatchup(
ctx context.Context,
sendStreamStats rac2.RangeSendStreamStats,
replicas []roachpb.ReplicaDescriptor,
) []roachpb.ReplicaDescriptor {
if sendStreamStats == nil {
// When we don't have stats, we can't make an informed decision about which
// replicas are behind. We'll just return the replicas as is. This can
// occur if the current leaseholder is not yet the raft leader, or only
// recently became one (concurrent to the lease transfer decision).
return replicas
}
filled := 0
for _, repl := range replicas {
if stats, ok := sendStreamStats[repl.ReplicaID]; ok &&
(!stats.IsStateReplicate || stats.HasSendQueue) {
log.KvDistribution.VEventf(ctx, 5,
"not considering %s as a potential candidate for a lease transfer "+
"because the replica requires catchup: "+
"[is_state_replicate=%v has_send_queue=%v]",
repl, stats.IsStateReplicate, stats.HasSendQueue)
continue
}
// We are also not excluding any replicas which weren't included in the
// stats here. If they weren't included it indicates that they were either
// recently added or removed and in either case we don't know enough to
// preclude them as lease transfer targets.
replicas[filled] = repl
filled++
}
return replicas[:filled]
}

// simulateFilterUnremovableReplicas removes any unremovable replicas from the
// supplied slice. Unlike FilterUnremovableReplicas, brandNewReplicaID is
// considered up-to-date (and thus can participate in quorum), but is not
Expand Down
182 changes: 179 additions & 3 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
Expand Down Expand Up @@ -1910,9 +1911,11 @@ func TestAllocatorRebalanceByCount(t *testing.T) {
// mockRepl satisfies the interface for the `leaseRepl` passed into
// `Allocator.TransferLeaseTarget()` for these tests.
type mockRepl struct {
replicationFactor int32
storeID roachpb.StoreID
replsInNeedOfSnapshot map[roachpb.ReplicaID]struct{}
replicationFactor int32
storeID roachpb.StoreID
replsInNeedOfSnapshot map[roachpb.ReplicaID]struct{}
replsWithSendQueue map[roachpb.ReplicaID]struct{}
replsNotInStateReplicate map[roachpb.ReplicaID]struct{}
}

func (r *mockRepl) RaftStatus() *raft.Status {
Expand Down Expand Up @@ -1945,13 +1948,42 @@ func (r *mockRepl) GetRangeID() roachpb.RangeID {
return roachpb.RangeID(0)
}

func (r *mockRepl) SendStreamStats() rac2.RangeSendStreamStats {
rangeStats := rac2.RangeSendStreamStats{}
for i := int32(1); i <= r.replicationFactor; i++ {
replStats := rac2.ReplicaSendStreamStats{}
if _, ok := r.replsWithSendQueue[roachpb.ReplicaID(i)]; ok {
replStats.HasSendQueue = true
}
if _, ok := r.replsNotInStateReplicate[roachpb.ReplicaID(i)]; !ok {
replStats.IsStateReplicate = true
}
rangeStats[roachpb.ReplicaID(i)] = replStats
}
return rangeStats
}

func (r *mockRepl) markReplAsNeedingSnapshot(id roachpb.ReplicaID) {
if r.replsInNeedOfSnapshot == nil {
r.replsInNeedOfSnapshot = make(map[roachpb.ReplicaID]struct{})
}
r.replsInNeedOfSnapshot[id] = struct{}{}
}

func (r *mockRepl) markReplAsHavingSendQueue(id roachpb.ReplicaID) {
if r.replsWithSendQueue == nil {
r.replsWithSendQueue = make(map[roachpb.ReplicaID]struct{})
}
r.replsWithSendQueue[id] = struct{}{}
}

func (r *mockRepl) markReplAsNotInStateReplicate(id roachpb.ReplicaID) {
if r.replsNotInStateReplicate == nil {
r.replsNotInStateReplicate = make(map[roachpb.ReplicaID]struct{})
}
r.replsNotInStateReplicate[id] = struct{}{}
}

func TestAllocatorTransferLeaseTarget(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -2268,6 +2300,150 @@ func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) {
}
}

func rIDs(replicaIDs ...int) []roachpb.ReplicaID {
repls := make([]roachpb.ReplicaID, len(replicaIDs))
for i, id := range replicaIDs {
repls[i] = roachpb.ReplicaID(id)
}
return repls
}

func TestAllocatorTransferLeaseToReplicasNeedingCatchup(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

existing := []roachpb.ReplicaDescriptor{
{StoreID: 1, NodeID: 1, ReplicaID: 1},
{StoreID: 2, NodeID: 2, ReplicaID: 2},
{StoreID: 3, NodeID: 3, ReplicaID: 3},
{StoreID: 4, NodeID: 4, ReplicaID: 4},
}
ctx := context.Background()
stopper, g, sp, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */)
defer stopper.Stop(ctx)

// 4 stores where the lease count for each store is equal to 10x the store
// ID.
var stores []*roachpb.StoreDescriptor
for i := 1; i <= 4; i++ {
stores = append(stores, &roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(i),
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)},
Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)},
})
}
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)

testCases := []struct {
existing []roachpb.ReplicaDescriptor
replsWithSendQueue, replsNotInStateReplicate []roachpb.ReplicaID
leaseholder roachpb.StoreID
excludeLeaseRepl bool
transferTarget roachpb.StoreID
}{
{
existing: existing,
replsWithSendQueue: rIDs(1),
replsNotInStateReplicate: rIDs(1),
leaseholder: 3,
excludeLeaseRepl: false,
transferTarget: 0,
},
{
existing: existing,
replsWithSendQueue: rIDs(1),
leaseholder: 3,
excludeLeaseRepl: false,
transferTarget: 0,
},
{
existing: existing,
replsWithSendQueue: rIDs(1),
leaseholder: 3,
excludeLeaseRepl: true,
transferTarget: 2,
},
{
existing: existing,
replsNotInStateReplicate: rIDs(1),
leaseholder: 3,
excludeLeaseRepl: true,
transferTarget: 2,
},
{
existing: existing,
replsWithSendQueue: rIDs(1),
leaseholder: 4,
excludeLeaseRepl: false,
transferTarget: 2,
},
{
existing: existing,
replsWithSendQueue: rIDs(1),
leaseholder: 4,
excludeLeaseRepl: true,
transferTarget: 2,
},
{
existing: existing,
replsWithSendQueue: rIDs(1),
replsNotInStateReplicate: rIDs(2),
leaseholder: 4,
excludeLeaseRepl: true,
transferTarget: 3,
},
{
existing: existing,
replsWithSendQueue: rIDs(1),
replsNotInStateReplicate: rIDs(2),
leaseholder: 4,
excludeLeaseRepl: false,
transferTarget: 0,
},
{
existing: existing,
replsWithSendQueue: rIDs(1),
replsNotInStateReplicate: rIDs(2, 3),
leaseholder: 4,
excludeLeaseRepl: false,
transferTarget: 0,
},
}

for _, c := range testCases {
repl := &mockRepl{
replicationFactor: 4,
storeID: c.leaseholder,
}
for _, r := range c.replsWithSendQueue {
repl.markReplAsHavingSendQueue(r)
}
for _, r := range c.replsNotInStateReplicate {
repl.markReplAsNotInStateReplicate(r)
}
t.Run("", func(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
c.existing,
repl,
allocator.RangeUsageInfo{},
false, /* alwaysAllowDecisionWithoutStats */
allocator.TransferLeaseOptions{
ExcludeLeaseRepl: c.excludeLeaseRepl,
CheckCandidateFullness: true,
},
)
if c.transferTarget != target.StoreID {
t.Fatalf("expected %d, but found %d", c.transferTarget, target.StoreID)
}
})
}
}

func TestAllocatorTransferLeaseTargetConstraints(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/plan/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/benignerror",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/raft",
"//pkg/raft/raftpb",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/allocator/plan/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/benignerror"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -104,6 +105,7 @@ type AllocatorReplica interface {
LastReplicaAdded() (roachpb.ReplicaID, time.Time)
StoreID() roachpb.StoreID
GetRangeID() roachpb.RangeID
SendStreamStats() rac2.RangeSendStreamStats
}

// ReplicaPlanner implements the ReplicationPlanner interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/asim/queue/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/state",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/raft",
"//pkg/roachpb",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/asim/queue/allocator_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/plan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -162,3 +163,7 @@ func (sr *SimulatorReplica) Desc() *roachpb.RangeDescriptor {
func (sr *SimulatorReplica) RangeUsageInfo() allocator.RangeUsageInfo {
return sr.usage
}

func (sr *SimulatorReplica) SendStreamStats() rac2.RangeSendStreamStats {
return nil
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/asim/storerebalancer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/op",
"//pkg/kv/kvserver/asim/state",
"//pkg/kv/kvserver/kvflowcontrol/rac2",
"//pkg/raft",
"//pkg/roachpb",
"//pkg/util/hlc",
Expand Down
Loading

0 comments on commit 2d803cf

Please sign in to comment.