From 5c209d66d2ab8a5de7a33a61a1f90c4a4caefd7c Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 23 Nov 2018 17:57:36 +0100 Subject: [PATCH] raft: ensure leader is in ProgressStateReplicate The leader perpetually kept itself in ProgressStateProbe even though of course it has perfect knowledge of its log. This wasn't usually an issue because it also doesn't care about its own Progress, but it's better to keep this data correctly maintained, especially since this is part of raft.Status and thus becomes visible to applications using the Raft library. (Concretely, in CockroachDB we use the Progress to inform log truncations). --- raft/raft.go | 5 +++++ raft/raft_test.go | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/raft/raft.go b/raft/raft.go index 865b36836047..56dd20810efe 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -750,6 +750,11 @@ func (r *raft) becomeLeader() { r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader + // Followers enter replicate mode when they've been successfully probed + // (perhaps after having received a snapshot as a result). The leader is + // trivially in this state. Note that r.reset() has initialized this + // progress with the last index already. + r.prs[r.id].becomeReplicate() // Conservatively set the pendingConfIndex to the last index in the // log. There may or may not be a pending config change, but it's diff --git a/raft/raft_test.go b/raft/raft_test.go index f05543aaa8df..227581bf3514 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -267,11 +267,30 @@ func TestProgressResume(t *testing.T) { } } +func TestProgressLeader(t *testing.T) { + r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) + r.becomeCandidate() + r.becomeLeader() + r.prs[2].becomeReplicate() + + // Send proposals to r1. The first 5 entries should be appended to the log. + propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} + for i := 0; i < 5; i++ { + if pr := r.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { + t.Errorf("unexpected progress %v", pr) + } + if err := r.Step(propMsg); err != nil { + t.Fatalf("proposal resulted in error: %v", err) + } + } +} + // TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat response. func TestProgressResumeByHeartbeatResp(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() + r.prs[2].Paused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})