Skip to content

Commit

Permalink
raft: support to do multiple proposals in one message
Browse files Browse the repository at this point in the history
  • Loading branch information
yichengq committed Dec 11, 2014
1 parent 3961214 commit cb09579
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
28 changes: 16 additions & 12 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,13 @@ func (r *raft) reset(term uint64) {
r.pendingConf = false
}

func (r *raft) appendEntry(e pb.Entry) {
e.Term = r.Term
e.Index = r.raftLog.lastIndex() + 1
r.raftLog.append(e)
func (r *raft) appendEntry(es ...pb.Entry) {
li := r.raftLog.lastIndex()
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
r.raftLog.append(es...)
r.prs[r.id].update(r.raftLog.lastIndex())
r.maybeCommit()
}
Expand Down Expand Up @@ -446,17 +449,18 @@ func stepLeader(r *raft, m pb.Message) {
case pb.MsgBeat:
r.bcastHeartbeat()
case pb.MsgProp:
if len(m.Entries) != 1 {
panic("unexpected length(entries) of a MsgProp")
if len(m.Entries) == 0 {
log.Panicf("raft: %x stepped empty MsgProp", r.id)
}
e := m.Entries[0]
if e.Type == pb.EntryConfChange {
if r.pendingConf {
return
for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConf {
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
}
r.pendingConf = true
}
r.pendingConf = true
}
r.appendEntry(e)
r.appendEntry(m.Entries...)
r.bcastAppend()
case pb.MsgAppResp:
if m.Reject {
Expand Down
9 changes: 5 additions & 4 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,8 +1245,8 @@ func TestStepConfig(t *testing.T) {
}

// TestStepIgnoreConfig tests that if raft step the second msgProp in
// EntryConfChange type when the first one is uncommitted, the node will deny
// the proposal and keep its original state.
// EntryConfChange type when the first one is uncommitted, the node will set
// the proposal to noop and keep its original state.
func TestStepIgnoreConfig(t *testing.T) {
// a raft that cannot make progress
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
Expand All @@ -1256,8 +1256,9 @@ func TestStepIgnoreConfig(t *testing.T) {
index := r.raftLog.lastIndex()
pendingConf := r.pendingConf
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
if g := r.raftLog.lastIndex(); g != index {
t.Errorf("index = %d, want %d", g, index)
wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
if ents := r.raftLog.entries(index + 1); !reflect.DeepEqual(ents, wents) {
t.Errorf("ents = %+v, want %+v", ents, wents)
}
if r.pendingConf != pendingConf {
t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
Expand Down

0 comments on commit cb09579

Please sign in to comment.