diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 9a270eac7190..e0077ff8bb65 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -274,12 +274,13 @@ func (rc *raftNode) startRaft() { rpeers[i] = raft.Peer{ID: uint64(i + 1)} } c := &raft.Config{ - ID: uint64(rc.id), - ElectionTick: 10, - HeartbeatTick: 1, - Storage: rc.raftStorage, - MaxSizePerMsg: 1024 * 1024, - MaxInflightMsgs: 256, + ID: uint64(rc.id), + ElectionTick: 10, + HeartbeatTick: 1, + Storage: rc.raftStorage, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + MaxUncommittedEntries: 1024, } if oldwal { diff --git a/raft/node_test.go b/raft/node_test.go index b067aaad4d12..4e82f536c30d 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -997,3 +997,73 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { ) } } + +// TestBoundedLogGrowthWithPartition tests a scenario where a leader is +// partitioned from a quorum of nodes. It verifies that the leader's log is +// protected from unbounded growth even as new entries continue to be proposed. +// This protection is provided by the MaxUncommittedEntries configuration. +func TestBoundedLogGrowthWithPartition(t *testing.T) { + peers := []uint64{1, 2, 3, 4, 5} + cfg1 := newTestConfig(1, peers, 10, 1, NewMemoryStorage()) + cfg1.MaxUncommittedEntries = 16 + r1 := newRaft(cfg1) + r2 := newTestRaft(2, peers, 10, 1, NewMemoryStorage()) + r3 := newTestRaft(3, peers, 10, 1, NewMemoryStorage()) + r4 := newTestRaft(4, peers, 10, 1, NewMemoryStorage()) + r5 := newTestRaft(5, peers, 10, 1, NewMemoryStorage()) + nt := newNetwork(r1, r2, r3, r4, r5) + + // Elect r1 as leader and commit to the tail of the log. + nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup}) + + // Partition the network while we make our proposals. These proposals should + // not cause the leader's log to grow indefinitely. + nt.isolate(1) + testEntry := []raftpb.Entry{{Data: []byte("testdata")}} + testEntryMsg := raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: testEntry} + for i := 0; i < 1024; i++ { + nt.send(testEntryMsg) + } + + // Check the size of leader's uncommitted log tail. It should not exceed the + // MaxUncommittedEntries limit. + checkUncommitted := func(exp int) { + t.Helper() + l := int(r1.raftLog.lastIndex() - r1.raftLog.committed) + if exp != l { + t.Fatalf("expected %d uncommitted entries, found %d", exp, l) + } + } + checkUncommitted(cfg1.MaxUncommittedEntries) + + // Recover from the partition and tick the clock to wake everything back up + // and send the messages. The uncommitted tail of the Raft log should + // disappear as entries are committed. + nt.recover() + nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat}) + checkUncommitted(0) + + // Create a second partition. This time, the leader is partitioned with a + // single follower. They still can't establish a quorum. + nt.isolate(1) + nt.isolate(2) + nt.drop(1, 2, 0.0) + nt.drop(2, 1, 0.0) + + // The follower now forwards proposals to the leader. Same deal. + testEntryMsg = raftpb.Message{From: 2, To: 2, Type: raftpb.MsgProp, Entries: testEntry} + for i := 0; i < 1024; i++ { + nt.send(testEntryMsg) + } + + // Check the size of leader's uncommitted log tail. It should not exceed the + // MaxUncommittedEntries limit. + checkUncommitted(cfg1.MaxUncommittedEntries) + + // Recover from the partition and tick the clock to wake everything back up + // and send the messages. The uncommitted tail of the Raft log should + // disappear as entries are committed. + nt.recover() + nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat}) + checkUncommitted(0) +} diff --git a/raft/raft.go b/raft/raft.go index 81bad3bec73b..6adf4232aeae 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -160,6 +160,10 @@ type Config struct { // overflowing that sending buffer. TODO (xiangli): feedback to application to // limit the proposal rate? MaxInflightMsgs int + // MaxUncommittedEntries limits the max number of uncommitted entries that + // may be appended to a leader's log. Once this limit is exceeded, proposals + // will begin to return ErrProposalDropped errors. Note: 0 for no limit. + MaxUncommittedEntries int // CheckQuorum specifies if the leader should check quorum activity. Leader // steps down when quorum is not active for an electionTimeout. @@ -219,6 +223,10 @@ func (c *Config) validate() error { return errors.New("max inflight messages must be greater than 0") } + if c.MaxUncommittedEntries < 0 { + return errors.New("max uncommitted entries cannot be less than 0") + } + if c.Logger == nil { c.Logger = raftLogger } @@ -241,11 +249,12 @@ type raft struct { // the log raftLog *raftLog - maxInflight int - maxMsgSize uint64 - prs map[uint64]*Progress - learnerPrs map[uint64]*Progress - matchBuf uint64Slice + maxMsgSize uint64 + maxInflight int + maxUncommitted int + prs map[uint64]*Progress + learnerPrs map[uint64]*Progress + matchBuf uint64Slice state StateType @@ -326,6 +335,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxInflight: c.MaxInflightMsgs, + maxUncommitted: c.MaxUncommittedEntries, prs: make(map[uint64]*Progress), learnerPrs: make(map[uint64]*Progress), electionTimeout: c.ElectionTick, @@ -954,6 +964,17 @@ func stepLeader(r *raft, m pb.Message) error { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) return ErrProposalDropped } + if r.maxUncommitted > 0 { + // We're limiting the size of the uncommitted tail of the Raft log. + // If the uncommitted tail is equal to or above the limit, begin + // dropping proposals. If we're below the limit but the proposal + // would push us above it because it has multiple entries, we still + // allow it. This avoids cases where large proposals are starved by + // small proposals. + if int(r.raftLog.lastIndex()-r.raftLog.committed) >= r.maxUncommitted { + return ErrProposalDropped + } + } for i, e := range m.Entries { if e.Type == pb.EntryConfChange { diff --git a/raft/raft_test.go b/raft/raft_test.go index 8619692f302d..11ca6738589e 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -362,6 +362,64 @@ func TestProgressFlowControl(t *testing.T) { } } +func TestUncommittedEntryLimit(t *testing.T) { + cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) + cfg.MaxUncommittedEntries = 5 + r := newRaft(cfg) + r.becomeCandidate() + r.becomeLeader() + + // Set the two followers to the replicate state. Commit to tail of log. + const followers = 2 + r.prs[2].becomeReplicate() + r.prs[3].becomeReplicate() + r.raftLog.commitTo(r.raftLog.lastIndex()) + + // Send proposals to r1. The first 5 entries should be appended to the log. + blob := []byte(strings.Repeat("a", 10)) + propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}} + for i := 0; i < cfg.MaxUncommittedEntries; i++ { + if err := r.Step(propMsg); err != nil { + t.Fatalf("proposal resulted in error: %v", err) + } + } + + // Send one more proposal to r1. It should be rejected. + if err := r.Step(propMsg); err != ErrProposalDropped { + t.Fatalf("proposal not dropped: %v", err) + } + + // Read messages and commit to last index. + ms := r.readMessages() + if e := cfg.MaxUncommittedEntries * followers; len(ms) != e { + t.Fatalf("expected %d messages, got %d", e, len(ms)) + } + r.raftLog.commitTo(r.raftLog.lastIndex()) + + // Send a single large proposal to r1. Should be accepted even though it + // pushes us above the limit because we were beneath it before the proposal. + propEnts := make([]pb.Entry, 2*cfg.MaxUncommittedEntries) + for i := range propEnts { + propEnts[i] = pb.Entry{Data: blob} + } + propMsgLarge := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: propEnts} + if err := r.Step(propMsgLarge); err != nil { + t.Fatalf("proposal resulted in error: %v", err) + } + + // Send one more proposal to r1. It should be rejected, again. + if err := r.Step(propMsg); err != ErrProposalDropped { + t.Fatalf("proposal not dropped: %v", err) + } + + // Read messages and commit to last index. + ms = r.readMessages() + if e := 1 * followers; len(ms) != e { + t.Fatalf("expected %d messages, got %d", e, len(ms)) + } + r.raftLog.commitTo(r.raftLog.lastIndex()) +} + func TestLeaderElection(t *testing.T) { testLeaderElection(t, false) } @@ -4138,8 +4196,8 @@ func (nw *network) drop(from, to uint64, perc float64) { } func (nw *network) cut(one, other uint64) { - nw.drop(one, other, 2.0) // always drop - nw.drop(other, one, 2.0) // always drop + nw.drop(one, other, 1.0) // always drop + nw.drop(other, one, 1.0) // always drop } func (nw *network) isolate(id uint64) { diff --git a/raft/rafttest/node.go b/raft/rafttest/node.go index 57ff9b262bc4..206df5ef910f 100644 --- a/raft/rafttest/node.go +++ b/raft/rafttest/node.go @@ -41,12 +41,13 @@ type node struct { func startNode(id uint64, peers []raft.Peer, iface iface) *node { st := raft.NewMemoryStorage() c := &raft.Config{ - ID: id, - ElectionTick: 10, - HeartbeatTick: 1, - Storage: st, - MaxSizePerMsg: 1024 * 1024, - MaxInflightMsgs: 256, + ID: id, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: st, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + MaxUncommittedEntries: 1024, } rn := raft.StartNode(c, peers) n := &node{ @@ -125,12 +126,13 @@ func (n *node) restart() { // wait for the shutdown <-n.stopc c := &raft.Config{ - ID: n.id, - ElectionTick: 10, - HeartbeatTick: 1, - Storage: n.storage, - MaxSizePerMsg: 1024 * 1024, - MaxInflightMsgs: 256, + ID: n.id, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: n.storage, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + MaxUncommittedEntries: 1024, } n.Node = raft.RestartNode(c) n.start()