-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove CmdID from InternalRaftCommand and pass it to raft separately. #211
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
+13 −0 | Documentation/0.5/admin_guide.md | |
+28 −5 | Documentation/0.5/other_apis.md | |
+6 −2 | Documentation/0.5/runtime-configuration.md | |
+3 −3 | etcdserver/etcdhttp/client_test.go | |
+34 −29 | etcdserver/server.go | |
+12 −23 | etcdserver/server_test.go | |
+5 −3 | raft/node.go | |
+7 −8 | raft/node_test.go | |
+91 −105 | raft/raft.go | |
+11 −5 | snap/snapshotter.go | |
+20 −0 | snap/snapshotter_test.go |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
|
||
package multiraft | ||
|
||
import "log" | ||
|
||
// An EventLeaderElection is broadcast when a group completes an election. | ||
type EventLeaderElection struct { | ||
GroupID uint64 | ||
|
@@ -25,5 +27,28 @@ type EventLeaderElection struct { | |
|
||
// An EventCommandCommitted is broadcast whenever a command has been committed. | ||
type EventCommandCommitted struct { | ||
Command []byte | ||
CommandID []byte | ||
Command []byte | ||
} | ||
|
||
// Commands are encoded with a 1-byte version (currently 0), a 16-byte ID, | ||
// followed by the payload. This inflexible encoding is used so we can efficiently | ||
// parse the command id while processing the logs. | ||
const commandIDLen = 16 | ||
|
||
func encodeCommand(commandID, command []byte) []byte { | ||
if len(commandID) != commandIDLen { | ||
log.Fatalf("invalid command ID length; %d != %d", len(commandID), commandIDLen) | ||
} | ||
x := make([]byte, 1, 1+commandIDLen+len(command)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make the first version a const up with commandIDLen and explicitly append it here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
x = append(x, commandID...) | ||
x = append(x, command...) | ||
return x | ||
} | ||
|
||
func decodeCommand(data []byte) (commandID, command []byte) { | ||
if data[0] != 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refer to currentVersion here. |
||
log.Fatalf("unknown command encoding version %v", data[0]) | ||
} | ||
return data[1 : 1+commandIDLen], data[1+commandIDLen:] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -176,20 +176,21 @@ func (m *MultiRaft) CreateGroup(groupID uint64, initialMembers []uint64) error { | |
// when the command has been successfully sent, not when it has been committed. | ||
// TODO(bdarnell): should SubmitCommand wait until the commit? | ||
// TODO(bdarnell): what do we do if we lose leadership before a command we proposed commits? | ||
func (m *MultiRaft) SubmitCommand(groupID uint64, command []byte) error { | ||
func (m *MultiRaft) SubmitCommand(commandID []byte, groupID uint64, command []byte) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like groupID should still come first, followed by commandID. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
log.V(6).Infof("node %v submitting command to group %v", m.nodeID, groupID) | ||
return m.multiNode.Propose(context.Background(), groupID, command) | ||
return m.multiNode.Propose(context.Background(), groupID, encodeCommand(commandID, command)) | ||
} | ||
|
||
// ChangeGroupMembership submits a proposed membership change to the cluster. | ||
// TODO(bdarnell): same concerns as SubmitCommand | ||
func (m *MultiRaft) ChangeGroupMembership(groupID uint64, changeType raftpb.ConfChangeType, | ||
nodeID uint64) error { | ||
func (m *MultiRaft) ChangeGroupMembership(commandID []byte, groupID uint64, | ||
changeType raftpb.ConfChangeType, nodeID uint64) error { | ||
log.V(6).Infof("node %v proposing membership change to group %v", m.nodeID, groupID) | ||
return m.multiNode.ProposeConfChange(context.Background(), groupID, | ||
raftpb.ConfChange{ | ||
Type: changeType, | ||
NodeID: nodeID, | ||
Type: changeType, | ||
NodeID: nodeID, | ||
Context: encodeCommand(commandID, nil), | ||
}) | ||
} | ||
|
||
|
@@ -403,9 +404,10 @@ func (s *state) handleWriteResponse(response *writeResponse, readyGroups map[uin | |
for _, entry := range ready.CommittedEntries { | ||
switch entry.Type { | ||
case raftpb.EntryNormal: | ||
// TODO(bdarnell): etcd raft adds a nil entry upon election; should this be given a different Type? | ||
// etcd raft occasionally adds a nil entry (e.g. upon election); ignore these. | ||
if entry.Data != nil { | ||
s.sendEvent(&EventCommandCommitted{entry.Data}) | ||
commandID, command := decodeCommand(entry.Data) | ||
s.sendEvent(&EventCommandCommitted{commandID, command}) | ||
} | ||
case raftpb.EntryConfChange: | ||
cc := raftpb.ConfChange{} | ||
|
@@ -414,6 +416,7 @@ func (s *state) handleWriteResponse(response *writeResponse, readyGroups map[uin | |
log.Fatalf("invalid ConfChange data: %s", err) | ||
} | ||
log.V(3).Infof("node %v applying configuration change %v", s.nodeID, cc) | ||
// TODO(bdarnell): dedupe by extracting commandID from cc.Context. | ||
s.multiNode.ApplyConfChange(groupID, cc) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,10 +21,17 @@ import ( | |
"testing" | ||
"time" | ||
|
||
"github.com/cockroachdb/cockroach/util" | ||
"github.com/cockroachdb/cockroach/util/log" | ||
"github.com/coreos/etcd/raft/raftpb" | ||
) | ||
|
||
var rand = util.NewPseudoRand() | ||
|
||
func makeCommandID() []byte { | ||
return []byte(util.RandString(rand, commandIDLen)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RandString actually leaves a lot of bits on the floor and is extraordinarily inefficient if all we're trying to do is generate 16 bytes. Let's just get two random int64s and append them using:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh forget it...this is just test code. |
||
} | ||
|
||
type testCluster struct { | ||
t *testing.T | ||
nodes []*state | ||
|
@@ -130,7 +137,7 @@ func TestCommand(t *testing.T) { | |
cluster.waitForElection(0) | ||
|
||
// Submit a command to the leader | ||
cluster.nodes[0].SubmitCommand(groupID, []byte("command")) | ||
cluster.nodes[0].SubmitCommand(makeCommandID(), groupID, []byte("command")) | ||
|
||
// The command will be committed on each node. | ||
for i, events := range cluster.events { | ||
|
@@ -156,7 +163,7 @@ func TestSlowStorage(t *testing.T) { | |
cluster.storages[2].Block() | ||
|
||
// Submit a command to the leader | ||
cluster.nodes[0].SubmitCommand(groupID, []byte("command")) | ||
cluster.nodes[0].SubmitCommand(makeCommandID(), groupID, []byte("command")) | ||
|
||
// Even with the third node blocked, the other nodes can make progress. | ||
for i := 0; i < 2; i++ { | ||
|
@@ -196,7 +203,8 @@ func TestMembershipChange(t *testing.T) { | |
|
||
// Add each of the other three nodes to the cluster. | ||
for i := 1; i < 4; i++ { | ||
err := cluster.nodes[0].ChangeGroupMembership(groupID, raftpb.ConfChangeAddNode, | ||
err := cluster.nodes[0].ChangeGroupMembership(makeCommandID(), groupID, | ||
raftpb.ConfChangeAddNode, | ||
cluster.nodes[i].nodeID) | ||
if err != nil { | ||
t.Fatal(err) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -200,7 +200,7 @@ message InternalRaftCommand { | |
// CmdID is used to identify a command when it has been | ||
// committed. If the client specified a CmdID in its RequestHeader, | ||
// that is used; otherwise the server generates an ID. | ||
optional ClientCmdID cmd_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "CmdID"]; | ||
//optional ClientCmdID cmd_id = 1 [(gogoproto.nullable) = false, (gogoproto.customname) = "CmdID"]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the plan here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've lost track of where cockroach will consume membership changes. Is that just not added yet? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant to remove the cmd_id field completely. The events for membership changes haven't been added yet. |
||
optional int64 raft_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "RaftID"]; | ||
optional InternalRaftCommandUnion cmd = 3 [(gogoproto.nullable) = false]; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ | |
package storage | ||
|
||
import ( | ||
"bytes" | ||
"encoding/binary" | ||
"fmt" | ||
"sync" | ||
|
||
|
@@ -29,15 +31,19 @@ import ( | |
"github.com/cockroachdb/cockroach/util/log" | ||
) | ||
|
||
type cmdIDKey struct { | ||
walltime, random int64 | ||
} | ||
type cmdIDKey string | ||
|
||
func makeCmdIDKey(cmdID proto.ClientCmdID) cmdIDKey { | ||
return cmdIDKey{ | ||
walltime: cmdID.WallTime, | ||
random: cmdID.Random, | ||
buf := bytes.NewBuffer(make([]byte, 0, 16)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, here you could use the simpler encoding.EncodeUint64 method we have in util/encoding. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, cool, I missed that on the first read and just saw the varint versions. |
||
err := binary.Write(buf, binary.BigEndian, cmdID.WallTime) | ||
if err != nil { | ||
panic(err) | ||
} | ||
err = binary.Write(buf, binary.BigEndian, cmdID.Random) | ||
if err != nil { | ||
panic(err) | ||
} | ||
return cmdIDKey(buf.String()) | ||
} | ||
|
||
// A ResponseCache provides idempotence for request retries. Each | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be using github.com/cockroachdb/cockroach/util/log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done