Skip to content

Commit

Permalink
raft: use membership sets in progress tracking
Browse files Browse the repository at this point in the history
Instead of having disjoint mappings of ID to *Progress for voters and
learners, use a map[id]struct{} for each and share a map of *Progress
among them.

This is easier to handle when joint quorums are introduced, at which
point a node may be a voting member of two quorums.
  • Loading branch information
tbg committed Jun 3, 2019
1 parent 0ff9ff8 commit dcc85aa
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 115 deletions.
50 changes: 23 additions & 27 deletions raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,9 @@ func (in *inflights) reset() {
// known about the nodes and learners in it. In particular, it tracks the match
// index for each peer which in turn allows reasoning about the committed index.
type progressTracker struct {
nodes map[uint64]*Progress
learners map[uint64]*Progress
nodes map[uint64]struct{}
learners map[uint64]struct{}
prs map[uint64]*Progress

votes map[uint64]bool

Expand All @@ -303,8 +304,9 @@ type progressTracker struct {
func makePRS(maxInflight int) progressTracker {
p := progressTracker{
maxInflight: maxInflight,
nodes: map[uint64]*Progress{},
learners: map[uint64]*Progress{},
prs: map[uint64]*Progress{},
nodes: map[uint64]struct{}{},
learners: map[uint64]struct{}{},
votes: map[uint64]bool{},
}
return p
Expand Down Expand Up @@ -334,59 +336,53 @@ func (p *progressTracker) committed() uint64 {
}
p.matchBuf = p.matchBuf[:len(p.nodes)]
idx := 0
for _, pr := range p.nodes {
p.matchBuf[idx] = pr.Match
for id := range p.nodes {
p.matchBuf[idx] = p.prs[id].Match
idx++
}
sort.Sort(&p.matchBuf)
return p.matchBuf[len(p.matchBuf)-p.quorum()]
}

func (p *progressTracker) removeAny(id uint64) {
pN := p.nodes[id]
pL := p.learners[id]
_, okPR := p.prs[id]
_, okV := p.nodes[id]
_, okL := p.learners[id]

if pN == nil && pL == nil {
if !okPR {
panic("attempting to remove unknown peer %x")
} else if !okV && !okL {
panic("attempting to remove unknown peer %x")
} else if pN != nil && pL != nil {
} else if okV && okL {
panic(fmt.Sprintf("peer %x is both voter and learner", id))
}

delete(p.nodes, id)
delete(p.learners, id)
delete(p.prs, id)
}

// initProgress initializes a new progress for the given node or learner. The
// node may not exist yet in either form or a panic will ensue.
func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
if pr := p.nodes[id]; pr != nil {
if pr := p.prs[id]; pr != nil {
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
}
if pr := p.learners[id]; pr != nil {
panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr))
}
if !isLearner {
p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)}
return
p.nodes[id] = struct{}{}
} else {
p.learners[id] = struct{}{}
}
p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true}
p.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: isLearner}
}

func (p *progressTracker) getProgress(id uint64) *Progress {
if pr, ok := p.nodes[id]; ok {
return pr
}

return p.learners[id]
return p.prs[id]
}

// visit invokes the supplied closure for all tracked progresses.
func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
for id, pr := range p.nodes {
f(id, pr)
}

for id, pr := range p.learners {
for id, pr := range p.prs {
f(id, pr)
}
}
Expand Down
6 changes: 3 additions & 3 deletions raft/raft_flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestMsgAppFlowControlFull(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

pr2 := r.prs.nodes[2]
pr2 := r.prs.prs[2]
// force the progress to be in replicate state
pr2.becomeReplicate()
// fill in the inflights window
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

pr2 := r.prs.nodes[2]
pr2 := r.prs.prs[2]
// force the progress to be in replicate state
pr2.becomeReplicate()
// fill in the inflights window
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

pr2 := r.prs.nodes[2]
pr2 := r.prs.prs[2]
// force the progress to be in replicate state
pr2.becomeReplicate()
// fill in the inflights window
Expand Down
58 changes: 29 additions & 29 deletions raft/raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {

// force set the next of node 2, so that
// node 2 needs a snapshot
sm.prs.nodes[2].Next = sm.raftLog.firstIndex()
sm.prs.prs[2].Next = sm.raftLog.firstIndex()

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true})
if sm.prs.nodes[2].PendingSnapshot != 11 {
t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.nodes[2].PendingSnapshot)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true})
if sm.prs.prs[2].PendingSnapshot != 11 {
t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.prs[2].PendingSnapshot)
}
}

Expand All @@ -56,7 +56,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()

sm.prs.nodes[2].becomeSnapshot(11)
sm.prs.prs[2].becomeSnapshot(11)

sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
msgs := sm.readMessages()
Expand All @@ -73,18 +73,18 @@ func TestSnapshotFailure(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()

sm.prs.nodes[2].Next = 1
sm.prs.nodes[2].becomeSnapshot(11)
sm.prs.prs[2].Next = 1
sm.prs.prs[2].becomeSnapshot(11)

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
if sm.prs.nodes[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
if sm.prs.prs[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
}
if sm.prs.nodes[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.prs.nodes[2].Next)
if sm.prs.prs[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.prs.prs[2].Next)
}
if !sm.prs.nodes[2].Paused {
t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused)
if !sm.prs.prs[2].Paused {
t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused)
}
}

Expand All @@ -96,18 +96,18 @@ func TestSnapshotSucceed(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()

sm.prs.nodes[2].Next = 1
sm.prs.nodes[2].becomeSnapshot(11)
sm.prs.prs[2].Next = 1
sm.prs.prs[2].becomeSnapshot(11)

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
if sm.prs.nodes[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
if sm.prs.prs[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
}
if sm.prs.nodes[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.prs.nodes[2].Next)
if sm.prs.prs[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.prs.prs[2].Next)
}
if !sm.prs.nodes[2].Paused {
t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused)
if !sm.prs.prs[2].Paused {
t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused)
}
}

Expand Down Expand Up @@ -206,7 +206,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) {
mustSend(n2, n1, pb.MsgAppResp)

// Leader has correct state for follower.
pr := n1.prs.nodes[2]
pr := n1.prs.prs[2]
if pr.State != ProgressStateReplicate {
t.Fatalf("unexpected state %v", pr)
}
Expand All @@ -227,23 +227,23 @@ func TestSnapshotAbort(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()

sm.prs.nodes[2].Next = 1
sm.prs.nodes[2].becomeSnapshot(11)
sm.prs.prs[2].Next = 1
sm.prs.prs[2].becomeSnapshot(11)

// A successful msgAppResp that has a higher/equal index than the
// pending snapshot should abort the pending snapshot.
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
if sm.prs.nodes[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
if sm.prs.prs[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
}
// The follower entered ProgressStateReplicate and the leader send an append
// and optimistically updated the progress (so we see 13 instead of 12).
// There is something to append because the leader appended an empty entry
// to the log at index 12 when it assumed leadership.
if sm.prs.nodes[2].Next != 13 {
t.Fatalf("Next = %d, want 13", sm.prs.nodes[2].Next)
if sm.prs.prs[2].Next != 13 {
t.Fatalf("Next = %d, want 13", sm.prs.prs[2].Next)
}
if n := sm.prs.nodes[2].ins.count; n != 1 {
if n := sm.prs.prs[2].ins.count; n != 1 {
t.Fatalf("expected an inflight message, got %d", n)
}
}
Loading

0 comments on commit dcc85aa

Please sign in to comment.