Skip to content

Commit

Permalink
Merge pull request #211 from bdarnell/raft-cmd-id
Browse files Browse the repository at this point in the history
Remove CmdID from InternalRaftCommand and pass it to raft separately.
  • Loading branch information
bdarnell committed Dec 10, 2014
2 parents 049a292 + f935f92 commit a610c9a
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 46 deletions.
31 changes: 30 additions & 1 deletion multiraft/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package multiraft

import "github.com/cockroachdb/cockroach/util/log"

// An EventLeaderElection is broadcast when a group completes an election.
type EventLeaderElection struct {
GroupID uint64
Expand All @@ -25,5 +27,32 @@ type EventLeaderElection struct {

// An EventCommandCommitted is broadcast whenever a command has been committed.
type EventCommandCommitted struct {
Command []byte
CommandID []byte
Command []byte
}

// Commands are encoded with a 1-byte version (currently 0), a 16-byte ID,
// followed by the payload. This inflexible encoding is used so we can efficiently
// parse the command id while processing the logs.
const (
commandIDLen = 16
commandEncodingVersion = 0
)

func encodeCommand(commandID, command []byte) []byte {
if len(commandID) != commandIDLen {
log.Fatalf("invalid command ID length; %d != %d", len(commandID), commandIDLen)
}
x := make([]byte, 1, 1+commandIDLen+len(command))
x[0] = commandEncodingVersion
x = append(x, commandID...)
x = append(x, command...)
return x
}

func decodeCommand(data []byte) (commandID, command []byte) {
if data[0] != commandEncodingVersion {
log.Fatalf("unknown command encoding version %v", data[0])
}
return data[1 : 1+commandIDLen], data[1+commandIDLen:]
}
19 changes: 11 additions & 8 deletions multiraft/multiraft.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,21 @@ func (m *MultiRaft) CreateGroup(groupID uint64, initialMembers []uint64) error {
// when the command has been successfully sent, not when it has been committed.
// TODO(bdarnell): should SubmitCommand wait until the commit?
// TODO(bdarnell): what do we do if we lose leadership before a command we proposed commits?
func (m *MultiRaft) SubmitCommand(groupID uint64, command []byte) error {
func (m *MultiRaft) SubmitCommand(groupID uint64, commandID []byte, command []byte) error {
log.V(6).Infof("node %v submitting command to group %v", m.nodeID, groupID)
return m.multiNode.Propose(context.Background(), groupID, command)
return m.multiNode.Propose(context.Background(), groupID, encodeCommand(commandID, command))
}

// ChangeGroupMembership submits a proposed membership change to the cluster.
// TODO(bdarnell): same concerns as SubmitCommand
func (m *MultiRaft) ChangeGroupMembership(groupID uint64, changeType raftpb.ConfChangeType,
nodeID uint64) error {
func (m *MultiRaft) ChangeGroupMembership(groupID uint64, commandID []byte,
changeType raftpb.ConfChangeType, nodeID uint64) error {
log.V(6).Infof("node %v proposing membership change to group %v", m.nodeID, groupID)
return m.multiNode.ProposeConfChange(context.Background(), groupID,
raftpb.ConfChange{
Type: changeType,
NodeID: nodeID,
Type: changeType,
NodeID: nodeID,
Context: encodeCommand(commandID, nil),
})
}

Expand Down Expand Up @@ -403,9 +404,10 @@ func (s *state) handleWriteResponse(response *writeResponse, readyGroups map[uin
for _, entry := range ready.CommittedEntries {
switch entry.Type {
case raftpb.EntryNormal:
// TODO(bdarnell): etcd raft adds a nil entry upon election; should this be given a different Type?
// etcd raft occasionally adds a nil entry (e.g. upon election); ignore these.
if entry.Data != nil {
s.sendEvent(&EventCommandCommitted{entry.Data})
commandID, command := decodeCommand(entry.Data)
s.sendEvent(&EventCommandCommitted{commandID, command})
}
case raftpb.EntryConfChange:
cc := raftpb.ConfChange{}
Expand All @@ -414,6 +416,7 @@ func (s *state) handleWriteResponse(response *writeResponse, readyGroups map[uin
log.Fatalf("invalid ConfChange data: %s", err)
}
log.V(3).Infof("node %v applying configuration change %v", s.nodeID, cc)
// TODO(bdarnell): dedupe by extracting commandID from cc.Context.
s.multiNode.ApplyConfChange(groupID, cc)
}
}
Expand Down
14 changes: 11 additions & 3 deletions multiraft/multiraft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
"github.com/coreos/etcd/raft/raftpb"
)

var rand = util.NewPseudoRand()

func makeCommandID() []byte {
return []byte(util.RandString(rand, commandIDLen))
}

type testCluster struct {
t *testing.T
nodes []*state
Expand Down Expand Up @@ -130,7 +137,7 @@ func TestCommand(t *testing.T) {
cluster.waitForElection(0)

// Submit a command to the leader
cluster.nodes[0].SubmitCommand(groupID, []byte("command"))
cluster.nodes[0].SubmitCommand(groupID, makeCommandID(), []byte("command"))

// The command will be committed on each node.
for i, events := range cluster.events {
Expand All @@ -156,7 +163,7 @@ func TestSlowStorage(t *testing.T) {
cluster.storages[2].Block()

// Submit a command to the leader
cluster.nodes[0].SubmitCommand(groupID, []byte("command"))
cluster.nodes[0].SubmitCommand(groupID, makeCommandID(), []byte("command"))

// Even with the third node blocked, the other nodes can make progress.
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -196,7 +203,8 @@ func TestMembershipChange(t *testing.T) {

// Add each of the other three nodes to the cluster.
for i := 1; i < 4; i++ {
err := cluster.nodes[0].ChangeGroupMembership(groupID, raftpb.ConfChangeAddNode,
err := cluster.nodes[0].ChangeGroupMembership(groupID, makeCommandID(),
raftpb.ConfChangeAddNode,
cluster.nodes[i].nodeID)
if err != nil {
t.Fatal(err)
Expand Down
4 changes: 0 additions & 4 deletions proto/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,6 @@ message InternalRaftCommandUnion {
// An InternalRaftCommand is a command which can be serialized and
// sent via raft.
message InternalRaftCommand {
// CmdID is used to identify a command when it has been
// committed. If the client specified a CmdID in its RequestHeader,
// that is used; otherwise the server generates an ID.
optional ClientCmdID cmd_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "CmdID"];
optional int64 raft_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "RaftID"];
optional InternalRaftCommandUnion cmd = 3 [(gogoproto.nullable) = false];
}
Expand Down
21 changes: 13 additions & 8 deletions storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,22 @@ import (
"github.com/cockroachdb/cockroach/proto"
)

type committedCommand struct {
cmdIDKey cmdIDKey
cmd proto.InternalRaftCommand
}

// raft is the interface exposed by a raft implementation.
type raft interface {
// propose a command to raft. If accepted by the consensus protocol it will
// eventually appear in the committed channel, but this is not guaranteed
// so callers may need to retry.
propose(proto.InternalRaftCommand)
propose(cmdIDKey, proto.InternalRaftCommand)

// committed returns a channel that yields commands as they are
// committed. Note that this includes commands proposed by this node
// and others.
committed() <-chan proto.InternalRaftCommand
committed() <-chan committedCommand

stop()
}
Expand All @@ -46,7 +51,7 @@ type singleNodeRaft struct {
mr *multiraft.MultiRaft
mu sync.Mutex
groups map[int64]struct{}
commitCh chan proto.InternalRaftCommand
commitCh chan committedCommand
stopper chan struct{}
}

Expand All @@ -64,7 +69,7 @@ func newSingleNodeRaft() *singleNodeRaft {
snr := &singleNodeRaft{
mr: mr,
groups: map[int64]struct{}{},
commitCh: make(chan proto.InternalRaftCommand, 10),
commitCh: make(chan committedCommand, 10),
stopper: make(chan struct{}),
}
mr.Start()
Expand All @@ -74,7 +79,7 @@ func newSingleNodeRaft() *singleNodeRaft {

var _ raft = (*singleNodeRaft)(nil)

func (snr *singleNodeRaft) propose(cmd proto.InternalRaftCommand) {
func (snr *singleNodeRaft) propose(cmdIDKey cmdIDKey, cmd proto.InternalRaftCommand) {
snr.mu.Lock()
defer snr.mu.Unlock()
if _, ok := snr.groups[cmd.RaftID]; !ok {
Expand All @@ -89,10 +94,10 @@ func (snr *singleNodeRaft) propose(cmd proto.InternalRaftCommand) {
if err != nil {
log.Fatal(err)
}
snr.mr.SubmitCommand(uint64(cmd.RaftID), data)
snr.mr.SubmitCommand(uint64(cmd.RaftID), []byte(cmdIDKey), data)
}

func (snr *singleNodeRaft) committed() <-chan proto.InternalRaftCommand {
func (snr *singleNodeRaft) committed() <-chan committedCommand {
return snr.commitCh
}

Expand All @@ -111,7 +116,7 @@ func (snr *singleNodeRaft) run() {
if err != nil {
log.Fatal(err)
}
snr.commitCh <- cmd
snr.commitCh <- committedCommand{cmdIDKey(e.CommandID), cmd}
}
case <-snr.stopper:
snr.mr.Stop()
Expand Down
15 changes: 8 additions & 7 deletions storage/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type RangeManager interface {
AddRange(rng *Range) error
RemoveRange(rng *Range) error
CreateSnapshot() (string, error)
ProposeRaftCommand(proto.InternalRaftCommand)
ProposeRaftCommand(cmdIDKey, proto.InternalRaftCommand)
}

// A Range is a contiguous keyspace with writes managed via an
Expand Down Expand Up @@ -427,21 +427,23 @@ func (r *Range) addReadWriteCmd(method string, args proto.Request, reply proto.R
raftCmd := proto.InternalRaftCommand{
RaftID: r.Desc.RaftID,
}
var cmdID proto.ClientCmdID
if !args.Header().CmdID.IsEmpty() {
raftCmd.CmdID = args.Header().CmdID
cmdID = args.Header().CmdID
} else {
raftCmd.CmdID = proto.ClientCmdID{
cmdID = proto.ClientCmdID{
WallTime: r.rm.Clock().PhysicalNow(),
Random: rand.Int63(),
}
}
idKey := makeCmdIDKey(cmdID)
r.Lock()
r.pendingCmds[makeCmdIDKey(raftCmd.CmdID)] = pendingCmd
r.pendingCmds[idKey] = pendingCmd
r.Unlock()
// TODO(bdarnell): In certain raft failover scenarios, proposed
// commands may be abandoned. We need to re-propose the command
// if too much time passes with no response on the done channel.
r.rm.ProposeRaftCommand(raftCmd)
r.rm.ProposeRaftCommand(idKey, raftCmd)

// Create a completion func for mandatory cleanups which we either
// run synchronously if we're waiting or in a goroutine otherwise.
Expand Down Expand Up @@ -474,8 +476,7 @@ func (r *Range) addReadWriteCmd(method string, args proto.Request, reply proto.R
return nil
}

func (r *Range) processRaftCommand(raftCmd proto.InternalRaftCommand) {
idKey := makeCmdIDKey(raftCmd.CmdID)
func (r *Range) processRaftCommand(idKey cmdIDKey, raftCmd proto.InternalRaftCommand) {
r.Lock()
cmd := r.pendingCmds[idKey]
delete(r.pendingCmds, idKey)
Expand Down
4 changes: 2 additions & 2 deletions storage/range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1548,12 +1548,12 @@ func TestRemoteRaftCommand(t *testing.T) {
// Send an increment direct to raft.
remoteIncArgs, _ := incrementArgs([]byte("a"), 2, 1, s.StoreID())
remoteIncArgs.Timestamp = proto.MinTimestamp
idKey := makeCmdIDKey(proto.ClientCmdID{WallTime: 1, Random: 1})
raftCmd := proto.InternalRaftCommand{
CmdID: proto.ClientCmdID{WallTime: 1, Random: 1},
RaftID: r.Desc.RaftID,
}
raftCmd.Cmd.SetValue(remoteIncArgs)
r.rm.ProposeRaftCommand(raftCmd)
r.rm.ProposeRaftCommand(idKey, raftCmd)

// Send an increment through the normal flow, since this is our
// simplest way of waiting until this command (and all earlier ones)
Expand Down
12 changes: 5 additions & 7 deletions storage/response_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ import (
"github.com/cockroachdb/cockroach/util/log"
)

type cmdIDKey struct {
walltime, random int64
}
type cmdIDKey string

func makeCmdIDKey(cmdID proto.ClientCmdID) cmdIDKey {
return cmdIDKey{
walltime: cmdID.WallTime,
random: cmdID.Random,
}
buf := make([]byte, 0, 16)
buf = encoding.EncodeUint64(buf, uint64(cmdID.WallTime))
buf = encoding.EncodeUint64(buf, uint64(cmdID.Random))
return cmdIDKey(string(buf))
}

// A ResponseCache provides idempotence for request retries. Each
Expand Down
11 changes: 6 additions & 5 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,13 +790,13 @@ func (s *Store) maybeResolveWriteIntentError(rng *Range, method string, args pro
}

// ProposeRaftCommand submits a command to raft.
func (s *Store) ProposeRaftCommand(cmd proto.InternalRaftCommand) {
func (s *Store) ProposeRaftCommand(idKey cmdIDKey, cmd proto.InternalRaftCommand) {
// s.raft should be constant throughout the life of the store, but
// the race detector reports a race between this method and s.Stop.
s.mu.RLock()
defer s.mu.RUnlock()

s.raft.propose(cmd)
s.raft.propose(idKey, cmd)
}

// processRaft processes read/write commands that have been committed
Expand Down Expand Up @@ -826,12 +826,13 @@ func (s *Store) processRaft(r raft, closer chan struct{}) {
select {
case raftCmd := <-r.committed():
s.mu.Lock()
r, ok := s.ranges[raftCmd.RaftID]
r, ok := s.ranges[raftCmd.cmd.RaftID]
s.mu.Unlock()
if !ok {
log.Errorf("got committed raft command for %d but have no range with that ID", raftCmd.RaftID)
log.Errorf("got committed raft command for %d but have no range with that ID",
raftCmd.cmd.RaftID)
} else {
r.processRaftCommand(raftCmd)
r.processRaftCommand(raftCmd.cmdIDKey, raftCmd.cmd)
}

case <-closer:
Expand Down

0 comments on commit a610c9a

Please sign in to comment.