From 87c30eb8fa09987698a94621a109c5254a490692 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 17 Nov 2023 11:51:39 +0000 Subject: [PATCH] Add option to accept any snapshot that allows replication A leader will not take into account snapshots reported by a follower unless they match or exceed the tracked PendingSnapshot index (which is the leader's last index at the time of requesting the snapshot). This is too inflexible: the leader should take into account any snapshot that reconnects the follower to its log. This PR adds a config option ResumeReplicateBelowPendingSnapshot that enables this behavior. In doing so, it addresses long-standing problems that we've encountered in CockroachDB. Unless you create the snapshot immediately and locally when raft emits an MsgSnap, it's difficult/impossible to later synthesize a snapshot at the requested index. It is possible to get one above the requested index which raft always accepted, but CockroachDB delegates snapshots to followers who might be behind on applying the log, and it is awkward to have to wait for log application to send the snapshot just to satisfy an overly strict condition in raft. Additionally, CockroachDB also sends snapshots preemptively when adding a new replica since there are qualitative differences between an initial snapshot and one needed to reconnect to the log and one does not want to wait for raft to round-trip to the follower to realize that a snapshot is needed. In this case, the sent snapshot is commonly behind the PendingSnapshot since the leader transitions the follower into StateProbe when a snapshot is already in flight. Touches https://github.com/cockroachdb/cockroach/issues/84242. Touches https://github.com/cockroachdb/cockroach/issues/87553. Touches https://github.com/cockroachdb/cockroach/pull/87554. Touches https://github.com/cockroachdb/cockroach/issues/97971. Touches https://github.com/cockroachdb/cockroach/issues/114349. See also https://github.com/cockroachdb/cockroach/blob/2b91c3829270eb512c5380201c26a3d838fc567a/pkg/kv/kvserver/raft_snapshot_queue.go#L131-L143. Signed-off-by: Erik Grinaker Signed-off-by: Tobias Grieger --- raft.go | 66 +++++---- rafttest/interaction_env_handler_add_nodes.go | 2 + .../snapshot_succeed_via_app_resp_behind.txt | 139 ++++++++++++++++++ tracker/progress.go | 22 ++- 4 files changed, 197 insertions(+), 32 deletions(-) create mode 100644 testdata/snapshot_succeed_via_app_resp_behind.txt diff --git a/raft.go b/raft.go index 86916626..9e1e6e08 100644 --- a/raft.go +++ b/raft.go @@ -283,6 +283,22 @@ type Config struct { // This behavior will become unconditional in the future. See: // https://github.com/etcd-io/raft/issues/83 StepDownOnRemoval bool + + // ResumeReplicateBelowPendingSnapshot allows the leader to resume replication + // to a follower (moving it to StateReplicate) if the follower is currently in + // StateSnapshot and we receive a MsgAppResp from it below the PendingSnapshot + // index but within the leader's log. In other words, the follower may apply a + // snapshot below the leader's PendingSnapshot, and the leader will resume + // replication as long as it connects the follower to the leader's log. + // + // Consider that complex systems may delegate the sending of snapshots to + // alternative datasources (i.e. not the leader). In such setups, it is + // difficult to manufacture a snapshot at a particular index requested by raft + // and the actual index may be ahead or behind. This should be okay, as long + // as the snapshot allows replication to resume. + // + // TODO(erikgrinaker): Consider making this the default behavior. + ResumeReplicateBelowPendingSnapshot bool } func (c *Config) validate() error { @@ -413,9 +429,10 @@ type raft struct { // randomizedElectionTimeout is a random number between // [electiontimeout, 2 * electiontimeout - 1]. It gets reset // when raft changes its state to follower or candidate. - randomizedElectionTimeout int - disableProposalForwarding bool - stepDownOnRemoval bool + randomizedElectionTimeout int + disableProposalForwarding bool + stepDownOnRemoval bool + resumeReplicateBelowPendingSnapshot bool tick func() step stepFunc @@ -440,22 +457,23 @@ func newRaft(c *Config) *raft { } r := &raft{ - id: c.ID, - lead: None, - isLearner: false, - raftLog: raftlog, - maxMsgSize: entryEncodingSize(c.MaxSizePerMsg), - maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize), - prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), - electionTimeout: c.ElectionTick, - heartbeatTimeout: c.HeartbeatTick, - logger: c.Logger, - checkQuorum: c.CheckQuorum, - preVote: c.PreVote, - readOnly: newReadOnly(c.ReadOnlyOption), - disableProposalForwarding: c.DisableProposalForwarding, - disableConfChangeValidation: c.DisableConfChangeValidation, - stepDownOnRemoval: c.StepDownOnRemoval, + id: c.ID, + lead: None, + isLearner: false, + raftLog: raftlog, + maxMsgSize: entryEncodingSize(c.MaxSizePerMsg), + maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize), + prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), + electionTimeout: c.ElectionTick, + heartbeatTimeout: c.HeartbeatTick, + logger: c.Logger, + checkQuorum: c.CheckQuorum, + preVote: c.PreVote, + readOnly: newReadOnly(c.ReadOnlyOption), + disableProposalForwarding: c.DisableProposalForwarding, + disableConfChangeValidation: c.DisableConfChangeValidation, + stepDownOnRemoval: c.StepDownOnRemoval, + resumeReplicateBelowPendingSnapshot: c.ResumeReplicateBelowPendingSnapshot, } cfg, prs, err := confchange.Restore(confchange.Changer{ @@ -1478,10 +1496,8 @@ func stepLeader(r *raft, m pb.Message) error { switch { case pr.State == tracker.StateProbe: pr.BecomeReplicate() - case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot: - // TODO(tbg): we should also enter this branch if a snapshot is - // received that is below pr.PendingSnapshot but which makes it - // possible to use the log again. + case pr.State == tracker.StateSnapshot && (pr.Match >= pr.PendingSnapshot || + r.resumeReplicateBelowPendingSnapshot && pr.Match >= max(0, r.raftLog.firstIndex()-1)): r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr) // Transition back to replicating state via probing state // (which takes the snapshot into account). If we didn't @@ -1560,10 +1576,6 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State != tracker.StateSnapshot { return nil } - // TODO(tbg): this code is very similar to the snapshot handling in - // MsgAppResp above. In fact, the code there is more correct than the - // code here and should likely be updated to match (or even better, the - // logic pulled into a newly created Progress state machine handler). if !m.Reject { pr.BecomeProbe() r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) diff --git a/rafttest/interaction_env_handler_add_nodes.go b/rafttest/interaction_env_handler_add_nodes.go index e68a295f..b4e4fe23 100644 --- a/rafttest/interaction_env_handler_add_nodes.go +++ b/rafttest/interaction_env_handler_add_nodes.go @@ -69,6 +69,8 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e } case "step-down-on-removal": arg.Scan(t, i, &cfg.StepDownOnRemoval) + case "resume-replicate-below-pending-snapshot": + arg.Scan(t, i, &cfg.ResumeReplicateBelowPendingSnapshot) } } } diff --git a/testdata/snapshot_succeed_via_app_resp_behind.txt b/testdata/snapshot_succeed_via_app_resp_behind.txt new file mode 100644 index 00000000..89b94574 --- /dev/null +++ b/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -0,0 +1,139 @@ +# This is a variant of snapshot_reject_via_app_resp in which the snapshot that +# is being sent is behind the PendingSnapshot index tracked by the leader. The +# test verifies that the leader will move the follower back to StateReplicate +# when ResumeReplicateBelowPendingSnapshot is enabled, since it is able to catch +# up the follower from the snapshot's index. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with three nodes, but the third is disconnected from the log. +# Allow resuming replication below PendingSnapshot. +add-nodes 2 voters=(1,2,3) index=10 resume-replicate-below-pending-snapshot=true +---- +ok + +add-nodes 1 voters=(1,2,3) index=5 +---- +ok + +campaign 1 +---- +ok + +process-ready 1 +---- +ok + +stabilize 2 3 +---- +ok + +# Now we have a leader, but it hasn't appended the empty entry yet. +# Initiate a snapshot from 1 to 3, which will be at the initial index, 10. +send-snapshot 1 3 +---- +ok + +# 1 and 2 commit the empty entry. +stabilize 1 2 +---- +ok + +# They also commit an additional entry. The leader's last index +# is then strictly non-adjacent to index 10 (of the snapshot). +propose 1 "foo" +---- +ok + +stabilize 1 2 +---- +ok + +log-level debug +---- +ok + +status 1 +---- +1: StateReplicate match=12 next=13 +2: StateReplicate match=12 next=13 +3: StateProbe match=0 next=11 paused inactive + +# n3 gets the first MsgApp the leader originally sent, trying to append entry +# 11 but this is rejected because the follower's log would start at index 5. +deliver-msgs 3 type=MsgApp +---- +1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] +DEBUG 3 [logterm: 0, index: 10] rejected MsgApp [logterm: 1, index: 10] from 1 + +# Note that the RejectionHint is 5, which is below the first index 10 of the +# leader. Once the leader receives this, it will move 3 into StateSnapshot +# with PendingSnapshot=lastIndex=12. +process-ready 3 +---- +Ready MustSync=false: +Lead:1 State:StateFollower +Messages: +3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5) + +# 3 receives the snapshot, but doesn't handle it yet. +deliver-msgs 3 +---- +1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:10 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false +INFO log [committed=5, applied=5, applying=5, unstable.offset=6, unstable.offsetInProgress=6, len(unstable.Entries)=0] starts to restore snapshot [index: 10, term: 1] +INFO 3 switched to configuration voters=(1 2 3) +INFO 3 [commit: 10, lastindex: 10, lastterm: 1] restored snapshot [index: 10, term: 1] +INFO 3 [commit: 10] restored snapshot [index: 10, term: 1] + + +# 1 sees the rejection and asks for a snapshot at index 12 (which is 1's current +# last index). +stabilize 1 +---- +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5) + DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10 + DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6] + DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=12] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + +# Drop the extra MsgSnap(index=12) that 1 just sent, to keep the test tidy. +deliver-msgs drop=(3) +---- +dropped: 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + +# 3 sends the affirmative MsgAppResp that resulted from applying the snapshot +# at index 10. +stabilize 3 +---- +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:10 + Snapshot Index:10 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + Messages: + 3->1 MsgAppResp Term:1 Log:0/10 + +stabilize 1 +---- +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:0/10 + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=10 next=11 paused pendingSnap=12] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/10 Commit:12 Entries:[1/11 EntryNormal "", 1/12 EntryNormal "\"foo\""] + +# 3 is in StateReplicate thanks to receiving the snapshot at index 10. +# This is despite its PendingSnapshot having been 12. +status 1 +---- +1: StateReplicate match=12 next=13 +2: StateReplicate match=12 next=13 +3: StateReplicate match=10 next=13 inflight=1 diff --git a/tracker/progress.go b/tracker/progress.go index 5948fadf..04bcff75 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -42,11 +42,23 @@ type Progress struct { // before and stops sending any replication message. State StateType - // PendingSnapshot is used in StateSnapshot. - // If there is a pending snapshot, the pendingSnapshot will be set to the - // index of the snapshot. If pendingSnapshot is set, the replication process of - // this Progress will be paused. raft will not resend snapshot until the pending one - // is reported to be failed. + // PendingSnapshot is used in StateSnapshot and tracks the last index of the + // leader at the time at which it realized a snapshot was necessary. This + // matches the index in the MsgSnap message emitted from raft. + // + // While there is a pending snapshot, replication to the follower is paused. + // The follower will transition back to StateReplicate if the leader + // receives an MsgAppResp from it at or above PendingSnapshot, or if + // ResumeReplicateBelowPendingSnapshot is enabled, one that reconnects the + // follower to the leader's log (such an MsgAppResp is emitted when the + // follower applies a snapshot). + // + // The follower will transition to StateProbe if ReportSnapshot is called on + // the leader; if SnapshotFinish is passed then PendingSnapshot becomes the + // basis for the next attempt to append. In practice, the first mechanism is + // the one that is relevant in most cases. However, if this MsgAppResp is + // lost (fallible network) then the second mechanism ensures that in this + // case the follower does not erroneously remain in StateSnapshot. PendingSnapshot uint64 // RecentActive is true if the progress is recently active. Receiving any messages