Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace the GroupID typedef with plain uint64. #132

Merged
merged 1 commit into from
Oct 17, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion multiraft/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package multiraft
// An EventLeaderElection is broadcast when a group completes an election.
// TODO(bdarnell): emit EventLeaderElection from follower nodes as well.
type EventLeaderElection struct {
GroupID GroupID
GroupID uint64
NodeID uint64
}

Expand Down
25 changes: 11 additions & 14 deletions multiraft/multiraft.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ import (
"github.com/coreos/etcd/raft/raftpb"
)

// GroupID is a unique identifier for a consensus group within the cluster.
type GroupID int64

// Config contains the parameters necessary to construct a MultiRaft object.
type Config struct {
Storage Storage
Expand Down Expand Up @@ -171,7 +168,7 @@ func (m *MultiRaft) sendEvent(event interface{}) {

// CreateGroup creates a new consensus group and joins it. The application should
// arrange to call CreateGroup on all nodes named in initialMembers.
func (m *MultiRaft) CreateGroup(groupID GroupID, initialMembers []uint64) error {
func (m *MultiRaft) CreateGroup(groupID uint64, initialMembers []uint64) error {
for _, id := range initialMembers {
if id == 0 {
return util.Error("Invalid NodeID")
Expand All @@ -190,7 +187,7 @@ func (m *MultiRaft) CreateGroup(groupID GroupID, initialMembers []uint64) error
// when the command has been successfully sent, not when it has been committed.
// TODO(bdarnell): should SubmitCommand wait until the commit?
// TODO(bdarnell): what do we do if we lose leadership before a command we proposed commits?
func (m *MultiRaft) SubmitCommand(groupID GroupID, command []byte) error {
func (m *MultiRaft) SubmitCommand(groupID uint64, command []byte) error {
op := &submitCommandOp{groupID, command, make(chan error, 1)}
m.ops <- op
return <-op.ch
Expand All @@ -201,7 +198,7 @@ func (m *MultiRaft) SubmitCommand(groupID GroupID, command []byte) error {
// TODO(bdarnell): do we expose ChangeMembershipAdd{Member,Observer} to the application
// level or does MultiRaft take care of the non-member -> observer -> full member
// cycle?
func (m *MultiRaft) ChangeGroupMembership(groupID GroupID, changeOp ChangeMembershipOperation,
func (m *MultiRaft) ChangeGroupMembership(groupID uint64, changeOp ChangeMembershipOperation,
nodeID uint64) error {
op := &changeGroupMembershipOp{
groupID,
Expand All @@ -225,7 +222,7 @@ type pendingCall struct {
type group struct {
node raft.Node

groupID GroupID
groupID uint64

// a List of *pendingCall
pendingCalls list.List
Expand All @@ -235,7 +232,7 @@ type group struct {
softState raft.SoftState
}

func (m *MultiRaft) newGroup(groupID GroupID, members []uint64) *group {
func (m *MultiRaft) newGroup(groupID uint64, members []uint64) *group {
peers := make([]raft.Peer, len(members))
for i, member := range members {
peers[i].ID = member
Expand All @@ -256,13 +253,13 @@ type createGroupOp struct {
}

type submitCommandOp struct {
groupID GroupID
groupID uint64
command []byte
ch chan error
}

type changeGroupMembershipOp struct {
groupID GroupID
groupID uint64
payload ChangeMembershipPayload
ch chan error
}
Expand All @@ -280,8 +277,8 @@ type node struct {
type state struct {
*MultiRaft
rand *rand.Rand
groups map[GroupID]*group
dirtyGroups map[GroupID]*group
groups map[uint64]*group
dirtyGroups map[uint64]*group
nodes map[uint64]*node
electionTimer *time.Timer
responses chan *rpc.Call
Expand All @@ -292,8 +289,8 @@ func newState(m *MultiRaft) *state {
return &state{
MultiRaft: m,
rand: util.NewPseudoRand(),
groups: make(map[GroupID]*group),
dirtyGroups: make(map[GroupID]*group),
groups: make(map[uint64]*group),
dirtyGroups: make(map[uint64]*group),
nodes: make(map[uint64]*node),
responses: make(chan *rpc.Call, 100),
writeTask: newWriteTask(m.Storage),
Expand Down
10 changes: 5 additions & 5 deletions multiraft/multiraft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *testCluster) stop() {
}

// createGroup replicates a group among the first numReplicas nodes in the cluster
func (c *testCluster) createGroup(groupID GroupID, numReplicas int) {
func (c *testCluster) createGroup(groupID uint64, numReplicas int) {
var replicaIDs []uint64
var replicaNodes []*state
for i := 0; i < numReplicas; i++ {
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestInitialLeaderElection(t *testing.T) {
// The node that requests an election first should win.
for leaderIndex := 0; leaderIndex < 3; leaderIndex++ {
cluster := newTestCluster(3, t)
groupID := GroupID(1)
groupID := uint64(1)
cluster.createGroup(groupID, 3)

event := cluster.waitForElection(leaderIndex)
Expand All @@ -123,7 +123,7 @@ func TestInitialLeaderElection(t *testing.T) {
func TestCommand(t *testing.T) {
cluster := newTestCluster(3, t)
defer cluster.stop()
groupID := GroupID(1)
groupID := uint64(1)
cluster.createGroup(groupID, 3)
cluster.waitForElection(0)

Expand All @@ -144,7 +144,7 @@ func TestCommand(t *testing.T) {
func disabledTestSlowStorage(t *testing.T) {
cluster := newTestCluster(3, t)
defer cluster.stop()
groupID := GroupID(1)
groupID := uint64(1)
cluster.createGroup(groupID, 3)

cluster.waitForElection(0)
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestMembershipChange(t *testing.T) {
defer cluster.stop()

// Create a group with a single member, cluster.nodes[0].
groupID := GroupID(1)
groupID := uint64(1)
cluster.createGroup(groupID, 1)
cluster.waitForElection(0)

Expand Down
36 changes: 18 additions & 18 deletions multiraft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type LogEntry struct {
// GroupPersistentState is a unified view of the readable data (except for log entries)
// about a group; used by Storage.LoadGroups.
type GroupPersistentState struct {
GroupID GroupID
GroupID uint64
HardState raftpb.HardState
}

Expand All @@ -80,23 +80,23 @@ type Storage interface {
//LoadGroups() <-chan *GroupPersistentState

// SetGroupState is called to update the persistent state for the given group.
SetGroupState(groupID GroupID, state *GroupPersistentState) error
SetGroupState(groupID uint64, state *GroupPersistentState) error

// AppendLogEntries is called to add entries to the log. The entries will always span
// a contiguous range of indices just after the current end of the log.
AppendLogEntries(groupID GroupID, entries []*LogEntry) error
AppendLogEntries(groupID uint64, entries []*LogEntry) error

// TruncateLog is called to delete all log entries with index > lastIndex.
//TruncateLog(groupID GroupID, lastIndex int) error
//TruncateLog(groupID uint64, lastIndex int) error

// GetLogEntry is called to synchronously retrieve an entry from the log.
//GetLogEntry(groupID GroupID, index int) (*LogEntry, error)
//GetLogEntry(groupID uint64, index int) (*LogEntry, error)

// GetLogEntries is called to asynchronously retrieve entries from the log,
// from firstIndex to lastIndex inclusive. If there is an error the storage
// layer should send one LogEntryState with a non-nil error and then close the
// channel.
//GetLogEntries(groupID GroupID, firstIndex, lastIndex int, ch chan<- *LogEntryState)
//GetLogEntries(groupID uint64, firstIndex, lastIndex int, ch chan<- *LogEntryState)
}

type memoryGroup struct {
Expand All @@ -106,15 +106,15 @@ type memoryGroup struct {

// MemoryStorage is an in-memory implementation of Storage for testing.
type MemoryStorage struct {
groups map[GroupID]*memoryGroup
groups map[uint64]*memoryGroup
}

// Verifying implementation of Storage interface.
var _ Storage = (*MemoryStorage)(nil)

// NewMemoryStorage creates a MemoryStorage.
func NewMemoryStorage() *MemoryStorage {
return &MemoryStorage{make(map[GroupID]*memoryGroup)}
return &MemoryStorage{make(map[uint64]*memoryGroup)}
}

// LoadGroups implements the Storage interface.
Expand All @@ -126,31 +126,31 @@ func NewMemoryStorage() *MemoryStorage {
}*/

// SetGroupState implements the Storage interface.
func (m *MemoryStorage) SetGroupState(groupID GroupID,
func (m *MemoryStorage) SetGroupState(groupID uint64,
state *GroupPersistentState) error {
m.getGroup(groupID).state = *state
return nil
}

// AppendLogEntries implements the Storage interface.
func (m *MemoryStorage) AppendLogEntries(groupID GroupID, entries []*LogEntry) error {
func (m *MemoryStorage) AppendLogEntries(groupID uint64, entries []*LogEntry) error {
g := m.getGroup(groupID)
g.entries = append(g.entries, entries...)
return nil
}

// TruncateLog implements the Storage interface.
/*func (m *MemoryStorage) TruncateLog(groupID GroupID, lastIndex int) error {
/*func (m *MemoryStorage) TruncateLog(groupID uint64, lastIndex int) error {
panic("unimplemented")
}*/

// GetLogEntry implements the Storage interface.
/*func (m *MemoryStorage) GetLogEntry(groupID GroupID, index int) (*LogEntry, error) {
/*func (m *MemoryStorage) GetLogEntry(groupID uint64, index int) (*LogEntry, error) {
panic("unimplemented")
}*/

// GetLogEntries implements the Storage interface.
/*func (m *MemoryStorage) GetLogEntries(groupID GroupID, firstIndex, lastIndex int,
/*func (m *MemoryStorage) GetLogEntries(groupID uint64, firstIndex, lastIndex int,
ch chan<- *LogEntryState) {
g := m.getGroup(groupID)
for i := firstIndex; i <= lastIndex; i++ {
Expand All @@ -160,7 +160,7 @@ func (m *MemoryStorage) AppendLogEntries(groupID GroupID, entries []*LogEntry) e
}*/

// getGroup returns a mutable memoryGroup object, creating if necessary.
func (m *MemoryStorage) getGroup(groupID GroupID) *memoryGroup {
func (m *MemoryStorage) getGroup(groupID uint64) *memoryGroup {
g, ok := m.groups[groupID]
if !ok {
g = &memoryGroup{
Expand All @@ -180,12 +180,12 @@ type groupWriteRequest struct {

// writeRequest is a collection of groupWriteRequests.
type writeRequest struct {
groups map[GroupID]*groupWriteRequest
groups map[uint64]*groupWriteRequest
}

// newWriteRequest creates a writeRequest.
func newWriteRequest() *writeRequest {
return &writeRequest{make(map[GroupID]*groupWriteRequest)}
return &writeRequest{make(map[uint64]*groupWriteRequest)}
}

// groupWriteResponse represents the final state of a persistent group.
Expand All @@ -201,7 +201,7 @@ type groupWriteResponse struct {

// writeResponse is a collection of groupWriteResponses.
type writeResponse struct {
groups map[GroupID]*groupWriteResponse
groups map[uint64]*groupWriteResponse
}

// writeTask manages a goroutine that interacts with the storage system.
Expand Down Expand Up @@ -241,7 +241,7 @@ func (w *writeTask) start() {
case request = <-w.in:
}
log.V(6).Infof("writeTask got request %#v", *request)
response := &writeResponse{make(map[GroupID]*groupWriteResponse)}
response := &writeResponse{make(map[uint64]*groupWriteResponse)}

for groupID, groupReq := range request.groups {
groupResp := &groupWriteResponse{nil, -1, -1, groupReq.entries}
Expand Down
10 changes: 5 additions & 5 deletions multiraft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,28 @@ func (b *BlockableStorage) Unblock() {
return b.storage.LoadGroups()
}*/

func (b *BlockableStorage) SetGroupState(groupID GroupID,
func (b *BlockableStorage) SetGroupState(groupID uint64,
state *GroupPersistentState) error {
b.wait()
return b.storage.SetGroupState(groupID, state)
}

func (b *BlockableStorage) AppendLogEntries(groupID GroupID, entries []*LogEntry) error {
func (b *BlockableStorage) AppendLogEntries(groupID uint64, entries []*LogEntry) error {
b.wait()
return b.storage.AppendLogEntries(groupID, entries)
}

/*func (b *BlockableStorage) TruncateLog(groupID GroupID, lastIndex int) error {
/*func (b *BlockableStorage) TruncateLog(groupID uint64, lastIndex int) error {
b.wait()
return b.storage.TruncateLog(groupID, lastIndex)
}

func (b *BlockableStorage) GetLogEntry(groupID GroupID, index int) (*LogEntry, error) {
func (b *BlockableStorage) GetLogEntry(groupID uint64, index int) (*LogEntry, error) {
b.wait()
return b.storage.GetLogEntry(groupID, index)
}

func (b *BlockableStorage) GetLogEntries(groupID GroupID, firstIndex, lastIndex int,
func (b *BlockableStorage) GetLogEntries(groupID uint64, firstIndex, lastIndex int,
ch chan<- *LogEntryState) {
b.wait()
b.storage.GetLogEntries(groupID, firstIndex, lastIndex, ch)
Expand Down
2 changes: 1 addition & 1 deletion multiraft/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Transport interface {

// SendMessageRequest wraps a raft message.
type SendMessageRequest struct {
GroupID GroupID
GroupID uint64
Message raftpb.Message
}

Expand Down