Skip to content

Commit

Permalink
raft: fix bug in unbounded log growth prevention mechanism
Browse files Browse the repository at this point in the history
The previous code was using the proto-generated `Size()` method to
track the size of an incoming proposal at the leader. This includes
the Index and Term, which were mutated after the call to `Size()`
when appending to the log. Additionally, it was not taking into
account that an ignored configuration change would ignore the
original proposal and append an empty entry instead.

As a result, a fully committed Raft group could end up with a non-
zero tracked uncommitted Raft log counter that would eventually hit
the ceiling and drop all future proposals indiscriminately. It would
also immediately imply that proposals exceeding the threshold alone
would get refused (as the "first uncommitted proposal" gets special
treatment and is always allowed in).

Track only the size of the payload actually appended to the Raft log
instead.

For context, see:
cockroachdb/cockroach#31618 (comment)
  • Loading branch information
tbg committed Oct 19, 2018
1 parent 8c80efb commit 58caec5
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 10 deletions.
2 changes: 1 addition & 1 deletion raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
const maxEntries = 16
data := []byte("testdata")
testEntry := raftpb.Entry{Data: data}
maxEntrySize := uint64(maxEntries * testEntry.Size())
maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))

s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
Expand Down
16 changes: 10 additions & 6 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,10 +970,6 @@ func stepLeader(r *raft, m pb.Message) error {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}
if !r.increaseUncommittedSize(m.Entries) {
r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
return ErrProposalDropped
}

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
Expand All @@ -986,6 +982,10 @@ func stepLeader(r *raft, m pb.Message) error {
}
}
}
if !r.increaseUncommittedSize(m.Entries) {
r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
return ErrProposalDropped
}
r.appendEntry(m.Entries...)
r.bcastAppend()
return nil
Expand Down Expand Up @@ -1490,7 +1490,11 @@ func (r *raft) abortLeaderTransfer() {
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
var s uint64
for _, e := range ents {
s += uint64(e.Size())
// NB: use only the Data as other properties of the Entry could change.
// For example, the Term and Index are set only later, and the
// XXX_unrecognized field could also introduce unexpected behavior
// should the protos ever change.
s += uint64(PayloadSize(e))
}

if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
Expand All @@ -1513,7 +1517,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) {

var s uint64
for _, e := range ents {
s += uint64(e.Size())
s += uint64(PayloadSize(e))
}
if s > r.uncommittedSize {
// uncommittedSize may underestimate the size of the uncommitted Raft
Expand Down
14 changes: 12 additions & 2 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,19 @@ func TestProgressFlowControl(t *testing.T) {
}

func TestUncommittedEntryLimit(t *testing.T) {
const maxEntries = 16
// Use a relatively large number of entries here to prevent regression of a
// bug in which we'd use the Size() instead of the PayloadSize(). Size()
// changes when the Index and Term are assigned before putting the proposal
// in the log. This test would fail with the bug, either because we'd get
// dropped proposals earlier than we expect them, or because the final tally
// ends up nonzero. (At the time of writing, the former).
const maxEntries = 1024
testEntry := pb.Entry{Data: []byte("testdata")}
maxEntrySize := maxEntries * testEntry.Size()
maxEntrySize := maxEntries * PayloadSize(testEntry)

cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
cfg.MaxInflightMsgs = 1024 // avoid interference
r := newRaft(cfg)
r.becomeCandidate()
r.becomeLeader()
Expand Down Expand Up @@ -425,6 +432,9 @@ func TestUncommittedEntryLimit(t *testing.T) {
t.Fatalf("expected %d messages, got %d", e, len(ms))
}
r.reduceUncommittedSize(propEnts)
if n := r.uncommittedSize; n != 0 {
t.Fatalf("expected zero uncommitted size, got %d", n)
}
}

func TestLeaderElection(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
const maxEntries = 16
data := []byte("testdata")
testEntry := raftpb.Entry{Data: data}
maxEntrySize := uint64(maxEntries * testEntry.Size())
maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))

s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
Expand Down
6 changes: 6 additions & 0 deletions raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string {
return buf.String()
}

// PayloadSize is the size of the payload of this Entry. Notably, it does not
// depend on the its Index or Term.
func PayloadSize(e pb.Entry) int {
return len(e.Data)
}

// DescribeEntry returns a concise human-readable description of an
// Entry for debugging.
func DescribeEntry(e pb.Entry, f EntryFormatter) string {
Expand Down

0 comments on commit 58caec5

Please sign in to comment.