Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: extract progress tracking into own component #10683

Merged
merged 9 commits into from
May 22, 2019
10 changes: 5 additions & 5 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,15 +353,15 @@ func (n *node) run(r *raft) {
}
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
if pr := r.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
case cc := <-n.confc:
if cc.NodeID == None {
select {
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
Nodes: r.prs.voterNodes(),
Learners: r.prs.learnerNodes()}:
case <-n.done:
}
break
Expand All @@ -384,8 +384,8 @@ func (n *node) run(r *raft) {
}
select {
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
Nodes: r.prs.voterNodes(),
Learners: r.prs.learnerNodes()}:
case <-n.done:
}
case <-n.tickc:
Expand Down
177 changes: 176 additions & 1 deletion raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package raft

import "fmt"
import (
"fmt"
"sort"
)

const (
ProgressStateProbe ProgressStateType = iota
Expand Down Expand Up @@ -283,3 +286,175 @@ func (in *inflights) reset() {
in.count = 0
in.start = 0
}

// progressTracker tracks the currently active configuration and the information
// known about the nodes and learners in it. In particular, it tracks the match
// index for each peer which in turn allows reasoning about the committed index.
type progressTracker struct {
nodes map[uint64]*Progress
learners map[uint64]*Progress

votes map[uint64]bool

maxInflight int
matchBuf uint64Slice
}

func makePRS(maxInflight int) progressTracker {
p := progressTracker{
maxInflight: maxInflight,
nodes: map[uint64]*Progress{},
learners: map[uint64]*Progress{},
votes: map[uint64]bool{},
}
return p
}

// isSingleton returns true if (and only if) there is only one voting member
// (i.e. the leader) in the current configuration.
func (p *progressTracker) isSingleton() bool {
return len(p.nodes) == 1
}

func (p *progressTracker) quorum() int {
return len(p.nodes)/2 + 1
}

func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool {
return len(m) >= p.quorum()
}

// committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
func (p *progressTracker) committed() uint64 {
// Preserving matchBuf across calls is an optimization
// used to avoid allocating a new slice on each call.
if cap(p.matchBuf) < len(p.nodes) {
p.matchBuf = make(uint64Slice, len(p.nodes))
}
p.matchBuf = p.matchBuf[:len(p.nodes)]
idx := 0
for _, pr := range p.nodes {
p.matchBuf[idx] = pr.Match
idx++
}
sort.Sort(&p.matchBuf)
return p.matchBuf[len(p.matchBuf)-p.quorum()]
}

func (p *progressTracker) removeAny(id uint64) {
pN := p.nodes[id]
pL := p.learners[id]

if pN == nil && pL == nil {
panic("attempting to remove unknown peer %x")
} else if pN != nil && pL != nil {
panic(fmt.Sprintf("peer %x is both voter and learner", id))
}

delete(p.nodes, id)
delete(p.learners, id)
}

// initProgress initializes a new progress for the given node or learner. The
// node may not exist yet in either form or a panic will ensue.
func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
if pr := p.nodes[id]; pr != nil {
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
}
if pr := p.learners[id]; pr != nil {
panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr))
}
if !isLearner {
p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)}
return
}
p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true}
}

func (p *progressTracker) getProgress(id uint64) *Progress {
if pr, ok := p.nodes[id]; ok {
return pr
}

return p.learners[id]
}

// visit invokes the supplied closure for all tracked progresses.
func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
for id, pr := range p.nodes {
f(id, pr)
}

for id, pr := range p.learners {
f(id, pr)
}
}

// checkQuorumActive returns true if the quorum is active from
// the view of the local raft state machine. Otherwise, it returns
// false.
func (p *progressTracker) quorumActive() bool {
var act int
p.visit(func(id uint64, pr *Progress) {
if pr.RecentActive && !pr.IsLearner {
act++
}
})

return act >= p.quorum()
}

func (p *progressTracker) voterNodes() []uint64 {
nodes := make([]uint64, 0, len(p.nodes))
for id := range p.nodes {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

func (p *progressTracker) learnerNodes() []uint64 {
nodes := make([]uint64, 0, len(p.learners))
for id := range p.learners {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

// resetVotes prepares for a new round of vote counting via recordVote.
func (p *progressTracker) resetVotes() {
p.votes = map[uint64]bool{}
}

// recordVote records that the node with the given id voted for this Raft
// instance if v == true (and declined it otherwise).
func (p *progressTracker) recordVote(id uint64, v bool) {
_, ok := p.votes[id]
if !ok {
p.votes[id] = v
}
}

// tallyVotes returns the number of granted and rejected votes, and whether the
// election outcome is known.
func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) {
for _, v := range p.votes {
if v {
granted++
} else {
rejected++
}
}

q := p.quorum()

result = electionIndeterminate
if granted >= q {
result = electionWon
} else if rejected >= q {
result = electionLost
}
return granted, rejected, result
}
Loading