From ac03eb0adfc3303637e3bca865adb74c925a7bfa Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Fri, 17 Oct 2014 15:40:44 -0400 Subject: [PATCH] Replace the GroupID typedef with plain uint64. This mirrors the earlier removal of NodeID and prepares for moving to etcd/raft (which uses raw integer types instead of named types everywhere). --- multiraft/events.go | 2 +- multiraft/multiraft.go | 25 +++++++++++-------------- multiraft/multiraft_test.go | 10 +++++----- multiraft/storage.go | 36 ++++++++++++++++++------------------ multiraft/storage_test.go | 10 +++++----- multiraft/transport.go | 2 +- 6 files changed, 41 insertions(+), 44 deletions(-) diff --git a/multiraft/events.go b/multiraft/events.go index e281eddbd020..0ad15a482c2a 100644 --- a/multiraft/events.go +++ b/multiraft/events.go @@ -20,7 +20,7 @@ package multiraft // An EventLeaderElection is broadcast when a group completes an election. // TODO(bdarnell): emit EventLeaderElection from follower nodes as well. type EventLeaderElection struct { - GroupID GroupID + GroupID uint64 NodeID uint64 } diff --git a/multiraft/multiraft.go b/multiraft/multiraft.go index 3011ce874a6c..22709141499b 100644 --- a/multiraft/multiraft.go +++ b/multiraft/multiraft.go @@ -30,9 +30,6 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) -// GroupID is a unique identifier for a consensus group within the cluster. -type GroupID int64 - // Config contains the parameters necessary to construct a MultiRaft object. type Config struct { Storage Storage @@ -171,7 +168,7 @@ func (m *MultiRaft) sendEvent(event interface{}) { // CreateGroup creates a new consensus group and joins it. The application should // arrange to call CreateGroup on all nodes named in initialMembers. -func (m *MultiRaft) CreateGroup(groupID GroupID, initialMembers []uint64) error { +func (m *MultiRaft) CreateGroup(groupID uint64, initialMembers []uint64) error { for _, id := range initialMembers { if id == 0 { return util.Error("Invalid NodeID") @@ -190,7 +187,7 @@ func (m *MultiRaft) CreateGroup(groupID GroupID, initialMembers []uint64) error // when the command has been successfully sent, not when it has been committed. // TODO(bdarnell): should SubmitCommand wait until the commit? // TODO(bdarnell): what do we do if we lose leadership before a command we proposed commits? -func (m *MultiRaft) SubmitCommand(groupID GroupID, command []byte) error { +func (m *MultiRaft) SubmitCommand(groupID uint64, command []byte) error { op := &submitCommandOp{groupID, command, make(chan error, 1)} m.ops <- op return <-op.ch @@ -201,7 +198,7 @@ func (m *MultiRaft) SubmitCommand(groupID GroupID, command []byte) error { // TODO(bdarnell): do we expose ChangeMembershipAdd{Member,Observer} to the application // level or does MultiRaft take care of the non-member -> observer -> full member // cycle? -func (m *MultiRaft) ChangeGroupMembership(groupID GroupID, changeOp ChangeMembershipOperation, +func (m *MultiRaft) ChangeGroupMembership(groupID uint64, changeOp ChangeMembershipOperation, nodeID uint64) error { op := &changeGroupMembershipOp{ groupID, @@ -225,7 +222,7 @@ type pendingCall struct { type group struct { node raft.Node - groupID GroupID + groupID uint64 // a List of *pendingCall pendingCalls list.List @@ -235,7 +232,7 @@ type group struct { softState raft.SoftState } -func (m *MultiRaft) newGroup(groupID GroupID, members []uint64) *group { +func (m *MultiRaft) newGroup(groupID uint64, members []uint64) *group { peers := make([]raft.Peer, len(members)) for i, member := range members { peers[i].ID = member @@ -256,13 +253,13 @@ type createGroupOp struct { } type submitCommandOp struct { - groupID GroupID + groupID uint64 command []byte ch chan error } type changeGroupMembershipOp struct { - groupID GroupID + groupID uint64 payload ChangeMembershipPayload ch chan error } @@ -280,8 +277,8 @@ type node struct { type state struct { *MultiRaft rand *rand.Rand - groups map[GroupID]*group - dirtyGroups map[GroupID]*group + groups map[uint64]*group + dirtyGroups map[uint64]*group nodes map[uint64]*node electionTimer *time.Timer responses chan *rpc.Call @@ -292,8 +289,8 @@ func newState(m *MultiRaft) *state { return &state{ MultiRaft: m, rand: util.NewPseudoRand(), - groups: make(map[GroupID]*group), - dirtyGroups: make(map[GroupID]*group), + groups: make(map[uint64]*group), + dirtyGroups: make(map[uint64]*group), nodes: make(map[uint64]*node), responses: make(chan *rpc.Call, 100), writeTask: newWriteTask(m.Storage), diff --git a/multiraft/multiraft_test.go b/multiraft/multiraft_test.go index c4b59820d99e..0fd57b416711 100644 --- a/multiraft/multiraft_test.go +++ b/multiraft/multiraft_test.go @@ -76,7 +76,7 @@ func (c *testCluster) stop() { } // createGroup replicates a group among the first numReplicas nodes in the cluster -func (c *testCluster) createGroup(groupID GroupID, numReplicas int) { +func (c *testCluster) createGroup(groupID uint64, numReplicas int) { var replicaIDs []uint64 var replicaNodes []*state for i := 0; i < numReplicas; i++ { @@ -105,7 +105,7 @@ func TestInitialLeaderElection(t *testing.T) { // The node that requests an election first should win. for leaderIndex := 0; leaderIndex < 3; leaderIndex++ { cluster := newTestCluster(3, t) - groupID := GroupID(1) + groupID := uint64(1) cluster.createGroup(groupID, 3) event := cluster.waitForElection(leaderIndex) @@ -123,7 +123,7 @@ func TestInitialLeaderElection(t *testing.T) { func TestCommand(t *testing.T) { cluster := newTestCluster(3, t) defer cluster.stop() - groupID := GroupID(1) + groupID := uint64(1) cluster.createGroup(groupID, 3) cluster.waitForElection(0) @@ -144,7 +144,7 @@ func TestCommand(t *testing.T) { func disabledTestSlowStorage(t *testing.T) { cluster := newTestCluster(3, t) defer cluster.stop() - groupID := GroupID(1) + groupID := uint64(1) cluster.createGroup(groupID, 3) cluster.waitForElection(0) @@ -189,7 +189,7 @@ func TestMembershipChange(t *testing.T) { defer cluster.stop() // Create a group with a single member, cluster.nodes[0]. - groupID := GroupID(1) + groupID := uint64(1) cluster.createGroup(groupID, 1) cluster.waitForElection(0) diff --git a/multiraft/storage.go b/multiraft/storage.go index dcfdf8c5d645..f3c588d75a5c 100644 --- a/multiraft/storage.go +++ b/multiraft/storage.go @@ -60,7 +60,7 @@ type LogEntry struct { // GroupPersistentState is a unified view of the readable data (except for log entries) // about a group; used by Storage.LoadGroups. type GroupPersistentState struct { - GroupID GroupID + GroupID uint64 HardState raftpb.HardState } @@ -80,23 +80,23 @@ type Storage interface { //LoadGroups() <-chan *GroupPersistentState // SetGroupState is called to update the persistent state for the given group. - SetGroupState(groupID GroupID, state *GroupPersistentState) error + SetGroupState(groupID uint64, state *GroupPersistentState) error // AppendLogEntries is called to add entries to the log. The entries will always span // a contiguous range of indices just after the current end of the log. - AppendLogEntries(groupID GroupID, entries []*LogEntry) error + AppendLogEntries(groupID uint64, entries []*LogEntry) error // TruncateLog is called to delete all log entries with index > lastIndex. - //TruncateLog(groupID GroupID, lastIndex int) error + //TruncateLog(groupID uint64, lastIndex int) error // GetLogEntry is called to synchronously retrieve an entry from the log. - //GetLogEntry(groupID GroupID, index int) (*LogEntry, error) + //GetLogEntry(groupID uint64, index int) (*LogEntry, error) // GetLogEntries is called to asynchronously retrieve entries from the log, // from firstIndex to lastIndex inclusive. If there is an error the storage // layer should send one LogEntryState with a non-nil error and then close the // channel. - //GetLogEntries(groupID GroupID, firstIndex, lastIndex int, ch chan<- *LogEntryState) + //GetLogEntries(groupID uint64, firstIndex, lastIndex int, ch chan<- *LogEntryState) } type memoryGroup struct { @@ -106,7 +106,7 @@ type memoryGroup struct { // MemoryStorage is an in-memory implementation of Storage for testing. type MemoryStorage struct { - groups map[GroupID]*memoryGroup + groups map[uint64]*memoryGroup } // Verifying implementation of Storage interface. @@ -114,7 +114,7 @@ var _ Storage = (*MemoryStorage)(nil) // NewMemoryStorage creates a MemoryStorage. func NewMemoryStorage() *MemoryStorage { - return &MemoryStorage{make(map[GroupID]*memoryGroup)} + return &MemoryStorage{make(map[uint64]*memoryGroup)} } // LoadGroups implements the Storage interface. @@ -126,31 +126,31 @@ func NewMemoryStorage() *MemoryStorage { }*/ // SetGroupState implements the Storage interface. -func (m *MemoryStorage) SetGroupState(groupID GroupID, +func (m *MemoryStorage) SetGroupState(groupID uint64, state *GroupPersistentState) error { m.getGroup(groupID).state = *state return nil } // AppendLogEntries implements the Storage interface. -func (m *MemoryStorage) AppendLogEntries(groupID GroupID, entries []*LogEntry) error { +func (m *MemoryStorage) AppendLogEntries(groupID uint64, entries []*LogEntry) error { g := m.getGroup(groupID) g.entries = append(g.entries, entries...) return nil } // TruncateLog implements the Storage interface. -/*func (m *MemoryStorage) TruncateLog(groupID GroupID, lastIndex int) error { +/*func (m *MemoryStorage) TruncateLog(groupID uint64, lastIndex int) error { panic("unimplemented") }*/ // GetLogEntry implements the Storage interface. -/*func (m *MemoryStorage) GetLogEntry(groupID GroupID, index int) (*LogEntry, error) { +/*func (m *MemoryStorage) GetLogEntry(groupID uint64, index int) (*LogEntry, error) { panic("unimplemented") }*/ // GetLogEntries implements the Storage interface. -/*func (m *MemoryStorage) GetLogEntries(groupID GroupID, firstIndex, lastIndex int, +/*func (m *MemoryStorage) GetLogEntries(groupID uint64, firstIndex, lastIndex int, ch chan<- *LogEntryState) { g := m.getGroup(groupID) for i := firstIndex; i <= lastIndex; i++ { @@ -160,7 +160,7 @@ func (m *MemoryStorage) AppendLogEntries(groupID GroupID, entries []*LogEntry) e }*/ // getGroup returns a mutable memoryGroup object, creating if necessary. -func (m *MemoryStorage) getGroup(groupID GroupID) *memoryGroup { +func (m *MemoryStorage) getGroup(groupID uint64) *memoryGroup { g, ok := m.groups[groupID] if !ok { g = &memoryGroup{ @@ -180,12 +180,12 @@ type groupWriteRequest struct { // writeRequest is a collection of groupWriteRequests. type writeRequest struct { - groups map[GroupID]*groupWriteRequest + groups map[uint64]*groupWriteRequest } // newWriteRequest creates a writeRequest. func newWriteRequest() *writeRequest { - return &writeRequest{make(map[GroupID]*groupWriteRequest)} + return &writeRequest{make(map[uint64]*groupWriteRequest)} } // groupWriteResponse represents the final state of a persistent group. @@ -201,7 +201,7 @@ type groupWriteResponse struct { // writeResponse is a collection of groupWriteResponses. type writeResponse struct { - groups map[GroupID]*groupWriteResponse + groups map[uint64]*groupWriteResponse } // writeTask manages a goroutine that interacts with the storage system. @@ -241,7 +241,7 @@ func (w *writeTask) start() { case request = <-w.in: } log.V(6).Infof("writeTask got request %#v", *request) - response := &writeResponse{make(map[GroupID]*groupWriteResponse)} + response := &writeResponse{make(map[uint64]*groupWriteResponse)} for groupID, groupReq := range request.groups { groupResp := &groupWriteResponse{nil, -1, -1, groupReq.entries} diff --git a/multiraft/storage_test.go b/multiraft/storage_test.go index df105f1dcd27..cf49f272f495 100644 --- a/multiraft/storage_test.go +++ b/multiraft/storage_test.go @@ -36,28 +36,28 @@ func (b *BlockableStorage) Unblock() { return b.storage.LoadGroups() }*/ -func (b *BlockableStorage) SetGroupState(groupID GroupID, +func (b *BlockableStorage) SetGroupState(groupID uint64, state *GroupPersistentState) error { b.wait() return b.storage.SetGroupState(groupID, state) } -func (b *BlockableStorage) AppendLogEntries(groupID GroupID, entries []*LogEntry) error { +func (b *BlockableStorage) AppendLogEntries(groupID uint64, entries []*LogEntry) error { b.wait() return b.storage.AppendLogEntries(groupID, entries) } -/*func (b *BlockableStorage) TruncateLog(groupID GroupID, lastIndex int) error { +/*func (b *BlockableStorage) TruncateLog(groupID uint64, lastIndex int) error { b.wait() return b.storage.TruncateLog(groupID, lastIndex) } -func (b *BlockableStorage) GetLogEntry(groupID GroupID, index int) (*LogEntry, error) { +func (b *BlockableStorage) GetLogEntry(groupID uint64, index int) (*LogEntry, error) { b.wait() return b.storage.GetLogEntry(groupID, index) } -func (b *BlockableStorage) GetLogEntries(groupID GroupID, firstIndex, lastIndex int, +func (b *BlockableStorage) GetLogEntries(groupID uint64, firstIndex, lastIndex int, ch chan<- *LogEntryState) { b.wait() b.storage.GetLogEntries(groupID, firstIndex, lastIndex, ch) diff --git a/multiraft/transport.go b/multiraft/transport.go index 1b8fc1cc59ec..94704228e251 100644 --- a/multiraft/transport.go +++ b/multiraft/transport.go @@ -43,7 +43,7 @@ type Transport interface { // SendMessageRequest wraps a raft message. type SendMessageRequest struct { - GroupID GroupID + GroupID uint64 Message raftpb.Message }