Skip to content

Commit

Permalink
storage: replace remote proposal tracking with uncommitted log size p…
Browse files Browse the repository at this point in the history
…rotection

This change reverts most of the non-testing code from 03b116f and f2f3fd2
and replaces it with use of the MaxUncommittedEntriesSize config. This
configuration was added in etcd-io/etcd#10167 and provides protection against
unbounded Raft log growth when a Raft group stops being able to commit
entries. It makes proposals into Raft safer because proposers don't need
to verify before the fact that the proposal isn't a duplicate that might
be blowing up the size of the Raft group.

By default, the configuration is set to double the Replica's proposal quota.
The logic here is that the quotaPool should be responsible for throttling
proposals in all cases except for unbounded Raft re-proposals because it
queues efficiently instead of dropping proposals on the floor indiscriminately.

Release note (bug fix): Fix a bug where Raft proposals could get
stuck if forwarded to a leader who could not itself append a new
entry to its log.
  • Loading branch information
nvanbenschoten committed Oct 16, 2018
1 parent e4003e1 commit 58413cb
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 362 deletions.
29 changes: 29 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,23 @@ type RaftConfig struct {
// performing log truncations.
RaftLogMaxSize int64

// RaftProposalQuota controls the maximum aggregate size of Raft commands
// that a leader is allowed to propose concurrently.
//
// By default, the quota is set to a fraction of the RaftLogMaxSize. In
// doing so, we ensure all replicas have sufficiently up to date logs so
// that when the log gets truncated, the followers do not need
// non-preemptive snapshots. Changing this deserves care. Too low and
// everything comes to a grinding halt, too high and we're not really
// throttling anything (we'll still generate snapshots).
RaftProposalQuota int64

// RaftMaxUncommittedEntriesSize controls how large the uncommitted tail of
// the Raft log can grow. The limit is meant to provide protection against
// unbounded Raft log growth when quorum is lost and entries stop being
// committed but continue to be proposed.
RaftMaxUncommittedEntriesSize uint64

// RaftMaxSizePerMsg controls how many Raft log entries the leader will send to
// followers in a single MsgApp.
RaftMaxSizePerMsg uint64
Expand Down Expand Up @@ -474,6 +491,18 @@ func (cfg *RaftConfig) SetDefaults() {
if cfg.RaftLogMaxSize == 0 {
cfg.RaftLogMaxSize = defaultRaftLogMaxSize
}
if cfg.RaftProposalQuota == 0 {
// By default, set this to a fraction of RaftLogMaxSize. See comment
// above for the tradeoffs of setting this higher or lower.
cfg.RaftProposalQuota = cfg.RaftLogMaxSize / 4
}
if cfg.RaftMaxUncommittedEntriesSize == 0 {
// By default, set this to twice the RaftProposalQuota. The logic here
// is that the quotaPool should be responsible for throttling proposals
// in all cases except for unbounded Raft re-proposals because it queues
// efficiently instead of dropping proposals on the floor indiscriminately.
cfg.RaftMaxUncommittedEntriesSize = uint64(2 * cfg.RaftProposalQuota)
}
if cfg.RaftMaxSizePerMsg == 0 {
cfg.RaftMaxSizePerMsg = uint64(defaultRaftMaxSizePerMsg)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,8 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
sc.RaftTickInterval = 10 * time.Millisecond
// Don't timeout raft leader. We don't want leadership moving.
sc.RaftElectionTimeoutTicks = 1000000
// Reduce the max uncommitted entry size.
sc.RaftMaxUncommittedEntriesSize = 64 << 10 // 64 KB
// Disable leader transfers during leaseholder changes so that we
// can easily create leader-not-leaseholder scenarios.
sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true
Expand Down Expand Up @@ -1233,7 +1235,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
// While a majority nodes are down, write some data.
putRes := make(chan *roachpb.Error)
go func() {
putArgs := putArgs([]byte("b"), make([]byte, 8<<10 /* 8 KB */))
putArgs := putArgs([]byte("b"), make([]byte, sc.RaftMaxUncommittedEntriesSize/8))
_, err := client.SendWrapped(context.Background(), propNode, putArgs)
putRes <- err
}()
Expand All @@ -1254,11 +1256,10 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
}

// Check raft log size.
const logSizeLimit = 64 << 10 // 64 KB
curlogSize := leaderRepl.GetRaftLogSize()
logSize := curlogSize - initLogSize
logSizeStr := humanizeutil.IBytes(logSize)
if logSize > logSizeLimit {
if uint64(logSize) > sc.RaftMaxUncommittedEntriesSize {
t.Fatalf("raft log size grew to %s", logSizeStr)
}
t.Logf("raft log size grew to %s", logSizeStr)
Expand Down
111 changes: 3 additions & 108 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,7 @@ type Replica struct {
// map must only be referenced while Replica.mu is held, except if the
// element is removed from the map first. The notable exception is the
// contained RaftCommand, which we treat as immutable.
localProposals map[storagebase.CmdIDKey]*ProposalData
// remoteProposals is maintained by Raft leaders and stores in-flight
// commands that were forwarded to the leader during its current term.
// The set allows leaders to detect duplicate forwarded commands and
// avoid re-proposing the same forwarded command multiple times.
remoteProposals map[storagebase.CmdIDKey]struct{}
localProposals map[storagebase.CmdIDKey]*ProposalData
internalRaftGroup *raft.RawNode
// The ID of the replica within the Raft group. May be 0 if the replica has
// been created from a preemptive snapshot (i.e. before being added to the
Expand Down Expand Up @@ -883,7 +878,6 @@ func (r *Replica) cancelPendingCommandsLocked() {
r.cleanupFailedProposalLocked(p)
p.finishApplication(pr)
}
r.mu.remoteProposals = nil
}

// cleanupFailedProposalLocked cleans up after a proposal that has failed. It
Expand Down Expand Up @@ -1118,22 +1112,12 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
log.Fatalf(ctx, "len(r.mu.commandSizes) = %d, expected 0", commandSizesLen)
}

// We set the defaultProposalQuota to be less than RaftLogMaxSize,
// in doing so we ensure all replicas have sufficiently up to date
// logs so that when the log gets truncated, the followers do not
// need non-preemptive snapshots. Changing this deserves care. Too
// low and everything comes to a grinding halt, too high and we're
// not really throttling anything (we'll still generate snapshots).
//
// TODO(nvanbenschoten): clean this up in later commits.
proposalQuota := r.store.cfg.RaftLogMaxSize / 4

// Raft may propose commands itself (specifically the empty
// commands when leadership changes), and these commands don't go
// through the code paths where we acquire quota from the pool. To
// offset this we reset the quota pool whenever leadership changes
// hands.
r.mu.proposalQuota = newQuotaPool(proposalQuota)
r.mu.proposalQuota = newQuotaPool(r.store.cfg.RaftProposalQuota)
r.mu.lastUpdateTimes = make(map[roachpb.ReplicaID]time.Time)
r.mu.commandSizes = make(map[storagebase.CmdIDKey]int)
} else if r.mu.proposalQuota != nil {
Expand Down Expand Up @@ -1913,7 +1897,6 @@ func (r *Replica) State() storagepb.RangeInfo {
ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagepb.ReplicaState)
ri.LastIndex = r.mu.lastIndex
ri.NumPending = uint64(len(r.mu.localProposals))
ri.NumRemotePending = uint64(len(r.mu.remoteProposals))
ri.RaftLogSize = r.mu.raftLogSize
ri.NumDropped = uint64(r.mu.droppedMessages)
if r.mu.proposalQuota != nil {
Expand Down Expand Up @@ -4042,20 +4025,7 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {
// we expect the originator to campaign instead.
r.unquiesceWithOptionsLocked(false /* campaignOnWake */)
r.refreshLastUpdateTimeForReplicaLocked(req.FromReplica.ReplicaID)

// Check if the message is a proposal that should be dropped.
if r.shouldDropForwardedProposalLocked(req) {
// If we could signal to the sender that its proposal was accepted
// or dropped then we wouldn't need to track anything.
return false /* unquiesceAndWakeLeader */, nil
}

err := raftGroup.Step(req.Message)
if err == nil {
// If we stepped successfully and the request is a proposal, consider
// tracking it so that we can ignore identical proposals in the future.
r.maybeTrackForwardedProposalLocked(raftGroup, req)
}
if err == raft.ErrProposalDropped {
// A proposal was forwarded to this replica but we couldn't propose it.
// Swallow the error since we don't have an effective way of signaling
Expand All @@ -4068,68 +4038,6 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {
})
}

func (r *Replica) shouldDropForwardedProposalLocked(req *RaftMessageRequest) bool {
if req.Message.Type != raftpb.MsgProp {
// Not a proposal.
return false
}

for _, e := range req.Message.Entries {
switch e.Type {
case raftpb.EntryNormal:
cmdID, _ := DecodeRaftCommand(e.Data)
if _, ok := r.mu.remoteProposals[cmdID]; !ok {
// Untracked remote proposal. Don't drop.
return false
}
case raftpb.EntryConfChange:
// Never drop EntryConfChange proposals.
return false
default:
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
}
}
// All entries tracked.
return true
}

func (r *Replica) maybeTrackForwardedProposalLocked(rg *raft.RawNode, req *RaftMessageRequest) {
if req.Message.Type != raftpb.MsgProp {
// Not a proposal.
return
}

if rg.Status().RaftState != raft.StateLeader {
// We're not the leader. We can't be sure that the proposal made it into
// the Raft log, so don't track it.
return
}

// Record that each of the proposal's entries was seen and appended. This
// allows us to catch duplicate forwarded proposals in the future and
// prevent them from being repeatedly appended to a leader's raft log.
for _, e := range req.Message.Entries {
switch e.Type {
case raftpb.EntryNormal:
cmdID, data := DecodeRaftCommand(e.Data)
if len(data) == 0 {
// An empty command is proposed to unquiesce a range and
// wake the leader. Don't keep track of these forwarded
// proposals because they will never be cleaned up.
} else {
if r.mu.remoteProposals == nil {
r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{}
}
r.mu.remoteProposals[cmdID] = struct{}{}
}
case raftpb.EntryConfChange:
// Don't track EntryConfChanges.
default:
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
}
}
}

type handleRaftReadyStats struct {
processed int
}
Expand Down Expand Up @@ -4394,7 +4302,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
r.mu.leaderID = leaderID
// Clear the remote proposal set. Would have been nil already if not
// previously the leader.
r.mu.remoteProposals = nil
becameLeader = r.mu.leaderID == r.mu.replicaID
}
r.mu.Unlock()
Expand Down Expand Up @@ -4599,22 +4506,13 @@ func (r *Replica) tick(livenessMap IsLiveMap) (bool, error) {
if knob := r.store.TestingKnobs().RefreshReasonTicksPeriod; knob > 0 {
refreshAtDelta = knob
}
if !r.store.TestingKnobs().DisableRefreshReasonTicks &&
r.mu.replicaID != r.mu.leaderID &&
r.mu.ticks%refreshAtDelta == 0 {
if !r.store.TestingKnobs().DisableRefreshReasonTicks && r.mu.ticks%refreshAtDelta == 0 {
// RaftElectionTimeoutTicks is a reasonable approximation of how long we
// should wait before deciding that our previous proposal didn't go
// through. Note that the combination of the above condition and passing
// RaftElectionTimeoutTicks to refreshProposalsLocked means that commands
// will be refreshed when they have been pending for 1 to 2 election
// cycles.
//
// However, we don't refresh proposals if we are the leader because
// doing so would be useless. The commands tracked by a leader replica
// were either all proposed when the replica was a leader or were
// re-proposed when the replica became a leader. Either way, they are
// guaranteed to be in the leader's Raft log so re-proposing won't do
// anything.
r.refreshProposalsLocked(refreshAtDelta, reasonTicks)
}
return true, nil
Expand Down Expand Up @@ -5407,9 +5305,6 @@ func (r *Replica) processRaftCommand(
delete(r.mu.localProposals, idKey)
}

// Delete the entry for a forwarded proposal set.
delete(r.mu.remoteProposals, idKey)

leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally)

r.mu.Unlock()
Expand Down
Loading

0 comments on commit 58413cb

Please sign in to comment.