Skip to content

Commit

Permalink
raft: send empty appends when replication is paused
Browse files Browse the repository at this point in the history
When Inflights to a particular node is full, i.e. MaxInflightMsgs for the
append messages flow is saturated, it is still necessary to continue sending
MsgApp to ensure progress. Currently this is achieved by "forgetting" the first
in-flight message in the window, which frees up quota for one new MsgApp.

This new message is constructed in such a way that it potentially has multiple
entries, or a large entry. The effect of this is that the in-flight limitations
can be exceeded arbitrarily, for as long as the flow to this node continues
being saturated. In particular, if a follower is stuck, the leader will keep
sending entries to it.

This commit makes the MsgApp empty when Inflights is saturated, and prevents
the described leakage of Entries to slow followers.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Oct 26, 2022
1 parent e24402d commit 98f1084
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,21 @@ func (r *raft) sendAppend(to uint64) {
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.prs.Progress[to]
if pr.IsPaused() {
// Don't send appends in paused state. The exception is StateReplicate, in
// which we want to continue sending empty MsgApp even if Inflights is full,
// for ensuring progress.
if pr.IsPaused() && (pr.State != tracker.StateReplicate || !sendIfEmpty) {
return false
}
m := pb.Message{}
m.To = to
m := pb.Message{To: to}

term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)

var ents []pb.Entry
var erre error
if !pr.IsPaused() {
ents, erre = r.raftLog.entries(pr.Next, r.maxMsgSize)
}
if len(ents) == 0 && !sendIfEmpty {
return false
}
Expand Down Expand Up @@ -1299,10 +1306,6 @@ func stepLeader(r *raft, m pb.Message) error {
pr.RecentActive = true
pr.ProbeSent = false

// free one slot for the full inflights window to allow progress.
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
pr.Inflights.FreeFirstOne()
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
Expand Down

0 comments on commit 98f1084

Please sign in to comment.