diff --git a/raft/multinode.go b/raft/multinode.go new file mode 100644 index 00000000000..ba7677f66ae --- /dev/null +++ b/raft/multinode.go @@ -0,0 +1,268 @@ +package raft + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" + pb "github.com/coreos/etcd/raft/raftpb" +) + +// MultiNode represents a node that is participating in multiple consensus groups. +type MultiNode interface { + CreateGroup(group uint64, peers []Peer) error + Tick() + Propose(ctx context.Context, group uint64, data []byte) error + ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error + ApplyConfChange(group uint64, cc pb.ConfChange) + Step(ctx context.Context, group uint64, msg pb.Message) error + Ready() <-chan map[uint64]Ready + Stop() + // TODO: Add Compact. Is Campaign necessary? +} + +// StartMultiNode creates a MultiNode and starts its background goroutine. +func StartMultiNode(id uint64, election, heartbeat int) MultiNode { + mn := newMultiNode(id, election, heartbeat) + go mn.run() + return &mn +} + +// TODO(bdarnell): add group ID to the underlying protos? +type multiMessage struct { + group uint64 + msg pb.Message +} + +type multiConfChange struct { + group uint64 + msg pb.ConfChange +} + +type groupCreation struct { + id uint64 + peers []Peer + // TODO(bdarnell): do we really need the done channel here? It's + // unlike the rest of this package, but we need the group creation + // to be complete before any Propose or other calls. + done chan struct{} +} + +type multiNode struct { + id uint64 + election int + heartbeat int + groupc chan groupCreation + propc chan multiMessage + recvc chan multiMessage + confc chan multiConfChange + readyc chan map[uint64]Ready + tickc chan struct{} + done chan struct{} +} + +func newMultiNode(id uint64, election, heartbeat int) multiNode { + return multiNode{ + id: id, + election: election, + heartbeat: heartbeat, + groupc: make(chan groupCreation), + propc: make(chan multiMessage), + recvc: make(chan multiMessage), + confc: make(chan multiConfChange), + readyc: make(chan map[uint64]Ready), + tickc: make(chan struct{}), + done: make(chan struct{}), + } +} + +type groupState struct { + id uint64 + raft *raft + prevSoftSt *SoftState + prevHardSt pb.HardState + prevSnapi uint64 +} + +func (g *groupState) newReady() Ready { + return newReady(g.raft, g.prevSoftSt, g.prevHardSt, g.prevSnapi) +} + +func (g *groupState) commitReady(rd Ready) { + if rd.SoftState != nil { + g.prevSoftSt = rd.SoftState + } + if !IsEmptyHardState(rd.HardState) { + g.prevHardSt = rd.HardState + } + if !IsEmptySnap(rd.Snapshot) { + g.prevSnapi = rd.Snapshot.Index + } + g.raft.raftLog.resetNextEnts() + g.raft.raftLog.resetUnstable() + g.raft.msgs = nil +} + +func (mn *multiNode) run() { + groups := map[uint64]*groupState{} + rds := map[uint64]Ready{} + for { + readyc := mn.readyc + if len(rds) == 0 { + readyc = nil + } + + var group *groupState + select { + case gc := <-mn.groupc: + r := newRaft(mn.id, nil, mn.election, mn.heartbeat) + group = &groupState{ + id: gc.id, + raft: r, + prevSoftSt: r.softState(), + prevHardSt: r.HardState, + prevSnapi: r.raftLog.snapshot.Index, + } + groups[gc.id] = group + ents := make([]pb.Entry, len(gc.peers)) + for i, peer := range gc.peers { + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} + data, err := cc.Marshal() + if err != nil { + panic("unexpected marshal error") + } + ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} + } + r.raftLog.append(0, ents...) + r.raftLog.committed = uint64(len(ents)) + close(gc.done) + case mm := <-mn.propc: + // TODO(bdarnell): single-node impl doesn't read from propc unless the group + // has a leader; we can't do that since we have one propc for many groups. + // We'll have to buffer somewhere on a group-by-group basis. + mm.msg.From = mn.id + group = groups[mm.group] + group.raft.Step(mm.msg) + case mm := <-mn.recvc: + group = groups[mm.group] + group.raft.Step(mm.msg) + case mcc := <-mn.confc: + group = groups[mcc.group] + switch mcc.msg.Type { + case pb.ConfChangeAddNode: + group.raft.addNode(mcc.msg.NodeID) + case pb.ConfChangeRemoveNode: + group.raft.removeNode(mcc.msg.NodeID) + default: + panic("unexpected conf type") + } + case <-mn.tickc: + // TODO(bdarnell): instead of calling every group on every tick, + // we should have a priority queue of groups based on their next + // time-based event. + for _, g := range groups { + g.raft.tick() + rd := g.newReady() + if rd.containsUpdates() { + rds[g.id] = rd + } + } + case readyc <- rds: + for group, rd := range rds { + groups[group].commitReady(rd) + } + rds = map[uint64]Ready{} + case <-mn.done: + return + } + if group != nil { + rd := group.newReady() + if rd.containsUpdates() { + rds[group.id] = rd + } + } + } +} + +func (mn *multiNode) CreateGroup(id uint64, peers []Peer) error { + gc := groupCreation{ + id: id, + peers: peers, + done: make(chan struct{}), + } + mn.groupc <- gc + select { + case <-gc.done: + return nil + case <-mn.done: + return ErrStopped + } +} + +func (mn *multiNode) Stop() { + close(mn.done) +} + +func (mn *multiNode) Tick() { + select { + case mn.tickc <- struct{}{}: + case <-mn.done: + } +} + +func (mn *multiNode) Propose(ctx context.Context, group uint64, data []byte) error { + return mn.step(ctx, multiMessage{group, + pb.Message{ + Type: pb.MsgProp, + Entries: []pb.Entry{ + {Data: data}, + }, + }}) +} + +func (mn *multiNode) ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error { + data, err := cc.Marshal() + if err != nil { + return err + } + return mn.Step(ctx, group, + pb.Message{ + Type: pb.MsgProp, + Entries: []pb.Entry{ + {Type: pb.EntryConfChange, Data: data}, + }, + }) +} + +func (mn *multiNode) step(ctx context.Context, m multiMessage) error { + ch := mn.recvc + if m.msg.Type == pb.MsgProp { + ch = mn.propc + } + + select { + case ch <- m: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-mn.done: + return ErrStopped + } +} + +func (mn *multiNode) ApplyConfChange(group uint64, cc pb.ConfChange) { + select { + case mn.confc <- multiConfChange{group, cc}: + case <-mn.done: + } +} + +func (mn *multiNode) Step(ctx context.Context, group uint64, m pb.Message) error { + // ignore unexpected local messages receiving over network + if m.Type == pb.MsgHup || m.Type == pb.MsgBeat { + // TODO: return an error? + return nil + } + return mn.step(ctx, multiMessage{group, m}) +} + +func (mn *multiNode) Ready() <-chan map[uint64]Ready { + return mn.readyc +}