diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 2aa409cfaaea..7b3743fe7c0e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -99,6 +99,10 @@ type RangeController interface { // InspectRaftMuLocked returns a handle containing the state of the range // controller. It's used to power /inspectz-style debugging pages. InspectRaftMuLocked(ctx context.Context) kvflowinspectpb.Handle + // SendStreamStats returns the stats for the replica send streams that belong + // to this range controller. It is only populated on the leader. The stats + // may be used to inform placement decisions pertaining to the range. + SendStreamStats() RangeSendStreamStats } // RaftInterface implements methods needed by RangeController. @@ -181,6 +185,22 @@ type ReplicaStateInfo struct { Next uint64 } +// ReplicaSendStreamStats contains the stats for the replica send streams that +// belong to a range. +type RangeSendStreamStats map[roachpb.ReplicaID]ReplicaSendStreamStats + +// ReplicaSendStreamStats contains the stats for a replica send stream that may +// be used to inform placement decisions pertaining to the replica. +type ReplicaSendStreamStats struct { + // IsStateReplicate is true iff the replica is being sent entries. + IsStateReplicate bool + // HasSendQueue is true when a replica has a non-zero amount of queued + // entries waiting on flow tokens to be sent. + // + // Ignore this value unless IsStateReplicate is true. + HasSendQueue bool +} + // RaftEvent carries a RACv2-relevant subset of raft state sent to storage. type RaftEvent struct { // MsgAppMode is the current mode. This is only relevant on the leader. @@ -1132,6 +1152,29 @@ func (rc *rangeController) InspectRaftMuLocked(ctx context.Context) kvflowinspec } } +func (rc *rangeController) SendStreamStats() RangeSendStreamStats { + rc.mu.RLock() + defer rc.mu.RUnlock() + + stats := RangeSendStreamStats{} + for i, vss := range rc.mu.voterSets { + for _, vs := range vss { + if i != 0 { + if _, ok := stats[vs.replicaID]; ok { + // NB: We have already seen this voter in the other set, the stats + // will be the same so we can skip it. + continue + } + } + stats[vs.replicaID] = ReplicaSendStreamStats{ + IsStateReplicate: vs.isStateReplicate, + HasSendQueue: vs.hasSendQ, + } + } + } + return stats +} + func (rc *rangeController) updateReplicaSet(ctx context.Context, newSet ReplicaSet) { prevSet := rc.replicaSet for r := range prevSet { diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 7863496e3878..be4746dca9d7 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -1194,6 +1194,20 @@ func TestRangeController(t *testing.T) { require.NoError(t, err) return fmt.Sprintf("%v", marshaled) + case "send_stream_stats": + var rangeID int + d.ScanArgs(t, "range_id", &rangeID) + r := state.ranges[roachpb.RangeID(rangeID)] + stats := r.rc.SendStreamStats() + var buf strings.Builder + for _, repl := range sortReplicas(r) { + buf.WriteString(fmt.Sprintf("%v: is_state_replicate=%-5v has_send_queue=%-5v\n", + repl, + stats[repl.ReplicaID].IsStateReplicate, + stats[repl.ReplicaID].HasSendQueue)) + } + return buf.String() + default: panic(fmt.Sprintf("unknown command: %s", d.Cmd)) } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/send_stream_stats b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/send_stream_stats new file mode 100644 index 000000000000..526bdf3da9dc --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/send_stream_stats @@ -0,0 +1,65 @@ +# This test verifies that the RangeController correctly returns whether a +# replica's send queue bytes and whether the send stream is closed. +# +# s2 is in StateSnapshot so its send stream should be closed. +# s3 has a send queue via sending only a prefix of entries [1,2). +init +range_id=1 tenant_id=1 local_replica_id=1 next_raft_index=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=1 + store_id=2 replica_id=2 type=VOTER_FULL state=StateSnapshot next=1 + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate next=1 +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] +t1/s1: eval reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB + send reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s2: eval reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB + send reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s3: eval reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB + send reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB + +raft_event +range_id=1 + entries + term=1 index=1 pri=NormalPri size=1MiB + term=1 index=2 pri=NormalPri size=1MiB + term=1 index=3 pri=NormalPri size=1MiB + sending + replica_id=1 [1,4) + replica_id=2 [1,4) + replica_id=2 [1,2) +---- +t1/s1: eval reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB + send reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB +t1/s2: eval reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB + send reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB +t1/s3: eval reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB + send reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B force-flush=false +eval deducted: reg=+3.0 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=1048576 + term=1 index=2 tokens=1048576 + term=1 index=3 tokens=1048576 +++++ +(n2,s2):2: closed +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+3.0 MiB force-flush=false +eval deducted: reg=+3.0 MiB ela=+0 B +eval original in send-q: reg=+3.0 MiB ela=+0 B +++++ + +# TODO(kvoli): When we introduce the send queue fully, this should change to +# send_queue_bytes=2 MiB for s3. +send_stream_stats range_id=1 +---- +(n1,s1):1: is_state_replicate=true has_send_queue=false +(n2,s2):2: is_state_replicate=false has_send_queue=true +(n3,s3):3: is_state_replicate=true has_send_queue=true + +close_rcs +---- +range_id=1 tenant_id={1} local_replica_id=1 diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 96a20589e9fd..7c96f7e0aa81 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -399,6 +399,11 @@ type Processor interface { // underlying range controller. It is used to power /inspectz-style debugging // pages. InspectRaftMuLocked(ctx context.Context) (kvflowinspectpb.Handle, bool) + + // SendStreamStats returns the stats for the replica send streams It is only + // populated on the leader. The stats may be used to inform placement + // decisions pertaining to the range. + SendStreamStats() rac2.RangeSendStreamStats } // processorImpl implements Processor. @@ -1176,6 +1181,16 @@ func (p *processorImpl) InspectRaftMuLocked(ctx context.Context) (kvflowinspectp return p.leader.rc.InspectRaftMuLocked(ctx), true } +// SendStreamStats implements Processor. +func (p *processorImpl) SendStreamStats() rac2.RangeSendStreamStats { + p.leader.rcReferenceUpdateMu.RLock() + defer p.leader.rcReferenceUpdateMu.RUnlock() + if p.leader.rc == nil { + return nil + } + return p.leader.rc.SendStreamStats() +} + // RangeControllerFactoryImpl implements the RangeControllerFactory interface. var _ RangeControllerFactory = RangeControllerFactoryImpl{} diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index c76e204ac1df..553d1571a710 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -278,6 +278,11 @@ func (c *testRangeController) InspectRaftMuLocked(ctx context.Context) kvflowins return kvflowinspectpb.Handle{} } +func (c *testRangeController) SendStreamStats() rac2.RangeSendStreamStats { + fmt.Fprintf(c.b, " RangeController.SendStreamStats\n") + return rac2.RangeSendStreamStats{} +} + func TestProcessorBasic(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -526,6 +531,10 @@ func TestProcessorBasic(t *testing.T) { p.InspectRaftMuLocked(ctx) return builderStr() + case "send-stream-stats": + p.SendStreamStats() + return builderStr() + default: return fmt.Sprintf("unknown command: %s", d.Cmd) } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 57dad60d3038..d9b408a45e38 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -815,6 +815,10 @@ inspect Replica.RaftMuAssertHeld RangeController.InspectRaftMuLocked +send-stream-stats +---- + RangeController.SendStreamStats + # Transition to follower. In this case, the leader is not even known. set-raft-state term=51 leader=0 ----