Skip to content

Commit

Permalink
rac2: improve TestRangeController wrt match and inflight-bytes
Browse files Browse the repository at this point in the history
This is in preparation for RangeController needing the per-replica
inflight bytes. The existing test was unnecessarily and implicitly
changing match. This happened in SendMsgAppRaftMuLocked, handling
of set_replicas and raft_event. This is now changed to only adjust
match explicitly via admit. And set_replicas will set match to 0,
but there is now a provision to explicitly provide a match value,
which is used to preserve an existing match value.

This permits maintaining ReplicaStateInfo.InflightBytes (which is
currently only relevant to the test) in the test code, by adjusting
it when match and next are changed. This is done via looking at
entries in MemoryStorage. The wait_for_eval_send_q is slightly modified,
since it needs to actually send the entry at index 1, so that it
can be retrieved from MemoryStorage.

Informs cockroachdb#135814

Epic: CRDB-37515

Release note: None
  • Loading branch information
sumeerbhola committed Dec 10, 2024
1 parent 28d9b53 commit 4fdc203
Show file tree
Hide file tree
Showing 18 changed files with 239 additions and 128 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ go_test(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_guptarohit_asciigraph//:asciigraph",
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ type ReplicaStateInfo struct {
// (Match, Next) is in-flight.
Match uint64
Next uint64
// InflightBytes are the bytes that have been sent but not yet persisted. It
// corresponds to tracker.Inflights.bytes.
InflightBytes uint64
}

// sendQueueStatRefreshInterval is the interval at which the send queue stats
Expand Down
118 changes: 92 additions & 26 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"cmp"
"context"
"fmt"
"math"
"slices"
"sort"
"strconv"
Expand Down Expand Up @@ -39,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -370,15 +372,20 @@ func (s *testingRCState) getOrInitRange(
s.ranges[r.rangeID] = testRC
}
s.maybeSetInitialTokens(r)
// Send through an empty raft event to trigger creating necessary replica
// send streams for the range.
event := testRC.makeRaftEventWithReplicasState()
event.MsgAppMode = mode
func() {
testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Lock()
defer testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock()
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(s.testCtx, event))
}()
if !ok {
// Send through an empty raft event to trigger creating necessary replica
// send streams for the range.
event := testRC.makeRaftEventWithReplicasState()
event.MsgAppMode = mode
func() {
testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Lock()
defer testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock()
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(s.testCtx, event))
}()
}
// Else, this is an existing testingRCRange. The caller may want to send an
// empty raft event too, to enact some changes.

return testRC
}

Expand Down Expand Up @@ -484,8 +491,7 @@ func (r *testingRCRange) SendMsgAppRaftMuLocked(
if !ok {
panic("unknown replica")
}
testR.info.Match = max(msg.Entries[0].Index-1, testR.info.Match)
testR.info.Next = msg.Entries[len(msg.Entries)-1].Index + 1
testR = updateNext(r, testR, msg.Entries[len(msg.Entries)-1].Index+1)
r.mu.r.replicaSet[replicaID] = testR
return msg, true
}
Expand Down Expand Up @@ -529,7 +535,7 @@ func (r *testingRCRange) admit(ctx context.Context, storeID roachpb.StoreID, av
for _, v := range av.Admitted {
// Ensure that Match doesn't lag behind the highest index in the
// AdmittedVector.
replica.info.Match = max(replica.info.Match, v)
replica = tryUpdateMatch(r, replica, v)
}
replicaID = replica.desc.ReplicaID
r.mu.r.replicaSet[replicaID] = replica
Expand All @@ -548,6 +554,23 @@ func (r *testingRCRange) admit(ctx context.Context, storeID roachpb.StoreID, av
r.rc.AdmitRaftMuLocked(ctx, replicaID, av)
}

func (r *testingRCRange) updateReplicas(t *testing.T, tr testingRange) {
r.mu.Lock()
defer r.mu.Unlock()
for replicaID, replica := range tr.replicaSet {
require.Equal(t, uint64(0), replica.info.InflightBytes)
if replica.info.Match+1 < replica.info.Next {
entries, err := r.raftLog.Entries(replica.info.Match+1, replica.info.Next, math.MaxUint64)
require.NoError(t, err)
for i := range entries {
replica.info.InflightBytes += uint64(len(entries[i].Data))
}
tr.replicaSet[replicaID] = replica
}
}
r.mu.r = tr
}

type testingRange struct {
rangeID roachpb.RangeID
tenantID roachpb.TenantID
Expand Down Expand Up @@ -598,6 +621,45 @@ type testingReplica struct {
info ReplicaStateInfo
}

func tryUpdateMatch(r *testingRCRange, replica testingReplica, match uint64) testingReplica {
if match <= replica.info.Match {
return replica
}
entries, err := r.raftLog.Entries(replica.info.Match+1, match+1, math.MaxUint64)
if err != nil {
panic(err)
}
for i := range entries {
entrySize := uint64(len(entries[i].Data))
if replica.info.InflightBytes < entrySize {
panic(errors.Errorf(
"inflight-bytes %d < entrySize %d at index %d", replica.info.InflightBytes, entrySize, i))
}
replica.info.InflightBytes -= entrySize
}
replica.info.Match = match
return replica
}

func updateNext(r *testingRCRange, replica testingReplica, next uint64) testingReplica {
if next < replica.info.Next {
panic(errors.Errorf("next %d < replica.info.Next %d", next, replica.info.Next))
}
if next == replica.info.Next {
return replica
}
entries, err := r.raftLog.Entries(replica.info.Next, next, math.MaxUint64)
if err != nil {
panic(err)
}
for i := range entries {
entrySize := uint64(len(entries[i].Data))
replica.info.InflightBytes += entrySize
}
replica.info.Next = next
return replica
}

func scanRanges(t *testing.T, input string) []testingRange {
replicas := []testingRange{}

Expand Down Expand Up @@ -694,11 +756,12 @@ func scanReplica(t *testing.T, line string) testingReplica {
}

next := uint64(0)
match := uint64(0)
// The fourth field is optional, if set it contains the tracker state of the
// replica on the leader replica (localReplicaID). The valid states are
// Probe, Replicate, and Snapshot.
if len(parts) > 3 {
require.Equal(t, 5, len(parts))
require.LessOrEqual(t, 5, len(parts))
parts[3] = strings.TrimSpace(parts[3])
require.True(t, strings.HasPrefix(parts[3], "state="))
parts[3] = strings.TrimPrefix(strings.TrimSpace(parts[3]), "state=")
Expand All @@ -718,6 +781,15 @@ func scanReplica(t *testing.T, line string) testingReplica {
nextInt, err := strconv.Atoi(parts[4])
require.NoError(t, err)
next = uint64(nextInt)
if len(parts) > 5 {
require.Equal(t, 6, len(parts))
parts[5] = strings.TrimSpace(parts[5])
require.True(t, strings.HasPrefix(parts[5], "match="))
parts[5] = strings.TrimPrefix(strings.TrimSpace(parts[5]), "match=")
matchInt, err := strconv.Atoi(parts[5])
require.NoError(t, err)
match = uint64(matchInt)
}
}

return testingReplica{
Expand All @@ -727,7 +799,7 @@ func scanReplica(t *testing.T, line string) testingReplica {
ReplicaID: roachpb.ReplicaID(replicaID),
Type: replicaType,
},
info: ReplicaStateInfo{State: state, Next: next},
info: ReplicaStateInfo{State: state, Match: match, Next: next},
}
}

Expand Down Expand Up @@ -1165,11 +1237,7 @@ func TestRangeController(t *testing.T) {
}
for _, r := range scanRanges(t, d.Input) {
testRC := state.getOrInitRange(t, r, mode)
func() {
testRC.mu.Lock()
defer testRC.mu.Unlock()
testRC.mu.r = r
}()
testRC.updateReplicas(t, r)
func() {
testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Lock()
defer testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock()
Expand Down Expand Up @@ -1356,20 +1424,18 @@ func TestRangeController(t *testing.T) {
// Else MsgAppPull mode, so raftEvent.MsgApps is unpopulated.

if len(msgApp.Entries) > 0 {
// Bump the Next and Index fields for replicas that have
// MsgApps being sent to them. The Match index is only updated
// if it increases.
testR.info.Match = max(msgApp.Entries[0].Index-1, testR.info.Match)
testR.info.Next = msgApp.Entries[len(msgApp.Entries)-1].Index + 1
// Bump the Next field for replicas that have MsgApps being
// sent to them.
testR = updateNext(testRC, testR, msgApp.Entries[len(msgApp.Entries)-1].Index+1)
testRC.mu.r.replicaSet[replicaID] = testR
} else if testR.desc.ReplicaID == testRC.mu.r.localReplicaID &&
len(raftEvent.Entries) > 0 {
// Leader does not see MsgApps, but the Next needs to be bumped.
//
// TODO(sumeer): many of the test cases are sending MsgApps to
// the leader. Stop doing it.
testR.info.Match = max(raftEvent.Entries[0].Index-1, testR.info.Match)
testR.info.Next = raftEvent.Entries[len(raftEvent.Entries)-1].Index + 1
testR = updateNext(
testRC, testR, raftEvent.Entries[len(raftEvent.Entries)-1].Index+1)
testRC.mu.r.replicaSet[replicaID] = testR
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,21 @@ NormalPri:
# tracked deductions for entries in [1,3].
set_replicas
range_id=1 tenant_id=1 local_replica_id=1 next_raft_index=4
store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=4
store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate next=4
store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=4 match=2
store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate next=4 match=1
store_id=3 replica_id=3 type=VOTER_FULL state=StateSnapshot next=4
----
r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3]

stream_state range_id=1
----
(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B
(n1,s1):1: state=replicate closed=false inflight=[3,4) send_queue=[4,4) precise_q_size=+0 B
eval deducted: reg=+1.0 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
term=1 index=3 tokens=1048576
++++
(n2,s2):2: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B
(n2,s2):2: state=replicate closed=false inflight=[2,4) send_queue=[4,4) precise_q_size=+0 B
eval deducted: reg=+2.0 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ t1/s3: eval reg=-4.5 MiB/+16 MiB ela=-4.5 MiB/+8.0 MiB

stream_state range_id=1
----
(n1,s1):1: state=replicate closed=false inflight=[4,7) send_queue=[7,7) precise_q_size=+0 B
(n1,s1):1: state=replicate closed=false inflight=[1,7) send_queue=[7,7) precise_q_size=+0 B
eval deducted: reg=+7.5 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
Expand All @@ -183,7 +183,7 @@ LowPri:
term=1 index=2 tokens=1048576
term=1 index=3 tokens=1048576
++++
(n3,s3):3: state=replicate closed=false inflight=[4,7) send_queue=[7,7) precise_q_size=+0 B
(n3,s3):3: state=replicate closed=false inflight=[1,7) send_queue=[7,7) precise_q_size=+0 B
eval deducted: reg=+4.5 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
Expand Down Expand Up @@ -247,7 +247,7 @@ NormalPri:
term=1 index=5 tokens=1572864
term=1 index=6 tokens=1572864
++++
(n2,s2):2: state=replicate closed=false inflight=[4,6) send_queue=[6,7) precise_q_size=+1.5 MiB force-flushing
(n2,s2):2: state=replicate closed=false inflight=[1,6) send_queue=[6,7) precise_q_size=+1.5 MiB force-flushing
eval deducted: reg=+0 B ela=+7.5 MiB
eval original in send-q: reg=+1.5 MiB ela=+0 B
LowPri:
Expand Down Expand Up @@ -279,7 +279,7 @@ NormalPri:
term=1 index=5 tokens=1572864
term=1 index=6 tokens=1572864
++++
(n2,s2):2: state=replicate closed=false inflight=[6,7) send_queue=[7,7) precise_q_size=+0 B
(n2,s2):2: state=replicate closed=false inflight=[1,7) send_queue=[7,7) precise_q_size=+0 B
eval deducted: reg=+0 B ela=+7.5 MiB
eval original in send-q: reg=+0 B ela=+0 B
LowPri:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ NormalPri:
term=1 index=4 tokens=6291456
term=1 index=5 tokens=6291456
++++
(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,6) precise_q_size=+18 MiB watching-for-tokens
(n2,s2):2: state=replicate closed=false inflight=[1,3) send_queue=[3,6) precise_q_size=+18 MiB watching-for-tokens
eval deducted: reg=+0 B ela=+30 MiB
eval original in send-q: reg=+18 MiB ela=+0 B
LowPri:
Expand Down Expand Up @@ -176,7 +176,7 @@ NormalPri:
term=1 index=4 tokens=6291456
term=1 index=5 tokens=6291456
++++
(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,6) precise_q_size=+18 MiB force-flushing (stop=3)
(n2,s2):2: state=replicate closed=false inflight=[1,3) send_queue=[3,6) precise_q_size=+18 MiB force-flushing (stop=3)
eval deducted: reg=+0 B ela=+30 MiB
eval original in send-q: reg=+18 MiB ela=+0 B
LowPri:
Expand Down Expand Up @@ -210,7 +210,7 @@ NormalPri:
term=1 index=4 tokens=6291456
term=1 index=5 tokens=6291456
++++
(n2,s2):2: state=replicate closed=false inflight=[3,4) send_queue=[4,6) precise_q_size=+12 MiB watching-for-tokens
(n2,s2):2: state=replicate closed=false inflight=[1,4) send_queue=[4,6) precise_q_size=+12 MiB watching-for-tokens
eval deducted: reg=+0 B ela=+30 MiB
eval original in send-q: reg=+12 MiB ela=+0 B
LowPri:
Expand Down Expand Up @@ -246,7 +246,7 @@ NormalPri:
term=1 index=4 tokens=6291456
term=1 index=5 tokens=6291456
++++
(n2,s2):2: state=replicate closed=false inflight=[3,4) send_queue=[4,6) precise_q_size=+12 MiB watching-for-tokens
(n2,s2):2: state=replicate closed=false inflight=[1,4) send_queue=[4,6) precise_q_size=+12 MiB watching-for-tokens
eval deducted: reg=+0 B ela=+30 MiB
eval original in send-q: reg=+12 MiB ela=+0 B
LowPri:
Expand Down Expand Up @@ -279,7 +279,7 @@ NormalPri:
term=1 index=4 tokens=6291456
term=1 index=5 tokens=6291456
++++
(n2,s2):2: state=replicate closed=false inflight=[3,4) send_queue=[4,6) precise_q_size=+12 MiB force-flushing (stop=4)
(n2,s2):2: state=replicate closed=false inflight=[1,4) send_queue=[4,6) precise_q_size=+12 MiB force-flushing (stop=4)
eval deducted: reg=+0 B ela=+30 MiB
eval original in send-q: reg=+12 MiB ela=+0 B
LowPri:
Expand Down Expand Up @@ -349,7 +349,7 @@ NormalPri:
term=1 index=4 tokens=6291456
term=1 index=5 tokens=6291456
++++
(n2,s2):2: state=replicate closed=false inflight=[4,5) send_queue=[5,6) precise_q_size=+6.0 MiB watching-for-tokens
(n2,s2):2: state=replicate closed=false inflight=[1,5) send_queue=[5,6) precise_q_size=+6.0 MiB watching-for-tokens
eval deducted: reg=+0 B ela=+30 MiB
eval original in send-q: reg=+6.0 MiB ela=+0 B
LowPri:
Expand Down Expand Up @@ -388,7 +388,7 @@ NormalPri:
term=1 index=4 tokens=6291456
term=1 index=5 tokens=6291456
++++
(n2,s2):2: state=replicate closed=false inflight=[4,5) send_queue=[5,6) precise_q_size=+6.0 MiB force-flushing
(n2,s2):2: state=replicate closed=false inflight=[1,5) send_queue=[5,6) precise_q_size=+6.0 MiB force-flushing
eval deducted: reg=+0 B ela=+30 MiB
eval original in send-q: reg=+6.0 MiB ela=+0 B
LowPri:
Expand All @@ -415,7 +415,7 @@ NormalPri:
term=1 index=4 tokens=6291456
term=1 index=5 tokens=6291456
++++
(n2,s2):2: state=replicate closed=false inflight=[5,6) send_queue=[6,6) precise_q_size=+0 B
(n2,s2):2: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B
eval deducted: reg=+0 B ela=+30 MiB
eval original in send-q: reg=+0 B ela=+0 B
LowPri:
Expand Down Expand Up @@ -447,7 +447,7 @@ t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB

stream_state range_id=1
----
(n1,s1):1: state=replicate closed=false inflight=[6,7) send_queue=[7,7) precise_q_size=+0 B
(n1,s1):1: state=replicate closed=false inflight=[1,7) send_queue=[7,7) precise_q_size=+0 B
eval deducted: reg=+36 MiB ela=+0 B
eval original in send-q: reg=+0 B ela=+0 B
NormalPri:
Expand All @@ -458,7 +458,7 @@ NormalPri:
term=1 index=5 tokens=6291456
term=1 index=6 tokens=6291456
++++
(n2,s2):2: state=replicate closed=false inflight=[6,7) send_queue=[7,7) precise_q_size=+0 B
(n2,s2):2: state=replicate closed=false inflight=[1,7) send_queue=[7,7) precise_q_size=+0 B
eval deducted: reg=+6.0 MiB ela=+30 MiB
eval original in send-q: reg=+0 B ela=+0 B
LowPri:
Expand Down
Loading

0 comments on commit 4fdc203

Please sign in to comment.