Skip to content

Commit

Permalink
NRG: Don't switch to candidate when waiting for pending applies
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen committed Oct 30, 2024
1 parent ea1df00 commit 7af96d8
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 1 deletion.
4 changes: 3 additions & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -4228,7 +4228,9 @@ func (n *raft) switchToCandidate() {
defer n.Unlock()

// If we are catching up or are in observer mode we can not switch.
if n.observer || n.paused {
// Avoid petitioning to become leader if we're behind on applies.
if n.observer || n.paused || n.applied < n.commit {
n.resetElect(minElectionTimeout / 4)
return
}

Expand Down
86 changes: 86 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,3 +1499,89 @@ func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) {
require_NoError(t, err)
require_Equal(t, len(files), 1)
}

func TestNRGDontSwitchToCandidateWithInflightSnapshot(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample snapshot entry, the content doesn't matter.
snapshotEntries := []*Entry{
newEntry(EntrySnapshot, nil),
newEntry(EntryPeerState, encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt})),
}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline.
aeTriggerCatchup := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
aeCatchupSnapshot := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: snapshotEntries})

// Switch follower into catchup.
n.processAppendEntry(aeTriggerCatchup, n.aesub)
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 0) // n.pterm
require_Equal(t, n.catchup.pindex, 0) // n.pindex

// Follower receives a snapshot, marking a snapshot as inflight as the apply queue is async.
n.processAppendEntry(aeCatchupSnapshot, n.catchup.sub)
require_Equal(t, n.pindex, 1)
require_Equal(t, n.commit, 1)

// Try to switch to candidate, it should be blocked since the snapshot is not processed yet.
n.switchToCandidate()
require_Equal(t, n.State(), Follower)

// Simulate snapshot being processed by the upper layer.
n.Applied(1)

// Retry becoming candidate, snapshot is processed so can now do so.
n.switchToCandidate()
require_Equal(t, n.State(), Candidate)
}

func TestNRGDontSwitchToCandidateWithMultipleInflightSnapshots(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample snapshot entry, the content doesn't matter.
snapshotEntries := []*Entry{
newEntry(EntrySnapshot, nil),
newEntry(EntryPeerState, encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt})),
}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline.
aeSnapshot1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: snapshotEntries})
aeSnapshot2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: snapshotEntries})
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil})

// Simulate snapshots being sent to us.
n.processAppendEntry(aeSnapshot1, n.aesub)
require_Equal(t, n.pindex, 1)
require_Equal(t, n.commit, 0)
require_Equal(t, n.applied, 0)

n.processAppendEntry(aeSnapshot2, n.aesub)
require_Equal(t, n.pindex, 2)
require_Equal(t, n.commit, 1)
require_Equal(t, n.applied, 0)

n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.pindex, 2)
require_Equal(t, n.commit, 2)
require_Equal(t, n.applied, 0)

for i := uint64(1); i <= 2; i++ {
// Try to switch to candidate, it should be blocked since the snapshot is not processed yet.
n.switchToCandidate()
require_Equal(t, n.State(), Follower)

// Simulate snapshot being processed by the upper layer.
n.Applied(i)
}

// Retry becoming candidate, all snapshots processed so can now do so.
n.switchToCandidate()
require_Equal(t, n.State(), Candidate)
}

0 comments on commit 7af96d8

Please sign in to comment.