diff --git a/raft/raft.go b/raft/raft.go index c18100e897df..0d9b8b5b1622 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -441,22 +441,29 @@ func (r *raft) getProgress(id uint64) *Progress { return r.learnerPrs[id] } -// sendAppend sends RPC, with entries to the given peer. -func (r *raft) sendAppend(to uint64) { +// sendAppend sends an append RPC with new entries to the given peer, +// if necessary. Returns true if a message was sent. The sendIfEmpty +// argument controls whether messages with no entries will be sent +// ("empty" messages are useful to convey updated Commit indexes, but +// are undesirable when we're sending multiple messages in a batch). +func (r *raft) sendAppend(to uint64, sendIfEmpty bool) bool { pr := r.getProgress(to) if pr.IsPaused() { - return + return false } m := pb.Message{} m.To = to term, errt := r.raftLog.term(pr.Next - 1) ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) + if len(ents) == 0 && !sendIfEmpty { + return false + } if errt != nil || erre != nil { // send snapshot if we failed to get term or entries if !pr.RecentActive { r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) - return + return false } m.Type = pb.MsgSnap @@ -464,7 +471,7 @@ func (r *raft) sendAppend(to uint64) { if err != nil { if err == ErrSnapshotTemporarilyUnavailable { r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) - return + return false } panic(err) // TODO(bdarnell) } @@ -498,6 +505,7 @@ func (r *raft) sendAppend(to uint64) { } } r.send(m) + return true } // sendHeartbeat sends an empty MsgApp @@ -537,7 +545,7 @@ func (r *raft) bcastAppend() { return } - r.sendAppend(id) + r.sendAppend(id, true) }) } @@ -1002,7 +1010,7 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State == ProgressStateReplicate { pr.becomeProbe() } - r.sendAppend(m.From) + r.sendAppend(m.From, true) } } else { oldPaused := pr.IsPaused() @@ -1020,9 +1028,17 @@ func stepLeader(r *raft, m pb.Message) error { if r.maybeCommit() { r.bcastAppend() } else if oldPaused { - // update() reset the wait state on this node. If we had delayed sending - // an update before, send it now. - r.sendAppend(m.From) + // If we were paused before, this node may be missing the + // latest commit index, so send it. + r.sendAppend(m.From, true) + } + // We've updated flow control information above, which may + // allow us to send multiple (size-limited) in-flight messages + // at once (such as when transitioning from probe to + // replicate, or when freeTo() covers multiple messages). If + // we have more entries to send, send as many messages as we + // can (without sending empty messages for the commit index) + for r.sendAppend(m.From, false) { } // Transfer leadership is in progress. if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() { @@ -1040,7 +1056,7 @@ func stepLeader(r *raft, m pb.Message) error { pr.ins.freeFirstOne() } if pr.Match < r.raftLog.lastIndex() { - r.sendAppend(m.From) + r.sendAppend(m.From, true) } if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { @@ -1113,7 +1129,7 @@ func stepLeader(r *raft, m pb.Message) error { r.sendTimeoutNow(leadTransferee) r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) } else { - r.sendAppend(leadTransferee) + r.sendAppend(leadTransferee, true) } } return nil diff --git a/raft/raft_test.go b/raft/raft_test.go index f086d2546853..2eaddca7a9f5 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -20,6 +20,7 @@ import ( "math" "math/rand" "reflect" + "strings" "testing" pb "github.com/coreos/etcd/raft/raftpb" @@ -293,6 +294,74 @@ func TestProgressPaused(t *testing.T) { } } +func TestProgressFlowControl(t *testing.T) { + cfg := newTestConfig(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + cfg.MaxInflightMsgs = 3 + cfg.MaxSizePerMsg = 2048 + r := newRaft(cfg) + r.becomeCandidate() + r.becomeLeader() + + // Throw away all the messages relating to the initial election. + r.readMessages() + + // While node 2 is in probe state, propose a bunch of entries. + r.prs[2].becomeProbe() + blob := []byte(strings.Repeat("a", 1000)) + for i := 0; i < 10; i++ { + r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) + } + + ms := r.readMessages() + // First append has two entries: the empty entry to confirm the + // election, and the first proposal (only one proposal gets sent + // because we're in probe state). + if len(ms) != 1 || ms[0].Type != pb.MsgApp { + t.Fatalf("expected 1 MsgApp, got %v", ms) + } + if len(ms[0].Entries) != 2 { + t.Fatalf("expected 2 entries, got %d", len(ms[0].Entries)) + } + if len(ms[0].Entries[0].Data) != 0 || len(ms[0].Entries[1].Data) != 1000 { + t.Fatalf("unexpected entry sizes: %v", ms[0].Entries) + } + + // When this append is acked, we change to replicate state and can + // send multiple messages at once. + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index}) + ms = r.readMessages() + if len(ms) != 3 { + t.Fatalf("expected 3 messages, got %d", len(ms)) + } + for i, m := range ms { + if m.Type != pb.MsgApp { + t.Errorf("%d: expected MsgApp, got %s", i, m.Type) + } + if len(m.Entries) != 2 { + t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries)) + } + } + + // Ack all three of those messages together and get the last two + // messages (containing three entries). + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index}) + ms = r.readMessages() + if len(ms) != 2 { + t.Fatalf("expected 2 messages, got %d", len(ms)) + } + for i, m := range ms { + if m.Type != pb.MsgApp { + t.Errorf("%d: expected MsgApp, got %s", i, m.Type) + } + } + if len(ms[0].Entries) != 2 { + t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries)) + } + if len(ms[1].Entries) != 1 { + t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries)) + } +} + func TestLeaderElection(t *testing.T) { testLeaderElection(t, false) } @@ -2576,7 +2645,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { // loop. After that, the follower is paused until a heartbeat response is // received. r.appendEntry(pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.sendAppend(2, true) msg := r.readMessages() if len(msg) != 1 { t.Errorf("len(msg) = %d, want %d", len(msg), 1) @@ -2591,7 +2660,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { } for j := 0; j < 10; j++ { r.appendEntry(pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.sendAppend(2, true) if l := len(r.readMessages()); l != 0 { t.Errorf("len(msg) = %d, want %d", l, 0) } @@ -2638,7 +2707,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { for i := 0; i < 10; i++ { r.appendEntry(pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.sendAppend(2, true) msgs := r.readMessages() if len(msgs) != 1 { t.Errorf("len(msg) = %d, want %d", len(msgs), 1) @@ -2655,7 +2724,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { for i := 0; i < 10; i++ { r.appendEntry(pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.sendAppend(2, true) msgs := r.readMessages() if len(msgs) != 0 { t.Errorf("len(msg) = %d, want %d", len(msgs), 0)