Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove CmdID from InternalRaftCommand and pass it to raft separately. #211

Merged
merged 2 commits into from
Dec 10, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion multiraft/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package multiraft

import "github.com/cockroachdb/cockroach/util/log"

// An EventLeaderElection is broadcast when a group completes an election.
type EventLeaderElection struct {
GroupID uint64
Expand All @@ -25,5 +27,32 @@ type EventLeaderElection struct {

// An EventCommandCommitted is broadcast whenever a command has been committed.
type EventCommandCommitted struct {
Command []byte
CommandID []byte
Command []byte
}

// Commands are encoded with a 1-byte version (currently 0), a 16-byte ID,
// followed by the payload. This inflexible encoding is used so we can efficiently
// parse the command id while processing the logs.
const (
commandIDLen = 16
commandEncodingVersion = 0
)

func encodeCommand(commandID, command []byte) []byte {
if len(commandID) != commandIDLen {
log.Fatalf("invalid command ID length; %d != %d", len(commandID), commandIDLen)
}
x := make([]byte, 1, 1+commandIDLen+len(command))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the first version a const up with commandIDLen and explicitly append it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

x[0] = commandEncodingVersion
x = append(x, commandID...)
x = append(x, command...)
return x
}

func decodeCommand(data []byte) (commandID, command []byte) {
if data[0] != commandEncodingVersion {
log.Fatalf("unknown command encoding version %v", data[0])
}
return data[1 : 1+commandIDLen], data[1+commandIDLen:]
}
19 changes: 11 additions & 8 deletions multiraft/multiraft.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,21 @@ func (m *MultiRaft) CreateGroup(groupID uint64, initialMembers []uint64) error {
// when the command has been successfully sent, not when it has been committed.
// TODO(bdarnell): should SubmitCommand wait until the commit?
// TODO(bdarnell): what do we do if we lose leadership before a command we proposed commits?
func (m *MultiRaft) SubmitCommand(groupID uint64, command []byte) error {
func (m *MultiRaft) SubmitCommand(groupID uint64, commandID []byte, command []byte) error {
log.V(6).Infof("node %v submitting command to group %v", m.nodeID, groupID)
return m.multiNode.Propose(context.Background(), groupID, command)
return m.multiNode.Propose(context.Background(), groupID, encodeCommand(commandID, command))
}

// ChangeGroupMembership submits a proposed membership change to the cluster.
// TODO(bdarnell): same concerns as SubmitCommand
func (m *MultiRaft) ChangeGroupMembership(groupID uint64, changeType raftpb.ConfChangeType,
nodeID uint64) error {
func (m *MultiRaft) ChangeGroupMembership(groupID uint64, commandID []byte,
changeType raftpb.ConfChangeType, nodeID uint64) error {
log.V(6).Infof("node %v proposing membership change to group %v", m.nodeID, groupID)
return m.multiNode.ProposeConfChange(context.Background(), groupID,
raftpb.ConfChange{
Type: changeType,
NodeID: nodeID,
Type: changeType,
NodeID: nodeID,
Context: encodeCommand(commandID, nil),
})
}

Expand Down Expand Up @@ -403,9 +404,10 @@ func (s *state) handleWriteResponse(response *writeResponse, readyGroups map[uin
for _, entry := range ready.CommittedEntries {
switch entry.Type {
case raftpb.EntryNormal:
// TODO(bdarnell): etcd raft adds a nil entry upon election; should this be given a different Type?
// etcd raft occasionally adds a nil entry (e.g. upon election); ignore these.
if entry.Data != nil {
s.sendEvent(&EventCommandCommitted{entry.Data})
commandID, command := decodeCommand(entry.Data)
s.sendEvent(&EventCommandCommitted{commandID, command})
}
case raftpb.EntryConfChange:
cc := raftpb.ConfChange{}
Expand All @@ -414,6 +416,7 @@ func (s *state) handleWriteResponse(response *writeResponse, readyGroups map[uin
log.Fatalf("invalid ConfChange data: %s", err)
}
log.V(3).Infof("node %v applying configuration change %v", s.nodeID, cc)
// TODO(bdarnell): dedupe by extracting commandID from cc.Context.
s.multiNode.ApplyConfChange(groupID, cc)
}
}
Expand Down
14 changes: 11 additions & 3 deletions multiraft/multiraft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
"github.com/coreos/etcd/raft/raftpb"
)

var rand = util.NewPseudoRand()

func makeCommandID() []byte {
return []byte(util.RandString(rand, commandIDLen))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RandString actually leaves a lot of bits on the floor and is extraordinarily inefficient if all we're trying to do is generate 16 bytes. Let's just get two random int64s and append them using:

return encoding.EncodeUint64(encoding.EncodeUint64(uint64(rand.Int63)), uint64(rand.Int63))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh forget it...this is just test code.

}

type testCluster struct {
t *testing.T
nodes []*state
Expand Down Expand Up @@ -130,7 +137,7 @@ func TestCommand(t *testing.T) {
cluster.waitForElection(0)

// Submit a command to the leader
cluster.nodes[0].SubmitCommand(groupID, []byte("command"))
cluster.nodes[0].SubmitCommand(groupID, makeCommandID(), []byte("command"))

// The command will be committed on each node.
for i, events := range cluster.events {
Expand All @@ -156,7 +163,7 @@ func TestSlowStorage(t *testing.T) {
cluster.storages[2].Block()

// Submit a command to the leader
cluster.nodes[0].SubmitCommand(groupID, []byte("command"))
cluster.nodes[0].SubmitCommand(groupID, makeCommandID(), []byte("command"))

// Even with the third node blocked, the other nodes can make progress.
for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -196,7 +203,8 @@ func TestMembershipChange(t *testing.T) {

// Add each of the other three nodes to the cluster.
for i := 1; i < 4; i++ {
err := cluster.nodes[0].ChangeGroupMembership(groupID, raftpb.ConfChangeAddNode,
err := cluster.nodes[0].ChangeGroupMembership(groupID, makeCommandID(),
raftpb.ConfChangeAddNode,
cluster.nodes[i].nodeID)
if err != nil {
t.Fatal(err)
Expand Down
4 changes: 0 additions & 4 deletions proto/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,6 @@ message InternalRaftCommandUnion {
// An InternalRaftCommand is a command which can be serialized and
// sent via raft.
message InternalRaftCommand {
// CmdID is used to identify a command when it has been
// committed. If the client specified a CmdID in its RequestHeader,
// that is used; otherwise the server generates an ID.
optional ClientCmdID cmd_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "CmdID"];
optional int64 raft_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "RaftID"];
optional InternalRaftCommandUnion cmd = 3 [(gogoproto.nullable) = false];
}
Expand Down
21 changes: 13 additions & 8 deletions storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,22 @@ import (
"github.com/cockroachdb/cockroach/proto"
)

type committedCommand struct {
cmdIDKey cmdIDKey
cmd proto.InternalRaftCommand
}

// raft is the interface exposed by a raft implementation.
type raft interface {
// propose a command to raft. If accepted by the consensus protocol it will
// eventually appear in the committed channel, but this is not guaranteed
// so callers may need to retry.
propose(proto.InternalRaftCommand)
propose(cmdIDKey, proto.InternalRaftCommand)

// committed returns a channel that yields commands as they are
// committed. Note that this includes commands proposed by this node
// and others.
committed() <-chan proto.InternalRaftCommand
committed() <-chan committedCommand

stop()
}
Expand All @@ -46,7 +51,7 @@ type singleNodeRaft struct {
mr *multiraft.MultiRaft
mu sync.Mutex
groups map[int64]struct{}
commitCh chan proto.InternalRaftCommand
commitCh chan committedCommand
stopper chan struct{}
}

Expand All @@ -64,7 +69,7 @@ func newSingleNodeRaft() *singleNodeRaft {
snr := &singleNodeRaft{
mr: mr,
groups: map[int64]struct{}{},
commitCh: make(chan proto.InternalRaftCommand, 10),
commitCh: make(chan committedCommand, 10),
stopper: make(chan struct{}),
}
mr.Start()
Expand All @@ -74,7 +79,7 @@ func newSingleNodeRaft() *singleNodeRaft {

var _ raft = (*singleNodeRaft)(nil)

func (snr *singleNodeRaft) propose(cmd proto.InternalRaftCommand) {
func (snr *singleNodeRaft) propose(cmdIDKey cmdIDKey, cmd proto.InternalRaftCommand) {
snr.mu.Lock()
defer snr.mu.Unlock()
if _, ok := snr.groups[cmd.RaftID]; !ok {
Expand All @@ -89,10 +94,10 @@ func (snr *singleNodeRaft) propose(cmd proto.InternalRaftCommand) {
if err != nil {
log.Fatal(err)
}
snr.mr.SubmitCommand(uint64(cmd.RaftID), data)
snr.mr.SubmitCommand(uint64(cmd.RaftID), []byte(cmdIDKey), data)
}

func (snr *singleNodeRaft) committed() <-chan proto.InternalRaftCommand {
func (snr *singleNodeRaft) committed() <-chan committedCommand {
return snr.commitCh
}

Expand All @@ -111,7 +116,7 @@ func (snr *singleNodeRaft) run() {
if err != nil {
log.Fatal(err)
}
snr.commitCh <- cmd
snr.commitCh <- committedCommand{cmdIDKey(e.CommandID), cmd}
}
case <-snr.stopper:
snr.mr.Stop()
Expand Down
15 changes: 8 additions & 7 deletions storage/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type RangeManager interface {
AddRange(rng *Range) error
RemoveRange(rng *Range) error
CreateSnapshot() (string, error)
ProposeRaftCommand(proto.InternalRaftCommand)
ProposeRaftCommand(cmdIDKey, proto.InternalRaftCommand)
}

// A Range is a contiguous keyspace with writes managed via an
Expand Down Expand Up @@ -427,21 +427,23 @@ func (r *Range) addReadWriteCmd(method string, args proto.Request, reply proto.R
raftCmd := proto.InternalRaftCommand{
RaftID: r.Desc.RaftID,
}
var cmdID proto.ClientCmdID
if !args.Header().CmdID.IsEmpty() {
raftCmd.CmdID = args.Header().CmdID
cmdID = args.Header().CmdID
} else {
raftCmd.CmdID = proto.ClientCmdID{
cmdID = proto.ClientCmdID{
WallTime: r.rm.Clock().PhysicalNow(),
Random: rand.Int63(),
}
}
idKey := makeCmdIDKey(cmdID)
r.Lock()
r.pendingCmds[makeCmdIDKey(raftCmd.CmdID)] = pendingCmd
r.pendingCmds[idKey] = pendingCmd
r.Unlock()
// TODO(bdarnell): In certain raft failover scenarios, proposed
// commands may be abandoned. We need to re-propose the command
// if too much time passes with no response on the done channel.
r.rm.ProposeRaftCommand(raftCmd)
r.rm.ProposeRaftCommand(idKey, raftCmd)

// Create a completion func for mandatory cleanups which we either
// run synchronously if we're waiting or in a goroutine otherwise.
Expand Down Expand Up @@ -474,8 +476,7 @@ func (r *Range) addReadWriteCmd(method string, args proto.Request, reply proto.R
return nil
}

func (r *Range) processRaftCommand(raftCmd proto.InternalRaftCommand) {
idKey := makeCmdIDKey(raftCmd.CmdID)
func (r *Range) processRaftCommand(idKey cmdIDKey, raftCmd proto.InternalRaftCommand) {
r.Lock()
cmd := r.pendingCmds[idKey]
delete(r.pendingCmds, idKey)
Expand Down
4 changes: 2 additions & 2 deletions storage/range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1548,12 +1548,12 @@ func TestRemoteRaftCommand(t *testing.T) {
// Send an increment direct to raft.
remoteIncArgs, _ := incrementArgs([]byte("a"), 2, 1, s.StoreID())
remoteIncArgs.Timestamp = proto.MinTimestamp
idKey := makeCmdIDKey(proto.ClientCmdID{WallTime: 1, Random: 1})
raftCmd := proto.InternalRaftCommand{
CmdID: proto.ClientCmdID{WallTime: 1, Random: 1},
RaftID: r.Desc.RaftID,
}
raftCmd.Cmd.SetValue(remoteIncArgs)
r.rm.ProposeRaftCommand(raftCmd)
r.rm.ProposeRaftCommand(idKey, raftCmd)

// Send an increment through the normal flow, since this is our
// simplest way of waiting until this command (and all earlier ones)
Expand Down
12 changes: 5 additions & 7 deletions storage/response_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ import (
"github.com/cockroachdb/cockroach/util/log"
)

type cmdIDKey struct {
walltime, random int64
}
type cmdIDKey string

func makeCmdIDKey(cmdID proto.ClientCmdID) cmdIDKey {
return cmdIDKey{
walltime: cmdID.WallTime,
random: cmdID.Random,
}
buf := make([]byte, 0, 16)
buf = encoding.EncodeUint64(buf, uint64(cmdID.WallTime))
buf = encoding.EncodeUint64(buf, uint64(cmdID.Random))
return cmdIDKey(string(buf))
}

// A ResponseCache provides idempotence for request retries. Each
Expand Down
11 changes: 6 additions & 5 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,13 +790,13 @@ func (s *Store) maybeResolveWriteIntentError(rng *Range, method string, args pro
}

// ProposeRaftCommand submits a command to raft.
func (s *Store) ProposeRaftCommand(cmd proto.InternalRaftCommand) {
func (s *Store) ProposeRaftCommand(idKey cmdIDKey, cmd proto.InternalRaftCommand) {
// s.raft should be constant throughout the life of the store, but
// the race detector reports a race between this method and s.Stop.
s.mu.RLock()
defer s.mu.RUnlock()

s.raft.propose(cmd)
s.raft.propose(idKey, cmd)
}

// processRaft processes read/write commands that have been committed
Expand Down Expand Up @@ -826,12 +826,13 @@ func (s *Store) processRaft(r raft, closer chan struct{}) {
select {
case raftCmd := <-r.committed():
s.mu.Lock()
r, ok := s.ranges[raftCmd.RaftID]
r, ok := s.ranges[raftCmd.cmd.RaftID]
s.mu.Unlock()
if !ok {
log.Errorf("got committed raft command for %d but have no range with that ID", raftCmd.RaftID)
log.Errorf("got committed raft command for %d but have no range with that ID",
raftCmd.cmd.RaftID)
} else {
r.processRaftCommand(raftCmd)
r.processRaftCommand(raftCmd.cmdIDKey, raftCmd.cmd)
}

case <-closer:
Expand Down