Skip to content

Commit

Permalink
(replica_)rac2: add send stream stats method
Browse files Browse the repository at this point in the history
Add a new method to the `RangeController`,
`SendStreamStats()`, which returns a map of replica IDs -> replica send
stream stats:

```
// ReplicaSendStreamStats contains the stats for a replica send stream that may
// be used to inform placement decisions pertaining to the replica.
type ReplicaSendStreamStats struct {
	// IsStateReplicate is true iff the replica is being sent entries.
	IsStateReplicate bool
	// HasSendQueue is true when a replica has a non-zero amount of queued
	// entries waiting on flow tokens to be sent.
	//
	// This is only populated when IsStateReplicate is true.
	HasSendQueue bool
}
```

This method can be used to determine appropriate lease transfer targets.

Part of: #128028
Release note: None
  • Loading branch information
kvoli committed Oct 1, 2024
1 parent 5e8869e commit 778d94a
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 0 deletions.
43 changes: 43 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type RangeController interface {
// InspectRaftMuLocked returns a handle containing the state of the range
// controller. It's used to power /inspectz-style debugging pages.
InspectRaftMuLocked(ctx context.Context) kvflowinspectpb.Handle
// SendStreamStats returns the stats for the replica send streams that belong
// to this range controller. It is only populated on the leader. The stats
// may be used to inform placement decisions pertaining to the range.
SendStreamStats() RangeSendStreamStats
}

// RaftInterface implements methods needed by RangeController.
Expand Down Expand Up @@ -181,6 +185,22 @@ type ReplicaStateInfo struct {
Next uint64
}

// ReplicaSendStreamStats contains the stats for the replica send streams that
// belong to a range.
type RangeSendStreamStats map[roachpb.ReplicaID]ReplicaSendStreamStats

// ReplicaSendStreamStats contains the stats for a replica send stream that may
// be used to inform placement decisions pertaining to the replica.
type ReplicaSendStreamStats struct {
// IsStateReplicate is true iff the replica is being sent entries.
IsStateReplicate bool
// HasSendQueue is true when a replica has a non-zero amount of queued
// entries waiting on flow tokens to be sent.
//
// Ignore this value unless IsStateReplicate is true.
HasSendQueue bool
}

// RaftEvent carries a RACv2-relevant subset of raft state sent to storage.
type RaftEvent struct {
// MsgAppMode is the current mode. This is only relevant on the leader.
Expand Down Expand Up @@ -1132,6 +1152,29 @@ func (rc *rangeController) InspectRaftMuLocked(ctx context.Context) kvflowinspec
}
}

func (rc *rangeController) SendStreamStats() RangeSendStreamStats {
rc.mu.RLock()
defer rc.mu.RUnlock()

stats := RangeSendStreamStats{}
for i, vss := range rc.mu.voterSets {
for _, vs := range vss {
if i != 0 {
if _, ok := stats[vs.replicaID]; ok {
// NB: We have already seen this voter in the other set, the stats
// will be the same so we can skip it.
continue
}
}
stats[vs.replicaID] = ReplicaSendStreamStats{
IsStateReplicate: vs.isStateReplicate,
HasSendQueue: vs.hasSendQ,
}
}
}
return stats
}

func (rc *rangeController) updateReplicaSet(ctx context.Context, newSet ReplicaSet) {
prevSet := rc.replicaSet
for r := range prevSet {
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,20 @@ func TestRangeController(t *testing.T) {
require.NoError(t, err)
return fmt.Sprintf("%v", marshaled)

case "send_stream_stats":
var rangeID int
d.ScanArgs(t, "range_id", &rangeID)
r := state.ranges[roachpb.RangeID(rangeID)]
stats := r.rc.SendStreamStats()
var buf strings.Builder
for _, repl := range sortReplicas(r) {
buf.WriteString(fmt.Sprintf("%v: is_state_replicate=%-5v has_send_queue=%-5v\n",
repl,
stats[repl.ReplicaID].IsStateReplicate,
stats[repl.ReplicaID].HasSendQueue))
}
return buf.String()

default:
panic(fmt.Sprintf("unknown command: %s", d.Cmd))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# This test verifies that the RangeController correctly returns whether a
# replica's send queue bytes and whether the send stream is closed.
#
# s2 is in StateSnapshot so its send stream should be closed.
# s3 has a send queue via sending only a prefix of entries [1,2).
init
range_id=1 tenant_id=1 local_replica_id=1 next_raft_index=1
store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=1
store_id=2 replica_id=2 type=VOTER_FULL state=StateSnapshot next=1
store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate next=1
----
r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3]
t1/s1: eval reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
send reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
t1/s2: eval reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
send reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
t1/s3: eval reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
send reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB

raft_event
range_id=1
entries
term=1 index=1 pri=NormalPri size=1MiB
term=1 index=2 pri=NormalPri size=1MiB
term=1 index=3 pri=NormalPri size=1MiB
sending
replica_id=1 [1,4)
replica_id=2 [1,4)
replica_id=2 [1,2)
----
t1/s1: eval reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB
send reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB
t1/s2: eval reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
send reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB
t1/s3: eval reg=+13 MiB/+16 MiB ela=+5.0 MiB/+8.0 MiB
send reg=+16 MiB/+16 MiB ela=+8.0 MiB/+8.0 MiB

stream_state range_id=1
----
(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B force-flush=false
eval deducted: reg=+3.0 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
term=1 index=1 tokens=1048576
term=1 index=2 tokens=1048576
term=1 index=3 tokens=1048576
++++
(n2,s2):2: closed
++++
(n3,s3):3: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+3.0 MiB force-flush=false
eval deducted: reg=+3.0 MiB ela=+0 B
eval original in send-q: reg=+3.0 MiB ela=+0 B
++++

# TODO(kvoli): When we introduce the send queue fully, this should change to
# send_queue_bytes=2 MiB for s3.
send_stream_stats range_id=1
----
(n1,s1):1: is_state_replicate=true has_send_queue=false
(n2,s2):2: is_state_replicate=false has_send_queue=true
(n3,s3):3: is_state_replicate=true has_send_queue=true

close_rcs
----
range_id=1 tenant_id={1} local_replica_id=1
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,11 @@ type Processor interface {
// underlying range controller. It is used to power /inspectz-style debugging
// pages.
InspectRaftMuLocked(ctx context.Context) (kvflowinspectpb.Handle, bool)

// SendStreamStats returns the stats for the replica send streams It is only
// populated on the leader. The stats may be used to inform placement
// decisions pertaining to the range.
SendStreamStats() rac2.RangeSendStreamStats
}

// processorImpl implements Processor.
Expand Down Expand Up @@ -1176,6 +1181,16 @@ func (p *processorImpl) InspectRaftMuLocked(ctx context.Context) (kvflowinspectp
return p.leader.rc.InspectRaftMuLocked(ctx), true
}

// SendStreamStats implements Processor.
func (p *processorImpl) SendStreamStats() rac2.RangeSendStreamStats {
p.leader.rcReferenceUpdateMu.RLock()
defer p.leader.rcReferenceUpdateMu.RUnlock()
if p.leader.rc == nil {
return nil
}
return p.leader.rc.SendStreamStats()
}

// RangeControllerFactoryImpl implements the RangeControllerFactory interface.
var _ RangeControllerFactory = RangeControllerFactoryImpl{}

Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ func (c *testRangeController) InspectRaftMuLocked(ctx context.Context) kvflowins
return kvflowinspectpb.Handle{}
}

func (c *testRangeController) SendStreamStats() rac2.RangeSendStreamStats {
fmt.Fprintf(c.b, " RangeController.SendStreamStats\n")
return rac2.RangeSendStreamStats{}
}

func TestProcessorBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -526,6 +531,10 @@ func TestProcessorBasic(t *testing.T) {
p.InspectRaftMuLocked(ctx)
return builderStr()

case "send-stream-stats":
p.SendStreamStats()
return builderStr()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,10 @@ inspect
Replica.RaftMuAssertHeld
RangeController.InspectRaftMuLocked

send-stream-stats
----
RangeController.SendStreamStats

# Transition to follower. In this case, the leader is not even known.
set-raft-state term=51 leader=0
----
Expand Down

0 comments on commit 778d94a

Please sign in to comment.