From dbac67e7a80cb165a6c4de70c03353702319fc5e Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 26 Apr 2019 11:09:14 +0200 Subject: [PATCH 1/9] raft: extract progress tracking into own component The Progress maps contain both the active configuration and information about the replication status. By pulling it into its own component, this becomes easier to unit test and also clarifies the code, which will see changes as etcd-io/etcd#7625 is addressed. More functionality will move into `prs` in self-contained follow-up commits. --- raft/progress.go | 46 ++++++++++++- raft/raft.go | 90 ++++++++++--------------- raft/raft_flow_control_test.go | 6 +- raft/raft_snap_test.go | 58 ++++++++-------- raft/raft_test.go | 118 ++++++++++++++++----------------- raft/rawnode.go | 4 +- raft/status.go | 4 +- 7 files changed, 174 insertions(+), 152 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index 54e0b21b01a..45dff6a2e24 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -14,7 +14,10 @@ package raft -import "fmt" +import ( + "fmt" + "sort" +) const ( ProgressStateProbe ProgressStateType = iota @@ -283,3 +286,44 @@ func (in *inflights) reset() { in.count = 0 in.start = 0 } + +// prs tracks the currently active configuration and the information known about +// the nodes and learners in it. In particular, it tracks the match index for +// each peer which in turn allows reasoning about the committed index. +type prs struct { + nodes map[uint64]*Progress + learners map[uint64]*Progress + matchBuf uint64Slice +} + +func makePRS() prs { + return prs{ + nodes: map[uint64]*Progress{}, + learners: map[uint64]*Progress{}, + } +} + +func (p *prs) quorum() int { + return len(p.nodes)/2 + 1 +} + +func (p *prs) committed() uint64 { + // Preserving matchBuf across calls is an optimization + // used to avoid allocating a new slice on each call. + if cap(p.matchBuf) < len(p.nodes) { + p.matchBuf = make(uint64Slice, len(p.nodes)) + } + p.matchBuf = p.matchBuf[:len(p.nodes)] + idx := 0 + for _, pr := range p.nodes { + p.matchBuf[idx] = pr.Match + idx++ + } + sort.Sort(&p.matchBuf) + return p.matchBuf[len(p.matchBuf)-p.quorum()] +} + +func (p *prs) removeAny(id uint64) { + delete(p.nodes, id) + delete(p.learners, id) +} diff --git a/raft/raft.go b/raft/raft.go index 76f95f3cf39..edf48860641 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -262,9 +262,7 @@ type raft struct { maxMsgSize uint64 maxUncommittedSize uint64 maxInflight int - prs map[uint64]*Progress - learnerPrs map[uint64]*Progress - matchBuf uint64Slice + prs prs state StateType @@ -350,8 +348,7 @@ func newRaft(c *Config) *raft { maxMsgSize: c.MaxSizePerMsg, maxInflight: c.MaxInflightMsgs, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: make(map[uint64]*Progress), - learnerPrs: make(map[uint64]*Progress), + prs: makePRS(), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -361,13 +358,13 @@ func newRaft(c *Config) *raft { disableProposalForwarding: c.DisableProposalForwarding, } for _, p := range peers { - r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} + r.prs.nodes[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} } for _, p := range learners { - if _, ok := r.prs[p]; ok { + if _, ok := r.prs.nodes[p]; ok { panic(fmt.Sprintf("node %x is in both learner and peer list", p)) } - r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true} + r.prs.learners[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true} if r.id == p { r.isLearner = true } @@ -403,11 +400,9 @@ func (r *raft) hardState() pb.HardState { } } -func (r *raft) quorum() int { return len(r.prs)/2 + 1 } - func (r *raft) nodes() []uint64 { - nodes := make([]uint64, 0, len(r.prs)) - for id := range r.prs { + nodes := make([]uint64, 0, len(r.prs.nodes)) + for id := range r.prs.nodes { nodes = append(nodes, id) } sort.Sort(uint64Slice(nodes)) @@ -415,8 +410,8 @@ func (r *raft) nodes() []uint64 { } func (r *raft) learnerNodes() []uint64 { - nodes := make([]uint64, 0, len(r.learnerPrs)) - for id := range r.learnerPrs { + nodes := make([]uint64, 0, len(r.prs.learners)) + for id := range r.prs.learners { nodes = append(nodes, id) } sort.Sort(uint64Slice(nodes)) @@ -458,11 +453,11 @@ func (r *raft) send(m pb.Message) { } func (r *raft) getProgress(id uint64) *Progress { - if pr, ok := r.prs[id]; ok { + if pr, ok := r.prs.nodes[id]; ok { return pr } - return r.learnerPrs[id] + return r.prs.learners[id] } // sendAppend sends an append RPC with new entries (if any) and the @@ -558,11 +553,11 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { } func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) { - for id, pr := range r.prs { + for id, pr := range r.prs.nodes { f(id, pr) } - for id, pr := range r.learnerPrs { + for id, pr := range r.prs.learners { f(id, pr) } } @@ -602,19 +597,7 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { // the commit index changed (in which case the caller should call // r.bcastAppend). func (r *raft) maybeCommit() bool { - // Preserving matchBuf across calls is an optimization - // used to avoid allocating a new slice on each call. - if cap(r.matchBuf) < len(r.prs) { - r.matchBuf = make(uint64Slice, len(r.prs)) - } - r.matchBuf = r.matchBuf[:len(r.prs)] - idx := 0 - for _, p := range r.prs { - r.matchBuf[idx] = p.Match - idx++ - } - sort.Sort(&r.matchBuf) - mci := r.matchBuf[len(r.matchBuf)-r.quorum()] + mci := r.prs.committed() return r.raftLog.maybeCommit(mci, r.Term) } @@ -755,7 +738,7 @@ func (r *raft) becomeLeader() { // (perhaps after having received a snapshot as a result). The leader is // trivially in this state. Note that r.reset() has initialized this // progress with the last index already. - r.prs[r.id].becomeReplicate() + r.prs.nodes[r.id].becomeReplicate() // Conservatively set the pendingConfIndex to the last index in the // log. There may or may not be a pending config change, but it's @@ -790,7 +773,7 @@ func (r *raft) campaign(t CampaignType) { voteMsg = pb.MsgVote term = r.Term } - if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) { + if r.prs.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). Advance to the next state. if t == campaignPreElection { @@ -800,7 +783,7 @@ func (r *raft) campaign(t CampaignType) { } return } - for id := range r.prs { + for id := range r.prs.nodes { if id == r.id { continue } @@ -994,7 +977,7 @@ func stepLeader(r *raft, m pb.Message) error { if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) } - if _, ok := r.prs[r.id]; !ok { + if _, ok := r.prs.nodes[r.id]; !ok { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. @@ -1024,7 +1007,7 @@ func stepLeader(r *raft, m pb.Message) error { r.bcastAppend() return nil case pb.MsgReadIndex: - if r.quorum() > 1 { + if r.prs.quorum() > 1 { if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. return nil @@ -1134,7 +1117,7 @@ func stepLeader(r *raft, m pb.Message) error { } ackCount := r.readOnly.recvAck(m) - if ackCount < r.quorum() { + if ackCount < r.prs.quorum() { return nil } @@ -1232,8 +1215,8 @@ func stepCandidate(r *raft, m pb.Message) error { r.handleSnapshot(m) case myVoteRespType: gr := r.poll(m.From, m.Type, !m.Reject) - r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr) - switch r.quorum() { + r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.prs.quorum(), gr, m.Type, len(r.votes)-gr) + switch r.prs.quorum() { case gr: if r.state == StatePreCandidate { r.campaign(campaignElection) @@ -1370,8 +1353,8 @@ func (r *raft) restore(s pb.Snapshot) bool { r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.restore(s) - r.prs = make(map[uint64]*Progress) - r.learnerPrs = make(map[uint64]*Progress) + r.prs.nodes = make(map[uint64]*Progress) + r.prs.learners = make(map[uint64]*Progress) r.restoreNode(s.Metadata.ConfState.Nodes, false) r.restoreNode(s.Metadata.ConfState.Learners, true) return true @@ -1392,7 +1375,7 @@ func (r *raft) restoreNode(nodes []uint64, isLearner bool) { // promotable indicates whether state machine can be promoted to leader, // which is true when its own id is in progress list. func (r *raft) promotable() bool { - _, ok := r.prs[r.id] + _, ok := r.prs.nodes[r.id] return ok } @@ -1422,9 +1405,9 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { } // change Learner to Voter, use origin Learner progress - delete(r.learnerPrs, id) + delete(r.prs.learners, id) pr.IsLearner = false - r.prs[id] = pr + r.prs.nodes[id] = pr } if r.id == id { @@ -1439,10 +1422,10 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { } func (r *raft) removeNode(id uint64) { - r.delProgress(id) + r.prs.removeAny(id) // do not try to commit or abort transferring if there is no nodes in the cluster. - if len(r.prs) == 0 && len(r.learnerPrs) == 0 { + if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 { return } @@ -1459,20 +1442,15 @@ func (r *raft) removeNode(id uint64) { func (r *raft) setProgress(id, match, next uint64, isLearner bool) { if !isLearner { - delete(r.learnerPrs, id) - r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)} + delete(r.prs.learners, id) + r.prs.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)} return } - if _, ok := r.prs[id]; ok { + if _, ok := r.prs.nodes[id]; ok { panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id)) } - r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true} -} - -func (r *raft) delProgress(id uint64) { - delete(r.prs, id) - delete(r.learnerPrs, id) + r.prs.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true} } func (r *raft) loadState(state pb.HardState) { @@ -1515,7 +1493,7 @@ func (r *raft) checkQuorumActive() bool { pr.RecentActive = false }) - return act >= r.quorum() + return act >= r.prs.quorum() } func (r *raft) sendTimeoutNow(to uint64) { diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index 070ed8d9551..15934844574 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -29,7 +29,7 @@ func TestMsgAppFlowControlFull(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs[2] + pr2 := r.prs.nodes[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window @@ -65,7 +65,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs[2] + pr2 := r.prs.nodes[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window @@ -110,7 +110,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { r.becomeCandidate() r.becomeLeader() - pr2 := r.prs[2] + pr2 := r.prs.nodes[2] // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index ed545b4b51c..358c85b8e29 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -40,11 +40,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { // force set the next of node 2, so that // node 2 needs a snapshot - sm.prs[2].Next = sm.raftLog.firstIndex() + sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true}) - if sm.prs[2].PendingSnapshot != 11 { - t.Fatalf("PendingSnapshot = %d, want 11", sm.prs[2].PendingSnapshot) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true}) + if sm.prs.nodes[2].PendingSnapshot != 11 { + t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.nodes[2].PendingSnapshot) } } @@ -56,7 +56,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs[2].becomeSnapshot(11) + sm.prs.nodes[2].becomeSnapshot(11) sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) msgs := sm.readMessages() @@ -73,18 +73,18 @@ func TestSnapshotFailure(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs[2].Next = 1 - sm.prs[2].becomeSnapshot(11) + sm.prs.nodes[2].Next = 1 + sm.prs.nodes[2].becomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true}) - if sm.prs[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot) + if sm.prs.nodes[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) } - if sm.prs[2].Next != 1 { - t.Fatalf("Next = %d, want 1", sm.prs[2].Next) + if sm.prs.nodes[2].Next != 1 { + t.Fatalf("Next = %d, want 1", sm.prs.nodes[2].Next) } - if !sm.prs[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs[2].Paused) + if !sm.prs.nodes[2].Paused { + t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused) } } @@ -96,18 +96,18 @@ func TestSnapshotSucceed(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs[2].Next = 1 - sm.prs[2].becomeSnapshot(11) + sm.prs.nodes[2].Next = 1 + sm.prs.nodes[2].becomeSnapshot(11) sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false}) - if sm.prs[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot) + if sm.prs.nodes[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) } - if sm.prs[2].Next != 12 { - t.Fatalf("Next = %d, want 12", sm.prs[2].Next) + if sm.prs.nodes[2].Next != 12 { + t.Fatalf("Next = %d, want 12", sm.prs.nodes[2].Next) } - if !sm.prs[2].Paused { - t.Errorf("Paused = %v, want true", sm.prs[2].Paused) + if !sm.prs.nodes[2].Paused { + t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused) } } @@ -206,7 +206,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) { mustSend(n2, n1, pb.MsgAppResp) // Leader has correct state for follower. - pr := n1.prs[2] + pr := n1.prs.nodes[2] if pr.State != ProgressStateReplicate { t.Fatalf("unexpected state %v", pr) } @@ -227,23 +227,23 @@ func TestSnapshotAbort(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() - sm.prs[2].Next = 1 - sm.prs[2].becomeSnapshot(11) + sm.prs.nodes[2].Next = 1 + sm.prs.nodes[2].becomeSnapshot(11) // A successful msgAppResp that has a higher/equal index than the // pending snapshot should abort the pending snapshot. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11}) - if sm.prs[2].PendingSnapshot != 0 { - t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot) + if sm.prs.nodes[2].PendingSnapshot != 0 { + t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot) } // The follower entered ProgressStateReplicate and the leader send an append // and optimistically updated the progress (so we see 13 instead of 12). // There is something to append because the leader appended an empty entry // to the log at index 12 when it assumed leadership. - if sm.prs[2].Next != 13 { - t.Fatalf("Next = %d, want 13", sm.prs[2].Next) + if sm.prs.nodes[2].Next != 13 { + t.Fatalf("Next = %d, want 13", sm.prs.nodes[2].Next) } - if n := sm.prs[2].ins.count; n != 1 { + if n := sm.prs.nodes[2].ins.count; n != 1 { t.Fatalf("expected an inflight message, got %d", n) } } diff --git a/raft/raft_test.go b/raft/raft_test.go index df808eb6f1c..7fdcdb28fd6 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -271,12 +271,12 @@ func TestProgressLeader(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() - r.prs[2].becomeReplicate() + r.prs.nodes[2].becomeReplicate() // Send proposals to r1. The first 5 entries should be appended to the log. propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} for i := 0; i < 5; i++ { - if pr := r.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { + if pr := r.prs.nodes[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { t.Errorf("unexpected progress %v", pr) } if err := r.Step(propMsg); err != nil { @@ -291,17 +291,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.prs[2].Paused = true + r.prs.nodes[2].Paused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs[2].Paused) + if !r.prs.nodes[2].Paused { + t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) } - r.prs[2].becomeReplicate() + r.prs.nodes[2].becomeReplicate() r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if r.prs[2].Paused { - t.Errorf("paused = %v, want false", r.prs[2].Paused) + if r.prs.nodes[2].Paused { + t.Errorf("paused = %v, want false", r.prs.nodes[2].Paused) } } @@ -331,7 +331,7 @@ func TestProgressFlowControl(t *testing.T) { r.readMessages() // While node 2 is in probe state, propose a bunch of entries. - r.prs[2].becomeProbe() + r.prs.nodes[2].becomeProbe() blob := []byte(strings.Repeat("a", 1000)) for i := 0; i < 10; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) @@ -409,8 +409,8 @@ func TestUncommittedEntryLimit(t *testing.T) { // Set the two followers to the replicate state. Commit to tail of log. const numFollowers = 2 - r.prs[2].becomeReplicate() - r.prs[3].becomeReplicate() + r.prs.nodes[2].becomeReplicate() + r.prs.nodes[3].becomeReplicate() r.uncommittedSize = 0 // Send proposals to r1. The first 5 entries should be appended to the log. @@ -2137,7 +2137,7 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) { nt := newNetwork(a, b) setRandomizedElectionTimeout(b, b.electionTimeout+1) // Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states - b.delProgress(2) + b.prs.removeAny(2) if b.promotable() { t.Fatalf("promotable = %v, want false", b.promotable()) @@ -2631,7 +2631,7 @@ func TestLeaderAppResp(t *testing.T) { sm.readMessages() sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index}) - p := sm.prs[2] + p := sm.prs.nodes[2] if p.Match != tt.wmatch { t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch) } @@ -2678,9 +2678,9 @@ func TestBcastBeat(t *testing.T) { mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1}) } // slow follower - sm.prs[2].Match, sm.prs[2].Next = 5, 6 + sm.prs.nodes[2].Match, sm.prs.nodes[2].Next = 5, 6 // normal follower - sm.prs[3].Match, sm.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 + sm.prs.nodes[3].Match, sm.prs.nodes[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1 sm.Step(pb.Message{Type: pb.MsgBeat}) msgs := sm.readMessages() @@ -2688,8 +2688,8 @@ func TestBcastBeat(t *testing.T) { t.Fatalf("len(msgs) = %v, want 2", len(msgs)) } wantCommitMap := map[uint64]uint64{ - 2: min(sm.raftLog.committed, sm.prs[2].Match), - 3: min(sm.raftLog.committed, sm.prs[3].Match), + 2: min(sm.raftLog.committed, sm.prs.nodes[2].Match), + 3: min(sm.raftLog.committed, sm.prs.nodes[3].Match), } for i, m := range msgs { if m.Type != pb.MsgHeartbeat { @@ -2775,11 +2775,11 @@ func TestLeaderIncreaseNext(t *testing.T) { sm.raftLog.append(previousEnts...) sm.becomeCandidate() sm.becomeLeader() - sm.prs[2].State = tt.state - sm.prs[2].Next = tt.next + sm.prs.nodes[2].State = tt.state + sm.prs.nodes[2].Next = tt.next sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - p := sm.prs[2] + p := sm.prs.nodes[2] if p.Next != tt.wnext { t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext) } @@ -2791,7 +2791,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs[2].becomeProbe() + r.prs.nodes[2].becomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2810,8 +2810,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { } } - if !r.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs[2].Paused) + if !r.prs.nodes[2].Paused { + t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2825,8 +2825,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - if !r.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs[2].Paused) + if !r.prs.nodes[2].Paused { + t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) } // consume the heartbeat @@ -2848,8 +2848,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { if msg[0].Index != 0 { t.Errorf("index = %d, want %d", msg[0].Index, 0) } - if !r.prs[2].Paused { - t.Errorf("paused = %v, want true", r.prs[2].Paused) + if !r.prs.nodes[2].Paused { + t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused) } } @@ -2858,7 +2858,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs[2].becomeReplicate() + r.prs.nodes[2].becomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2875,7 +2875,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.prs[2].becomeSnapshot(10) + r.prs.nodes[2].becomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2896,17 +2896,17 @@ func TestRecvMsgUnreachable(t *testing.T) { r.becomeLeader() r.readMessages() // set node 2 to state replicate - r.prs[2].Match = 3 - r.prs[2].becomeReplicate() - r.prs[2].optimisticUpdate(5) + r.prs.nodes[2].Match = 3 + r.prs.nodes[2].becomeReplicate() + r.prs.nodes[2].optimisticUpdate(5) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) - if r.prs[2].State != ProgressStateProbe { - t.Errorf("state = %s, want %s", r.prs[2].State, ProgressStateProbe) + if r.prs.nodes[2].State != ProgressStateProbe { + t.Errorf("state = %s, want %s", r.prs.nodes[2].State, ProgressStateProbe) } - if wnext := r.prs[2].Match + 1; r.prs[2].Next != wnext { - t.Errorf("next = %d, want %d", r.prs[2].Next, wnext) + if wnext := r.prs.nodes[2].Match + 1; r.prs.nodes[2].Next != wnext { + t.Errorf("next = %d, want %d", r.prs.nodes[2].Next, wnext) } } @@ -2972,13 +2972,13 @@ func TestRestoreWithLearner(t *testing.T) { t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners) } for _, n := range s.Metadata.ConfState.Nodes { - if sm.prs[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], false) + if sm.prs.nodes[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], false) } } for _, n := range s.Metadata.ConfState.Learners { - if !sm.learnerPrs[n].IsLearner { - t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], true) + if !sm.prs.learners[n].IsLearner { + t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], true) } } @@ -3120,8 +3120,8 @@ func TestProvideSnap(t *testing.T) { sm.becomeLeader() // force set the next of node 2, so that node 2 needs a snapshot - sm.prs[2].Next = sm.raftLog.firstIndex() - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true}) + sm.prs.nodes[2].Next = sm.raftLog.firstIndex() + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true}) msgs := sm.readMessages() if len(msgs) != 1 { @@ -3151,8 +3151,8 @@ func TestIgnoreProvidingSnap(t *testing.T) { // force set the next of node 2, so that node 2 needs a snapshot // change node 2 to be inactive, expect node 1 ignore sending snapshot to 2 - sm.prs[2].Next = sm.raftLog.firstIndex() - 1 - sm.prs[2].RecentActive = false + sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - 1 + sm.prs.nodes[2].RecentActive = false sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) @@ -3200,7 +3200,7 @@ func TestSlowNodeRestore(t *testing.T) { // node 3 will only be considered as active when node 1 receives a reply from it. for { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if lead.prs[3].RecentActive { + if lead.prs.nodes[3].RecentActive { break } } @@ -3303,8 +3303,8 @@ func TestAddLearner(t *testing.T) { if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) } - if !r.learnerPrs[2].IsLearner { - t.Errorf("node 2 is learner %t, want %t", r.prs[2].IsLearner, true) + if !r.prs.learners[2].IsLearner { + t.Errorf("node 2 is learner %t, want %t", r.prs.nodes[2].IsLearner, true) } } @@ -3618,8 +3618,8 @@ func TestLeaderTransferToSlowFollower(t *testing.T) { nt.recover() lead := nt.peers[1].(*raft) - if lead.prs[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1) + if lead.prs.nodes[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of log. @@ -3641,8 +3641,8 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { nt.storage[1].Compact(lead.raftLog.applied) nt.recover() - if lead.prs[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1) + if lead.prs.nodes[3].Match != 1 { + t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1) } // Transfer leadership to 3 when node 3 is lack of snapshot. @@ -3721,8 +3721,8 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) { t.Fatalf("should return drop proposal error while transferring") } - if lead.prs[1].Match != 1 { - t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1) + if lead.prs.nodes[1].Match != 1 { + t.Fatalf("node 1 has match %x, want %x", lead.prs.nodes[1].Match, 1) } } @@ -4294,18 +4294,18 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw sm := newRaft(cfg) npeers[id] = sm case *raft: - learners := make(map[uint64]bool, len(v.learnerPrs)) - for i := range v.learnerPrs { + learners := make(map[uint64]bool, len(v.prs.learners)) + for i := range v.prs.learners { learners[i] = true } v.id = id - v.prs = make(map[uint64]*Progress) - v.learnerPrs = make(map[uint64]*Progress) + v.prs.nodes = make(map[uint64]*Progress) + v.prs.learners = make(map[uint64]*Progress) for i := 0; i < size; i++ { if _, ok := learners[peerAddrs[i]]; ok { - v.learnerPrs[peerAddrs[i]] = &Progress{IsLearner: true} + v.prs.learners[peerAddrs[i]] = &Progress{IsLearner: true} } else { - v.prs[peerAddrs[i]] = &Progress{} + v.prs.nodes[peerAddrs[i]] = &Progress{} } } v.reset(v.Term) diff --git a/raft/rawnode.go b/raft/rawnode.go index 7ab42ca1c25..8a2c3d16b58 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -257,12 +257,12 @@ const ( // WithProgress is a helper to introspect the Progress for this node and its // peers. func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) { - for id, pr := range rn.raft.prs { + for id, pr := range rn.raft.prs.nodes { pr := *pr pr.ins = nil visitor(id, ProgressTypePeer, pr) } - for id, pr := range rn.raft.learnerPrs { + for id, pr := range rn.raft.prs.learners { pr := *pr pr.ins = nil visitor(id, ProgressTypeLearner, pr) diff --git a/raft/status.go b/raft/status.go index 68a9b1a4487..3e06c343676 100644 --- a/raft/status.go +++ b/raft/status.go @@ -34,11 +34,11 @@ type Status struct { func getProgressCopy(r *raft) map[uint64]Progress { prs := make(map[uint64]Progress) - for id, p := range r.prs { + for id, p := range r.prs.nodes { prs[id] = *p } - for id, p := range r.learnerPrs { + for id, p := range r.prs.learners { prs[id] = *p } return prs From ea82b2b7584475007465ce4d2fbcb5972df9d24f Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 26 Apr 2019 12:07:29 +0200 Subject: [PATCH 2/9] raft: move more methods onto the progress tracker Continues what was initiated in the last commit. --- raft/node.go | 10 +-- raft/progress.go | 69 +++++++++++++++++++-- raft/raft.go | 110 +++++++++------------------------ raft/raft_flow_control_test.go | 8 +-- raft/raft_test.go | 32 +++++----- raft/rawnode.go | 6 +- raft/status.go | 19 +++--- 7 files changed, 131 insertions(+), 123 deletions(-) diff --git a/raft/node.go b/raft/node.go index 19ba37600fa..5b2600fde26 100644 --- a/raft/node.go +++ b/raft/node.go @@ -353,15 +353,15 @@ func (n *node) run(r *raft) { } case m := <-n.recvc: // filter out response message from unknown From. - if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { + if pr := r.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { r.Step(m) } case cc := <-n.confc: if cc.NodeID == None { select { case n.confstatec <- pb.ConfState{ - Nodes: r.nodes(), - Learners: r.learnerNodes()}: + Nodes: r.prs.voterNodes(), + Learners: r.prs.learnerNodes()}: case <-n.done: } break @@ -384,8 +384,8 @@ func (n *node) run(r *raft) { } select { case n.confstatec <- pb.ConfState{ - Nodes: r.nodes(), - Learners: r.learnerNodes()}: + Nodes: r.prs.voterNodes(), + Learners: r.prs.learnerNodes()}: case <-n.done: } case <-n.tickc: diff --git a/raft/progress.go b/raft/progress.go index 45dff6a2e24..1e5e4a3c1ba 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -291,15 +291,17 @@ func (in *inflights) reset() { // the nodes and learners in it. In particular, it tracks the match index for // each peer which in turn allows reasoning about the committed index. type prs struct { - nodes map[uint64]*Progress - learners map[uint64]*Progress - matchBuf uint64Slice + nodes map[uint64]*Progress + learners map[uint64]*Progress + maxInflight int + matchBuf uint64Slice } -func makePRS() prs { +func makePRS(maxInflight int) prs { return prs{ - nodes: map[uint64]*Progress{}, - learners: map[uint64]*Progress{}, + nodes: map[uint64]*Progress{}, + learners: map[uint64]*Progress{}, + maxInflight: maxInflight, } } @@ -307,6 +309,8 @@ func (p *prs) quorum() int { return len(p.nodes)/2 + 1 } +// committed returns the largest log index known to be committed based on what +// the voting members of the group have acknowledged. func (p *prs) committed() uint64 { // Preserving matchBuf across calls is an optimization // used to avoid allocating a new slice on each call. @@ -327,3 +331,56 @@ func (p *prs) removeAny(id uint64) { delete(p.nodes, id) delete(p.learners, id) } + +func (p *prs) getProgress(id uint64) *Progress { + if pr, ok := p.nodes[id]; ok { + return pr + } + + return p.learners[id] +} + +// initProgress initializes a new progress for the given node, replacing any that +// may exist. It is invalid to replace a voter by a learner and attempts to do so +// will result in a panic. +func (p *prs) initProgress(id, match, next uint64, isLearner bool) { + if !isLearner { + delete(p.learners, id) + p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)} + return + } + + if _, ok := p.nodes[id]; ok { + panic(fmt.Sprintf("changing from voter to learner for %x", id)) + } + p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true} +} + +func (p *prs) voterNodes() []uint64 { + nodes := make([]uint64, 0, len(p.nodes)) + for id := range p.nodes { + nodes = append(nodes, id) + } + sort.Sort(uint64Slice(nodes)) + return nodes +} + +func (p *prs) learnerNodes() []uint64 { + nodes := make([]uint64, 0, len(p.learners)) + for id := range p.learners { + nodes = append(nodes, id) + } + sort.Sort(uint64Slice(nodes)) + return nodes +} + +// visit invokes the supplied closure for all tracked progresses. +func (p *prs) visit(f func(id uint64, pr *Progress)) { + for id, pr := range p.nodes { + f(id, pr) + } + + for id, pr := range p.learners { + f(id, pr) + } +} diff --git a/raft/raft.go b/raft/raft.go index edf48860641..21346e36775 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -20,7 +20,6 @@ import ( "fmt" "math" "math/rand" - "sort" "strings" "sync" "time" @@ -261,7 +260,6 @@ type raft struct { maxMsgSize uint64 maxUncommittedSize uint64 - maxInflight int prs prs state StateType @@ -346,9 +344,8 @@ func newRaft(c *Config) *raft { isLearner: false, raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, - maxInflight: c.MaxInflightMsgs, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: makePRS(), + prs: makePRS(c.MaxInflightMsgs), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -358,13 +355,13 @@ func newRaft(c *Config) *raft { disableProposalForwarding: c.DisableProposalForwarding, } for _, p := range peers { - r.prs.nodes[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} + // Add node to active config. + r.prs.initProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */) } for _, p := range learners { - if _, ok := r.prs.nodes[p]; ok { - panic(fmt.Sprintf("node %x is in both learner and peer list", p)) - } - r.prs.learners[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true} + // Add learner to active config. + r.prs.initProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */) + if r.id == p { r.isLearner = true } @@ -379,7 +376,7 @@ func newRaft(c *Config) *raft { r.becomeFollower(r.Term, None) var nodesStrs []string - for _, n := range r.nodes() { + for _, n := range r.prs.voterNodes() { nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n)) } @@ -400,24 +397,6 @@ func (r *raft) hardState() pb.HardState { } } -func (r *raft) nodes() []uint64 { - nodes := make([]uint64, 0, len(r.prs.nodes)) - for id := range r.prs.nodes { - nodes = append(nodes, id) - } - sort.Sort(uint64Slice(nodes)) - return nodes -} - -func (r *raft) learnerNodes() []uint64 { - nodes := make([]uint64, 0, len(r.prs.learners)) - for id := range r.prs.learners { - nodes = append(nodes, id) - } - sort.Sort(uint64Slice(nodes)) - return nodes -} - // send persists state to stable storage and then sends to its mailbox. func (r *raft) send(m pb.Message) { m.From = r.id @@ -452,14 +431,6 @@ func (r *raft) send(m pb.Message) { r.msgs = append(r.msgs, m) } -func (r *raft) getProgress(id uint64) *Progress { - if pr, ok := r.prs.nodes[id]; ok { - return pr - } - - return r.prs.learners[id] -} - // sendAppend sends an append RPC with new entries (if any) and the // current commit index to the given peer. func (r *raft) sendAppend(to uint64) { @@ -472,7 +443,7 @@ func (r *raft) sendAppend(to uint64) { // ("empty" messages are useful to convey updated Commit indexes, but // are undesirable when we're sending multiple messages in a batch). func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { - pr := r.getProgress(to) + pr := r.prs.getProgress(to) if pr.IsPaused() { return false } @@ -541,7 +512,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // or it might not have all the committed entries. // The leader MUST NOT forward the follower's commit to // an unmatched index. - commit := min(r.getProgress(to).Match, r.raftLog.committed) + commit := min(r.prs.getProgress(to).Match, r.raftLog.committed) m := pb.Message{ To: to, Type: pb.MsgHeartbeat, @@ -552,20 +523,10 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { r.send(m) } -func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) { - for id, pr := range r.prs.nodes { - f(id, pr) - } - - for id, pr := range r.prs.learners { - f(id, pr) - } -} - // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.prs. func (r *raft) bcastAppend() { - r.forEachProgress(func(id uint64, _ *Progress) { + r.prs.visit(func(id uint64, _ *Progress) { if id == r.id { return } @@ -585,7 +546,7 @@ func (r *raft) bcastHeartbeat() { } func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { - r.forEachProgress(func(id uint64, _ *Progress) { + r.prs.visit(func(id uint64, _ *Progress) { if id == r.id { return } @@ -615,8 +576,8 @@ func (r *raft) reset(term uint64) { r.abortLeaderTransfer() r.votes = make(map[uint64]bool) - r.forEachProgress(func(id uint64, pr *Progress) { - *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner} + r.prs.visit(func(id uint64, pr *Progress) { + *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.prs.maxInflight), IsLearner: pr.IsLearner} if id == r.id { pr.Match = r.raftLog.lastIndex() } @@ -644,7 +605,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { } // use latest "last" index after truncate/append li = r.raftLog.append(es...) - r.getProgress(r.id).maybeUpdate(li) + r.prs.getProgress(r.id).maybeUpdate(li) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() return true @@ -738,7 +699,7 @@ func (r *raft) becomeLeader() { // (perhaps after having received a snapshot as a result). The leader is // trivially in this state. Note that r.reset() has initialized this // progress with the last index already. - r.prs.nodes[r.id].becomeReplicate() + r.prs.getProgress(r.id).becomeReplicate() // Conservatively set the pendingConfIndex to the last index in the // log. There may or may not be a pending config change, but it's @@ -1040,7 +1001,7 @@ func stepLeader(r *raft, m pb.Message) error { } // All other message types require a progress for m.From (pr). - pr := r.getProgress(m.From) + pr := r.prs.getProgress(m.From) if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) return nil @@ -1367,16 +1328,16 @@ func (r *raft) restoreNode(nodes []uint64, isLearner bool) { match = next - 1 r.isLearner = isLearner } - r.setProgress(n, match, next, isLearner) - r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n)) + r.prs.initProgress(n, match, next, isLearner) + r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.getProgress(n)) } } // promotable indicates whether state machine can be promoted to leader, // which is true when its own id is in progress list. func (r *raft) promotable() bool { - _, ok := r.prs.nodes[r.id] - return ok + pr := r.prs.getProgress(r.id) + return pr != nil && !pr.IsLearner } func (r *raft) addNode(id uint64) { @@ -1388,12 +1349,12 @@ func (r *raft) addLearner(id uint64) { } func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { - pr := r.getProgress(id) + pr := r.prs.getProgress(id) if pr == nil { - r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) + r.prs.initProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) } else { if isLearner && !pr.IsLearner { - // can only change Learner to Voter + // Can only change Learner to Voter. r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id) return } @@ -1404,10 +1365,11 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { return } - // change Learner to Voter, use origin Learner progress - delete(r.prs.learners, id) + // Change Learner to Voter, use origin Learner progress. + r.prs.removeAny(id) + r.prs.initProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */) pr.IsLearner = false - r.prs.nodes[id] = pr + *r.prs.getProgress(id) = *pr } if r.id == id { @@ -1417,8 +1379,7 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked // before the added node has a chance to communicate with us. - pr = r.getProgress(id) - pr.RecentActive = true + r.prs.getProgress(id).RecentActive = true } func (r *raft) removeNode(id uint64) { @@ -1440,19 +1401,6 @@ func (r *raft) removeNode(id uint64) { } } -func (r *raft) setProgress(id, match, next uint64, isLearner bool) { - if !isLearner { - delete(r.prs.learners, id) - r.prs.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)} - return - } - - if _, ok := r.prs.nodes[id]; ok { - panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id)) - } - r.prs.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true} -} - func (r *raft) loadState(state pb.HardState) { if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() { r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex()) @@ -1480,7 +1428,7 @@ func (r *raft) resetRandomizedElectionTimeout() { func (r *raft) checkQuorumActive() bool { var act int - r.forEachProgress(func(id uint64, pr *Progress) { + r.prs.visit(func(id uint64, pr *Progress) { if id == r.id { // self is always active act++ return diff --git a/raft/raft_flow_control_test.go b/raft/raft_flow_control_test.go index 15934844574..01d4dd7a66f 100644 --- a/raft/raft_flow_control_test.go +++ b/raft/raft_flow_control_test.go @@ -33,7 +33,7 @@ func TestMsgAppFlowControlFull(t *testing.T) { // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window - for i := 0; i < r.maxInflight; i++ { + for i := 0; i < r.prs.maxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) ms := r.readMessages() if len(ms) != 1 { @@ -69,14 +69,14 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) { // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window - for i := 0; i < r.maxInflight; i++ { + for i := 0; i < r.prs.maxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) r.readMessages() } // 1 is noop, 2 is the first proposal we just sent. // so we start with 2. - for tt := 2; tt < r.maxInflight; tt++ { + for tt := 2; tt < r.prs.maxInflight; tt++ { // move forward the window r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)}) r.readMessages() @@ -114,7 +114,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) { // force the progress to be in replicate state pr2.becomeReplicate() // fill in the inflights window - for i := 0; i < r.maxInflight; i++ { + for i := 0; i < r.prs.maxInflight; i++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) r.readMessages() } diff --git a/raft/raft_test.go b/raft/raft_test.go index 7fdcdb28fd6..48441fba328 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -889,7 +889,7 @@ func TestLearnerLogReplication(t *testing.T) { t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed) } - match := n1.getProgress(2).Match + match := n1.prs.getProgress(2).Match if match != n2.raftLog.committed { t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match) } @@ -1352,7 +1352,7 @@ func TestCommit(t *testing.T) { sm := newTestRaft(1, []uint64{1}, 10, 2, storage) for j := 0; j < len(tt.matches); j++ { - sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false) + sm.prs.initProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false) } sm.maybeCommit() if g := sm.raftLog.committed; g != tt.w { @@ -2931,7 +2931,7 @@ func TestRestore(t *testing.T) { if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term { t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) } - sg := sm.nodes() + sg := sm.prs.voterNodes() if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) { t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes) } @@ -2963,11 +2963,11 @@ func TestRestoreWithLearner(t *testing.T) { if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term { t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) } - sg := sm.nodes() + sg := sm.prs.voterNodes() if len(sg) != len(s.Metadata.ConfState.Nodes) { t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Nodes) } - lns := sm.learnerNodes() + lns := sm.prs.learnerNodes() if len(lns) != len(s.Metadata.ConfState.Learners) { t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners) } @@ -3192,7 +3192,7 @@ func TestSlowNodeRestore(t *testing.T) { } lead := nt.peers[1].(*raft) nextEnts(lead, nt.storage[1]) - nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil) + nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil) nt.storage[1].Compact(lead.raftLog.applied) nt.recover() @@ -3287,7 +3287,7 @@ func TestNewLeaderPendingConfig(t *testing.T) { func TestAddNode(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) r.addNode(2) - nodes := r.nodes() + nodes := r.prs.voterNodes() wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) @@ -3298,7 +3298,7 @@ func TestAddNode(t *testing.T) { func TestAddLearner(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) r.addLearner(2) - nodes := r.learnerNodes() + nodes := r.prs.learnerNodes() wnodes := []uint64{2} if !reflect.DeepEqual(nodes, wnodes) { t.Errorf("nodes = %v, want %v", nodes, wnodes) @@ -3348,14 +3348,14 @@ func TestRemoveNode(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r.removeNode(2) w := []uint64{1} - if g := r.nodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } // remove all nodes from cluster r.removeNode(1) w = []uint64{} - if g := r.nodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } } @@ -3366,18 +3366,18 @@ func TestRemoveLearner(t *testing.T) { r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) r.removeNode(2) w := []uint64{1} - if g := r.nodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } w = []uint64{} - if g := r.learnerNodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.learnerNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } // remove all nodes from cluster r.removeNode(1) - if g := r.nodes(); !reflect.DeepEqual(g, w) { + if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) } } @@ -3416,8 +3416,8 @@ func TestRaftNodes(t *testing.T) { } for i, tt := range tests { r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage()) - if !reflect.DeepEqual(r.nodes(), tt.wids) { - t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids) + if !reflect.DeepEqual(r.prs.voterNodes(), tt.wids) { + t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.voterNodes(), tt.wids) } } } @@ -3637,7 +3637,7 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) lead := nt.peers[1].(*raft) nextEnts(lead, nt.storage[1]) - nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil) + nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil) nt.storage[1].Compact(lead.raftLog.applied) nt.recover() diff --git a/raft/rawnode.go b/raft/rawnode.go index 8a2c3d16b58..0505c9db30b 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -166,7 +166,7 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { // ApplyConfChange applies a config change to the local node. func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { if cc.NodeID == None { - return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()} + return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()} } switch cc.Type { case pb.ConfChangeAddNode: @@ -179,7 +179,7 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { default: panic("unexpected conf type") } - return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()} + return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()} } // Step advances the state machine using the given message. @@ -188,7 +188,7 @@ func (rn *RawNode) Step(m pb.Message) error { if IsLocalMsg(m.Type) { return ErrStepLocalMsg } - if pr := rn.raft.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { + if pr := rn.raft.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { return rn.raft.Step(m) } return ErrStepPeerNotFound diff --git a/raft/status.go b/raft/status.go index 3e06c343676..f894716e190 100644 --- a/raft/status.go +++ b/raft/status.go @@ -33,15 +33,18 @@ type Status struct { } func getProgressCopy(r *raft) map[uint64]Progress { - prs := make(map[uint64]Progress) - for id, p := range r.prs.nodes { - prs[id] = *p - } + m := make(map[uint64]Progress) + r.prs.visit(func(id uint64, pr *Progress) { + var p Progress + p, pr = *pr, nil /* avoid accidental reuse below */ - for id, p := range r.prs.learners { - prs[id] = *p - } - return prs + // The inflight buffer is tricky to copy and besides, it isn't exposed + // to the client, so pretend it's nil. + p.ins = nil + + m[id] = p + }) + return m } func getStatusWithoutProgress(r *raft) Status { From a11563737c58f5d67de6e29c472e1360b4128c6d Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 26 Apr 2019 13:16:00 +0200 Subject: [PATCH 3/9] raft: use progress tracker APIs in more places This doesn't completely eliminate access to prs.nodes, but that's not really necessary. This commit uses the existing APIs in a few more places where it's convenient, and also sprinkles some assertions. --- raft/progress.go | 59 +++++++++++++++++++++++++++++------------------ raft/raft.go | 14 +++++++---- raft/raft_test.go | 1 + raft/rawnode.go | 19 ++++++++------- 4 files changed, 56 insertions(+), 37 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index 1e5e4a3c1ba..f88cedecc24 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -328,10 +328,35 @@ func (p *prs) committed() uint64 { } func (p *prs) removeAny(id uint64) { + pN := p.nodes[id] + pL := p.learners[id] + + if pN == nil && pL == nil { + panic("attempting to remove unknown peer %x") + } else if pN != nil && pL != nil { + panic(fmt.Sprintf("peer %x is both voter and learner", id)) + } + delete(p.nodes, id) delete(p.learners, id) } +// initProgress initializes a new progress for the given node or learner. The +// node may not exist yet in either form or a panic will ensue. +func (p *prs) initProgress(id, match, next uint64, isLearner bool) { + if pr := p.nodes[id]; pr != nil { + panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr)) + } + if pr := p.learners[id]; pr != nil { + panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr)) + } + if !isLearner { + p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)} + return + } + p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true} +} + func (p *prs) getProgress(id uint64) *Progress { if pr, ok := p.nodes[id]; ok { return pr @@ -340,20 +365,21 @@ func (p *prs) getProgress(id uint64) *Progress { return p.learners[id] } -// initProgress initializes a new progress for the given node, replacing any that -// may exist. It is invalid to replace a voter by a learner and attempts to do so -// will result in a panic. -func (p *prs) initProgress(id, match, next uint64, isLearner bool) { - if !isLearner { - delete(p.learners, id) - p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)} - return +// visit invokes the supplied closure for all tracked progresses. +func (p *prs) visit(f func(id uint64, pr *Progress)) { + for id, pr := range p.nodes { + f(id, pr) } - if _, ok := p.nodes[id]; ok { - panic(fmt.Sprintf("changing from voter to learner for %x", id)) + for id, pr := range p.learners { + f(id, pr) } - p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true} +} + +func (p *prs) reset() { + p.nodes = map[uint64]*Progress{} + p.learners = map[uint64]*Progress{} + p.matchBuf = nil } func (p *prs) voterNodes() []uint64 { @@ -373,14 +399,3 @@ func (p *prs) learnerNodes() []uint64 { sort.Sort(uint64Slice(nodes)) return nodes } - -// visit invokes the supplied closure for all tracked progresses. -func (p *prs) visit(f func(id uint64, pr *Progress)) { - for id, pr := range p.nodes { - f(id, pr) - } - - for id, pr := range p.learners { - f(id, pr) - } -} diff --git a/raft/raft.go b/raft/raft.go index 21346e36775..e7f3c06167e 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -577,7 +577,12 @@ func (r *raft) reset(term uint64) { r.votes = make(map[uint64]bool) r.prs.visit(func(id uint64, pr *Progress) { - *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.prs.maxInflight), IsLearner: pr.IsLearner} + *pr = Progress{ + Match: 0, + Next: r.raftLog.lastIndex() + 1, + ins: newInflights(r.prs.maxInflight), + IsLearner: pr.IsLearner, + } if id == r.id { pr.Match = r.raftLog.lastIndex() } @@ -938,7 +943,7 @@ func stepLeader(r *raft, m pb.Message) error { if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) } - if _, ok := r.prs.nodes[r.id]; !ok { + if r.prs.getProgress(r.id) == nil { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. @@ -1314,8 +1319,7 @@ func (r *raft) restore(s pb.Snapshot) bool { r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.restore(s) - r.prs.nodes = make(map[uint64]*Progress) - r.prs.learners = make(map[uint64]*Progress) + r.prs.reset() r.restoreNode(s.Metadata.ConfState.Nodes, false) r.restoreNode(s.Metadata.ConfState.Learners, true) return true @@ -1385,7 +1389,7 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { func (r *raft) removeNode(id uint64) { r.prs.removeAny(id) - // do not try to commit or abort transferring if there is no nodes in the cluster. + // Do not try to commit or abort transferring if the cluster is now empty. if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 { return } diff --git a/raft/raft_test.go b/raft/raft_test.go index 48441fba328..93c61735cba 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1351,6 +1351,7 @@ func TestCommit(t *testing.T) { storage.hardState = pb.HardState{Term: tt.smTerm} sm := newTestRaft(1, []uint64{1}, 10, 2, storage) + sm.prs.removeAny(1) for j := 0; j < len(tt.matches); j++ { sm.prs.initProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false) } diff --git a/raft/rawnode.go b/raft/rawnode.go index 0505c9db30b..139a084211c 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -257,16 +257,15 @@ const ( // WithProgress is a helper to introspect the Progress for this node and its // peers. func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) { - for id, pr := range rn.raft.prs.nodes { - pr := *pr - pr.ins = nil - visitor(id, ProgressTypePeer, pr) - } - for id, pr := range rn.raft.prs.learners { - pr := *pr - pr.ins = nil - visitor(id, ProgressTypeLearner, pr) - } + rn.raft.prs.visit(func(id uint64, pr *Progress) { + typ := ProgressTypePeer + if pr.IsLearner { + typ = ProgressTypeLearner + } + p := *pr + p.ins = nil + visitor(id, typ, p) + }) } // ReportUnreachable reports the given node is not reachable for the last send. From 26eaadb1d176ba2a619e66b115adde527c024437 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 26 Apr 2019 14:13:05 +0200 Subject: [PATCH 4/9] raft: move votes into prs This is purely mechanical. Cleanup deferred to the next commit. --- raft/progress.go | 7 +++++-- raft/raft.go | 16 +++++++--------- raft/raft_paper_test.go | 2 +- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index f88cedecc24..21c04fcca37 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -291,8 +291,11 @@ func (in *inflights) reset() { // the nodes and learners in it. In particular, it tracks the match index for // each peer which in turn allows reasoning about the committed index. type prs struct { - nodes map[uint64]*Progress - learners map[uint64]*Progress + nodes map[uint64]*Progress + learners map[uint64]*Progress + + votes map[uint64]bool + maxInflight int matchBuf uint64Slice } diff --git a/raft/raft.go b/raft/raft.go index e7f3c06167e..d1b62312721 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -267,8 +267,6 @@ type raft struct { // isLearner is true if the local raft node is a learner. isLearner bool - votes map[uint64]bool - msgs []pb.Message // the leader id @@ -575,7 +573,7 @@ func (r *raft) reset(term uint64) { r.abortLeaderTransfer() - r.votes = make(map[uint64]bool) + r.prs.votes = make(map[uint64]bool) r.prs.visit(func(id uint64, pr *Progress) { *pr = Progress{ Match: 0, @@ -683,7 +681,7 @@ func (r *raft) becomePreCandidate() { // but doesn't change anything else. In particular it does not increase // r.Term or change r.Vote. r.step = stepCandidate - r.votes = make(map[uint64]bool) + r.prs.votes = make(map[uint64]bool) r.tick = r.tickElection r.lead = None r.state = StatePreCandidate @@ -770,10 +768,10 @@ func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) { } else { r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term) } - if _, ok := r.votes[id]; !ok { - r.votes[id] = v + if _, ok := r.prs.votes[id]; !ok { + r.prs.votes[id] = v } - for _, vv := range r.votes { + for _, vv := range r.prs.votes { if vv { granted++ } @@ -1181,7 +1179,7 @@ func stepCandidate(r *raft, m pb.Message) error { r.handleSnapshot(m) case myVoteRespType: gr := r.poll(m.From, m.Type, !m.Reject) - r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.prs.quorum(), gr, m.Type, len(r.votes)-gr) + r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.prs.quorum(), gr, m.Type, len(r.prs.votes)-gr) switch r.prs.quorum() { case gr: if r.state == StatePreCandidate { @@ -1190,7 +1188,7 @@ func stepCandidate(r *raft, m pb.Message) error { r.becomeLeader() r.bcastAppend() } - case len(r.votes) - gr: + case len(r.prs.votes) - gr: // pb.MsgPreVoteResp contains future term of pre-candidate // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index c40f279c20a..3b767b4fa0b 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -169,7 +169,7 @@ func testNonleaderStartElection(t *testing.T, state StateType) { if r.state != StateCandidate { t.Errorf("state = %s, want %s", r.state, StateCandidate) } - if !r.votes[r.id] { + if !r.prs.votes[r.id] { t.Errorf("vote for self = false, want true") } msgs := r.readMessages() From a6f222e62d250df227274a53d4bbd301d776c400 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 26 Apr 2019 14:47:48 +0200 Subject: [PATCH 5/9] raft: establish an interface around vote counting This cleans up the mechanical refactor in the last commit and will help with etcd-io/etcd#7625 as well. --- raft/progress.go | 48 ++++++++++++++++++++++++++++++++++++++++-------- raft/raft.go | 39 ++++++++++++++++++++------------------- 2 files changed, 60 insertions(+), 27 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index 21c04fcca37..c6434da3e00 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -301,11 +301,13 @@ type prs struct { } func makePRS(maxInflight int) prs { - return prs{ + p := prs{ + maxInflight: maxInflight, nodes: map[uint64]*Progress{}, learners: map[uint64]*Progress{}, - maxInflight: maxInflight, + votes: map[uint64]bool{}, } + return p } func (p *prs) quorum() int { @@ -379,12 +381,6 @@ func (p *prs) visit(f func(id uint64, pr *Progress)) { } } -func (p *prs) reset() { - p.nodes = map[uint64]*Progress{} - p.learners = map[uint64]*Progress{} - p.matchBuf = nil -} - func (p *prs) voterNodes() []uint64 { nodes := make([]uint64, 0, len(p.nodes)) for id := range p.nodes { @@ -402,3 +398,39 @@ func (p *prs) learnerNodes() []uint64 { sort.Sort(uint64Slice(nodes)) return nodes } + +// resetVotes prepares for a new round of vote counting via recordVote. +func (p *prs) resetVotes() { + p.votes = map[uint64]bool{} +} + +// recordVote records that the node with the given id voted for this Raft +// instance if v == true (and declined it otherwise). +func (p *prs) recordVote(id uint64, v bool) { + _, ok := p.votes[id] + if !ok { + p.votes[id] = v + } +} + +// tallyVotes returns the number of granted and rejected votes, and whether the +// election outcome is known. +func (p *prs) tallyVotes() (granted int, rejected int, result electionResult) { + for _, v := range p.votes { + if v { + granted++ + } else { + rejected++ + } + } + + q := p.quorum() + + result = electionIndeterminate + if granted >= q { + result = electionWon + } else if rejected >= q { + result = electionLost + } + return granted, rejected, result +} diff --git a/raft/raft.go b/raft/raft.go index d1b62312721..645eb1c5f16 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -573,7 +573,7 @@ func (r *raft) reset(term uint64) { r.abortLeaderTransfer() - r.prs.votes = make(map[uint64]bool) + r.prs.resetVotes() r.prs.visit(func(id uint64, pr *Progress) { *pr = Progress{ Match: 0, @@ -681,7 +681,7 @@ func (r *raft) becomePreCandidate() { // but doesn't change anything else. In particular it does not increase // r.Term or change r.Vote. r.step = stepCandidate - r.prs.votes = make(map[uint64]bool) + r.prs.resetVotes() r.tick = r.tickElection r.lead = None r.state = StatePreCandidate @@ -737,7 +737,7 @@ func (r *raft) campaign(t CampaignType) { voteMsg = pb.MsgVote term = r.Term } - if r.prs.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) { + if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == electionWon { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). Advance to the next state. if t == campaignPreElection { @@ -762,21 +762,22 @@ func (r *raft) campaign(t CampaignType) { } } -func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) { +type electionResult byte + +const ( + electionIndeterminate electionResult = iota + electionLost + electionWon +) + +func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result electionResult) { if v { r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term) } else { r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term) } - if _, ok := r.prs.votes[id]; !ok { - r.prs.votes[id] = v - } - for _, vv := range r.prs.votes { - if vv { - granted++ - } - } - return granted + r.prs.recordVote(id, v) + return r.prs.tallyVotes() } func (r *raft) Step(m pb.Message) error { @@ -1178,17 +1179,17 @@ func stepCandidate(r *raft, m pb.Message) error { r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleSnapshot(m) case myVoteRespType: - gr := r.poll(m.From, m.Type, !m.Reject) - r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.prs.quorum(), gr, m.Type, len(r.prs.votes)-gr) - switch r.prs.quorum() { - case gr: + gr, rj, res := r.poll(m.From, m.Type, !m.Reject) + r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) + switch res { + case electionWon: if r.state == StatePreCandidate { r.campaign(campaignElection) } else { r.becomeLeader() r.bcastAppend() } - case len(r.prs.votes) - gr: + case electionLost: // pb.MsgPreVoteResp contains future term of pre-candidate // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) @@ -1317,7 +1318,7 @@ func (r *raft) restore(s pb.Snapshot) bool { r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.restore(s) - r.prs.reset() + r.prs = makePRS(r.prs.maxInflight) r.restoreNode(s.Metadata.ConfState.Nodes, false) r.restoreNode(s.Metadata.ConfState.Learners, true) return true From bc828e939a6ab917fbff32597f609a2e42fa7fdb Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 26 Apr 2019 15:16:12 +0200 Subject: [PATCH 6/9] raft: pull checkQuorumActive into prs It's looking at each voter's Progress and needs to know how quorums work, so this is the ideal new home for it. --- raft/progress.go | 14 ++++++++++++++ raft/raft.go | 44 ++++++++++++++++++++------------------------ 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index c6434da3e00..0df0c2a3044 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -381,6 +381,20 @@ func (p *prs) visit(f func(id uint64, pr *Progress)) { } } +// checkQuorumActive returns true if the quorum is active from +// the view of the local raft state machine. Otherwise, it returns +// false. +func (p *prs) quorumActive() bool { + var act int + p.visit(func(id uint64, pr *Progress) { + if pr.RecentActive && !pr.IsLearner { + act++ + } + }) + + return act >= p.quorum() +} + func (p *prs) voterNodes() []uint64 { nodes := make([]uint64, 0, len(p.nodes)) for id := range p.nodes { diff --git a/raft/raft.go b/raft/raft.go index 645eb1c5f16..750d7665c9d 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -933,10 +933,26 @@ func stepLeader(r *raft, m pb.Message) error { r.bcastHeartbeat() return nil case pb.MsgCheckQuorum: - if !r.checkQuorumActive() { + // The leader should always see itself as active. As a precaution, handle + // the case in which the leader isn't in the configuration any more (for + // example if it just removed itself). + // + // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the + // leader steps down when removing itself. I might be missing something. + if pr := r.prs.getProgress(r.id); pr != nil { + pr.RecentActive = true + } + if !r.prs.quorumActive() { r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) r.becomeFollower(r.Term, None) } + // Mark everyone (but ourselves) as inactive in preparation for the next + // CheckQuorum. + r.prs.visit(func(id uint64, pr *Progress) { + if id != r.id { + pr.RecentActive = false + } + }) return nil case pb.MsgProp: if len(m.Entries) == 0 { @@ -1393,6 +1409,9 @@ func (r *raft) removeNode(id uint64) { return } + // TODO(tbg): won't bad (or at least unfortunate) things happen if the + // leader just removed itself? + // The quorum size is now smaller, so see if any pending entries can // be committed. if r.maybeCommit() { @@ -1424,29 +1443,6 @@ func (r *raft) resetRandomizedElectionTimeout() { r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout) } -// checkQuorumActive returns true if the quorum is active from -// the view of the local raft state machine. Otherwise, it returns -// false. -// checkQuorumActive also resets all RecentActive to false. -func (r *raft) checkQuorumActive() bool { - var act int - - r.prs.visit(func(id uint64, pr *Progress) { - if id == r.id { // self is always active - act++ - return - } - - if pr.RecentActive && !pr.IsLearner { - act++ - } - - pr.RecentActive = false - }) - - return act >= r.prs.quorum() -} - func (r *raft) sendTimeoutNow(to uint64) { r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow}) } From 57a1b39fcd3b02ea63008fb0708f4ea47e2d7a52 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 26 Apr 2019 15:24:05 +0200 Subject: [PATCH 7/9] raft: avoid another call to quorum() This particular caller just wanted to know whether it was in a single-voter cluster configuration, which is now a question prs can answer. --- raft/progress.go | 6 ++++++ raft/raft.go | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index 0df0c2a3044..2833730c610 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -310,6 +310,12 @@ func makePRS(maxInflight int) prs { return p } +// isSingleton returns true if (and only if) there is only one voting member +// (i.e. the leader) in the current configuration. +func (p *prs) isSingleton() bool { + return len(p.nodes) == 1 +} + func (p *prs) quorum() int { return len(p.nodes)/2 + 1 } diff --git a/raft/raft.go b/raft/raft.go index 750d7665c9d..4c630dfd823 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -988,7 +988,7 @@ func stepLeader(r *raft, m pb.Message) error { r.bcastAppend() return nil case pb.MsgReadIndex: - if r.prs.quorum() > 1 { + if !r.prs.isSingleton() { // more than one voting member in cluster if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. return nil @@ -1009,7 +1009,7 @@ func stepLeader(r *raft, m pb.Message) error { r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries}) } } - } else { // there is only one voting member (the leader) in the cluster + } else { // only one voting member (the leader) in the cluster if m.From == None || m.From == r.id { // from leader itself r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) } else { // from learner member From 02b0d8023421616bf77732a0e706662017fe063d Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 26 Apr 2019 15:56:39 +0200 Subject: [PATCH 8/9] raft: remove quorum() dependency from readOnly This now delegates the quorum computation to r.prs, which will allow it to generalize in a straightforward way when etcd-io/etcd#7625 is addressed. --- raft/progress.go | 4 ++++ raft/raft.go | 5 +++-- raft/read_only.go | 19 +++++++++---------- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index 2833730c610..cd48c8277cc 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -320,6 +320,10 @@ func (p *prs) quorum() int { return len(p.nodes)/2 + 1 } +func (p *prs) hasQuorum(m map[uint64]struct{}) bool { + return len(m) >= p.quorum() +} + // committed returns the largest log index known to be committed based on what // the voting members of the group have acknowledged. func (p *prs) committed() uint64 { diff --git a/raft/raft.go b/raft/raft.go index 4c630dfd823..5bf372f1876 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1000,6 +1000,8 @@ func stepLeader(r *raft, m pb.Message) error { switch r.readOnly.option { case ReadOnlySafe: r.readOnly.addRequest(r.raftLog.committed, m) + // The local node automatically acks the request. + r.readOnly.recvAck(r.id, m.Entries[0].Data) r.bcastHeartbeatWithCtx(m.Entries[0].Data) case ReadOnlyLeaseBased: ri := r.raftLog.committed @@ -1097,8 +1099,7 @@ func stepLeader(r *raft, m pb.Message) error { return nil } - ackCount := r.readOnly.recvAck(m) - if ackCount < r.prs.quorum() { + if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) { return nil } diff --git a/raft/read_only.go b/raft/read_only.go index 38053383186..955fe798669 100644 --- a/raft/read_only.go +++ b/raft/read_only.go @@ -50,26 +50,25 @@ func newReadOnly(option ReadOnlyOption) *readOnly { // the read only request. // `m` is the original read only request message from the local or remote node. func (ro *readOnly) addRequest(index uint64, m pb.Message) { - ctx := string(m.Entries[0].Data) - if _, ok := ro.pendingReadIndex[ctx]; ok { + s := string(m.Entries[0].Data) + if _, ok := ro.pendingReadIndex[s]; ok { return } - ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} - ro.readIndexQueue = append(ro.readIndexQueue, ctx) + ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} + ro.readIndexQueue = append(ro.readIndexQueue, s) } // recvAck notifies the readonly struct that the raft state machine received // an acknowledgment of the heartbeat that attached with the read only request // context. -func (ro *readOnly) recvAck(m pb.Message) int { - rs, ok := ro.pendingReadIndex[string(m.Context)] +func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} { + rs, ok := ro.pendingReadIndex[string(context)] if !ok { - return 0 + return nil } - rs.acks[m.From] = struct{}{} - // add one to include an ack from local node - return len(rs.acks) + 1 + rs.acks[id] = struct{}{} + return rs.acks } // advance advances the read only request queue kept by the readonly struct. From 5dd45011d67afab5a8b15ed0e19ac9308fb38e54 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 1 May 2019 18:32:02 +0200 Subject: [PATCH 9/9] raft: rename prs to progressTracker --- raft/progress.go | 40 ++++++++++++++++++++-------------------- raft/raft.go | 2 +- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index cd48c8277cc..fa4d63edfba 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -287,10 +287,10 @@ func (in *inflights) reset() { in.start = 0 } -// prs tracks the currently active configuration and the information known about -// the nodes and learners in it. In particular, it tracks the match index for -// each peer which in turn allows reasoning about the committed index. -type prs struct { +// progressTracker tracks the currently active configuration and the information +// known about the nodes and learners in it. In particular, it tracks the match +// index for each peer which in turn allows reasoning about the committed index. +type progressTracker struct { nodes map[uint64]*Progress learners map[uint64]*Progress @@ -300,8 +300,8 @@ type prs struct { matchBuf uint64Slice } -func makePRS(maxInflight int) prs { - p := prs{ +func makePRS(maxInflight int) progressTracker { + p := progressTracker{ maxInflight: maxInflight, nodes: map[uint64]*Progress{}, learners: map[uint64]*Progress{}, @@ -312,21 +312,21 @@ func makePRS(maxInflight int) prs { // isSingleton returns true if (and only if) there is only one voting member // (i.e. the leader) in the current configuration. -func (p *prs) isSingleton() bool { +func (p *progressTracker) isSingleton() bool { return len(p.nodes) == 1 } -func (p *prs) quorum() int { +func (p *progressTracker) quorum() int { return len(p.nodes)/2 + 1 } -func (p *prs) hasQuorum(m map[uint64]struct{}) bool { +func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool { return len(m) >= p.quorum() } // committed returns the largest log index known to be committed based on what // the voting members of the group have acknowledged. -func (p *prs) committed() uint64 { +func (p *progressTracker) committed() uint64 { // Preserving matchBuf across calls is an optimization // used to avoid allocating a new slice on each call. if cap(p.matchBuf) < len(p.nodes) { @@ -342,7 +342,7 @@ func (p *prs) committed() uint64 { return p.matchBuf[len(p.matchBuf)-p.quorum()] } -func (p *prs) removeAny(id uint64) { +func (p *progressTracker) removeAny(id uint64) { pN := p.nodes[id] pL := p.learners[id] @@ -358,7 +358,7 @@ func (p *prs) removeAny(id uint64) { // initProgress initializes a new progress for the given node or learner. The // node may not exist yet in either form or a panic will ensue. -func (p *prs) initProgress(id, match, next uint64, isLearner bool) { +func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) { if pr := p.nodes[id]; pr != nil { panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr)) } @@ -372,7 +372,7 @@ func (p *prs) initProgress(id, match, next uint64, isLearner bool) { p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true} } -func (p *prs) getProgress(id uint64) *Progress { +func (p *progressTracker) getProgress(id uint64) *Progress { if pr, ok := p.nodes[id]; ok { return pr } @@ -381,7 +381,7 @@ func (p *prs) getProgress(id uint64) *Progress { } // visit invokes the supplied closure for all tracked progresses. -func (p *prs) visit(f func(id uint64, pr *Progress)) { +func (p *progressTracker) visit(f func(id uint64, pr *Progress)) { for id, pr := range p.nodes { f(id, pr) } @@ -394,7 +394,7 @@ func (p *prs) visit(f func(id uint64, pr *Progress)) { // checkQuorumActive returns true if the quorum is active from // the view of the local raft state machine. Otherwise, it returns // false. -func (p *prs) quorumActive() bool { +func (p *progressTracker) quorumActive() bool { var act int p.visit(func(id uint64, pr *Progress) { if pr.RecentActive && !pr.IsLearner { @@ -405,7 +405,7 @@ func (p *prs) quorumActive() bool { return act >= p.quorum() } -func (p *prs) voterNodes() []uint64 { +func (p *progressTracker) voterNodes() []uint64 { nodes := make([]uint64, 0, len(p.nodes)) for id := range p.nodes { nodes = append(nodes, id) @@ -414,7 +414,7 @@ func (p *prs) voterNodes() []uint64 { return nodes } -func (p *prs) learnerNodes() []uint64 { +func (p *progressTracker) learnerNodes() []uint64 { nodes := make([]uint64, 0, len(p.learners)) for id := range p.learners { nodes = append(nodes, id) @@ -424,13 +424,13 @@ func (p *prs) learnerNodes() []uint64 { } // resetVotes prepares for a new round of vote counting via recordVote. -func (p *prs) resetVotes() { +func (p *progressTracker) resetVotes() { p.votes = map[uint64]bool{} } // recordVote records that the node with the given id voted for this Raft // instance if v == true (and declined it otherwise). -func (p *prs) recordVote(id uint64, v bool) { +func (p *progressTracker) recordVote(id uint64, v bool) { _, ok := p.votes[id] if !ok { p.votes[id] = v @@ -439,7 +439,7 @@ func (p *prs) recordVote(id uint64, v bool) { // tallyVotes returns the number of granted and rejected votes, and whether the // election outcome is known. -func (p *prs) tallyVotes() (granted int, rejected int, result electionResult) { +func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) { for _, v := range p.votes { if v { granted++ diff --git a/raft/raft.go b/raft/raft.go index 5bf372f1876..24a6c01784b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -260,7 +260,7 @@ type raft struct { maxMsgSize uint64 maxUncommittedSize uint64 - prs prs + prs progressTracker state StateType