From f89b06dc6d9d9cc98228c2bdd6968a3f137ef26f Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 9 Oct 2018 12:40:58 -0400 Subject: [PATCH 1/3] raft: provide protection against unbounded Raft log growth The suggested pattern for Raft proposals is that they be retried periodically until they succeed. This turns out to be an issue when a leader cannot commit entries because the leader will continue to append re-proposed entries to its log without committing anything. This can result in the uncommitted tail of a leader's log growing without bound until it is able to commit entries. This change add a safeguard to protect against this case where a leader's log can grow without bound during loss of quorum scenarios. It does so by introducing a new, optional ``MaxUncommittedEntriesSize configuration. This config limits the max aggregate size of uncommitted entries that may be appended to a leader's log. Once this limit is exceeded, proposals will begin to return ErrProposalDropped errors. See cockroachdb/cockroach#27772 --- contrib/raftexample/raft.go | 13 +++--- raft/README.md | 1 + raft/node.go | 1 + raft/node_test.go | 54 ++++++++++++++++++++++++ raft/raft.go | 83 ++++++++++++++++++++++++++++++++----- raft/raft_test.go | 65 +++++++++++++++++++++++++++++ raft/rafttest/node.go | 26 ++++++------ raft/rawnode.go | 1 + raft/rawnode_test.go | 61 +++++++++++++++++++++++++++ 9 files changed, 277 insertions(+), 28 deletions(-) diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index 9a270eac719..a76fb9d7454 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -274,12 +274,13 @@ func (rc *raftNode) startRaft() { rpeers[i] = raft.Peer{ID: uint64(i + 1)} } c := &raft.Config{ - ID: uint64(rc.id), - ElectionTick: 10, - HeartbeatTick: 1, - Storage: rc.raftStorage, - MaxSizePerMsg: 1024 * 1024, - MaxInflightMsgs: 256, + ID: uint64(rc.id), + ElectionTick: 10, + HeartbeatTick: 1, + Storage: rc.raftStorage, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + MaxUncommittedEntriesSize: 1 << 30, } if oldwal { diff --git a/raft/README.md b/raft/README.md index 0ddf3f48f8a..a78e5f720ba 100644 --- a/raft/README.md +++ b/raft/README.md @@ -41,6 +41,7 @@ This raft implementation also includes a few optional enhancements: - Writing to leader's disk in parallel - Internal proposal redirection from followers to leader - Automatic stepping down when the leader loses quorum +- Protection against unbounded log growth when quorum is lost ## Notable Users diff --git a/raft/node.go b/raft/node.go index 9e7c209ce66..f67628fd361 100644 --- a/raft/node.go +++ b/raft/node.go @@ -401,6 +401,7 @@ func (n *node) run(r *raft) { r.msgs = nil r.readStates = nil + r.reduceUncommittedSize(rd.CommittedEntries) advancec = n.advancec case <-advancec: if applyingToI != 0 { diff --git a/raft/node_test.go b/raft/node_test.go index b067aaad4d1..a729068bfc2 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -997,3 +997,57 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { ) } } + +// TestNodeBoundedLogGrowthWithPartition tests a scenario where a leader is +// partitioned from a quorum of nodes. It verifies that the leader's log is +// protected from unbounded growth even as new entries continue to be proposed. +// This protection is provided by the MaxUncommittedEntriesSize configuration. +func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { + const maxEntries = 16 + data := []byte("testdata") + testEntry := raftpb.Entry{Data: data} + maxEntrySize := uint64(maxEntries * testEntry.Size()) + + s := NewMemoryStorage() + cfg := newTestConfig(1, []uint64{1}, 10, 1, s) + cfg.MaxUncommittedEntriesSize = maxEntrySize + r := newRaft(cfg) + n := newNode() + go n.run(r) + defer n.Stop() + n.Campaign(context.TODO()) + + rd := readyWithTimeout(&n) + if len(rd.CommittedEntries) != 1 { + t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + n.Advance() + + // Simulate a network partition while we make our proposals by never + // committing anything. These proposals should not cause the leader's + // log to grow indefinitely. + for i := 0; i < 1024; i++ { + n.Propose(context.TODO(), data) + } + + // Check the size of leader's uncommitted log tail. It should not exceed the + // MaxUncommittedEntriesSize limit. + checkUncommitted := func(exp uint64) { + t.Helper() + if a := r.uncommittedSize; exp != a { + t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a) + } + } + checkUncommitted(maxEntrySize) + + // Recover from the partition. The uncommitted tail of the Raft log should + // disappear as entries are committed. + rd = readyWithTimeout(&n) + if len(rd.CommittedEntries) != maxEntries { + t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + n.Advance() + checkUncommitted(0) +} diff --git a/raft/raft.go b/raft/raft.go index 81bad3bec73..5e69b862ac3 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -148,12 +148,17 @@ type Config struct { // applied entries. This is a very application dependent configuration. Applied uint64 - // MaxSizePerMsg limits the max size of each append message. Smaller value - // lowers the raft recovery cost(initial probing and message lost during normal - // operation). On the other side, it might affect the throughput during normal - // replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per - // message. + // MaxSizePerMsg limits the max byte size of each append message. Smaller + // value lowers the raft recovery cost(initial probing and message lost + // during normal operation). On the other side, it might affect the + // throughput during normal replication. Note: math.MaxUint64 for unlimited, + // 0 for at most one entry per message. MaxSizePerMsg uint64 + // MaxUncommittedEntriesSize limits the aggregate byte size of the + // uncommitted entries that may be appended to a leader's log. Once this + // limit is exceeded, proposals will begin to return ErrProposalDropped + // errors. Note: 0 for no limit. + MaxUncommittedEntriesSize uint64 // MaxInflightMsgs limits the max number of in-flight append messages during // optimistic replication phase. The application transportation layer usually // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid @@ -215,6 +220,10 @@ func (c *Config) validate() error { return errors.New("storage cannot be nil") } + if c.MaxUncommittedEntriesSize == 0 { + c.MaxUncommittedEntriesSize = noLimit + } + if c.MaxInflightMsgs <= 0 { return errors.New("max inflight messages must be greater than 0") } @@ -241,11 +250,12 @@ type raft struct { // the log raftLog *raftLog - maxInflight int - maxMsgSize uint64 - prs map[uint64]*Progress - learnerPrs map[uint64]*Progress - matchBuf uint64Slice + maxMsgSize uint64 + maxUncommittedSize uint64 + maxInflight int + prs map[uint64]*Progress + learnerPrs map[uint64]*Progress + matchBuf uint64Slice state StateType @@ -268,6 +278,10 @@ type raft struct { // be proposed if the leader's applied index is greater than this // value. pendingConfIndex uint64 + // an estimate of the size of the uncommitted tail of the Raft log. Used to + // prevent unbounded log growth. Only maintained by the leader. Reset on + // term changes. + uncommittedSize uint64 readOnly *readOnly @@ -326,6 +340,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxInflight: c.MaxInflightMsgs, + maxUncommittedSize: c.MaxUncommittedEntriesSize, prs: make(map[uint64]*Progress), learnerPrs: make(map[uint64]*Progress), electionTimeout: c.ElectionTick, @@ -616,6 +631,7 @@ func (r *raft) reset(term uint64) { }) r.pendingConfIndex = 0 + r.uncommittedSize = 0 r.readOnly = newReadOnly(r.readOnly.option) } @@ -954,6 +970,10 @@ func stepLeader(r *raft, m pb.Message) error { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) return ErrProposalDropped } + if !r.increaseUncommittedSize(m.Entries) { + r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id) + return ErrProposalDropped + } for i, e := range m.Entries { if e.Type == pb.EntryConfChange { @@ -1462,6 +1482,49 @@ func (r *raft) abortLeaderTransfer() { r.leadTransferee = None } +// increaseUncommittedSize computes the size of the proposed entries and +// determines whether they would push leader over its maxUncommittedSize limit. +// If the new entries would exceed the limit, the method returns false. If not, +// the increase in uncommitted entry size is recorded and the method returns +// true. +func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { + var s uint64 + for _, e := range ents { + s += uint64(e.Size()) + } + + if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize { + // If the uncommitted tail of the Raft log is empty, allow any size + // proposal. Otherwise, limit the size of the uncommitted tail of the + // log and drop any proposal that would push the size over the limit. + return false + } + r.uncommittedSize += s + return true +} + +// reduceUncommittedSize accounts for the newly committed entries by decreasing +// the uncommitted entry size limit. +func (r *raft) reduceUncommittedSize(ents []pb.Entry) { + if r.uncommittedSize == 0 { + // Fast-path for followers, who do not track or enforce the limit. + return + } + + var s uint64 + for _, e := range ents { + s += uint64(e.Size()) + } + if s > r.uncommittedSize { + // uncommittedSize may underestimate the size of the uncommitted Raft + // log tail but will never overestimate it. Saturate at 0 instead of + // allowing overflow. + r.uncommittedSize = 0 + } else { + r.uncommittedSize -= s + } +} + func numOfPendingConf(ents []pb.Entry) int { n := 0 for i := range ents { diff --git a/raft/raft_test.go b/raft/raft_test.go index 8619692f302..cac4bb6c2ca 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -362,6 +362,71 @@ func TestProgressFlowControl(t *testing.T) { } } +func TestUncommittedEntryLimit(t *testing.T) { + const maxEntries = 16 + testEntry := pb.Entry{Data: []byte("testdata")} + maxEntrySize := maxEntries * testEntry.Size() + + cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) + cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize) + r := newRaft(cfg) + r.becomeCandidate() + r.becomeLeader() + + // Set the two followers to the replicate state. Commit to tail of log. + const numFollowers = 2 + r.prs[2].becomeReplicate() + r.prs[3].becomeReplicate() + r.uncommittedSize = 0 + + // Send proposals to r1. The first 5 entries should be appended to the log. + propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{testEntry}} + propEnts := make([]pb.Entry, maxEntries) + for i := 0; i < maxEntries; i++ { + if err := r.Step(propMsg); err != nil { + t.Fatalf("proposal resulted in error: %v", err) + } + propEnts[i] = testEntry + } + + // Send one more proposal to r1. It should be rejected. + if err := r.Step(propMsg); err != ErrProposalDropped { + t.Fatalf("proposal not dropped: %v", err) + } + + // Read messages and reduce the uncommitted size as if we had committed + // these entries. + ms := r.readMessages() + if e := maxEntries * numFollowers; len(ms) != e { + t.Fatalf("expected %d messages, got %d", e, len(ms)) + } + r.reduceUncommittedSize(propEnts) + + // Send a single large proposal to r1. Should be accepted even though it + // pushes us above the limit because we were beneath it before the proposal. + propEnts = make([]pb.Entry, 2*maxEntries) + for i := range propEnts { + propEnts[i] = testEntry + } + propMsgLarge := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: propEnts} + if err := r.Step(propMsgLarge); err != nil { + t.Fatalf("proposal resulted in error: %v", err) + } + + // Send one more proposal to r1. It should be rejected, again. + if err := r.Step(propMsg); err != ErrProposalDropped { + t.Fatalf("proposal not dropped: %v", err) + } + + // Read messages and reduce the uncommitted size as if we had committed + // these entries. + ms = r.readMessages() + if e := 1 * numFollowers; len(ms) != e { + t.Fatalf("expected %d messages, got %d", e, len(ms)) + } + r.reduceUncommittedSize(propEnts) +} + func TestLeaderElection(t *testing.T) { testLeaderElection(t, false) } diff --git a/raft/rafttest/node.go b/raft/rafttest/node.go index 57ff9b262bc..add21da0b56 100644 --- a/raft/rafttest/node.go +++ b/raft/rafttest/node.go @@ -41,12 +41,13 @@ type node struct { func startNode(id uint64, peers []raft.Peer, iface iface) *node { st := raft.NewMemoryStorage() c := &raft.Config{ - ID: id, - ElectionTick: 10, - HeartbeatTick: 1, - Storage: st, - MaxSizePerMsg: 1024 * 1024, - MaxInflightMsgs: 256, + ID: id, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: st, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + MaxUncommittedEntriesSize: 1 << 30, } rn := raft.StartNode(c, peers) n := &node{ @@ -125,12 +126,13 @@ func (n *node) restart() { // wait for the shutdown <-n.stopc c := &raft.Config{ - ID: n.id, - ElectionTick: 10, - HeartbeatTick: 1, - Storage: n.storage, - MaxSizePerMsg: 1024 * 1024, - MaxInflightMsgs: 256, + ID: n.id, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: n.storage, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + MaxUncommittedEntriesSize: 1 << 30, } n.Node = raft.RestartNode(c) n.start() diff --git a/raft/rawnode.go b/raft/rawnode.go index 5f8a116dd63..4a4ec2e9463 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -198,6 +198,7 @@ func (rn *RawNode) Step(m pb.Message) error { func (rn *RawNode) Ready() Ready { rd := rn.newReady() rn.raft.msgs = nil + rn.raft.reduceUncommittedSize(rd.CommittedEntries) return rd } diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index f941cbf7b60..3e56733aa42 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -484,3 +484,64 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { }) } } + +// TestRawNodeBoundedLogGrowthWithPartition tests a scenario where a leader is +// partitioned from a quorum of nodes. It verifies that the leader's log is +// protected from unbounded growth even as new entries continue to be proposed. +// This protection is provided by the MaxUncommittedEntriesSize configuration. +func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { + const maxEntries = 16 + data := []byte("testdata") + testEntry := raftpb.Entry{Data: data} + maxEntrySize := uint64(maxEntries * testEntry.Size()) + + s := NewMemoryStorage() + cfg := newTestConfig(1, []uint64{1}, 10, 1, s) + cfg.MaxUncommittedEntriesSize = maxEntrySize + rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}}) + if err != nil { + t.Fatal(err) + } + rd := rawNode.Ready() + s.Append(rd.Entries) + rawNode.Advance(rd) + + // Become the leader. + rawNode.Campaign() + for { + rd = rawNode.Ready() + s.Append(rd.Entries) + if rd.SoftState.Lead == rawNode.raft.id { + rawNode.Advance(rd) + break + } + rawNode.Advance(rd) + } + + // Simulate a network partition while we make our proposals by never + // committing anything. These proposals should not cause the leader's + // log to grow indefinitely. + for i := 0; i < 1024; i++ { + rawNode.Propose(data) + } + + // Check the size of leader's uncommitted log tail. It should not exceed the + // MaxUncommittedEntriesSize limit. + checkUncommitted := func(exp uint64) { + t.Helper() + if a := rawNode.raft.uncommittedSize; exp != a { + t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a) + } + } + checkUncommitted(maxEntrySize) + + // Recover from the partition. The uncommitted tail of the Raft log should + // disappear as entries are committed. + rd = rawNode.Ready() + if len(rd.CommittedEntries) != maxEntries { + t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + rawNode.Advance(rd) + checkUncommitted(0) +} From 7be7ac5a5d261e7169e009aa691fcb92331ea919 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 8 Oct 2018 19:31:54 -0400 Subject: [PATCH 2/3] raft: Fix spelling in doc.go --- raft/doc.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/raft/doc.go b/raft/doc.go index 2c10c0f5dc4..c30d88445f2 100644 --- a/raft/doc.go +++ b/raft/doc.go @@ -87,7 +87,7 @@ large). Note: Marshalling messages is not thread-safe; it is important that you make sure that no new entries are persisted while marshalling. -The easiest way to achieve this is to serialise the messages directly inside +The easiest way to achieve this is to serialize the messages directly inside your main raft loop. 3. Apply Snapshot (if any) and CommittedEntries to the state machine. @@ -153,7 +153,7 @@ If the proposal is committed, data will appear in committed entries with type raftpb.EntryNormal. There is no guarantee that a proposed command will be committed; you may have to re-propose after a timeout. -To add or remove node in a cluster, build ConfChange struct 'cc' and call: +To add or remove a node in a cluster, build ConfChange struct 'cc' and call: n.ProposeConfChange(ctx, cc) @@ -260,7 +260,7 @@ stale log entries: 'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election protocol. When Config.PreVote is true, a pre-election is carried out first (using the same rules as a regular election), and no node increases its term - number unless the pre-election indicates that the campaigining node would win. + number unless the pre-election indicates that the campaigning node would win. This minimizes disruption when a partitioned node rejoins the cluster. 'MsgSnap' requests to install a snapshot message. When a node has just From 73c20cc1b7dbf155b6153502e6dc71dd948762bc Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 8 Oct 2018 19:42:06 -0400 Subject: [PATCH 3/3] raft: Fix comment on sendHeartbeat --- raft/raft.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/raft.go b/raft/raft.go index 5e69b862ac3..bf0a8983c46 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -529,7 +529,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return true } -// sendHeartbeat sends an empty MsgApp +// sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message,