Skip to content

Commit

Permalink
raft: use half-populated joint quorum
Browse files Browse the repository at this point in the history
To ease a future transition into joint quorums, this commit removes the
previous "ad-hoc" majority-based quorum and vote computations with that
introduced in the `raft/quorum` package.

More specifically, the progressTracker now uses a quorum.JointConfig for
which the "second" majority quorum is always empty; in this case the
quorum behaves like the one quorum.MajorityConfig that is actually
present. Or, more briefly, this change is a no-op, but it will take the
busywork out of actually starting to make use of joint quorums in the
future.

On a side node, I suspect that this might've fixed a bug regarding the
read index though I haven't been able to explicitly come up with a
counter-example. The problem was that the acks collected for the read
index weren't taking into account membership changes, so they'd run the
danger of using acks from nodes since removed to claim that a quorum of
acks had been received. There's a chance that there isn't a
counter-example (the only guarantee extracted from the "quorum" is that
there isn't another leader, but even if there's another leader all that
matters is that that leader doesn't have a divergent history from the
stale leader in the hypothetical counter-example), but either way there
is morally a bug here that is now fixed because VoteCommitted doesn't
care about votes from members that are not voters known to the currently
active configuration.
  • Loading branch information
tbg committed Jun 19, 2019
1 parent 0384c58 commit e039629
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 68 deletions.
93 changes: 47 additions & 46 deletions raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package raft
import (
"fmt"
"sort"

"go.etcd.io/etcd/raft/quorum"
)

const (
Expand Down Expand Up @@ -291,64 +293,61 @@ func (in *inflights) reset() {
// known about the nodes and learners in it. In particular, it tracks the match
// index for each peer which in turn allows reasoning about the committed index.
type progressTracker struct {
nodes map[uint64]struct{}
voters quorum.JointConfig
learners map[uint64]struct{}
prs map[uint64]*Progress

votes map[uint64]bool

maxInflight int
matchBuf uint64Slice
}

func makeProgressTracker(maxInflight int) progressTracker {
p := progressTracker{
maxInflight: maxInflight,
prs: map[uint64]*Progress{},
nodes: map[uint64]struct{}{},
learners: map[uint64]struct{}{},
votes: map[uint64]bool{},
voters: quorum.JointConfig{
quorum.MajorityConfig{},
quorum.MajorityConfig{},
},
learners: map[uint64]struct{}{},
votes: map[uint64]bool{},
prs: map[uint64]*Progress{},
}
return p
}

// isSingleton returns true if (and only if) there is only one voting member
// (i.e. the leader) in the current configuration.
func (p *progressTracker) isSingleton() bool {
return len(p.nodes) == 1
return len(p.voters[0]) == 1 && len(p.voters[1]) == 0
}

func (p *progressTracker) quorum() int {
return len(p.nodes)/2 + 1
}
type progressAckIndexer map[uint64]*Progress

func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool {
return len(m) >= p.quorum()
var _ quorum.AckedIndexer = progressAckIndexer(nil)

func (l progressAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) {
pr, ok := l[id]
if !ok {
return 0, false
}
return quorum.Index(pr.Match), true
}

// committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
func (p *progressTracker) committed() uint64 {
// Preserving matchBuf across calls is an optimization
// used to avoid allocating a new slice on each call.
if cap(p.matchBuf) < len(p.nodes) {
p.matchBuf = make(uint64Slice, len(p.nodes))
}
p.matchBuf = p.matchBuf[:len(p.nodes)]
idx := 0
for id := range p.nodes {
p.matchBuf[idx] = p.prs[id].Match
idx++
}
sort.Sort(&p.matchBuf)
return p.matchBuf[len(p.matchBuf)-p.quorum()]
return uint64(p.voters.CommittedIndex(progressAckIndexer(p.prs)))
}

func (p *progressTracker) removeAny(id uint64) {
_, okPR := p.prs[id]
_, okV := p.nodes[id]
_, okV1 := p.voters[0][id]
_, okV2 := p.voters[1][id]
_, okL := p.learners[id]

okV := okV1 || okV2

if !okPR {
panic("attempting to remove unknown peer %x")
} else if !okV && !okL {
Expand All @@ -357,7 +356,8 @@ func (p *progressTracker) removeAny(id uint64) {
panic(fmt.Sprintf("peer %x is both voter and learner", id))
}

delete(p.nodes, id)
delete(p.voters[0], id)
delete(p.voters[1], id)
delete(p.learners, id)
delete(p.prs, id)
}
Expand All @@ -369,7 +369,7 @@ func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
}
if !isLearner {
p.nodes[id] = struct{}{}
p.voters[0][id] = struct{}{}
} else {
p.learners[id] = struct{}{}
}
Expand All @@ -391,19 +391,21 @@ func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
// the view of the local raft state machine. Otherwise, it returns
// false.
func (p *progressTracker) quorumActive() bool {
var act int
votes := map[uint64]bool{}
p.visit(func(id uint64, pr *Progress) {
if pr.RecentActive && !pr.IsLearner {
act++
if pr.IsLearner {
return
}
votes[id] = pr.RecentActive
})

return act >= p.quorum()
return p.voters.VoteResult(votes) == quorum.VoteWon
}

func (p *progressTracker) voterNodes() []uint64 {
nodes := make([]uint64, 0, len(p.nodes))
for id := range p.nodes {
m := p.voters.IDs()
nodes := make([]uint64, 0, len(m))
for id := range m {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
Expand Down Expand Up @@ -435,22 +437,21 @@ func (p *progressTracker) recordVote(id uint64, v bool) {

// tallyVotes returns the number of granted and rejected votes, and whether the
// election outcome is known.
func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) {
for _, v := range p.votes {
if v {
func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) {
// Make sure to populate granted/rejected correctly even if the votes slice
// contains members no longer part of the configuration. This doesn't really
// matter in the way the numbers are used (they're informational), but might
// as well get it right.
for id, pr := range p.prs {
if pr.IsLearner {
continue
}
if p.votes[id] {
granted++
} else {
rejected++
}
}

q := p.quorum()

result = electionIndeterminate
if granted >= q {
result = electionWon
} else if rejected >= q {
result = electionLost
}
result := p.voters.VoteResult(p.votes)
return granted, rejected, result
}
27 changes: 11 additions & 16 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"go.etcd.io/etcd/raft/quorum"
pb "go.etcd.io/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -744,7 +745,7 @@ func (r *raft) campaign(t CampaignType) {
voteMsg = pb.MsgVote
term = r.Term
}
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == electionWon {
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
Expand All @@ -754,7 +755,7 @@ func (r *raft) campaign(t CampaignType) {
}
return
}
for id := range r.prs.nodes {
for id := range r.prs.voters.IDs() {
if id == r.id {
continue
}
Expand All @@ -769,15 +770,7 @@ func (r *raft) campaign(t CampaignType) {
}
}

type electionResult byte

const (
electionIndeterminate electionResult = iota
electionLost
electionWon
)

func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result electionResult) {
func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
if v {
r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
} else {
Expand Down Expand Up @@ -999,7 +992,9 @@ func stepLeader(r *raft, m pb.Message) error {
r.bcastAppend()
return nil
case pb.MsgReadIndex:
if !r.prs.isSingleton() { // more than one voting member in cluster
// If more than the local vote is needed, go through a full broadcast,
// otherwise optimize.
if !r.prs.isSingleton() {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return nil
Expand Down Expand Up @@ -1110,7 +1105,7 @@ func stepLeader(r *raft, m pb.Message) error {
return nil
}

if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) {
if r.prs.voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}

Expand Down Expand Up @@ -1210,14 +1205,14 @@ func stepCandidate(r *raft, m pb.Message) error {
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
switch res {
case electionWon:
case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case electionLost:
case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
Expand Down Expand Up @@ -1417,7 +1412,7 @@ func (r *raft) removeNode(id uint64) {
r.prs.removeAny(id)

// Do not try to commit or abort transferring if the cluster is now empty.
if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 {
if len(r.prs.voters[0]) == 0 && len(r.prs.learners) == 0 {
return
}

Expand Down
5 changes: 3 additions & 2 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4334,7 +4334,8 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
learners[i] = true
}
v.id = id
v.prs.nodes = make(map[uint64]struct{})
v.prs.voters[0] = make(map[uint64]struct{})
v.prs.voters[1] = make(map[uint64]struct{})
v.prs.learners = make(map[uint64]struct{})
v.prs.prs = make(map[uint64]*Progress)
for i := 0; i < size; i++ {
Expand All @@ -4343,7 +4344,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
pr.IsLearner = true
v.prs.learners[peerAddrs[i]] = struct{}{}
} else {
v.prs.nodes[peerAddrs[i]] = struct{}{}
v.prs.voters[0][peerAddrs[i]] = struct{}{}
}
v.prs.prs[peerAddrs[i]] = pr
}
Expand Down
12 changes: 8 additions & 4 deletions raft/read_only.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ type ReadState struct {
type readIndexStatus struct {
req pb.Message
index uint64
acks map[uint64]struct{}
// NB: this never records 'false', but it's more convenient to use this
// instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If
// this becomes performance sensitive enough (doubtful), quorum.VoteResult
// can change to an API that is closer to that of CommittedIndex.
acks map[uint64]bool
}

type readOnly struct {
Expand All @@ -54,20 +58,20 @@ func (ro *readOnly) addRequest(index uint64, m pb.Message) {
if _, ok := ro.pendingReadIndex[s]; ok {
return
}
ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)}
ro.readIndexQueue = append(ro.readIndexQueue, s)
}

// recvAck notifies the readonly struct that the raft state machine received
// an acknowledgment of the heartbeat that attached with the read only request
// context.
func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} {
func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool {
rs, ok := ro.pendingReadIndex[string(context)]
if !ok {
return nil
}

rs.acks[id] = struct{}{}
rs.acks[id] = true
return rs.acks
}

Expand Down

0 comments on commit e039629

Please sign in to comment.