Skip to content

Commit

Permalink
raft: Avoid scanning raft log in becomeLeader
Browse files Browse the repository at this point in the history
Scanning the uncommitted portion of the raft log to determine whether
there are any pending config changes can be expensive. In
cockroachdb/cockroach#18601, we've seen that a new leader can spend so
much time scanning its log post-election that it fails to send
its first heartbeats in time to prevent a second election from
starting immediately.

Instead of tracking whether a pending config change exists with a
boolean, this commit tracks the latest log index at which a pending
config change *could* exist. This is a less expensive solution to
the problem, and the impact of false positives should be minimal since
a newly-elected leader should be able to quickly commit the tail of
its log.
  • Loading branch information
bdarnell authored and gyuho committed Jan 9, 2018
1 parent 1ae0c0b commit bc705f1
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 73 deletions.
2 changes: 0 additions & 2 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ func (n *node) run(r *raft) {
}
case cc := <-n.confc:
if cc.NodeID == None {
r.resetPendingConf()
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
case <-n.done:
Expand All @@ -344,7 +343,6 @@ func (n *node) run(r *raft) {
}
r.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
r.resetPendingConf()
default:
panic("unexpected conf type")
}
Expand Down
6 changes: 5 additions & 1 deletion raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
n.Tick()
case rd := <-n.Ready():
s.Append(rd.Entries)
applied := false
for _, e := range rd.Entries {
rdyEntries = append(rdyEntries, e)
switch e.Type {
Expand All @@ -356,10 +357,13 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
n.ApplyConfChange(cc)
applyConfChan <- struct{}{}
applied = true
}
}
n.Advance()
if applied {
applyConfChan <- struct{}{}
}
}
}
}()
Expand Down
36 changes: 20 additions & 16 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,13 @@ type raft struct {
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
// New configuration is ignored if there exists unapplied configuration.
pendingConf bool
// Only one conf change may be pending (in the log, but not yet
// applied) at a time. This is enforced via pendingConfIndex, which
// is set to a value >= the log index of the latest pending
// configuration change (if any). Config changes are only allowed to
// be proposed if the leader's applied index is greater than this
// value.
pendingConfIndex uint64

readOnly *readOnly

Expand Down Expand Up @@ -578,7 +583,7 @@ func (r *raft) reset(term uint64) {
}
})

r.pendingConf = false
r.pendingConfIndex = 0
r.readOnly = newReadOnly(r.readOnly.option)
}

Expand Down Expand Up @@ -682,12 +687,13 @@ func (r *raft) becomeLeader() {
r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
}

nconf := numOfPendingConf(ents)
if nconf > 1 {
panic("unexpected multiple uncommitted config entry")
}
if nconf == 1 {
r.pendingConf = true
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
if len(ents) > 0 {
r.pendingConfIndex = ents[len(ents)-1].Index
}

r.appendEntry(pb.Entry{Data: nil})
Expand Down Expand Up @@ -901,11 +907,13 @@ func stepLeader(r *raft, m pb.Message) {

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConf {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String())
if r.pendingConfIndex > r.raftLog.applied {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
e.String(), r.pendingConfIndex, r.raftLog.applied)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
r.pendingConf = true
}
}
r.appendEntry(m.Entries...)
Expand Down Expand Up @@ -1270,7 +1278,6 @@ func (r *raft) addLearner(id uint64) {
}

func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
r.pendingConf = false
pr := r.getProgress(id)
if pr == nil {
r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
Expand Down Expand Up @@ -1306,7 +1313,6 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {

func (r *raft) removeNode(id uint64) {
r.delProgress(id)
r.pendingConf = false

// do not try to commit or abort transferring if there is no nodes in the cluster.
if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
Expand All @@ -1324,8 +1330,6 @@ func (r *raft) removeNode(id uint64) {
}
}

func (r *raft) resetPendingConf() { r.pendingConf = false }

func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
if !isLearner {
delete(r.learnerPrs, id)
Expand Down
73 changes: 21 additions & 52 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2736,8 +2736,8 @@ func TestStepConfig(t *testing.T) {
if g := r.raftLog.lastIndex(); g != index+1 {
t.Errorf("index = %d, want %d", g, index+1)
}
if !r.pendingConf {
t.Errorf("pendingConf = %v, want true", r.pendingConf)
if r.pendingConfIndex != index+1 {
t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, index+1)
}
}

Expand All @@ -2751,7 +2751,7 @@ func TestStepIgnoreConfig(t *testing.T) {
r.becomeLeader()
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
index := r.raftLog.lastIndex()
pendingConf := r.pendingConf
pendingConfIndex := r.pendingConfIndex
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
ents, err := r.raftLog.entries(index+1, noLimit)
Expand All @@ -2761,72 +2761,50 @@ func TestStepIgnoreConfig(t *testing.T) {
if !reflect.DeepEqual(ents, wents) {
t.Errorf("ents = %+v, want %+v", ents, wents)
}
if r.pendingConf != pendingConf {
t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
if r.pendingConfIndex != pendingConfIndex {
t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, pendingConfIndex)
}
}

// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
// TestNewLeaderPendingConfig tests that new leader sets its pendingConfigIndex
// based on uncommitted entries.
func TestRecoverPendingConfig(t *testing.T) {
func TestNewLeaderPendingConfig(t *testing.T) {
tests := []struct {
entType pb.EntryType
wpending bool
addEntry bool
wpendingIndex uint64
}{
{pb.EntryNormal, false},
{pb.EntryConfChange, true},
{false, 0},
{true, 1},
}
for i, tt := range tests {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.appendEntry(pb.Entry{Type: tt.entType})
if tt.addEntry {
r.appendEntry(pb.Entry{Type: pb.EntryNormal})
}
r.becomeCandidate()
r.becomeLeader()
if r.pendingConf != tt.wpending {
t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
if r.pendingConfIndex != tt.wpendingIndex {
t.Errorf("#%d: pendingConfIndex = %d, want %d",
i, r.pendingConfIndex, tt.wpendingIndex)
}
}
}

// TestRecoverDoublePendingConfig tests that new leader will panic if
// there exist two uncommitted config entries.
func TestRecoverDoublePendingConfig(t *testing.T) {
func() {
defer func() {
if err := recover(); err == nil {
t.Errorf("expect panic, but nothing happens")
}
}()
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.becomeCandidate()
r.becomeLeader()
}()
}

// TestAddNode tests that addNode could update pendingConf and nodes correctly.
// TestAddNode tests that addNode could update nodes correctly.
func TestAddNode(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.addNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes()
wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) {
t.Errorf("nodes = %v, want %v", nodes, wnodes)
}
}

// TestAddLearner tests that addLearner could update pendingConf and nodes correctly.
// TestAddLearner tests that addLearner could update nodes correctly.
func TestAddLearner(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.addLearner(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes()
wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) {
Expand All @@ -2841,7 +2819,6 @@ func TestAddLearner(t *testing.T) {
// immediately when checkQuorum is set.
func TestAddNodeCheckQuorum(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.checkQuorum = true

r.becomeCandidate()
Expand Down Expand Up @@ -2872,15 +2849,11 @@ func TestAddNodeCheckQuorum(t *testing.T) {
}
}

// TestRemoveNode tests that removeNode could update pendingConf, nodes and
// TestRemoveNode tests that removeNode could update nodes and
// and removed list correctly.
func TestRemoveNode(t *testing.T) {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.removeNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
w := []uint64{1}
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
Expand All @@ -2894,15 +2867,11 @@ func TestRemoveNode(t *testing.T) {
}
}

// TestRemoveLearner tests that removeNode could update pendingConf, nodes and
// TestRemoveLearner tests that removeNode could update nodes and
// and removed list correctly.
func TestRemoveLearner(t *testing.T) {
r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.removeNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
w := []uint64{1}
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
Expand Down
2 changes: 0 additions & 2 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
// ApplyConfChange applies a config change to the local node.
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
if cc.NodeID == None {
rn.raft.resetPendingConf()
return &pb.ConfState{Nodes: rn.raft.nodes()}
}
switch cc.Type {
Expand All @@ -180,7 +179,6 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
case pb.ConfChangeRemoveNode:
rn.raft.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
rn.raft.resetPendingConf()
default:
panic("unexpected conf type")
}
Expand Down

0 comments on commit bc705f1

Please sign in to comment.