Skip to content

Commit

Permalink
Merge pull request #190 from bdarnell/multiraft-storage
Browse files Browse the repository at this point in the history
Update raft submodule and start using the new raft.Storage interface.
  • Loading branch information
bdarnell committed Dec 2, 2014
2 parents f8ea3b6 + 67c4f4e commit 6396675
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 107 deletions.
2 changes: 1 addition & 1 deletion _vendor/src/github.com/coreos/etcd
Submodule etcd updated 129 files
15 changes: 5 additions & 10 deletions multiraft/multiraft.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)
Expand Down Expand Up @@ -376,6 +376,7 @@ func (s *state) start() {

case readyGroups = <-raftReady:
s.handleRaftReady(readyGroups)
s.multiNode.Advance()
}
}
}
Expand Down Expand Up @@ -413,7 +414,7 @@ func (s *state) createGroup(op *createGroupOp) {
}
s.nodes[member] = &node{member, 1, &asyncClient{member, conn, s.responses}}
}
s.multiNode.CreateGroup(op.groupID, peers)
s.multiNode.CreateGroup(op.groupID, peers, s.Storage.GroupStorage(op.groupID))
s.groups[op.groupID] = &group{
groupID: op.groupID,
}
Expand Down Expand Up @@ -462,16 +463,10 @@ func (s *state) handleWriteReady(readyGroups map[uint64]raft.Ready) {
for groupID, ready := range readyGroups {
gwr := &groupWriteRequest{}
if !raft.IsEmptyHardState(ready.HardState) {
gwr.state = &GroupPersistentState{
GroupID: groupID,
HardState: ready.HardState,
}
gwr.state = ready.HardState
}
if len(ready.Entries) > 0 {
gwr.entries = make([]*LogEntry, len(ready.Entries))
for i, ent := range ready.Entries {
gwr.entries[i] = &LogEntry{ent}
}
gwr.entries = ready.Entries
}
writeRequest.groups[groupID] = gwr
}
Expand Down
4 changes: 4 additions & 0 deletions multiraft/multiraft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ func TestMembershipChange(t *testing.T) {

// Add each of the other three nodes to the cluster.
for i := 1; i < 4; i++ {
// TODO(bdarnell): there's a race somewhere (which is not found by the race detector).
// Remove this hacky sleep once it's been fixed.
time.Sleep(time.Millisecond)

err := cluster.nodes[0].ChangeGroupMembership(groupID, ChangeMembershipAddMember,
cluster.nodes[i].nodeID)
if err != nil {
Expand Down
105 changes: 28 additions & 77 deletions multiraft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package multiraft

import (
"github.com/cockroachdb/cockroach/util/log"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -72,49 +73,37 @@ type LogEntryState struct {
Error error
}

// WriteableGroupStorage represents a single group within a Storage.
// It is implemented by *raft.MemoryStorage.
type WriteableGroupStorage interface {
raft.Storage
Append(entries []raftpb.Entry)
SetHardState(st raftpb.HardState) error
}

var _ WriteableGroupStorage = (*raft.MemoryStorage)(nil)

// The Storage interface is supplied by the application to manage persistent storage
// of raft data.
type Storage interface {
// LoadGroups is called at startup to load all previously-existing groups.
// The returned channel should be closed once all groups have been loaded.
//LoadGroups() <-chan *GroupPersistentState

// SetGroupState is called to update the persistent state for the given group.
SetGroupState(groupID uint64, state *GroupPersistentState) error

// AppendLogEntries is called to add entries to the log. The entries will always span
// a contiguous range of indices just after the current end of the log.
AppendLogEntries(groupID uint64, entries []*LogEntry) error

// TruncateLog is called to delete all log entries with index > lastIndex.
//TruncateLog(groupID uint64, lastIndex int) error

// GetLogEntry is called to synchronously retrieve an entry from the log.
//GetLogEntry(groupID uint64, index int) (*LogEntry, error)

// GetLogEntries is called to asynchronously retrieve entries from the log,
// from firstIndex to lastIndex inclusive. If there is an error the storage
// layer should send one LogEntryState with a non-nil error and then close the
// channel.
//GetLogEntries(groupID uint64, firstIndex, lastIndex int, ch chan<- *LogEntryState)
}

type memoryGroup struct {
state GroupPersistentState
entries []*LogEntry
GroupStorage(groupID uint64) WriteableGroupStorage
}

// MemoryStorage is an in-memory implementation of Storage for testing.
type MemoryStorage struct {
groups map[uint64]*memoryGroup
groups map[uint64]WriteableGroupStorage
}

// Verifying implementation of Storage interface.
var _ Storage = (*MemoryStorage)(nil)

// NewMemoryStorage creates a MemoryStorage.
func NewMemoryStorage() *MemoryStorage {
return &MemoryStorage{make(map[uint64]*memoryGroup)}
return &MemoryStorage{make(map[uint64]WriteableGroupStorage)}
}

// LoadGroups implements the Storage interface.
Expand All @@ -125,57 +114,21 @@ func NewMemoryStorage() *MemoryStorage {
return ch
}*/

// SetGroupState implements the Storage interface.
func (m *MemoryStorage) SetGroupState(groupID uint64,
state *GroupPersistentState) error {
m.getGroup(groupID).state = *state
return nil
}

// AppendLogEntries implements the Storage interface.
func (m *MemoryStorage) AppendLogEntries(groupID uint64, entries []*LogEntry) error {
g := m.getGroup(groupID)
g.entries = append(g.entries, entries...)
return nil
}

// TruncateLog implements the Storage interface.
/*func (m *MemoryStorage) TruncateLog(groupID uint64, lastIndex int) error {
panic("unimplemented")
}*/

// GetLogEntry implements the Storage interface.
/*func (m *MemoryStorage) GetLogEntry(groupID uint64, index int) (*LogEntry, error) {
panic("unimplemented")
}*/

// GetLogEntries implements the Storage interface.
/*func (m *MemoryStorage) GetLogEntries(groupID uint64, firstIndex, lastIndex int,
ch chan<- *LogEntryState) {
g := m.getGroup(groupID)
for i := firstIndex; i <= lastIndex; i++ {
ch <- &LogEntryState{i, *g.entries[i], nil}
}
close(ch)
}*/

// getGroup returns a mutable memoryGroup object, creating if necessary.
func (m *MemoryStorage) getGroup(groupID uint64) *memoryGroup {
// GroupStorage implements the Storage interface.
func (m *MemoryStorage) GroupStorage(groupID uint64) WriteableGroupStorage {
g, ok := m.groups[groupID]
if !ok {
g = &memoryGroup{
// Start with a dummy entry because the raft paper uses 1-based indexing.
entries: []*LogEntry{nil},
}
g = raft.NewMemoryStorage()
m.groups[groupID] = g
}
return g
}

// groupWriteRequest represents a set of changes to make to a group.
type groupWriteRequest struct {
state *GroupPersistentState
entries []*LogEntry
state raftpb.HardState
entries []raftpb.Entry
snapshot raftpb.Snapshot
}

// writeRequest is a collection of groupWriteRequests.
Expand All @@ -193,10 +146,10 @@ func newWriteRequest() *writeRequest {
// state was not changed (which may be because there were no changes in the request
// or due to an error)
type groupWriteResponse struct {
state *GroupPersistentState
state raftpb.HardState
lastIndex int
lastTerm int
entries []*LogEntry
entries []raftpb.Entry
}

// writeResponse is a collection of groupWriteResponses.
Expand Down Expand Up @@ -244,20 +197,18 @@ func (w *writeTask) start() {
response := &writeResponse{make(map[uint64]*groupWriteResponse)}

for groupID, groupReq := range request.groups {
groupResp := &groupWriteResponse{nil, -1, -1, groupReq.entries}
group := w.storage.GroupStorage(groupID)
groupResp := &groupWriteResponse{raftpb.HardState{}, -1, -1, groupReq.entries}
response.groups[groupID] = groupResp
if groupReq.state != nil {
err := w.storage.SetGroupState(groupID, groupReq.state)
if !raft.IsEmptyHardState(groupReq.state) {
err := group.SetHardState(groupReq.state)
if err != nil {
continue
panic(err) // TODO(bdarnell): mark this node dead on storage errors
}
groupResp.state = groupReq.state
}
if len(groupReq.entries) > 0 {
err := w.storage.AppendLogEntries(groupID, groupReq.entries)
if err != nil {
continue
}
group.Append(groupReq.entries)
}
}
w.out <- response
Expand Down
63 changes: 44 additions & 19 deletions multiraft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

package multiraft

import "sync"
import (
"sync"

"github.com/coreos/etcd/raft/raftpb"
)

// BlockableStorage is an implementation of Storage that can be blocked for testing
// to simulate slow storage devices.
Expand Down Expand Up @@ -36,30 +40,51 @@ func (b *BlockableStorage) Unblock() {
return b.storage.LoadGroups()
}*/

func (b *BlockableStorage) SetGroupState(groupID uint64,
state *GroupPersistentState) error {
b.wait()
return b.storage.SetGroupState(groupID, state)
func (b *BlockableStorage) GroupStorage(g uint64) WriteableGroupStorage {
return &blockableGroupStorage{b, b.storage.GroupStorage(g)}
}

func (b *BlockableStorage) AppendLogEntries(groupID uint64, entries []*LogEntry) error {
b.wait()
return b.storage.AppendLogEntries(groupID, entries)
type blockableGroupStorage struct {
b *BlockableStorage
s WriteableGroupStorage
}

/*func (b *BlockableStorage) TruncateLog(groupID uint64, lastIndex int) error {
b.wait()
return b.storage.TruncateLog(groupID, lastIndex)
func (b *blockableGroupStorage) Append(entries []raftpb.Entry) {
b.b.wait()
b.s.Append(entries)
}

func (b *BlockableStorage) GetLogEntry(groupID uint64, index int) (*LogEntry, error) {
b.wait()
return b.storage.GetLogEntry(groupID, index)
func (b *blockableGroupStorage) SetHardState(st raftpb.HardState) error {
b.b.wait()
return b.s.SetHardState(st)
}

func (b *BlockableStorage) GetLogEntries(groupID uint64, firstIndex, lastIndex int,
ch chan<- *LogEntryState) {
b.wait()
b.storage.GetLogEntries(groupID, firstIndex, lastIndex, ch)
func (b *blockableGroupStorage) InitialState() (raftpb.HardState, raftpb.ConfState, error) {
b.b.wait()
return b.s.InitialState()
}

func (b *blockableGroupStorage) Entries(lo, hi uint64) ([]raftpb.Entry, error) {
b.b.wait()
return b.s.Entries(lo, hi)
}

func (b *blockableGroupStorage) Term(i uint64) (uint64, error) {
b.b.wait()
return b.s.Term(i)
}

func (b *blockableGroupStorage) LastIndex() (uint64, error) {
b.b.wait()
return b.s.LastIndex()
}

func (b *blockableGroupStorage) FirstIndex() (uint64, error) {
b.b.wait()
return b.s.FirstIndex()
}

func (b *blockableGroupStorage) Snapshot() (raftpb.Snapshot, error) {
b.b.wait()
return b.s.Snapshot()
}
*/

0 comments on commit 6396675

Please sign in to comment.