Skip to content

Commit

Permalink
tracker: rename MsgAppFlowPaused
Browse files Browse the repository at this point in the history
Epic: none
Release note: none
  • Loading branch information
pav-kv committed May 28, 2024
1 parent 3244a7c commit 2b3ab55
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 57 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/flow_control_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (rf *replicaFlowControl) getBehindFollowers() map[roachpb.ReplicaID]struct{
// time for it to catch up and then later return those tokens to us.
// This is I3a again; do it as part of #95563.
_ = progress.RecentActive
_ = progress.MsgAppFlowPaused
_ = progress.MsgAppProbesPaused
_ = progress.Match
})
return behindFollowers
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/flow_control_replica_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (
// follows: progress=(replid@match:<state>:<active>:<paused>,...).
// <state> is one of {probe,replicate,snapshot}, <active> is
// {active,!inactive}, and <paused> is {paused,!paused}. The latter controls
// MsgAppFlowPaused in the raft library, not the CRDB-level follower
// MsgAppProbesPaused in the raft library, not the CRDB-level follower
// pausing.
//
// B. For the raft transport, we can specify the set of replica IDs we're
Expand Down Expand Up @@ -169,12 +169,12 @@ func TestFlowControlReplicaIntegration(t *testing.T) {
paused := parts[3] == "paused"

progress[replID] = tracker.Progress{
Match: uint64(index),
State: state,
RecentActive: active,
MsgAppFlowPaused: paused,
Inflights: tracker.NewInflights(1, 0), // avoid NPE
IsLearner: false,
Match: uint64(index),
State: state,
RecentActive: active,
MsgAppProbesPaused: paused,
Inflights: tracker.NewInflights(1, 0), // avoid NPE
IsLearner: false,
}

case "descriptor", "paused", "inactive":
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/split_delay_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) {
st := statusWithState(raft.StateLeader)
st.Progress = map[uint64]tracker.Progress{
2: {
State: state,
RecentActive: true,
MsgAppFlowPaused: true, // Unifies string output below.
Inflights: &tracker.Inflights{},
State: state,
RecentActive: true,
MsgAppProbesPaused: true, // Unifies string output below.
Inflights: &tracker.Inflights{},
},
// Healthy follower just for kicks.
3: {State: tracker.StateReplicate},
Expand Down
6 changes: 3 additions & 3 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,7 +1494,7 @@ func stepLeader(r *raft, m pb.Message) error {
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.MsgAppFlowPaused = false
pr.MsgAppProbesPaused = false
r.maybeSendAppend(m.From)

case pb.MsgSnapStatus:
Expand All @@ -1514,7 +1514,7 @@ func stepLeader(r *raft, m pb.Message) error {
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
// out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.MsgAppFlowPaused = true
pr.MsgAppProbesPaused = true
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
Expand Down Expand Up @@ -1551,7 +1551,7 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
pr.MsgAppFlowPaused = false
pr.MsgAppProbesPaused = false
r.maybeSendAppend(leadTransferee)
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/raft/raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func TestSnapshotFailure(t *testing.T) {
if sm.trk.Progress[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next)
}
if !sm.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
if !sm.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
}
}

Expand All @@ -109,8 +109,8 @@ func TestSnapshotSucceed(t *testing.T) {
if sm.trk.Progress[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next)
}
if !sm.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
if !sm.trk.Progress[2].MsgAppProbesPaused {
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
}
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,16 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

r.trk.Progress[2].MsgAppFlowPaused = true
r.trk.Progress[2].MsgAppProbesPaused = true

r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)

r.trk.Progress[2].BecomeReplicate()
assert.False(t, r.trk.Progress[2].MsgAppFlowPaused)
r.trk.Progress[2].MsgAppFlowPaused = true
assert.False(t, r.trk.Progress[2].MsgAppProbesPaused)
r.trk.Progress[2].MsgAppProbesPaused = true
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
}

func TestProgressPaused(t *testing.T) {
Expand Down Expand Up @@ -2076,7 +2076,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
assert.Zero(t, msg[0].Index)
}

assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
for j := 0; j < 10; j++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.maybeSendAppend(2)
Expand All @@ -2087,7 +2087,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
for j := 0; j < r.heartbeatTimeout; j++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
}
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)

// consume the heartbeat
msg := r.readMessages()
Expand All @@ -2100,7 +2100,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
msg := r.readMessages()
assert.Len(t, msg, 1)
assert.Zero(t, msg[0].Index)
assert.True(t, r.trk.Progress[2].MsgAppFlowPaused)
assert.True(t, r.trk.Progress[2].MsgAppProbesPaused)
}

func TestSendAppendForProgressReplicate(t *testing.T) {
Expand Down
26 changes: 13 additions & 13 deletions pkg/raft/tracker/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ type Progress struct {
// This is always true on the leader.
RecentActive bool

// MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This
// MsgAppProbesPaused is used when the MsgApp flow to a node is throttled. This
// happens in StateProbe, or StateReplicate with saturated Inflights. In both
// cases, we need to continue sending MsgApp once in a while to guarantee
// progress, but we only do so when MsgAppFlowPaused is false (it is reset on
// progress, but we only do so when MsgAppProbesPaused is false (it is reset on
// receiving a heartbeat response), to not overflow the receiver. See
// IsPaused().
MsgAppFlowPaused bool
// IsPaused() and ShouldSendMsgApp().
MsgAppProbesPaused bool

// Inflights is a sliding window for the inflight messages.
// Each inflight message contains one or more log entries.
Expand All @@ -122,10 +122,10 @@ type Progress struct {
IsLearner bool
}

// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused,
// ResetState moves the Progress into the specified State, resetting MsgAppProbesPaused,
// PendingSnapshot, and Inflights.
func (pr *Progress) ResetState(state StateType) {
pr.MsgAppFlowPaused = false
pr.MsgAppProbesPaused = false
pr.PendingSnapshot = 0
pr.State = state
pr.Inflights.reset()
Expand Down Expand Up @@ -173,7 +173,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) {
pr.Next += uint64(entries)
pr.Inflights.Add(pr.Next-1, bytes)
}
pr.MsgAppFlowPaused = true
pr.MsgAppProbesPaused = true
}

// CanSendEntries returns true if the flow control state allows sending at least
Expand Down Expand Up @@ -248,7 +248,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
pr.Next = max(min(rejected, matchHint+1), pr.Match+1)
// Regress the sentCommit since it unlikely has been applied.
pr.sentCommit = min(pr.sentCommit, pr.Next-1)
pr.MsgAppFlowPaused = false
pr.MsgAppProbesPaused = false
return true
}

Expand All @@ -261,9 +261,9 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
func (pr *Progress) IsPaused() bool {
switch pr.State {
case StateProbe:
return pr.MsgAppFlowPaused
return pr.MsgAppProbesPaused
case StateReplicate:
return pr.MsgAppFlowPaused && pr.Inflights.Full()
return pr.MsgAppProbesPaused && pr.Inflights.Full()
case StateSnapshot:
return true
default:
Expand Down Expand Up @@ -293,7 +293,7 @@ func (pr *Progress) IsPaused() bool {
func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool {
switch pr.State {
case StateProbe:
return !pr.MsgAppFlowPaused
return !pr.MsgAppProbesPaused

case StateReplicate:
// Send a MsgApp containing the latest commit index if:
Expand All @@ -307,14 +307,14 @@ func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool {
return false
}
// Don't send a MsgApp if we are in a throttled replication state, i.e.
// pr.Inflights.Full() && pr.MsgAppFlowPaused.
// pr.Inflights.Full() && pr.MsgAppProbesPaused.
if pr.IsPaused() {
return false
}
// We are here if the follower's log is not up-to-date, and the flow is not
// paused. We can always send a MsgApp, except when everything is already
// in-flight, and the last MsgApp was recent.
return pr.Next <= last || !pr.MsgAppFlowPaused
return pr.Next <= last || !pr.MsgAppProbesPaused

case StateSnapshot:
return false
Expand Down
34 changes: 17 additions & 17 deletions pkg/raft/tracker/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ func TestProgressString(t *testing.T) {
ins := NewInflights(1, 0)
ins.Add(123, 1)
pr := &Progress{
Match: 1,
Next: 2,
State: StateSnapshot,
PendingSnapshot: 123,
RecentActive: false,
MsgAppFlowPaused: true,
IsLearner: true,
Inflights: ins,
Match: 1,
Next: 2,
State: StateSnapshot,
PendingSnapshot: 123,
RecentActive: false,
MsgAppProbesPaused: true,
IsLearner: true,
Inflights: ins,
}
const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]`
assert.Equal(t, exp, pr.String())
Expand All @@ -53,29 +53,29 @@ func TestProgressIsPaused(t *testing.T) {
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
MsgAppFlowPaused: tt.paused,
Inflights: NewInflights(256, 0),
State: tt.state,
MsgAppProbesPaused: tt.paused,
Inflights: NewInflights(256, 0),
}
assert.Equal(t, tt.w, p.IsPaused(), i)
}
}

// TestProgressResume ensures that MaybeDecrTo resets MsgAppFlowPaused, and
// TestProgressResume ensures that MaybeDecrTo resets MsgAppProbesPaused, and
// MaybeUpdate does not.
//
// TODO(pav-kv): there is little sense in testing these micro-behaviours in the
// struct. We should test the visible behaviour instead.
func TestProgressResume(t *testing.T) {
p := &Progress{
Next: 2,
MsgAppFlowPaused: true,
Next: 2,
MsgAppProbesPaused: true,
}
p.MaybeDecrTo(1, 1)
assert.False(t, p.MsgAppFlowPaused)
p.MsgAppFlowPaused = true
assert.False(t, p.MsgAppProbesPaused)
p.MsgAppProbesPaused = true
p.MaybeUpdate(2)
assert.True(t, p.MsgAppFlowPaused)
assert.True(t, p.MsgAppProbesPaused)
}

func TestProgressBecomeProbe(t *testing.T) {
Expand Down

0 comments on commit 2b3ab55

Please sign in to comment.