Skip to content

Commit

Permalink
raft: raft learners should be returned after applyConfChange
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Jan 9, 2018
1 parent 6f76e46 commit 935fe85
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 10 deletions.
8 changes: 6 additions & 2 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ func (n *node) run(r *raft) {
if cc.NodeID == None {
r.resetPendingConf()
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
case <-n.done:
}
break
Expand All @@ -349,7 +351,9 @@ func (n *node) run(r *raft) {
panic("unexpected conf type")
}
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
case <-n.done:
}
case <-n.tickc:
Expand Down
51 changes: 51 additions & 0 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,3 +728,54 @@ func TestIsHardStateEqual(t *testing.T) {
}
}
}

func TestNodeProposeAddLearnerNode(t *testing.T) {
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
n.Campaign(context.TODO())
stop := make(chan struct{})
done := make(chan struct{})
applyConfChan := make(chan struct{})
go func() {
defer close(done)
for {
select {
case <-stop:
return
case <-ticker.C:
n.Tick()
case rd := <-n.Ready():
s.Append(rd.Entries)
t.Logf("raft: %v", rd.Entries)
for _, ent := range rd.Entries {
if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(ent.Data)
state := n.ApplyConfChange(cc)
if len(state.Learners) == 0 ||
state.Learners[0] != cc.NodeID ||
cc.NodeID != 2 {
t.Fatalf("apply conf change should return new added learner: %v", state.String())
}

if len(state.Nodes) != 1 {
t.Fatalf("add learner should not change the nodes: %v", state.String())
}
t.Logf("apply raft conf %v changed to: %v", cc, state.String())
applyConfChan <- struct{}{}
}
}
n.Advance()
}
}
}()
cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode, NodeID: 2}
n.ProposeConfChange(context.TODO(), cc)
<-applyConfChan
close(stop)
<-done
}
8 changes: 7 additions & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,16 @@ func (r *raft) hardState() pb.HardState {
func (r *raft) quorum() int { return len(r.prs)/2 + 1 }

func (r *raft) nodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs)+len(r.learnerPrs))
nodes := make([]uint64, 0, len(r.prs))
for id := range r.prs {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

func (r *raft) learnerNodes() []uint64 {
nodes := make([]uint64, 0, len(r.learnerPrs))
for id := range r.learnerPrs {
nodes = append(nodes, id)
}
Expand Down
18 changes: 13 additions & 5 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2475,8 +2475,12 @@ func TestRestoreWithLearner(t *testing.T) {
t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
}
sg := sm.nodes()
if len(sg) != len(s.Metadata.ConfState.Nodes)+len(s.Metadata.ConfState.Learners) {
t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState)
if len(sg) != len(s.Metadata.ConfState.Nodes) {
t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Nodes)
}
lns := sm.learnerNodes()
if len(lns) != len(s.Metadata.ConfState.Learners) {
t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners)
}
for _, n := range s.Metadata.ConfState.Nodes {
if sm.prs[n].IsLearner {
Expand Down Expand Up @@ -2827,8 +2831,8 @@ func TestAddLearner(t *testing.T) {
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes()
wnodes := []uint64{1, 2}
nodes := r.learnerNodes()
wnodes := []uint64{2}
if !reflect.DeepEqual(nodes, wnodes) {
t.Errorf("nodes = %v, want %v", nodes, wnodes)
}
Expand Down Expand Up @@ -2908,9 +2912,13 @@ func TestRemoveLearner(t *testing.T) {
t.Errorf("nodes = %v, want %v", g, w)
}

w = []uint64{}
if g := r.learnerNodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}

// remove all nodes from cluster
r.removeNode(1)
w = []uint64{}
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
}
Expand Down
4 changes: 2 additions & 2 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
if cc.NodeID == None {
rn.raft.resetPendingConf()
return &pb.ConfState{Nodes: rn.raft.nodes()}
return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()}
}
switch cc.Type {
case pb.ConfChangeAddNode:
Expand All @@ -184,7 +184,7 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
default:
panic("unexpected conf type")
}
return &pb.ConfState{Nodes: rn.raft.nodes()}
return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()}
}

// Step advances the state machine using the given message.
Expand Down

0 comments on commit 935fe85

Please sign in to comment.