Skip to content

Commit

Permalink
Merge pull request #29655 from nvanbenschoten/backport2.1-29618
Browse files Browse the repository at this point in the history
release-2.1: storage: don't keep track of empty remote proposals
  • Loading branch information
nvanbenschoten authored Sep 6, 2018
2 parents 56a68ea + 13b2645 commit 98acdd8
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 32 deletions.
86 changes: 54 additions & 32 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3904,39 +3904,14 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {
// we expect the originator to campaign instead.
r.unquiesceWithOptionsLocked(false /* campaignOnWake */)
r.refreshLastUpdateTimeForReplicaLocked(req.FromReplica.ReplicaID)
if req.Message.Type == raftpb.MsgProp {
// A proposal was forwarded to this replica.
if r.mu.replicaID == r.mu.leaderID {
// This replica is the leader. Record that the proposal
// was seen and drop the proposal if it was already seen.
// This prevents duplicate forwarded proposals from each
// being appended to a leader's raft log.
allSeen := true
for _, e := range req.Message.Entries {
switch e.Type {
case raftpb.EntryNormal:
cmdID, _ := DecodeRaftCommand(e.Data)
if r.mu.remoteProposals == nil {
r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{}
}
if _, ok := r.mu.remoteProposals[cmdID]; !ok {
r.mu.remoteProposals[cmdID] = struct{}{}
allSeen = false
}
case raftpb.EntryConfChange:
// We could peek into the EntryConfChange to find the
// command ID, but we don't expect follower-initiated
// conf changes.
allSeen = false
default:
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
}
}
if allSeen {
return false /* unquiesceAndWakeLeader */, nil
}
}

// Check if the message is a proposal that should be dropped.
if r.shouldDropForwardedProposalLocked(req) {
// If we could signal to the sender that it's proposal was
// accepted or dropped then we wouldn't need to track anything.
return false /* unquiesceAndWakeLeader */, nil
}

err := raftGroup.Step(req.Message)
if err == raft.ErrProposalDropped {
// A proposal was forwarded to this replica but we couldn't propose it.
Expand All @@ -3950,6 +3925,53 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {
})
}

func (r *Replica) shouldDropForwardedProposalLocked(req *RaftMessageRequest) bool {
if req.Message.Type != raftpb.MsgProp {
// Not a proposal.
return false
}

if r.mu.replicaID != r.mu.leaderID {
// Always continue to forward proposals if we're not the leader.
return false
}

// Record that the proposal was seen and drop the proposal if it was
// already seen. This prevents duplicate forwarded proposals from each
// being appended to a leader's raft log.
drop := true
for _, e := range req.Message.Entries {
switch e.Type {
case raftpb.EntryNormal:
cmdID, data := DecodeRaftCommand(e.Data)
if len(data) == 0 {
// An empty command is proposed to unquiesce a range and
// wake the leader. Don't keep track of these forwarded
// proposals because they will never be cleaned up.
drop = false
} else {
// Record that the proposal was seen so that we can catch
// duplicate proposals in the future.
if r.mu.remoteProposals == nil {
r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{}
}
if _, ok := r.mu.remoteProposals[cmdID]; !ok {
r.mu.remoteProposals[cmdID] = struct{}{}
drop = false
}
}
case raftpb.EntryConfChange:
// We could peek into the EntryConfChange to find the
// command ID, but we don't expect follower-initiated
// conf changes.
drop = false
default:
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
}
}
return drop
}

type handleRaftReadyStats struct {
processed int
}
Expand Down
132 changes: 132 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8356,6 +8356,138 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
}
}

func TestReplicaShouldDropForwardedProposal(t *testing.T) {
defer leaktest.AfterTest(t)()

var tc testContext
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

cmdSeen, cmdNotSeen := makeIDKey(), makeIDKey()
data, noData := []byte("data"), []byte("")

testCases := []struct {
name string
leader bool
msg raftpb.Message
expDrop bool
expRemotePropsAfter int
}{
{
name: "new proposal",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, data)},
},
},
expDrop: false,
expRemotePropsAfter: 2,
},
{
name: "duplicate proposal",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdSeen, data)},
},
},
expDrop: true,
expRemotePropsAfter: 1,
},
{
name: "partially new proposal",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, data)},
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdSeen, data)},
},
},
expDrop: false,
expRemotePropsAfter: 2,
},
{
name: "empty proposal",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, noData)},
},
},
expDrop: false,
expRemotePropsAfter: 1,
},
{
name: "conf change",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryConfChange, Data: encodeRaftCommandV1(cmdNotSeen, data)},
},
},
expDrop: false,
expRemotePropsAfter: 1,
},
{
name: "non proposal",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgApp,
},
expDrop: false,
expRemotePropsAfter: 1,
},
{
name: "not leader",
leader: false,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, data)},
},
},
expDrop: false,
expRemotePropsAfter: 0,
},
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()

if c.leader {
// Reset the remoteProposals map to only contain cmdSeen.
tc.repl.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{
cmdSeen: {},
}
} else {
// Clear the remoteProposals map and set the leader ID to
// someone else.
tc.repl.mu.remoteProposals = nil
tc.repl.mu.leaderID = tc.repl.mu.replicaID + 1
defer func() { tc.repl.mu.leaderID = tc.repl.mu.replicaID }()
}

req := &RaftMessageRequest{Message: c.msg}
drop := tc.repl.shouldDropForwardedProposalLocked(req)

if c.expDrop != drop {
t.Errorf("expected drop=%t, found %t", c.expDrop, drop)
}
if l := len(tc.repl.mu.remoteProposals); c.expRemotePropsAfter != l {
t.Errorf("expected %d tracked remote proposals, found %d", c.expRemotePropsAfter, l)
}
})
}
}

// checkValue asserts that the value for a key is the expected one.
// The function will attempt to resolve the intent present on the key, if any.
func checkValue(ctx context.Context, tc *testContext, key []byte, expectedVal []byte) error {
Expand Down

0 comments on commit 98acdd8

Please sign in to comment.