From 559c53b202a961940cbd0d5be04001ecf47e5f0e Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Wed, 10 Dec 2014 17:09:28 -0500 Subject: [PATCH 1/2] Remove CmdID from InternalRaftCommand and pass it to raft separately. Multiraft needs a command identifier so it can track pending commands and resubmit them when an election has caused them to be dropped. --- _vendor/src/github.com/coreos/etcd | 2 +- multiraft/events.go | 27 ++++++++++++++++++++++++++- multiraft/multiraft.go | 19 +++++++++++-------- multiraft/multiraft_test.go | 14 +++++++++++--- proto/internal.proto | 2 +- storage/raft.go | 21 +++++++++++++-------- storage/range.go | 15 ++++++++------- storage/range_test.go | 4 ++-- storage/response_cache.go | 18 ++++++++++++------ storage/store.go | 11 ++++++----- 10 files changed, 91 insertions(+), 42 deletions(-) diff --git a/_vendor/src/github.com/coreos/etcd b/_vendor/src/github.com/coreos/etcd index c0060d47b9aa..d3435fa04a84 160000 --- a/_vendor/src/github.com/coreos/etcd +++ b/_vendor/src/github.com/coreos/etcd @@ -1 +1 @@ -Subproject commit c0060d47b9aa8f6dbe8bdc2bfa78d011c3f2722a +Subproject commit d3435fa04a84278c5ed019f30ed5d7606cf87c17 diff --git a/multiraft/events.go b/multiraft/events.go index 06f6efa17037..e22be1800d6f 100644 --- a/multiraft/events.go +++ b/multiraft/events.go @@ -17,6 +17,8 @@ package multiraft +import "log" + // An EventLeaderElection is broadcast when a group completes an election. type EventLeaderElection struct { GroupID uint64 @@ -25,5 +27,28 @@ type EventLeaderElection struct { // An EventCommandCommitted is broadcast whenever a command has been committed. type EventCommandCommitted struct { - Command []byte + CommandID []byte + Command []byte +} + +// Commands are encoded with a 1-byte version (currently 0), a 16-byte ID, +// followed by the payload. This inflexible encoding is used so we can efficiently +// parse the command id while processing the logs. +const commandIDLen = 16 + +func encodeCommand(commandID, command []byte) []byte { + if len(commandID) != commandIDLen { + log.Fatalf("invalid command ID length; %d != %d", len(commandID), commandIDLen) + } + x := make([]byte, 1, 1+commandIDLen+len(command)) + x = append(x, commandID...) + x = append(x, command...) + return x +} + +func decodeCommand(data []byte) (commandID, command []byte) { + if data[0] != 0 { + log.Fatalf("unknown command encoding version %v", data[0]) + } + return data[1 : 1+commandIDLen], data[1+commandIDLen:] } diff --git a/multiraft/multiraft.go b/multiraft/multiraft.go index 37274f0635eb..5d316e80c076 100644 --- a/multiraft/multiraft.go +++ b/multiraft/multiraft.go @@ -176,20 +176,21 @@ func (m *MultiRaft) CreateGroup(groupID uint64, initialMembers []uint64) error { // when the command has been successfully sent, not when it has been committed. // TODO(bdarnell): should SubmitCommand wait until the commit? // TODO(bdarnell): what do we do if we lose leadership before a command we proposed commits? -func (m *MultiRaft) SubmitCommand(groupID uint64, command []byte) error { +func (m *MultiRaft) SubmitCommand(commandID []byte, groupID uint64, command []byte) error { log.V(6).Infof("node %v submitting command to group %v", m.nodeID, groupID) - return m.multiNode.Propose(context.Background(), groupID, command) + return m.multiNode.Propose(context.Background(), groupID, encodeCommand(commandID, command)) } // ChangeGroupMembership submits a proposed membership change to the cluster. // TODO(bdarnell): same concerns as SubmitCommand -func (m *MultiRaft) ChangeGroupMembership(groupID uint64, changeType raftpb.ConfChangeType, - nodeID uint64) error { +func (m *MultiRaft) ChangeGroupMembership(commandID []byte, groupID uint64, + changeType raftpb.ConfChangeType, nodeID uint64) error { log.V(6).Infof("node %v proposing membership change to group %v", m.nodeID, groupID) return m.multiNode.ProposeConfChange(context.Background(), groupID, raftpb.ConfChange{ - Type: changeType, - NodeID: nodeID, + Type: changeType, + NodeID: nodeID, + Context: encodeCommand(commandID, nil), }) } @@ -403,9 +404,10 @@ func (s *state) handleWriteResponse(response *writeResponse, readyGroups map[uin for _, entry := range ready.CommittedEntries { switch entry.Type { case raftpb.EntryNormal: - // TODO(bdarnell): etcd raft adds a nil entry upon election; should this be given a different Type? + // etcd raft occasionally adds a nil entry (e.g. upon election); ignore these. if entry.Data != nil { - s.sendEvent(&EventCommandCommitted{entry.Data}) + commandID, command := decodeCommand(entry.Data) + s.sendEvent(&EventCommandCommitted{commandID, command}) } case raftpb.EntryConfChange: cc := raftpb.ConfChange{} @@ -414,6 +416,7 @@ func (s *state) handleWriteResponse(response *writeResponse, readyGroups map[uin log.Fatalf("invalid ConfChange data: %s", err) } log.V(3).Infof("node %v applying configuration change %v", s.nodeID, cc) + // TODO(bdarnell): dedupe by extracting commandID from cc.Context. s.multiNode.ApplyConfChange(groupID, cc) } } diff --git a/multiraft/multiraft_test.go b/multiraft/multiraft_test.go index 456ed07e1ef4..79f7ec04efcc 100644 --- a/multiraft/multiraft_test.go +++ b/multiraft/multiraft_test.go @@ -21,10 +21,17 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/util" "github.com/cockroachdb/cockroach/util/log" "github.com/coreos/etcd/raft/raftpb" ) +var rand = util.NewPseudoRand() + +func makeCommandID() []byte { + return []byte(util.RandString(rand, commandIDLen)) +} + type testCluster struct { t *testing.T nodes []*state @@ -130,7 +137,7 @@ func TestCommand(t *testing.T) { cluster.waitForElection(0) // Submit a command to the leader - cluster.nodes[0].SubmitCommand(groupID, []byte("command")) + cluster.nodes[0].SubmitCommand(makeCommandID(), groupID, []byte("command")) // The command will be committed on each node. for i, events := range cluster.events { @@ -156,7 +163,7 @@ func TestSlowStorage(t *testing.T) { cluster.storages[2].Block() // Submit a command to the leader - cluster.nodes[0].SubmitCommand(groupID, []byte("command")) + cluster.nodes[0].SubmitCommand(makeCommandID(), groupID, []byte("command")) // Even with the third node blocked, the other nodes can make progress. for i := 0; i < 2; i++ { @@ -196,7 +203,8 @@ func TestMembershipChange(t *testing.T) { // Add each of the other three nodes to the cluster. for i := 1; i < 4; i++ { - err := cluster.nodes[0].ChangeGroupMembership(groupID, raftpb.ConfChangeAddNode, + err := cluster.nodes[0].ChangeGroupMembership(makeCommandID(), groupID, + raftpb.ConfChangeAddNode, cluster.nodes[i].nodeID) if err != nil { t.Fatal(err) diff --git a/proto/internal.proto b/proto/internal.proto index 8a8dfa456530..9a3857d19a6a 100644 --- a/proto/internal.proto +++ b/proto/internal.proto @@ -200,7 +200,7 @@ message InternalRaftCommand { // CmdID is used to identify a command when it has been // committed. If the client specified a CmdID in its RequestHeader, // that is used; otherwise the server generates an ID. - optional ClientCmdID cmd_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "CmdID"]; + //optional ClientCmdID cmd_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "CmdID"]; optional int64 raft_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "RaftID"]; optional InternalRaftCommandUnion cmd = 3 [(gogoproto.nullable) = false]; } diff --git a/storage/raft.go b/storage/raft.go index 6162a0c5e360..0b8cf4a6d1f2 100644 --- a/storage/raft.go +++ b/storage/raft.go @@ -27,17 +27,22 @@ import ( "github.com/cockroachdb/cockroach/proto" ) +type committedCommand struct { + cmdIDKey cmdIDKey + cmd proto.InternalRaftCommand +} + // raft is the interface exposed by a raft implementation. type raft interface { // propose a command to raft. If accepted by the consensus protocol it will // eventually appear in the committed channel, but this is not guaranteed // so callers may need to retry. - propose(proto.InternalRaftCommand) + propose(cmdIDKey, proto.InternalRaftCommand) // committed returns a channel that yields commands as they are // committed. Note that this includes commands proposed by this node // and others. - committed() <-chan proto.InternalRaftCommand + committed() <-chan committedCommand stop() } @@ -46,7 +51,7 @@ type singleNodeRaft struct { mr *multiraft.MultiRaft mu sync.Mutex groups map[int64]struct{} - commitCh chan proto.InternalRaftCommand + commitCh chan committedCommand stopper chan struct{} } @@ -64,7 +69,7 @@ func newSingleNodeRaft() *singleNodeRaft { snr := &singleNodeRaft{ mr: mr, groups: map[int64]struct{}{}, - commitCh: make(chan proto.InternalRaftCommand, 10), + commitCh: make(chan committedCommand, 10), stopper: make(chan struct{}), } mr.Start() @@ -74,7 +79,7 @@ func newSingleNodeRaft() *singleNodeRaft { var _ raft = (*singleNodeRaft)(nil) -func (snr *singleNodeRaft) propose(cmd proto.InternalRaftCommand) { +func (snr *singleNodeRaft) propose(cmdIDKey cmdIDKey, cmd proto.InternalRaftCommand) { snr.mu.Lock() defer snr.mu.Unlock() if _, ok := snr.groups[cmd.RaftID]; !ok { @@ -89,10 +94,10 @@ func (snr *singleNodeRaft) propose(cmd proto.InternalRaftCommand) { if err != nil { log.Fatal(err) } - snr.mr.SubmitCommand(uint64(cmd.RaftID), data) + snr.mr.SubmitCommand([]byte(cmdIDKey), uint64(cmd.RaftID), data) } -func (snr *singleNodeRaft) committed() <-chan proto.InternalRaftCommand { +func (snr *singleNodeRaft) committed() <-chan committedCommand { return snr.commitCh } @@ -111,7 +116,7 @@ func (snr *singleNodeRaft) run() { if err != nil { log.Fatal(err) } - snr.commitCh <- cmd + snr.commitCh <- committedCommand{cmdIDKey(e.CommandID), cmd} } case <-snr.stopper: snr.mr.Stop() diff --git a/storage/range.go b/storage/range.go index 1a39660546aa..bf0ed9f206fb 100644 --- a/storage/range.go +++ b/storage/range.go @@ -142,7 +142,7 @@ type RangeManager interface { AddRange(rng *Range) error RemoveRange(rng *Range) error CreateSnapshot() (string, error) - ProposeRaftCommand(proto.InternalRaftCommand) + ProposeRaftCommand(cmdIDKey, proto.InternalRaftCommand) } // A Range is a contiguous keyspace with writes managed via an @@ -427,21 +427,23 @@ func (r *Range) addReadWriteCmd(method string, args proto.Request, reply proto.R raftCmd := proto.InternalRaftCommand{ RaftID: r.Desc.RaftID, } + var cmdID proto.ClientCmdID if !args.Header().CmdID.IsEmpty() { - raftCmd.CmdID = args.Header().CmdID + cmdID = args.Header().CmdID } else { - raftCmd.CmdID = proto.ClientCmdID{ + cmdID = proto.ClientCmdID{ WallTime: r.rm.Clock().PhysicalNow(), Random: rand.Int63(), } } + idKey := makeCmdIDKey(cmdID) r.Lock() - r.pendingCmds[makeCmdIDKey(raftCmd.CmdID)] = pendingCmd + r.pendingCmds[idKey] = pendingCmd r.Unlock() // TODO(bdarnell): In certain raft failover scenarios, proposed // commands may be abandoned. We need to re-propose the command // if too much time passes with no response on the done channel. - r.rm.ProposeRaftCommand(raftCmd) + r.rm.ProposeRaftCommand(idKey, raftCmd) // Create a completion func for mandatory cleanups which we either // run synchronously if we're waiting or in a goroutine otherwise. @@ -474,8 +476,7 @@ func (r *Range) addReadWriteCmd(method string, args proto.Request, reply proto.R return nil } -func (r *Range) processRaftCommand(raftCmd proto.InternalRaftCommand) { - idKey := makeCmdIDKey(raftCmd.CmdID) +func (r *Range) processRaftCommand(idKey cmdIDKey, raftCmd proto.InternalRaftCommand) { r.Lock() cmd := r.pendingCmds[idKey] delete(r.pendingCmds, idKey) diff --git a/storage/range_test.go b/storage/range_test.go index 6bd7df21c90f..6cc8153a8f59 100644 --- a/storage/range_test.go +++ b/storage/range_test.go @@ -1548,12 +1548,12 @@ func TestRemoteRaftCommand(t *testing.T) { // Send an increment direct to raft. remoteIncArgs, _ := incrementArgs([]byte("a"), 2, 1, s.StoreID()) remoteIncArgs.Timestamp = proto.MinTimestamp + idKey := makeCmdIDKey(proto.ClientCmdID{WallTime: 1, Random: 1}) raftCmd := proto.InternalRaftCommand{ - CmdID: proto.ClientCmdID{WallTime: 1, Random: 1}, RaftID: r.Desc.RaftID, } raftCmd.Cmd.SetValue(remoteIncArgs) - r.rm.ProposeRaftCommand(raftCmd) + r.rm.ProposeRaftCommand(idKey, raftCmd) // Send an increment through the normal flow, since this is our // simplest way of waiting until this command (and all earlier ones) diff --git a/storage/response_cache.go b/storage/response_cache.go index 479062d0e113..45b2464c3cbc 100644 --- a/storage/response_cache.go +++ b/storage/response_cache.go @@ -18,6 +18,8 @@ package storage import ( + "bytes" + "encoding/binary" "fmt" "sync" @@ -29,15 +31,19 @@ import ( "github.com/cockroachdb/cockroach/util/log" ) -type cmdIDKey struct { - walltime, random int64 -} +type cmdIDKey string func makeCmdIDKey(cmdID proto.ClientCmdID) cmdIDKey { - return cmdIDKey{ - walltime: cmdID.WallTime, - random: cmdID.Random, + buf := bytes.NewBuffer(make([]byte, 0, 16)) + err := binary.Write(buf, binary.BigEndian, cmdID.WallTime) + if err != nil { + panic(err) + } + err = binary.Write(buf, binary.BigEndian, cmdID.Random) + if err != nil { + panic(err) } + return cmdIDKey(buf.String()) } // A ResponseCache provides idempotence for request retries. Each diff --git a/storage/store.go b/storage/store.go index 038878ba5dff..e32f20bf4945 100644 --- a/storage/store.go +++ b/storage/store.go @@ -790,13 +790,13 @@ func (s *Store) maybeResolveWriteIntentError(rng *Range, method string, args pro } // ProposeRaftCommand submits a command to raft. -func (s *Store) ProposeRaftCommand(cmd proto.InternalRaftCommand) { +func (s *Store) ProposeRaftCommand(idKey cmdIDKey, cmd proto.InternalRaftCommand) { // s.raft should be constant throughout the life of the store, but // the race detector reports a race between this method and s.Stop. s.mu.RLock() defer s.mu.RUnlock() - s.raft.propose(cmd) + s.raft.propose(idKey, cmd) } // processRaft processes read/write commands that have been committed @@ -826,12 +826,13 @@ func (s *Store) processRaft(r raft, closer chan struct{}) { select { case raftCmd := <-r.committed(): s.mu.Lock() - r, ok := s.ranges[raftCmd.RaftID] + r, ok := s.ranges[raftCmd.cmd.RaftID] s.mu.Unlock() if !ok { - log.Errorf("got committed raft command for %d but have no range with that ID", raftCmd.RaftID) + log.Errorf("got committed raft command for %d but have no range with that ID", + raftCmd.cmd.RaftID) } else { - r.processRaftCommand(raftCmd) + r.processRaftCommand(raftCmd.cmdIDKey, raftCmd.cmd) } case <-closer: From f935f92d9a378a841a3186fadfe26a843adb18c9 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Wed, 10 Dec 2014 18:12:54 -0500 Subject: [PATCH 2/2] PR feedback for InternalRaftCommand.CmdID removal. --- multiraft/events.go | 10 +++++++--- multiraft/multiraft.go | 4 ++-- multiraft/multiraft_test.go | 6 +++--- proto/internal.proto | 4 ---- storage/raft.go | 2 +- storage/response_cache.go | 16 ++++------------ 6 files changed, 17 insertions(+), 25 deletions(-) diff --git a/multiraft/events.go b/multiraft/events.go index e22be1800d6f..06cd2b75a937 100644 --- a/multiraft/events.go +++ b/multiraft/events.go @@ -17,7 +17,7 @@ package multiraft -import "log" +import "github.com/cockroachdb/cockroach/util/log" // An EventLeaderElection is broadcast when a group completes an election. type EventLeaderElection struct { @@ -34,20 +34,24 @@ type EventCommandCommitted struct { // Commands are encoded with a 1-byte version (currently 0), a 16-byte ID, // followed by the payload. This inflexible encoding is used so we can efficiently // parse the command id while processing the logs. -const commandIDLen = 16 +const ( + commandIDLen = 16 + commandEncodingVersion = 0 +) func encodeCommand(commandID, command []byte) []byte { if len(commandID) != commandIDLen { log.Fatalf("invalid command ID length; %d != %d", len(commandID), commandIDLen) } x := make([]byte, 1, 1+commandIDLen+len(command)) + x[0] = commandEncodingVersion x = append(x, commandID...) x = append(x, command...) return x } func decodeCommand(data []byte) (commandID, command []byte) { - if data[0] != 0 { + if data[0] != commandEncodingVersion { log.Fatalf("unknown command encoding version %v", data[0]) } return data[1 : 1+commandIDLen], data[1+commandIDLen:] diff --git a/multiraft/multiraft.go b/multiraft/multiraft.go index 5d316e80c076..4f55376657d7 100644 --- a/multiraft/multiraft.go +++ b/multiraft/multiraft.go @@ -176,14 +176,14 @@ func (m *MultiRaft) CreateGroup(groupID uint64, initialMembers []uint64) error { // when the command has been successfully sent, not when it has been committed. // TODO(bdarnell): should SubmitCommand wait until the commit? // TODO(bdarnell): what do we do if we lose leadership before a command we proposed commits? -func (m *MultiRaft) SubmitCommand(commandID []byte, groupID uint64, command []byte) error { +func (m *MultiRaft) SubmitCommand(groupID uint64, commandID []byte, command []byte) error { log.V(6).Infof("node %v submitting command to group %v", m.nodeID, groupID) return m.multiNode.Propose(context.Background(), groupID, encodeCommand(commandID, command)) } // ChangeGroupMembership submits a proposed membership change to the cluster. // TODO(bdarnell): same concerns as SubmitCommand -func (m *MultiRaft) ChangeGroupMembership(commandID []byte, groupID uint64, +func (m *MultiRaft) ChangeGroupMembership(groupID uint64, commandID []byte, changeType raftpb.ConfChangeType, nodeID uint64) error { log.V(6).Infof("node %v proposing membership change to group %v", m.nodeID, groupID) return m.multiNode.ProposeConfChange(context.Background(), groupID, diff --git a/multiraft/multiraft_test.go b/multiraft/multiraft_test.go index 79f7ec04efcc..1012f1f345ee 100644 --- a/multiraft/multiraft_test.go +++ b/multiraft/multiraft_test.go @@ -137,7 +137,7 @@ func TestCommand(t *testing.T) { cluster.waitForElection(0) // Submit a command to the leader - cluster.nodes[0].SubmitCommand(makeCommandID(), groupID, []byte("command")) + cluster.nodes[0].SubmitCommand(groupID, makeCommandID(), []byte("command")) // The command will be committed on each node. for i, events := range cluster.events { @@ -163,7 +163,7 @@ func TestSlowStorage(t *testing.T) { cluster.storages[2].Block() // Submit a command to the leader - cluster.nodes[0].SubmitCommand(makeCommandID(), groupID, []byte("command")) + cluster.nodes[0].SubmitCommand(groupID, makeCommandID(), []byte("command")) // Even with the third node blocked, the other nodes can make progress. for i := 0; i < 2; i++ { @@ -203,7 +203,7 @@ func TestMembershipChange(t *testing.T) { // Add each of the other three nodes to the cluster. for i := 1; i < 4; i++ { - err := cluster.nodes[0].ChangeGroupMembership(makeCommandID(), groupID, + err := cluster.nodes[0].ChangeGroupMembership(groupID, makeCommandID(), raftpb.ConfChangeAddNode, cluster.nodes[i].nodeID) if err != nil { diff --git a/proto/internal.proto b/proto/internal.proto index 9a3857d19a6a..87520c24f407 100644 --- a/proto/internal.proto +++ b/proto/internal.proto @@ -197,10 +197,6 @@ message InternalRaftCommandUnion { // An InternalRaftCommand is a command which can be serialized and // sent via raft. message InternalRaftCommand { - // CmdID is used to identify a command when it has been - // committed. If the client specified a CmdID in its RequestHeader, - // that is used; otherwise the server generates an ID. - //optional ClientCmdID cmd_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "CmdID"]; optional int64 raft_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "RaftID"]; optional InternalRaftCommandUnion cmd = 3 [(gogoproto.nullable) = false]; } diff --git a/storage/raft.go b/storage/raft.go index 0b8cf4a6d1f2..591d67cabfee 100644 --- a/storage/raft.go +++ b/storage/raft.go @@ -94,7 +94,7 @@ func (snr *singleNodeRaft) propose(cmdIDKey cmdIDKey, cmd proto.InternalRaftComm if err != nil { log.Fatal(err) } - snr.mr.SubmitCommand([]byte(cmdIDKey), uint64(cmd.RaftID), data) + snr.mr.SubmitCommand(uint64(cmd.RaftID), []byte(cmdIDKey), data) } func (snr *singleNodeRaft) committed() <-chan committedCommand { diff --git a/storage/response_cache.go b/storage/response_cache.go index 45b2464c3cbc..549ed0acdbe2 100644 --- a/storage/response_cache.go +++ b/storage/response_cache.go @@ -18,8 +18,6 @@ package storage import ( - "bytes" - "encoding/binary" "fmt" "sync" @@ -34,16 +32,10 @@ import ( type cmdIDKey string func makeCmdIDKey(cmdID proto.ClientCmdID) cmdIDKey { - buf := bytes.NewBuffer(make([]byte, 0, 16)) - err := binary.Write(buf, binary.BigEndian, cmdID.WallTime) - if err != nil { - panic(err) - } - err = binary.Write(buf, binary.BigEndian, cmdID.Random) - if err != nil { - panic(err) - } - return cmdIDKey(buf.String()) + buf := make([]byte, 0, 16) + buf = encoding.EncodeUint64(buf, uint64(cmdID.WallTime)) + buf = encoding.EncodeUint64(buf, uint64(cmdID.Random)) + return cmdIDKey(string(buf)) } // A ResponseCache provides idempotence for request retries. Each