Skip to content

Commit

Permalink
raft: use half-populated joint quorum
Browse files Browse the repository at this point in the history
To ease a future transition into joint quorums, this commit removes the
previous "ad-hoc" majority-based quorum and vote computations with that
introduced in the `raft/quorum` package.

More specifically, the progressTracker now uses a quorum.JointConfig for
which the "second" majority quorum is always empty; in this case the
quorum behaves like the one quorum.MajorityConfig that is actually
present. Or, more briefly, this change is a no-op, but it will take the
busywork out of actually starting to make use of joint quorums in the
future.
  • Loading branch information
tbg committed Jun 3, 2019
1 parent c225ba6 commit d218d94
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 68 deletions.
93 changes: 47 additions & 46 deletions raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package raft
import (
"fmt"
"sort"

"go.etcd.io/etcd/v3/raft/quorum"
)

const (
Expand Down Expand Up @@ -291,64 +293,61 @@ func (in *inflights) reset() {
// known about the nodes and learners in it. In particular, it tracks the match
// index for each peer which in turn allows reasoning about the committed index.
type progressTracker struct {
nodes map[uint64]struct{}
voters quorum.JointConfig
learners map[uint64]struct{}
prs map[uint64]*Progress

votes map[uint64]bool

maxInflight int
matchBuf uint64Slice
}

func makeProgressTracker(maxInflight int) progressTracker {
p := progressTracker{
maxInflight: maxInflight,
prs: map[uint64]*Progress{},
nodes: map[uint64]struct{}{},
learners: map[uint64]struct{}{},
votes: map[uint64]bool{},
voters: quorum.JointConfig{
quorum.MajorityConfig{},
quorum.MajorityConfig{},
},
learners: map[uint64]struct{}{},
votes: map[uint64]bool{},
prs: map[uint64]*Progress{},
}
return p
}

// isSingleton returns true if (and only if) there is only one voting member
// (i.e. the leader) in the current configuration.
func (p *progressTracker) isSingleton() bool {
return len(p.nodes) == 1
return len(p.voters[0]) == 1 && len(p.voters[1]) == 0
}

func (p *progressTracker) quorum() int {
return len(p.nodes)/2 + 1
}
type matchLookuper map[uint64]*Progress

func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool {
return len(m) >= p.quorum()
var _ quorum.IndexLookuper = matchLookuper(nil)

func (l matchLookuper) Index(id uint64) (uint64, bool) {
pr, ok := l[id]
if !ok {
return 0, false
}
return pr.Match, true
}

// committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
func (p *progressTracker) committed() uint64 {
// Preserving matchBuf across calls is an optimization
// used to avoid allocating a new slice on each call.
if cap(p.matchBuf) < len(p.nodes) {
p.matchBuf = make(uint64Slice, len(p.nodes))
}
p.matchBuf = p.matchBuf[:len(p.nodes)]
idx := 0
for id := range p.nodes {
p.matchBuf[idx] = p.prs[id].Match
idx++
}
sort.Sort(&p.matchBuf)
return p.matchBuf[len(p.matchBuf)-p.quorum()]
return p.voters.CommittedIndex(matchLookuper(p.prs)).Definitely
}

func (p *progressTracker) removeAny(id uint64) {
_, okPR := p.prs[id]
_, okV := p.nodes[id]
_, okV1 := p.voters[0][id]
_, okV2 := p.voters[1][id]
_, okL := p.learners[id]

okV := okV1 || okV2

if !okPR {
panic("attempting to remove unknown peer %x")
} else if !okV && !okL {
Expand All @@ -357,7 +356,8 @@ func (p *progressTracker) removeAny(id uint64) {
panic(fmt.Sprintf("peer %x is both voter and learner", id))
}

delete(p.nodes, id)
delete(p.voters[0], id)
delete(p.voters[1], id)
delete(p.learners, id)
delete(p.prs, id)
}
Expand All @@ -369,7 +369,7 @@ func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
}
if !isLearner {
p.nodes[id] = struct{}{}
p.voters[0][id] = struct{}{}
} else {
p.learners[id] = struct{}{}
}
Expand All @@ -391,19 +391,21 @@ func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
// the view of the local raft state machine. Otherwise, it returns
// false.
func (p *progressTracker) quorumActive() bool {
var act int
votes := map[uint64]bool{}
p.visit(func(id uint64, pr *Progress) {
if pr.RecentActive && !pr.IsLearner {
act++
if pr.IsLearner {
return
}
votes[id] = pr.RecentActive
})

return act >= p.quorum()
return p.voters.VoteResult(votes) == quorum.VoteWon
}

func (p *progressTracker) voterNodes() []uint64 {
nodes := make([]uint64, 0, len(p.nodes))
for id := range p.nodes {
m := p.voters.IDs()
nodes := make([]uint64, 0, len(m))
for id := range m {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
Expand Down Expand Up @@ -435,22 +437,21 @@ func (p *progressTracker) recordVote(id uint64, v bool) {

// tallyVotes returns the number of granted and rejected votes, and whether the
// election outcome is known.
func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) {
for _, v := range p.votes {
if v {
func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) {
// Make sure to populate granted/rejected correctly even if the votes slice
// contains members no longer part of the configuration. This doesn't really
// matter in the way the numbers are used (they're informational), but might
// as well get it right.
for id, pr := range p.prs {
if pr.IsLearner {
continue
}
if p.votes[id] {
granted++
} else {
rejected++
}
}

q := p.quorum()

result = electionIndeterminate
if granted >= q {
result = electionWon
} else if rejected >= q {
result = electionLost
}
result := p.voters.VoteResult(p.votes)
return granted, rejected, result
}
27 changes: 11 additions & 16 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"go.etcd.io/etcd/raft/quorum"
pb "go.etcd.io/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -737,7 +738,7 @@ func (r *raft) campaign(t CampaignType) {
voteMsg = pb.MsgVote
term = r.Term
}
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == electionWon {
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
Expand All @@ -747,7 +748,7 @@ func (r *raft) campaign(t CampaignType) {
}
return
}
for id := range r.prs.nodes {
for id := range r.prs.voters.IDs() {
if id == r.id {
continue
}
Expand All @@ -762,15 +763,7 @@ func (r *raft) campaign(t CampaignType) {
}
}

type electionResult byte

const (
electionIndeterminate electionResult = iota
electionLost
electionWon
)

func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result electionResult) {
func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
if v {
r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
} else {
Expand Down Expand Up @@ -988,7 +981,9 @@ func stepLeader(r *raft, m pb.Message) error {
r.bcastAppend()
return nil
case pb.MsgReadIndex:
if !r.prs.isSingleton() { // more than one voting member in cluster
// If more than the local vote is needed, go through a full broadcast,
// otherwise optimize.
if !r.prs.isSingleton() {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return nil
Expand Down Expand Up @@ -1099,7 +1094,7 @@ func stepLeader(r *raft, m pb.Message) error {
return nil
}

if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) {
if r.prs.voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}

Expand Down Expand Up @@ -1199,14 +1194,14 @@ func stepCandidate(r *raft, m pb.Message) error {
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
switch res {
case electionWon:
case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case electionLost:
case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
Expand Down Expand Up @@ -1406,7 +1401,7 @@ func (r *raft) removeNode(id uint64) {
r.prs.removeAny(id)

// Do not try to commit or abort transferring if the cluster is now empty.
if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 {
if len(r.prs.voters[0]) == 0 && len(r.prs.learners) == 0 {
return
}

Expand Down
5 changes: 3 additions & 2 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4300,7 +4300,8 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
learners[i] = true
}
v.id = id
v.prs.nodes = make(map[uint64]struct{})
v.prs.voters[0] = make(map[uint64]struct{})
v.prs.voters[1] = make(map[uint64]struct{})
v.prs.learners = make(map[uint64]struct{})
v.prs.prs = make(map[uint64]*Progress)
for i := 0; i < size; i++ {
Expand All @@ -4309,7 +4310,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
pr.IsLearner = true
v.prs.learners[peerAddrs[i]] = struct{}{}
} else {
v.prs.nodes[peerAddrs[i]] = struct{}{}
v.prs.voters[0][peerAddrs[i]] = struct{}{}
}
v.prs.prs[peerAddrs[i]] = pr
}
Expand Down
12 changes: 8 additions & 4 deletions raft/read_only.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ type ReadState struct {
type readIndexStatus struct {
req pb.Message
index uint64
acks map[uint64]struct{}
// NB: this never records 'false', but it's more convenient to use this
// instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If
// this becomes performance sensitive enough (doubtful), quorum.VoteResult
// can change to an API that is closer to that of CommittedIndex.
acks map[uint64]bool
}

type readOnly struct {
Expand All @@ -54,20 +58,20 @@ func (ro *readOnly) addRequest(index uint64, m pb.Message) {
if _, ok := ro.pendingReadIndex[s]; ok {
return
}
ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)}
ro.readIndexQueue = append(ro.readIndexQueue, s)
}

// recvAck notifies the readonly struct that the raft state machine received
// an acknowledgment of the heartbeat that attached with the read only request
// context.
func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} {
func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool {
rs, ok := ro.pendingReadIndex[string(context)]
if !ok {
return nil
}

rs.acks[id] = struct{}{}
rs.acks[id] = true
return rs.acks
}

Expand Down

0 comments on commit d218d94

Please sign in to comment.