Skip to content

Commit

Permalink
raft/tracker: rename and comment MsgApp paused field
Browse files Browse the repository at this point in the history
Make the field name and comment clearer on the fact that it's used both in
StateProbe and StateReplicate. The old name ProbeSent was slightly confusing,
and also triggered thinking that it's used only in StateProbe.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Nov 8, 2022
1 parent 467114e commit 1ea1349
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 56 deletions.
4 changes: 2 additions & 2 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,7 @@ func stepLeader(r *raft, m pb.Message) error {
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.ProbeSent = false
pr.MsgAppFlowPaused = false

// NB: if the follower is paused (full Inflights), this will still send an
// empty append, allowing it to recover from situations in which all the
Expand Down Expand Up @@ -1349,7 +1349,7 @@ func stepLeader(r *raft, m pb.Message) error {
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
// out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.ProbeSent = true
pr.MsgAppFlowPaused = true
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
Expand Down
8 changes: 4 additions & 4 deletions raft/raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) {
if sm.prs.Progress[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next)
}
if !sm.prs.Progress[2].ProbeSent {
t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
if !sm.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.prs.Progress[2].MsgAppFlowPaused)
}
}

Expand All @@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) {
if sm.prs.Progress[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next)
}
if !sm.prs.Progress[2].ProbeSent {
t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
if !sm.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("MsgAppFlowPaused = %v, want true", sm.prs.Progress[2].MsgAppFlowPaused)
}
}

Expand Down
28 changes: 14 additions & 14 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,21 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

r.prs.Progress[2].ProbeSent = true
r.prs.Progress[2].MsgAppFlowPaused = true

r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
if !r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
}

r.prs.Progress[2].BecomeReplicate()
if r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent)
if r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want false", r.prs.Progress[2].MsgAppFlowPaused)
}
r.prs.Progress[2].ProbeSent = true
r.prs.Progress[2].MsgAppFlowPaused = true
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
if r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent)
if r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want false", r.prs.Progress[2].MsgAppFlowPaused)
}
}

Expand Down Expand Up @@ -2658,8 +2658,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
}
}

if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
if !r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
}
for j := 0; j < 10; j++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
Expand All @@ -2673,8 +2673,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
for j := 0; j < r.heartbeatTimeout; j++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
}
if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
if !r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
}

// consume the heartbeat
Expand All @@ -2696,8 +2696,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
if msg[0].Index != 0 {
t.Errorf("index = %d, want %d", msg[0].Index, 0)
}
if !r.prs.Progress[2].ProbeSent {
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
if !r.prs.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
}
}

Expand Down
33 changes: 16 additions & 17 deletions raft/tracker/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,13 @@ type Progress struct {
// This is always true on the leader.
RecentActive bool

// ProbeSent is true when a "probe" MsgApp was sent to this follower recently,
// and we haven't heard from it back yet. Used when the MsgApp flow is
// throttled, i.e. when State is StateProbe, or StateReplicate with saturated
// Inflights. In both cases, we need to continue sending MsgApp once in a
// while to guarantee progress, but we only do so when ProbeSent is false (it
// is reset on receiving a heartbeat response), to not overflow the receiver.
// See IsPaused().
ProbeSent bool
// MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This
// happens in StateProbe, or StateReplicate with saturated Inflights. In both
// cases, we need to continue sending MsgApp once in a while to guarantee
// progress, but we only do so when MsgAppFlowPaused is false (it is reset on
// receiving a heartbeat response), to not overflow the receiver. See
// IsPaused().
MsgAppFlowPaused bool

// Inflights is a sliding window for the inflight messages.
// Each inflight message contains one or more log entries.
Expand All @@ -82,10 +81,10 @@ type Progress struct {
IsLearner bool
}

// ResetState moves the Progress into the specified State, resetting ProbeSent,
// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused,
// PendingSnapshot, and Inflights.
func (pr *Progress) ResetState(state StateType) {
pr.ProbeSent = false
pr.MsgAppFlowPaused = false
pr.PendingSnapshot = 0
pr.State = state
pr.Inflights.reset()
Expand Down Expand Up @@ -146,13 +145,13 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error {
}
// If this message overflows the in-flights tracker, or it was already full,
// consider this message being a probe, so that the flow is paused.
pr.ProbeSent = pr.Inflights.Full()
pr.MsgAppFlowPaused = pr.Inflights.Full()
case StateProbe:
// TODO(pavelkalinnikov): this condition captures the previous behaviour,
// but we should set ProbeSent unconditionally for simplicity, because any
// but we should set MsgAppFlowPaused unconditionally for simplicity, because any
// MsgApp in StateProbe is a probe, not only non-empty ones.
if entries > 0 {
pr.ProbeSent = true
pr.MsgAppFlowPaused = true
}
default:
return fmt.Errorf("sending append in unhandled state %s", pr.State)
Expand All @@ -168,7 +167,7 @@ func (pr *Progress) MaybeUpdate(n uint64) bool {
if pr.Match < n {
pr.Match = n
updated = true
pr.ProbeSent = false
pr.MsgAppFlowPaused = false
}
pr.Next = max(pr.Next, n+1)
return updated
Expand Down Expand Up @@ -210,7 +209,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
}

pr.Next = max(min(rejected, matchHint+1), 1)
pr.ProbeSent = false
pr.MsgAppFlowPaused = false
return true
}

Expand All @@ -223,9 +222,9 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
func (pr *Progress) IsPaused() bool {
switch pr.State {
case StateProbe:
return pr.ProbeSent
return pr.MsgAppFlowPaused
case StateReplicate:
return pr.ProbeSent
return pr.MsgAppFlowPaused
case StateSnapshot:
return true
default:
Expand Down
38 changes: 19 additions & 19 deletions raft/tracker/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ func TestProgressString(t *testing.T) {
ins := NewInflights(1)
ins.Add(123)
pr := &Progress{
Match: 1,
Next: 2,
State: StateSnapshot,
PendingSnapshot: 123,
RecentActive: false,
ProbeSent: true,
IsLearner: true,
Inflights: ins,
Match: 1,
Next: 2,
State: StateSnapshot,
PendingSnapshot: 123,
RecentActive: false,
MsgAppFlowPaused: true,
IsLearner: true,
Inflights: ins,
}
const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]`
if act := pr.String(); act != exp {
Expand All @@ -53,9 +53,9 @@ func TestProgressIsPaused(t *testing.T) {
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
ProbeSent: tt.paused,
Inflights: NewInflights(256),
State: tt.state,
MsgAppFlowPaused: tt.paused,
Inflights: NewInflights(256),
}
if g := p.IsPaused(); g != tt.w {
t.Errorf("#%d: paused= %t, want %t", i, g, tt.w)
Expand All @@ -64,20 +64,20 @@ func TestProgressIsPaused(t *testing.T) {
}

// TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset
// ProbeSent.
// MsgAppFlowPaused.
func TestProgressResume(t *testing.T) {
p := &Progress{
Next: 2,
ProbeSent: true,
Next: 2,
MsgAppFlowPaused: true,
}
p.MaybeDecrTo(1, 1)
if p.ProbeSent {
t.Errorf("paused= %v, want false", p.ProbeSent)
if p.MsgAppFlowPaused {
t.Errorf("paused= %v, want false", p.MsgAppFlowPaused)
}
p.ProbeSent = true
p.MsgAppFlowPaused = true
p.MaybeUpdate(2)
if p.ProbeSent {
t.Errorf("paused= %v, want false", p.ProbeSent)
if p.MsgAppFlowPaused {
t.Errorf("paused= %v, want false", p.MsgAppFlowPaused)
}
}

Expand Down

0 comments on commit 1ea1349

Please sign in to comment.