Skip to content

Commit

Permalink
Merge pull request cockroachdb#31023 from nvanbenschoten/backport2.0-…
Browse files Browse the repository at this point in the history
…30990

release-2.0: storage: only track forwarded proposals that were successfully appended
  • Loading branch information
nvanbenschoten authored Oct 5, 2018
2 parents dfa0c88 + c1aee03 commit 7cad30f
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 32 deletions.
62 changes: 41 additions & 21 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3353,12 +3353,17 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {

// Check if the message is a proposal that should be dropped.
if r.shouldDropForwardedProposalLocked(req) {
// If we could signal to the sender that it's proposal was
// accepted or dropped then we wouldn't need to track anything.
// If we could signal to the sender that its proposal was accepted
// or dropped then we wouldn't need to track anything.
return false /* unquiesceAndWakeLeader */, nil
}

err := raftGroup.Step(req.Message)
if err == nil {
// If we stepped successfully and the request is a proposal, consider
// tracking it so that we can ignore identical proposals in the future.
r.maybeTrackForwardedProposalLocked(raftGroup, req)
}
if err == raft.ErrProposalDropped {
// A proposal was forwarded to this replica but we couldn't propose it.
// Swallow the error since we don't have an effective way of signaling
Expand All @@ -3377,15 +3382,40 @@ func (r *Replica) shouldDropForwardedProposalLocked(req *RaftMessageRequest) boo
return false
}

if r.mu.replicaID != r.mu.leaderID {
// Always continue to forward proposals if we're not the leader.
return false
for _, e := range req.Message.Entries {
switch e.Type {
case raftpb.EntryNormal:
cmdID, _ := DecodeRaftCommand(e.Data)
if _, ok := r.mu.remoteProposals[cmdID]; !ok {
// Untracked remote proposal. Don't drop.
return false
}
case raftpb.EntryConfChange:
// Never drop EntryConfChange proposals.
return false
default:
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
}
}
// All entries tracked.
return true
}

// Record that the proposal was seen and drop the proposal if it was
// already seen. This prevents duplicate forwarded proposals from each
// being appended to a leader's raft log.
drop := true
func (r *Replica) maybeTrackForwardedProposalLocked(rg *raft.RawNode, req *RaftMessageRequest) {
if req.Message.Type != raftpb.MsgProp {
// Not a proposal.
return
}

if rg.Status().RaftState != raft.StateLeader {
// We're not the leader. We can't be sure that the proposal made it into
// the Raft log, so don't track it.
return
}

// Record that each of the proposal's entries was seen and appended. This
// allows us to catch duplicate forwarded proposals in the future and
// prevent them from being repeatedly appended to a leader's raft log.
for _, e := range req.Message.Entries {
switch e.Type {
case raftpb.EntryNormal:
Expand All @@ -3394,28 +3424,18 @@ func (r *Replica) shouldDropForwardedProposalLocked(req *RaftMessageRequest) boo
// An empty command is proposed to unquiesce a range and
// wake the leader. Don't keep track of these forwarded
// proposals because they will never be cleaned up.
drop = false
} else {
// Record that the proposal was seen so that we can catch
// duplicate proposals in the future.
if r.mu.remoteProposals == nil {
r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{}
}
if _, ok := r.mu.remoteProposals[cmdID]; !ok {
r.mu.remoteProposals[cmdID] = struct{}{}
drop = false
}
r.mu.remoteProposals[cmdID] = struct{}{}
}
case raftpb.EntryConfChange:
// We could peek into the EntryConfChange to find the
// command ID, but we don't expect follower-initiated
// conf changes.
drop = false
// Don't track EntryConfChanges.
default:
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
}
}
return drop
}

type handleRaftReadyStats struct {
Expand Down
35 changes: 24 additions & 11 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8052,11 +8052,6 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
func TestReplicaShouldDropForwardedProposal(t *testing.T) {
defer leaktest.AfterTest(t)()

var tc testContext
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

cmdSeen, cmdNotSeen := makeIDKey(), makeIDKey()
data, noData := []byte("data"), []byte("")

Expand Down Expand Up @@ -8152,28 +8147,46 @@ func TestReplicaShouldDropForwardedProposal(t *testing.T) {
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
var tc testContext
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()

rg := tc.repl.mu.internalRaftGroup
if c.leader {
// Reset the remoteProposals map to only contain cmdSeen.
// Set the remoteProposals map to only contain cmdSeen.
tc.repl.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{
cmdSeen: {},
}
// Make sure the replica is the leader.
if s := rg.Status(); s.RaftState != raft.StateLeader {
t.Errorf("Replica not leader: %v", s)
}
} else {
// Clear the remoteProposals map and set the leader ID to
// someone else.
// Clear the remoteProposals map.
tc.repl.mu.remoteProposals = nil
tc.repl.mu.leaderID = tc.repl.mu.replicaID + 1
defer func() { tc.repl.mu.leaderID = tc.repl.mu.replicaID }()
// Force the replica to step down as the leader by sending it a
// heartbeat at a high term.
if err := rg.Step(raftpb.Message{
Type: raftpb.MsgHeartbeat,
Term: 999,
}); err != nil {
t.Error(err)
}
if s := rg.Status(); s.RaftState != raft.StateFollower {
t.Errorf("Replica not follower: %v", s)
}
}

req := &RaftMessageRequest{Message: c.msg}
drop := tc.repl.shouldDropForwardedProposalLocked(req)

if c.expDrop != drop {
t.Errorf("expected drop=%t, found %t", c.expDrop, drop)
}

tc.repl.maybeTrackForwardedProposalLocked(rg, req)
if l := len(tc.repl.mu.remoteProposals); c.expRemotePropsAfter != l {
t.Errorf("expected %d tracked remote proposals, found %d", c.expRemotePropsAfter, l)
}
Expand Down

0 comments on commit 7cad30f

Please sign in to comment.