Skip to content

Commit

Permalink
tracker: consolidate MsgApp decisions in Progress
Browse files Browse the repository at this point in the history
This commit consolidates all decision-making about sending append
messages into a single maybeSendAppend method. Previously, the behaviour
depended on the sendIfEmpty flag which was set/unset depending on the
circumstances in which the method is called. This is unnecessary because
the Progress struct contains enough information about the
leader->follower flow state, so maybeSendAppend can be made stand-alone.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Mar 6, 2024
1 parent 01d44ed commit 9d42a69
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 119 deletions.
8 changes: 4 additions & 4 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ stale log entries:
rafthttp package.
'MsgApp' contains log entries to replicate. A leader calls bcastAppend,
which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
which calls maybeSendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
type. When 'MsgApp' is passed to candidate's Step method, candidate reverts
back to follower, because it indicates that there is a valid leader sending
'MsgApp' messages. Candidate and follower respond to this message in
Expand Down Expand Up @@ -353,8 +353,8 @@ stale log entries:
'MsgSnap' requests to install a snapshot message. When a node has just
become a leader or the leader receives 'MsgProp' message, it calls
'bcastAppend' method, which then calls 'sendAppend' method to each
follower. In 'sendAppend', if a leader fails to get term or entries,
'bcastAppend' method, which then calls 'maybeSendAppend' method to each
follower. In 'maybeSendAppend', if a leader fails to get term or entries,
the leader requests snapshot by sending 'MsgSnap' type message.
'MsgSnapStatus' tells the result of snapshot install message. When a
Expand All @@ -376,7 +376,7 @@ stale log entries:
'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp'
is passed to leader's Step method, the leader knows which follower
responded. And only when the leader's last committed index is greater than
follower's Match index, the leader runs 'sendAppend` method.
follower's Match index, the leader runs 'maybeSendAppend` method.
'MsgUnreachable' tells that request(message) wasn't delivered. When
'MsgUnreachable' is passed to leader's Step method, the leader discovers
Expand Down
132 changes: 46 additions & 86 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,24 +588,24 @@ func (r *raft) send(m pb.Message) {
}
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
}

// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
// maybeSendAppend sends an append RPC with log entries (if any) that are not
// yet known to be replicated in the given peer's log, as well as the current
// commit index. Usually it sends a MsgApp message, but in some cases (e.g. the
// log has been compacted) it can send a MsgSnap.
//
// In some cases, the MsgApp message can have zero entries, and yet being sent.
// When the follower log is not fully up-to-date, we must send a MsgApp
// periodically so that eventually the flow is either accepted or rejected. Not
// doing so can result in replication stall, in cases when a MsgApp is dropped.
//
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
// struct contains all the state necessary for deciding whether to send a
// message.
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
// Returns true if a message was sent, or false otherwise. A message is not sent
// if the follower log and commit index are up-to-date, the flow is paused (for
// reasons like in-flight limits), or the message could not be constructed.
func (r *raft) maybeSendAppend(to uint64) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() {

last, commit := r.raftLog.lastIndex(), r.raftLog.committed
if !pr.ShouldSendMsgApp(last, commit) {
return false
}

Expand All @@ -617,35 +617,25 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
return r.maybeSendSnapshot(to, pr)
}

var ents []pb.Entry
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if we had a full Inflights and all inflight messages were in
// fact dropped, replication to that follower would stall. Instead, an empty
// MsgApp will eventually reach the follower (heartbeats responses prompt the
// leader to send an append), allowing it to be acked or rejected, both of
// which will clear out Inflights.
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
}
if len(ents) == 0 && !sendIfEmpty {
return false
}
// TODO(pav-kv): move this check up to where err is returned.
if err != nil { // send a snapshot if we failed to get the entries
return r.maybeSendSnapshot(to, pr)
var entries []pb.Entry
if pr.CanSendEntries(last) {
if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil {
// Send a snapshot if we failed to get the entries.
return r.maybeSendSnapshot(to, pr)
}
}

// Send the actual MsgApp otherwise, and update the progress accordingly.
// Send the MsgApp, and update the progress accordingly.
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Index: prevIndex,
LogTerm: prevTerm,
Entries: ents,
Commit: r.raftLog.committed,
Entries: entries,
Commit: commit,
})
pr.SentEntries(len(ents), uint64(payloadsSize(ents)))
pr.SentCommit(r.raftLog.committed)
pr.SentEntries(len(entries), uint64(payloadsSize(entries)))
pr.SentCommit(commit)
return true
}

Expand Down Expand Up @@ -704,7 +694,7 @@ func (r *raft) bcastAppend() {
if id == r.id {
return
}
r.sendAppend(id)
r.maybeSendAppend(id)
})
}

Expand Down Expand Up @@ -1482,7 +1472,7 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
r.maybeSendAppend(m.From)
}
} else {
// We want to update our tracking if the response updates our
Expand Down Expand Up @@ -1523,21 +1513,13 @@ func stepLeader(r *raft, m pb.Message) error {
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
// This node may be missing the latest commit index, so send it.
// NB: this is not strictly necessary because the periodic heartbeat
// messages deliver commit indices too. However, a message sent now
// may arrive earlier than the next heartbeat fires.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
// We've updated flow control information above, which may allow us to
// send multiple (size-limited) in-flight messages at once (such as when
// transitioning from probe to replicate, or when freeTo() covers
// multiple messages). Send as many messages as we can.
if r.id != m.From {
for r.maybeSendAppend(m.From, false /* sendIfEmpty */) {
for r.maybeSendAppend(m.From) {
}
}
// Transfer leadership is in progress.
Expand All @@ -1549,24 +1531,8 @@ func stepLeader(r *raft, m pb.Message) error {
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.MsgAppFlowPaused = false

// NB: if the follower is paused (full Inflights), this will still send an
// empty append, allowing it to recover from situations in which all the
// messages that filled up Inflights in the first place were dropped. Note
// also that the outgoing heartbeat already communicated the commit index.
//
// If the follower is fully caught up but also in StateProbe (as can happen
// if ReportUnreachable was called), we also want to send an append (it will
// be empty) to allow the follower to transition back to StateReplicate once
// it responds.
//
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
// no-op.
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
r.sendAppend(m.From)
}
pr.PauseMsgAppProbes(false)
r.maybeSendAppend(m.From)

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
Expand Down Expand Up @@ -1636,7 +1602,8 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
pr.PauseMsgAppProbes(false)
r.maybeSendAppend(leadTransferee)
}
}
return nil
Expand Down Expand Up @@ -1984,21 +1951,14 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co
return cs
}

if r.maybeCommit() {
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
r.bcastAppend()
} else {
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id == r.id {
return
}
r.maybeSendAppend(id, false /* sendIfEmpty */)
})
}
r.maybeCommit()
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
//
// Otherwise, still probe the newly added replicas; there's no reason to let
// them wait out a heartbeat interval (or the next incoming proposal).
r.bcastAppend()

// If the leadTransferee was removed or demoted, abort the leadership transfer.
if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
Expand Down
16 changes: 8 additions & 8 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
}
r.trk.Progress[2].MsgAppFlowPaused = true
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
if r.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused)
if !r.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
}
}

Expand Down Expand Up @@ -2773,7 +2773,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
// loop. After that, the follower is paused until a heartbeat response is
// received.
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2)
msg := r.readMessages()
if len(msg) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msg), 1)
Expand All @@ -2788,7 +2788,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
}
for j := 0; j < 10; j++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2)
if l := len(r.readMessages()); l != 0 {
t.Errorf("len(msg) = %d, want %d", l, 0)
}
Expand Down Expand Up @@ -2835,7 +2835,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2)
msgs := r.readMessages()
if len(msgs) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
Expand All @@ -2852,7 +2852,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2)
msgs := r.readMessages()
if len(msgs) != 0 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 0)
Expand Down Expand Up @@ -4677,10 +4677,10 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) {

// r1 sends 2 MsgApp messages to r2.
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
r1.sendAppend(2)
r1.maybeSendAppend(2)
req1 := expectOneMessage(t, r1)
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
r1.sendAppend(2)
r1.maybeSendAppend(2)
req2 := expectOneMessage(t, r1)

// r2 receives the second MsgApp first due to reordering.
Expand Down
3 changes: 3 additions & 0 deletions testdata/replicate_pause.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ deliver-msgs drop=3
dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"]
dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"]
dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"]
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:12
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:13
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14


# Repeat committing 3 entries.
Expand Down
2 changes: 2 additions & 0 deletions testdata/slow_follower_after_compaction.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ deliver-msgs drop=3
----
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"]
dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"]
dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:15
dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:16

# Truncate the leader's log beyond node 3 log size.
compact 1 17
Expand Down
2 changes: 2 additions & 0 deletions tracker/inflights.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type Inflights struct {
count int // number of inflight messages in the buffer
bytes uint64 // number of inflight bytes

// TODO(pav-kv): do not store the limits here, pass them to methods. For flow
// control, we need to support dynamic limits.
size int // the max number of inflight messages
maxBytes uint64 // the max total byte size of inflight messages

Expand Down
Loading

0 comments on commit 9d42a69

Please sign in to comment.