From 3b7c2eda375df018d612623e462f93ee214532a7 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 28 May 2024 14:43:35 +0100 Subject: [PATCH] tracker: rename MsgAppFlowPaused Epic: none Release note: none --- pkg/kv/kvserver/flow_control_replica.go | 2 +- .../flow_control_replica_integration_test.go | 14 ++++---- pkg/kv/kvserver/split_delay_helper_test.go | 8 ++--- pkg/raft/raft.go | 2 +- pkg/raft/raft_snap_test.go | 8 ++--- pkg/raft/raft_test.go | 16 ++++----- pkg/raft/tracker/progress.go | 22 ++++++------ pkg/raft/tracker/progress_test.go | 34 +++++++++---------- 8 files changed, 53 insertions(+), 53 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_replica.go b/pkg/kv/kvserver/flow_control_replica.go index 97af2458ab5d..10b5649e97e9 100644 --- a/pkg/kv/kvserver/flow_control_replica.go +++ b/pkg/kv/kvserver/flow_control_replica.go @@ -85,7 +85,7 @@ func (rf *replicaFlowControl) getBehindFollowers() map[roachpb.ReplicaID]struct{ // time for it to catch up and then later return those tokens to us. // This is I3a again; do it as part of #95563. _ = progress.RecentActive - _ = progress.MsgAppFlowPaused + _ = progress.MsgAppProbesPaused _ = progress.Match }) return behindFollowers diff --git a/pkg/kv/kvserver/flow_control_replica_integration_test.go b/pkg/kv/kvserver/flow_control_replica_integration_test.go index 31ceb8831be3..38e9f7e8859e 100644 --- a/pkg/kv/kvserver/flow_control_replica_integration_test.go +++ b/pkg/kv/kvserver/flow_control_replica_integration_test.go @@ -52,7 +52,7 @@ import ( // follows: progress=(replid@match:::,...). // is one of {probe,replicate,snapshot}, is // {active,!inactive}, and is {paused,!paused}. The latter controls -// MsgAppFlowPaused in the raft library, not the CRDB-level follower +// MsgAppProbesPaused in the raft library, not the CRDB-level follower // pausing. // // B. For the raft transport, we can specify the set of replica IDs we're @@ -169,12 +169,12 @@ func TestFlowControlReplicaIntegration(t *testing.T) { paused := parts[3] == "paused" progress[replID] = tracker.Progress{ - Match: uint64(index), - State: state, - RecentActive: active, - MsgAppFlowPaused: paused, - Inflights: tracker.NewInflights(1, 0), // avoid NPE - IsLearner: false, + Match: uint64(index), + State: state, + RecentActive: active, + MsgAppProbesPaused: paused, + Inflights: tracker.NewInflights(1, 0), // avoid NPE + IsLearner: false, } case "descriptor", "paused", "inactive": diff --git a/pkg/kv/kvserver/split_delay_helper_test.go b/pkg/kv/kvserver/split_delay_helper_test.go index 1be11a7b7ec9..884f5cbc1495 100644 --- a/pkg/kv/kvserver/split_delay_helper_test.go +++ b/pkg/kv/kvserver/split_delay_helper_test.go @@ -139,10 +139,10 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { st := statusWithState(raft.StateLeader) st.Progress = map[uint64]tracker.Progress{ 2: { - State: state, - RecentActive: true, - MsgAppFlowPaused: true, // Unifies string output below. - Inflights: &tracker.Inflights{}, + State: state, + RecentActive: true, + MsgAppProbesPaused: true, // Unifies string output below. + Inflights: &tracker.Inflights{}, }, // Healthy follower just for kicks. 3: {State: tracker.StateReplicate}, diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 5b3d438f3be2..76b4467e433e 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -1514,7 +1514,7 @@ func stepLeader(r *raft, m pb.Message) error { // If snapshot finish, wait for the MsgAppResp from the remote node before sending // out the next MsgApp. // If snapshot failure, wait for a heartbeat interval before next try - pr.MsgAppFlowPaused = true + pr.MsgAppProbesPaused = true case pb.MsgUnreachable: // During optimistic replication, if the remote becomes unreachable, // there is huge probability that a MsgApp is lost. diff --git a/pkg/raft/raft_snap_test.go b/pkg/raft/raft_snap_test.go index 4e8718916242..ff591b4b3634 100644 --- a/pkg/raft/raft_snap_test.go +++ b/pkg/raft/raft_snap_test.go @@ -86,8 +86,8 @@ func TestSnapshotFailure(t *testing.T) { if sm.trk.Progress[2].Next != 1 { t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next) } - if !sm.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused) + if !sm.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) } } @@ -109,8 +109,8 @@ func TestSnapshotSucceed(t *testing.T) { if sm.trk.Progress[2].Next != 12 { t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next) } - if !sm.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused) + if !sm.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) } } diff --git a/pkg/raft/raft_test.go b/pkg/raft/raft_test.go index 630fd215a5e9..1c4b413b3726 100644 --- a/pkg/raft/raft_test.go +++ b/pkg/raft/raft_test.go @@ -125,16 +125,16 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.trk.Progress[2].MsgAppFlowPaused = true + r.trk.Progress[2].MsgAppProbesPaused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) + assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) r.trk.Progress[2].BecomeReplicate() - assert.False(t, r.trk.Progress[2].MsgAppFlowPaused) - r.trk.Progress[2].MsgAppFlowPaused = true + assert.False(t, r.trk.Progress[2].MsgAppProbesPaused) + r.trk.Progress[2].MsgAppProbesPaused = true r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) + assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) } func TestProgressPaused(t *testing.T) { @@ -2076,7 +2076,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { assert.Zero(t, msg[0].Index) } - assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) + assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) r.maybeSendAppend(2) @@ -2087,7 +2087,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) + assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) // consume the heartbeat msg := r.readMessages() @@ -2100,7 +2100,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { msg := r.readMessages() assert.Len(t, msg, 1) assert.Zero(t, msg[0].Index) - assert.True(t, r.trk.Progress[2].MsgAppFlowPaused) + assert.True(t, r.trk.Progress[2].MsgAppProbesPaused) } func TestSendAppendForProgressReplicate(t *testing.T) { diff --git a/pkg/raft/tracker/progress.go b/pkg/raft/tracker/progress.go index 9dab69dfef77..7f8cca96838b 100644 --- a/pkg/raft/tracker/progress.go +++ b/pkg/raft/tracker/progress.go @@ -96,13 +96,13 @@ type Progress struct { // This is always true on the leader. RecentActive bool - // MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This + // MsgAppProbesPaused is used when the MsgApp flow to a node is throttled. This // happens in StateProbe, or StateReplicate with saturated Inflights. In both // cases, we need to continue sending MsgApp once in a while to guarantee - // progress, but we only do so when MsgAppFlowPaused is false (it is reset on + // progress, but we only do so when MsgAppProbesPaused is false (it is reset on // receiving a heartbeat response), to not overflow the receiver. See - // IsPaused(). - MsgAppFlowPaused bool + // IsPaused() and ShouldSendMsgApp(). + MsgAppProbesPaused bool // Inflights is a sliding window for the inflight messages. // Each inflight message contains one or more log entries. @@ -122,7 +122,7 @@ type Progress struct { IsLearner bool } -// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused, +// ResetState moves the Progress into the specified State, resetting MsgAppProbesPaused, // PendingSnapshot, and Inflights. func (pr *Progress) ResetState(state StateType) { pr.PauseMsgAppProbes(false) @@ -179,7 +179,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) { // PauseMsgAppProbes pauses or unpauses empty MsgApp messages flow, depending on // the passed-in bool. func (pr *Progress) PauseMsgAppProbes(pause bool) { - pr.MsgAppFlowPaused = pause + pr.MsgAppProbesPaused = pause } // CanSendEntries returns true if the flow control state allows sending at least @@ -267,9 +267,9 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { func (pr *Progress) IsPaused() bool { switch pr.State { case StateProbe: - return pr.MsgAppFlowPaused + return pr.MsgAppProbesPaused case StateReplicate: - return pr.MsgAppFlowPaused && pr.Inflights.Full() + return pr.MsgAppProbesPaused && pr.Inflights.Full() case StateSnapshot: return true default: @@ -299,7 +299,7 @@ func (pr *Progress) IsPaused() bool { func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool { switch pr.State { case StateProbe: - return !pr.MsgAppFlowPaused + return !pr.MsgAppProbesPaused case StateReplicate: // Send a MsgApp containing the latest commit index if: // - our commit index exceeds the in-flight commit index, and @@ -312,14 +312,14 @@ func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool { return false } // Don't send a MsgApp if we are in a throttled replication state, i.e. - // pr.Inflights.Full() && pr.MsgAppFlowPaused. + // pr.Inflights.Full() && pr.MsgAppProbesPaused. if pr.IsPaused() { return false } // We are here if the follower's log is not up-to-date, and the flow is not // paused We can always send a MsgApp, except when everything is already // in-flight, and the last MsgApp was recent. - return pr.Next <= last || !pr.MsgAppFlowPaused + return pr.Next <= last || !pr.MsgAppProbesPaused case StateSnapshot: return false diff --git a/pkg/raft/tracker/progress_test.go b/pkg/raft/tracker/progress_test.go index 5ceaa59fd15c..c9485c21960b 100644 --- a/pkg/raft/tracker/progress_test.go +++ b/pkg/raft/tracker/progress_test.go @@ -24,14 +24,14 @@ func TestProgressString(t *testing.T) { ins := NewInflights(1, 0) ins.Add(123, 1) pr := &Progress{ - Match: 1, - Next: 2, - State: StateSnapshot, - PendingSnapshot: 123, - RecentActive: false, - MsgAppFlowPaused: true, - IsLearner: true, - Inflights: ins, + Match: 1, + Next: 2, + State: StateSnapshot, + PendingSnapshot: 123, + RecentActive: false, + MsgAppProbesPaused: true, + IsLearner: true, + Inflights: ins, } const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]` assert.Equal(t, exp, pr.String()) @@ -53,29 +53,29 @@ func TestProgressIsPaused(t *testing.T) { } for i, tt := range tests { p := &Progress{ - State: tt.state, - MsgAppFlowPaused: tt.paused, - Inflights: NewInflights(256, 0), + State: tt.state, + MsgAppProbesPaused: tt.paused, + Inflights: NewInflights(256, 0), } assert.Equal(t, tt.w, p.IsPaused(), i) } } -// TestProgressResume ensures that MaybeDecrTo resets MsgAppFlowPaused, and +// TestProgressResume ensures that MaybeDecrTo resets MsgAppProbesPaused, and // MaybeUpdate does not. // // TODO(pav-kv): there is little sense in testing these micro-behaviours in the // struct. We should test the visible behaviour instead. func TestProgressResume(t *testing.T) { p := &Progress{ - Next: 2, - MsgAppFlowPaused: true, + Next: 2, + MsgAppProbesPaused: true, } p.MaybeDecrTo(1, 1) - assert.False(t, p.MsgAppFlowPaused) - p.MsgAppFlowPaused = true + assert.False(t, p.MsgAppProbesPaused) + p.MsgAppProbesPaused = true p.MaybeUpdate(2) - assert.True(t, p.MsgAppFlowPaused) + assert.True(t, p.MsgAppProbesPaused) } func TestProgressBecomeProbe(t *testing.T) {