From e03962990731caaaac7cb8fd892f856086506753 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 14 Jun 2019 21:47:49 +0200 Subject: [PATCH] raft: use half-populated joint quorum To ease a future transition into joint quorums, this commit removes the previous "ad-hoc" majority-based quorum and vote computations with that introduced in the `raft/quorum` package. More specifically, the progressTracker now uses a quorum.JointConfig for which the "second" majority quorum is always empty; in this case the quorum behaves like the one quorum.MajorityConfig that is actually present. Or, more briefly, this change is a no-op, but it will take the busywork out of actually starting to make use of joint quorums in the future. On a side node, I suspect that this might've fixed a bug regarding the read index though I haven't been able to explicitly come up with a counter-example. The problem was that the acks collected for the read index weren't taking into account membership changes, so they'd run the danger of using acks from nodes since removed to claim that a quorum of acks had been received. There's a chance that there isn't a counter-example (the only guarantee extracted from the "quorum" is that there isn't another leader, but even if there's another leader all that matters is that that leader doesn't have a divergent history from the stale leader in the hypothetical counter-example), but either way there is morally a bug here that is now fixed because VoteCommitted doesn't care about votes from members that are not voters known to the currently active configuration. --- raft/progress.go | 93 ++++++++++++++++++++++++----------------------- raft/raft.go | 27 ++++++-------- raft/raft_test.go | 5 ++- raft/read_only.go | 12 ++++-- 4 files changed, 69 insertions(+), 68 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index b0bd817a9c9..37663562493 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -17,6 +17,8 @@ package raft import ( "fmt" "sort" + + "go.etcd.io/etcd/raft/quorum" ) const ( @@ -291,23 +293,25 @@ func (in *inflights) reset() { // known about the nodes and learners in it. In particular, it tracks the match // index for each peer which in turn allows reasoning about the committed index. type progressTracker struct { - nodes map[uint64]struct{} + voters quorum.JointConfig learners map[uint64]struct{} prs map[uint64]*Progress votes map[uint64]bool maxInflight int - matchBuf uint64Slice } func makeProgressTracker(maxInflight int) progressTracker { p := progressTracker{ maxInflight: maxInflight, - prs: map[uint64]*Progress{}, - nodes: map[uint64]struct{}{}, - learners: map[uint64]struct{}{}, - votes: map[uint64]bool{}, + voters: quorum.JointConfig{ + quorum.MajorityConfig{}, + quorum.MajorityConfig{}, + }, + learners: map[uint64]struct{}{}, + votes: map[uint64]bool{}, + prs: map[uint64]*Progress{}, } return p } @@ -315,40 +319,35 @@ func makeProgressTracker(maxInflight int) progressTracker { // isSingleton returns true if (and only if) there is only one voting member // (i.e. the leader) in the current configuration. func (p *progressTracker) isSingleton() bool { - return len(p.nodes) == 1 + return len(p.voters[0]) == 1 && len(p.voters[1]) == 0 } -func (p *progressTracker) quorum() int { - return len(p.nodes)/2 + 1 -} +type progressAckIndexer map[uint64]*Progress -func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool { - return len(m) >= p.quorum() +var _ quorum.AckedIndexer = progressAckIndexer(nil) + +func (l progressAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) { + pr, ok := l[id] + if !ok { + return 0, false + } + return quorum.Index(pr.Match), true } // committed returns the largest log index known to be committed based on what // the voting members of the group have acknowledged. func (p *progressTracker) committed() uint64 { - // Preserving matchBuf across calls is an optimization - // used to avoid allocating a new slice on each call. - if cap(p.matchBuf) < len(p.nodes) { - p.matchBuf = make(uint64Slice, len(p.nodes)) - } - p.matchBuf = p.matchBuf[:len(p.nodes)] - idx := 0 - for id := range p.nodes { - p.matchBuf[idx] = p.prs[id].Match - idx++ - } - sort.Sort(&p.matchBuf) - return p.matchBuf[len(p.matchBuf)-p.quorum()] + return uint64(p.voters.CommittedIndex(progressAckIndexer(p.prs))) } func (p *progressTracker) removeAny(id uint64) { _, okPR := p.prs[id] - _, okV := p.nodes[id] + _, okV1 := p.voters[0][id] + _, okV2 := p.voters[1][id] _, okL := p.learners[id] + okV := okV1 || okV2 + if !okPR { panic("attempting to remove unknown peer %x") } else if !okV && !okL { @@ -357,7 +356,8 @@ func (p *progressTracker) removeAny(id uint64) { panic(fmt.Sprintf("peer %x is both voter and learner", id)) } - delete(p.nodes, id) + delete(p.voters[0], id) + delete(p.voters[1], id) delete(p.learners, id) delete(p.prs, id) } @@ -369,7 +369,7 @@ func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) { panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr)) } if !isLearner { - p.nodes[id] = struct{}{} + p.voters[0][id] = struct{}{} } else { p.learners[id] = struct{}{} } @@ -391,19 +391,21 @@ func (p *progressTracker) visit(f func(id uint64, pr *Progress)) { // the view of the local raft state machine. Otherwise, it returns // false. func (p *progressTracker) quorumActive() bool { - var act int + votes := map[uint64]bool{} p.visit(func(id uint64, pr *Progress) { - if pr.RecentActive && !pr.IsLearner { - act++ + if pr.IsLearner { + return } + votes[id] = pr.RecentActive }) - return act >= p.quorum() + return p.voters.VoteResult(votes) == quorum.VoteWon } func (p *progressTracker) voterNodes() []uint64 { - nodes := make([]uint64, 0, len(p.nodes)) - for id := range p.nodes { + m := p.voters.IDs() + nodes := make([]uint64, 0, len(m)) + for id := range m { nodes = append(nodes, id) } sort.Sort(uint64Slice(nodes)) @@ -435,22 +437,21 @@ func (p *progressTracker) recordVote(id uint64, v bool) { // tallyVotes returns the number of granted and rejected votes, and whether the // election outcome is known. -func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) { - for _, v := range p.votes { - if v { +func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) { + // Make sure to populate granted/rejected correctly even if the votes slice + // contains members no longer part of the configuration. This doesn't really + // matter in the way the numbers are used (they're informational), but might + // as well get it right. + for id, pr := range p.prs { + if pr.IsLearner { + continue + } + if p.votes[id] { granted++ } else { rejected++ } } - - q := p.quorum() - - result = electionIndeterminate - if granted >= q { - result = electionWon - } else if rejected >= q { - result = electionLost - } + result := p.voters.VoteResult(p.votes) return granted, rejected, result } diff --git a/raft/raft.go b/raft/raft.go index 780a77b1552..06ba6bf12b2 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "go.etcd.io/etcd/raft/quorum" pb "go.etcd.io/etcd/raft/raftpb" ) @@ -744,7 +745,7 @@ func (r *raft) campaign(t CampaignType) { voteMsg = pb.MsgVote term = r.Term } - if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == electionWon { + if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). Advance to the next state. if t == campaignPreElection { @@ -754,7 +755,7 @@ func (r *raft) campaign(t CampaignType) { } return } - for id := range r.prs.nodes { + for id := range r.prs.voters.IDs() { if id == r.id { continue } @@ -769,15 +770,7 @@ func (r *raft) campaign(t CampaignType) { } } -type electionResult byte - -const ( - electionIndeterminate electionResult = iota - electionLost - electionWon -) - -func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result electionResult) { +func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) { if v { r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term) } else { @@ -999,7 +992,9 @@ func stepLeader(r *raft, m pb.Message) error { r.bcastAppend() return nil case pb.MsgReadIndex: - if !r.prs.isSingleton() { // more than one voting member in cluster + // If more than the local vote is needed, go through a full broadcast, + // otherwise optimize. + if !r.prs.isSingleton() { if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. return nil @@ -1110,7 +1105,7 @@ func stepLeader(r *raft, m pb.Message) error { return nil } - if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) { + if r.prs.voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { return nil } @@ -1210,14 +1205,14 @@ func stepCandidate(r *raft, m pb.Message) error { gr, rj, res := r.poll(m.From, m.Type, !m.Reject) r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) switch res { - case electionWon: + case quorum.VoteWon: if r.state == StatePreCandidate { r.campaign(campaignElection) } else { r.becomeLeader() r.bcastAppend() } - case electionLost: + case quorum.VoteLost: // pb.MsgPreVoteResp contains future term of pre-candidate // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) @@ -1417,7 +1412,7 @@ func (r *raft) removeNode(id uint64) { r.prs.removeAny(id) // Do not try to commit or abort transferring if the cluster is now empty. - if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 { + if len(r.prs.voters[0]) == 0 && len(r.prs.learners) == 0 { return } diff --git a/raft/raft_test.go b/raft/raft_test.go index 31ad3f1aedc..40be17cfc8f 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4334,7 +4334,8 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw learners[i] = true } v.id = id - v.prs.nodes = make(map[uint64]struct{}) + v.prs.voters[0] = make(map[uint64]struct{}) + v.prs.voters[1] = make(map[uint64]struct{}) v.prs.learners = make(map[uint64]struct{}) v.prs.prs = make(map[uint64]*Progress) for i := 0; i < size; i++ { @@ -4343,7 +4344,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw pr.IsLearner = true v.prs.learners[peerAddrs[i]] = struct{}{} } else { - v.prs.nodes[peerAddrs[i]] = struct{}{} + v.prs.voters[0][peerAddrs[i]] = struct{}{} } v.prs.prs[peerAddrs[i]] = pr } diff --git a/raft/read_only.go b/raft/read_only.go index 39eb2b06515..6987f1bd7d7 100644 --- a/raft/read_only.go +++ b/raft/read_only.go @@ -29,7 +29,11 @@ type ReadState struct { type readIndexStatus struct { req pb.Message index uint64 - acks map[uint64]struct{} + // NB: this never records 'false', but it's more convenient to use this + // instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If + // this becomes performance sensitive enough (doubtful), quorum.VoteResult + // can change to an API that is closer to that of CommittedIndex. + acks map[uint64]bool } type readOnly struct { @@ -54,20 +58,20 @@ func (ro *readOnly) addRequest(index uint64, m pb.Message) { if _, ok := ro.pendingReadIndex[s]; ok { return } - ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} + ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)} ro.readIndexQueue = append(ro.readIndexQueue, s) } // recvAck notifies the readonly struct that the raft state machine received // an acknowledgment of the heartbeat that attached with the read only request // context. -func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} { +func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool { rs, ok := ro.pendingReadIndex[string(context)] if !ok { return nil } - rs.acks[id] = struct{}{} + rs.acks[id] = true return rs.acks }