Skip to content

Commit

Permalink
Merge #29618
Browse files Browse the repository at this point in the history
29618: storage: don't keep track of empty remote proposals r=nvanbenschoten a=nvanbenschoten

I observed a very slow memory leak on a long running cluster
that pointed to a number of replica's `remoteProposals` maps.
After reproducing locally I found that some forwarded proposals
were not being cleaned up. The reason for this was that we don't
treat empty proposals (those generated to wake up quiesced leader)
the same as normal proposals during application time. This meant
that we were never clearing their command IDs from the remote
proposals map. This commit fixes that issue.

It will need to be backported to 2.0 and 2.1.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Sep 6, 2018
2 parents acc8f83 + b846568 commit 1907546
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 32 deletions.
86 changes: 54 additions & 32 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3904,39 +3904,14 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {
// we expect the originator to campaign instead.
r.unquiesceWithOptionsLocked(false /* campaignOnWake */)
r.refreshLastUpdateTimeForReplicaLocked(req.FromReplica.ReplicaID)
if req.Message.Type == raftpb.MsgProp {
// A proposal was forwarded to this replica.
if r.mu.replicaID == r.mu.leaderID {
// This replica is the leader. Record that the proposal
// was seen and drop the proposal if it was already seen.
// This prevents duplicate forwarded proposals from each
// being appended to a leader's raft log.
allSeen := true
for _, e := range req.Message.Entries {
switch e.Type {
case raftpb.EntryNormal:
cmdID, _ := DecodeRaftCommand(e.Data)
if r.mu.remoteProposals == nil {
r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{}
}
if _, ok := r.mu.remoteProposals[cmdID]; !ok {
r.mu.remoteProposals[cmdID] = struct{}{}
allSeen = false
}
case raftpb.EntryConfChange:
// We could peek into the EntryConfChange to find the
// command ID, but we don't expect follower-initiated
// conf changes.
allSeen = false
default:
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
}
}
if allSeen {
return false /* unquiesceAndWakeLeader */, nil
}
}

// Check if the message is a proposal that should be dropped.
if r.shouldDropForwardedProposalLocked(req) {
// If we could signal to the sender that it's proposal was
// accepted or dropped then we wouldn't need to track anything.
return false /* unquiesceAndWakeLeader */, nil
}

err := raftGroup.Step(req.Message)
if err == raft.ErrProposalDropped {
// A proposal was forwarded to this replica but we couldn't propose it.
Expand All @@ -3950,6 +3925,53 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {
})
}

func (r *Replica) shouldDropForwardedProposalLocked(req *RaftMessageRequest) bool {
if req.Message.Type != raftpb.MsgProp {
// Not a proposal.
return false
}

if r.mu.replicaID != r.mu.leaderID {
// Always continue to forward proposals if we're not the leader.
return false
}

// Record that the proposal was seen and drop the proposal if it was
// already seen. This prevents duplicate forwarded proposals from each
// being appended to a leader's raft log.
drop := true
for _, e := range req.Message.Entries {
switch e.Type {
case raftpb.EntryNormal:
cmdID, data := DecodeRaftCommand(e.Data)
if len(data) == 0 {
// An empty command is proposed to unquiesce a range and
// wake the leader. Don't keep track of these forwarded
// proposals because they will never be cleaned up.
drop = false
} else {
// Record that the proposal was seen so that we can catch
// duplicate proposals in the future.
if r.mu.remoteProposals == nil {
r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{}
}
if _, ok := r.mu.remoteProposals[cmdID]; !ok {
r.mu.remoteProposals[cmdID] = struct{}{}
drop = false
}
}
case raftpb.EntryConfChange:
// We could peek into the EntryConfChange to find the
// command ID, but we don't expect follower-initiated
// conf changes.
drop = false
default:
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
}
}
return drop
}

type handleRaftReadyStats struct {
processed int
}
Expand Down
132 changes: 132 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8377,6 +8377,138 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
}
}

func TestReplicaShouldDropForwardedProposal(t *testing.T) {
defer leaktest.AfterTest(t)()

var tc testContext
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

cmdSeen, cmdNotSeen := makeIDKey(), makeIDKey()
data, noData := []byte("data"), []byte("")

testCases := []struct {
name string
leader bool
msg raftpb.Message
expDrop bool
expRemotePropsAfter int
}{
{
name: "new proposal",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, data)},
},
},
expDrop: false,
expRemotePropsAfter: 2,
},
{
name: "duplicate proposal",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdSeen, data)},
},
},
expDrop: true,
expRemotePropsAfter: 1,
},
{
name: "partially new proposal",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, data)},
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdSeen, data)},
},
},
expDrop: false,
expRemotePropsAfter: 2,
},
{
name: "empty proposal",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, noData)},
},
},
expDrop: false,
expRemotePropsAfter: 1,
},
{
name: "conf change",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryConfChange, Data: encodeRaftCommandV1(cmdNotSeen, data)},
},
},
expDrop: false,
expRemotePropsAfter: 1,
},
{
name: "non proposal",
leader: true,
msg: raftpb.Message{
Type: raftpb.MsgApp,
},
expDrop: false,
expRemotePropsAfter: 1,
},
{
name: "not leader",
leader: false,
msg: raftpb.Message{
Type: raftpb.MsgProp,
Entries: []raftpb.Entry{
{Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, data)},
},
},
expDrop: false,
expRemotePropsAfter: 0,
},
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()

if c.leader {
// Reset the remoteProposals map to only contain cmdSeen.
tc.repl.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{
cmdSeen: {},
}
} else {
// Clear the remoteProposals map and set the leader ID to
// someone else.
tc.repl.mu.remoteProposals = nil
tc.repl.mu.leaderID = tc.repl.mu.replicaID + 1
defer func() { tc.repl.mu.leaderID = tc.repl.mu.replicaID }()
}

req := &RaftMessageRequest{Message: c.msg}
drop := tc.repl.shouldDropForwardedProposalLocked(req)

if c.expDrop != drop {
t.Errorf("expected drop=%t, found %t", c.expDrop, drop)
}
if l := len(tc.repl.mu.remoteProposals); c.expRemotePropsAfter != l {
t.Errorf("expected %d tracked remote proposals, found %d", c.expRemotePropsAfter, l)
}
})
}
}

// checkValue asserts that the value for a key is the expected one.
// The function will attempt to resolve the intent present on the key, if any.
func checkValue(ctx context.Context, tc *testContext, key []byte, expectedVal []byte) error {
Expand Down

0 comments on commit 1907546

Please sign in to comment.