From 58caec5d5e3063a4d9282ad9764cb3e36d1736b4 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 19 Oct 2018 19:42:50 +0200 Subject: [PATCH] raft: fix bug in unbounded log growth prevention mechanism The previous code was using the proto-generated `Size()` method to track the size of an incoming proposal at the leader. This includes the Index and Term, which were mutated after the call to `Size()` when appending to the log. Additionally, it was not taking into account that an ignored configuration change would ignore the original proposal and append an empty entry instead. As a result, a fully committed Raft group could end up with a non- zero tracked uncommitted Raft log counter that would eventually hit the ceiling and drop all future proposals indiscriminately. It would also immediately imply that proposals exceeding the threshold alone would get refused (as the "first uncommitted proposal" gets special treatment and is always allowed in). Track only the size of the payload actually appended to the Raft log instead. For context, see: https://github.com/cockroachdb/cockroach/issues/31618#issuecomment-431374938 --- raft/node_test.go | 2 +- raft/raft.go | 16 ++++++++++------ raft/raft_test.go | 14 ++++++++++++-- raft/rawnode_test.go | 2 +- raft/util.go | 6 ++++++ 5 files changed, 30 insertions(+), 10 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index a729068bfc21..e977da6d6e1f 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -1006,7 +1006,7 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { const maxEntries = 16 data := []byte("testdata") testEntry := raftpb.Entry{Data: data} - maxEntrySize := uint64(maxEntries * testEntry.Size()) + maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) diff --git a/raft/raft.go b/raft/raft.go index bf0a8983c462..b9778a842fb4 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -970,10 +970,6 @@ func stepLeader(r *raft, m pb.Message) error { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) return ErrProposalDropped } - if !r.increaseUncommittedSize(m.Entries) { - r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id) - return ErrProposalDropped - } for i, e := range m.Entries { if e.Type == pb.EntryConfChange { @@ -986,6 +982,10 @@ func stepLeader(r *raft, m pb.Message) error { } } } + if !r.increaseUncommittedSize(m.Entries) { + r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id) + return ErrProposalDropped + } r.appendEntry(m.Entries...) r.bcastAppend() return nil @@ -1490,7 +1490,11 @@ func (r *raft) abortLeaderTransfer() { func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { var s uint64 for _, e := range ents { - s += uint64(e.Size()) + // NB: use only the Data as other properties of the Entry could change. + // For example, the Term and Index are set only later, and the + // XXX_unrecognized field could also introduce unexpected behavior + // should the protos ever change. + s += uint64(PayloadSize(e)) } if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize { @@ -1513,7 +1517,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) { var s uint64 for _, e := range ents { - s += uint64(e.Size()) + s += uint64(PayloadSize(e)) } if s > r.uncommittedSize { // uncommittedSize may underestimate the size of the uncommitted Raft diff --git a/raft/raft_test.go b/raft/raft_test.go index cac4bb6c2caa..a250dfe56a7c 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -363,12 +363,19 @@ func TestProgressFlowControl(t *testing.T) { } func TestUncommittedEntryLimit(t *testing.T) { - const maxEntries = 16 + // Use a relatively large number of entries here to prevent regression of a + // bug in which we'd use the Size() instead of the PayloadSize(). Size() + // changes when the Index and Term are assigned before putting the proposal + // in the log. This test would fail with the bug, either because we'd get + // dropped proposals earlier than we expect them, or because the final tally + // ends up nonzero. (At the time of writing, the former). + const maxEntries = 1024 testEntry := pb.Entry{Data: []byte("testdata")} - maxEntrySize := maxEntries * testEntry.Size() + maxEntrySize := maxEntries * PayloadSize(testEntry) cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize) + cfg.MaxInflightMsgs = 1024 // avoid interference r := newRaft(cfg) r.becomeCandidate() r.becomeLeader() @@ -425,6 +432,9 @@ func TestUncommittedEntryLimit(t *testing.T) { t.Fatalf("expected %d messages, got %d", e, len(ms)) } r.reduceUncommittedSize(propEnts) + if n := r.uncommittedSize; n != 0 { + t.Fatalf("expected zero uncommitted size, got %d", n) + } } func TestLeaderElection(t *testing.T) { diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 3e56733aa425..6348bb7e3248 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -493,7 +493,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { const maxEntries = 16 data := []byte("testdata") testEntry := raftpb.Entry{Data: data} - maxEntrySize := uint64(maxEntries * testEntry.Size()) + maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) diff --git a/raft/util.go b/raft/util.go index 1a7a1e9ac3a2..b8d4206c457d 100644 --- a/raft/util.go +++ b/raft/util.go @@ -101,6 +101,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string { return buf.String() } +// PayloadSize is the size of the payload of this Entry. Notably, it does not +// depend on the its Index or Term. +func PayloadSize(e pb.Entry) int { + return len(e.Data) +} + // DescribeEntry returns a concise human-readable description of an // Entry for debugging. func DescribeEntry(e pb.Entry, f EntryFormatter) string {