From be280f0d4265cbf6017baf13441ad7548edfe5f0 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 30 Oct 2024 17:12:42 +0100 Subject: [PATCH] [FIXED] Don't become candidate while processing snapshot Signed-off-by: Maurice van Veen --- server/raft.go | 4 ++- server/raft_test.go | 86 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/server/raft.go b/server/raft.go index 76597d8d660..f6aa1147304 100644 --- a/server/raft.go +++ b/server/raft.go @@ -4228,7 +4228,9 @@ func (n *raft) switchToCandidate() { defer n.Unlock() // If we are catching up or are in observer mode we can not switch. - if n.observer || n.paused { + // Avoid petitioning to become leader if we're behind on applies. + if n.observer || n.paused || n.applied < n.commit { + n.resetElect(minElectionTimeout / 4) return } diff --git a/server/raft_test.go b/server/raft_test.go index 1a6c5e83878..86882b7d9a4 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1499,3 +1499,89 @@ func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) { require_NoError(t, err) require_Equal(t, len(files), 1) } + +func TestNRGDontSwitchToCandidateWithInflightSnapshot(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample snapshot entry, the content doesn't matter. + snapshotEntries := []*Entry{ + newEntry(EntrySnapshot, nil), + newEntry(EntryPeerState, encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt})), + } + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeTriggerCatchup := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + aeCatchupSnapshot := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: snapshotEntries}) + + // Switch follower into catchup. + n.processAppendEntry(aeTriggerCatchup, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + + // Follower receives a snapshot, marking a snapshot as inflight as the apply queue is async. + n.processAppendEntry(aeCatchupSnapshot, n.catchup.sub) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.commit, 1) + + // Try to switch to candidate, it should be blocked since the snapshot is not processed yet. + n.switchToCandidate() + require_Equal(t, n.State(), Follower) + + // Simulate snapshot being processed by the upper layer. + n.Applied(1) + + // Retry becoming candidate, snapshot is processed so can now do so. + n.switchToCandidate() + require_Equal(t, n.State(), Candidate) +} + +func TestNRGDontSwitchToCandidateWithMultipleInflightSnapshots(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample snapshot entry, the content doesn't matter. + snapshotEntries := []*Entry{ + newEntry(EntrySnapshot, nil), + newEntry(EntryPeerState, encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt})), + } + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeSnapshot1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: snapshotEntries}) + aeSnapshot2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: snapshotEntries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil}) + + // Simulate snapshots being sent to us. + n.processAppendEntry(aeSnapshot1, n.aesub) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.commit, 0) + require_Equal(t, n.applied, 0) + + n.processAppendEntry(aeSnapshot2, n.aesub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 1) + require_Equal(t, n.applied, 0) + + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 2) + require_Equal(t, n.applied, 0) + + for i := uint64(1); i <= 2; i++ { + // Try to switch to candidate, it should be blocked since the snapshot is not processed yet. + n.switchToCandidate() + require_Equal(t, n.State(), Follower) + + // Simulate snapshot being processed by the upper layer. + n.Applied(i) + } + + // Retry becoming candidate, all snapshots processed so can now do so. + n.switchToCandidate() + require_Equal(t, n.State(), Candidate) +}