diff --git a/raft/progress.go b/raft/progress.go index b0bd817a9c95..564d31304c74 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -17,6 +17,8 @@ package raft import ( "fmt" "sort" + + "go.etcd.io/etcd/raft/quorum" ) const ( @@ -291,23 +293,25 @@ func (in *inflights) reset() { // known about the nodes and learners in it. In particular, it tracks the match // index for each peer which in turn allows reasoning about the committed index. type progressTracker struct { - nodes map[uint64]struct{} + voters quorum.JointConfig learners map[uint64]struct{} prs map[uint64]*Progress votes map[uint64]bool maxInflight int - matchBuf uint64Slice } func makeProgressTracker(maxInflight int) progressTracker { p := progressTracker{ maxInflight: maxInflight, - prs: map[uint64]*Progress{}, - nodes: map[uint64]struct{}{}, - learners: map[uint64]struct{}{}, - votes: map[uint64]bool{}, + voters: quorum.JointConfig{ + quorum.MajorityConfig{}, + quorum.MajorityConfig{}, + }, + learners: map[uint64]struct{}{}, + votes: map[uint64]bool{}, + prs: map[uint64]*Progress{}, } return p } @@ -315,40 +319,35 @@ func makeProgressTracker(maxInflight int) progressTracker { // isSingleton returns true if (and only if) there is only one voting member // (i.e. the leader) in the current configuration. func (p *progressTracker) isSingleton() bool { - return len(p.nodes) == 1 + return len(p.voters[0]) == 1 && len(p.voters[1]) == 0 } -func (p *progressTracker) quorum() int { - return len(p.nodes)/2 + 1 -} +type progressAckIndexer map[uint64]*Progress -func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool { - return len(m) >= p.quorum() +var _ quorum.AckedIndexer = progressAckIndexer(nil) + +func (l progressAckIndexer) AckedIndex(id uint64) (uint64, bool) { + pr, ok := l[id] + if !ok { + return 0, false + } + return pr.Match, true } // committed returns the largest log index known to be committed based on what // the voting members of the group have acknowledged. func (p *progressTracker) committed() uint64 { - // Preserving matchBuf across calls is an optimization - // used to avoid allocating a new slice on each call. - if cap(p.matchBuf) < len(p.nodes) { - p.matchBuf = make(uint64Slice, len(p.nodes)) - } - p.matchBuf = p.matchBuf[:len(p.nodes)] - idx := 0 - for id := range p.nodes { - p.matchBuf[idx] = p.prs[id].Match - idx++ - } - sort.Sort(&p.matchBuf) - return p.matchBuf[len(p.matchBuf)-p.quorum()] + return p.voters.CommittedIndex(progressAckIndexer(p.prs)).Definitely } func (p *progressTracker) removeAny(id uint64) { _, okPR := p.prs[id] - _, okV := p.nodes[id] + _, okV1 := p.voters[0][id] + _, okV2 := p.voters[1][id] _, okL := p.learners[id] + okV := okV1 || okV2 + if !okPR { panic("attempting to remove unknown peer %x") } else if !okV && !okL { @@ -357,7 +356,8 @@ func (p *progressTracker) removeAny(id uint64) { panic(fmt.Sprintf("peer %x is both voter and learner", id)) } - delete(p.nodes, id) + delete(p.voters[0], id) + delete(p.voters[1], id) delete(p.learners, id) delete(p.prs, id) } @@ -369,7 +369,7 @@ func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) { panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr)) } if !isLearner { - p.nodes[id] = struct{}{} + p.voters[0][id] = struct{}{} } else { p.learners[id] = struct{}{} } @@ -391,19 +391,21 @@ func (p *progressTracker) visit(f func(id uint64, pr *Progress)) { // the view of the local raft state machine. Otherwise, it returns // false. func (p *progressTracker) quorumActive() bool { - var act int + votes := map[uint64]bool{} p.visit(func(id uint64, pr *Progress) { - if pr.RecentActive && !pr.IsLearner { - act++ + if pr.IsLearner { + return } + votes[id] = pr.RecentActive }) - return act >= p.quorum() + return p.voters.VoteResult(votes) == quorum.VoteWon } func (p *progressTracker) voterNodes() []uint64 { - nodes := make([]uint64, 0, len(p.nodes)) - for id := range p.nodes { + m := p.voters.IDs() + nodes := make([]uint64, 0, len(m)) + for id := range m { nodes = append(nodes, id) } sort.Sort(uint64Slice(nodes)) @@ -435,22 +437,21 @@ func (p *progressTracker) recordVote(id uint64, v bool) { // tallyVotes returns the number of granted and rejected votes, and whether the // election outcome is known. -func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) { - for _, v := range p.votes { - if v { +func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) { + // Make sure to populate granted/rejected correctly even if the votes slice + // contains members no longer part of the configuration. This doesn't really + // matter in the way the numbers are used (they're informational), but might + // as well get it right. + for id, pr := range p.prs { + if pr.IsLearner { + continue + } + if p.votes[id] { granted++ } else { rejected++ } } - - q := p.quorum() - - result = electionIndeterminate - if granted >= q { - result = electionWon - } else if rejected >= q { - result = electionLost - } + result := p.voters.VoteResult(p.votes) return granted, rejected, result } diff --git a/raft/raft.go b/raft/raft.go index 3372134ae992..cb003b80d1b4 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "go.etcd.io/etcd/raft/quorum" pb "go.etcd.io/etcd/raft/raftpb" ) @@ -737,7 +738,7 @@ func (r *raft) campaign(t CampaignType) { voteMsg = pb.MsgVote term = r.Term } - if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == electionWon { + if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon { // We won the election after voting for ourselves (which must mean that // this is a single-node cluster). Advance to the next state. if t == campaignPreElection { @@ -747,7 +748,7 @@ func (r *raft) campaign(t CampaignType) { } return } - for id := range r.prs.nodes { + for id := range r.prs.voters.IDs() { if id == r.id { continue } @@ -762,15 +763,7 @@ func (r *raft) campaign(t CampaignType) { } } -type electionResult byte - -const ( - electionIndeterminate electionResult = iota - electionLost - electionWon -) - -func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result electionResult) { +func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) { if v { r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term) } else { @@ -988,7 +981,9 @@ func stepLeader(r *raft, m pb.Message) error { r.bcastAppend() return nil case pb.MsgReadIndex: - if !r.prs.isSingleton() { // more than one voting member in cluster + // If more than the local vote is needed, go through a full broadcast, + // otherwise optimize. + if !r.prs.isSingleton() { if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. return nil @@ -1099,7 +1094,7 @@ func stepLeader(r *raft, m pb.Message) error { return nil } - if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) { + if r.prs.voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon { return nil } @@ -1199,14 +1194,14 @@ func stepCandidate(r *raft, m pb.Message) error { gr, rj, res := r.poll(m.From, m.Type, !m.Reject) r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) switch res { - case electionWon: + case quorum.VoteWon: if r.state == StatePreCandidate { r.campaign(campaignElection) } else { r.becomeLeader() r.bcastAppend() } - case electionLost: + case quorum.VoteLost: // pb.MsgPreVoteResp contains future term of pre-candidate // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) @@ -1406,7 +1401,7 @@ func (r *raft) removeNode(id uint64) { r.prs.removeAny(id) // Do not try to commit or abort transferring if the cluster is now empty. - if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 { + if len(r.prs.voters[0]) == 0 && len(r.prs.learners) == 0 { return } diff --git a/raft/raft_test.go b/raft/raft_test.go index ab1037bea54b..ad34e07aafb0 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4300,7 +4300,8 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw learners[i] = true } v.id = id - v.prs.nodes = make(map[uint64]struct{}) + v.prs.voters[0] = make(map[uint64]struct{}) + v.prs.voters[1] = make(map[uint64]struct{}) v.prs.learners = make(map[uint64]struct{}) v.prs.prs = make(map[uint64]*Progress) for i := 0; i < size; i++ { @@ -4309,7 +4310,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw pr.IsLearner = true v.prs.learners[peerAddrs[i]] = struct{}{} } else { - v.prs.nodes[peerAddrs[i]] = struct{}{} + v.prs.voters[0][peerAddrs[i]] = struct{}{} } v.prs.prs[peerAddrs[i]] = pr } diff --git a/raft/read_only.go b/raft/read_only.go index 39eb2b065156..6987f1bd7d7e 100644 --- a/raft/read_only.go +++ b/raft/read_only.go @@ -29,7 +29,11 @@ type ReadState struct { type readIndexStatus struct { req pb.Message index uint64 - acks map[uint64]struct{} + // NB: this never records 'false', but it's more convenient to use this + // instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If + // this becomes performance sensitive enough (doubtful), quorum.VoteResult + // can change to an API that is closer to that of CommittedIndex. + acks map[uint64]bool } type readOnly struct { @@ -54,20 +58,20 @@ func (ro *readOnly) addRequest(index uint64, m pb.Message) { if _, ok := ro.pendingReadIndex[s]; ok { return } - ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} + ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)} ro.readIndexQueue = append(ro.readIndexQueue, s) } // recvAck notifies the readonly struct that the raft state machine received // an acknowledgment of the heartbeat that attached with the read only request // context. -func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} { +func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool { rs, ok := ro.pendingReadIndex[string(context)] if !ok { return nil } - rs.acks[id] = struct{}{} + rs.acks[id] = true return rs.acks }