diff --git a/raft.go b/raft.go index cb2c4f7d..bb16f559 100644 --- a/raft.go +++ b/raft.go @@ -283,6 +283,22 @@ type Config struct { // This behavior will become unconditional in the future. See: // https://github.com/etcd-io/raft/issues/83 StepDownOnRemoval bool + + // ResumeReplicateBelowPendingSnapshot allows the leader to resume replication + // to a follower (moving it to StateReplicate) if the follower is currently in + // StateSnapshot and we receive a MsgAppResp from it below the PendingSnapshot + // index but within the leader's log. In other words, the follower may apply a + // snapshot below the leader's PendingSnapshot, and the leader will resume + // replication as long as it connects the follower to the leader's log. + // + // Consider that complex systems may delegate the sending of snapshots to + // alternative datasources (i.e. not the leader). In such setups, it is + // difficult to manufacture a snapshot at a particular index requested by raft + // and the actual index may be ahead or behind. This should be okay, as long + // as the snapshot allows replication to resume. + // + // TODO(erikgrinaker): Consider making this the default behavior. + ResumeReplicateBelowPendingSnapshot bool } func (c *Config) validate() error { @@ -413,9 +429,10 @@ type raft struct { // randomizedElectionTimeout is a random number between // [electiontimeout, 2 * electiontimeout - 1]. It gets reset // when raft changes its state to follower or candidate. - randomizedElectionTimeout int - disableProposalForwarding bool - stepDownOnRemoval bool + randomizedElectionTimeout int + disableProposalForwarding bool + stepDownOnRemoval bool + resumeReplicateBelowPendingSnapshot bool tick func() step stepFunc @@ -440,22 +457,23 @@ func newRaft(c *Config) *raft { } r := &raft{ - id: c.ID, - lead: None, - isLearner: false, - raftLog: raftlog, - maxMsgSize: entryEncodingSize(c.MaxSizePerMsg), - maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize), - trk: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), - electionTimeout: c.ElectionTick, - heartbeatTimeout: c.HeartbeatTick, - logger: c.Logger, - checkQuorum: c.CheckQuorum, - preVote: c.PreVote, - readOnly: newReadOnly(c.ReadOnlyOption), - disableProposalForwarding: c.DisableProposalForwarding, - disableConfChangeValidation: c.DisableConfChangeValidation, - stepDownOnRemoval: c.StepDownOnRemoval, + id: c.ID, + lead: None, + isLearner: false, + raftLog: raftlog, + maxMsgSize: entryEncodingSize(c.MaxSizePerMsg), + maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize), + trk: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), + electionTimeout: c.ElectionTick, + heartbeatTimeout: c.HeartbeatTick, + logger: c.Logger, + checkQuorum: c.CheckQuorum, + preVote: c.PreVote, + readOnly: newReadOnly(c.ReadOnlyOption), + disableProposalForwarding: c.DisableProposalForwarding, + disableConfChangeValidation: c.DisableConfChangeValidation, + stepDownOnRemoval: c.StepDownOnRemoval, + resumeReplicateBelowPendingSnapshot: c.ResumeReplicateBelowPendingSnapshot, } cfg, trk, err := confchange.Restore(confchange.Changer{ @@ -1478,10 +1496,8 @@ func stepLeader(r *raft, m pb.Message) error { switch { case pr.State == tracker.StateProbe: pr.BecomeReplicate() - case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot: - // TODO(tbg): we should also enter this branch if a snapshot is - // received that is below pr.PendingSnapshot but which makes it - // possible to use the log again. + case pr.State == tracker.StateSnapshot && (pr.Match >= pr.PendingSnapshot || + r.resumeReplicateBelowPendingSnapshot && pr.Match+1 >= r.raftLog.firstIndex()): r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr) // Transition back to replicating state via probing state // (which takes the snapshot into account). If we didn't @@ -1560,10 +1576,6 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State != tracker.StateSnapshot { return nil } - // TODO(tbg): this code is very similar to the snapshot handling in - // MsgAppResp above. In fact, the code there is more correct than the - // code here and should likely be updated to match (or even better, the - // logic pulled into a newly created Progress state machine handler). if !m.Reject { pr.BecomeProbe() r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) diff --git a/rafttest/interaction_env_handler_add_nodes.go b/rafttest/interaction_env_handler_add_nodes.go index e68a295f..b4e4fe23 100644 --- a/rafttest/interaction_env_handler_add_nodes.go +++ b/rafttest/interaction_env_handler_add_nodes.go @@ -69,6 +69,8 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e } case "step-down-on-removal": arg.Scan(t, i, &cfg.StepDownOnRemoval) + case "resume-replicate-below-pending-snapshot": + arg.Scan(t, i, &cfg.ResumeReplicateBelowPendingSnapshot) } } } diff --git a/testdata/snapshot_succeed_via_app_resp_behind.txt b/testdata/snapshot_succeed_via_app_resp_behind.txt new file mode 100644 index 00000000..75b5cadb --- /dev/null +++ b/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -0,0 +1,141 @@ +# This is a variant of snapshot_reject_via_app_resp in which the snapshot that +# is being sent is behind the PendingSnapshot index tracked by the leader. The +# test verifies that the leader will move the follower back to StateReplicate +# when ResumeReplicateBelowPendingSnapshot is enabled, since it is able to catch +# up the follower from the snapshot's index. +# +# Any changes to snapshot_reject_via_app_resp should also be made here. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with three nodes, but the third is disconnected from the log. +# Allow resuming replication below PendingSnapshot. +add-nodes 2 voters=(1,2,3) index=10 resume-replicate-below-pending-snapshot=true +---- +ok + +add-nodes 1 voters=(1,2,3) index=5 +---- +ok + +campaign 1 +---- +ok + +process-ready 1 +---- +ok + +stabilize 2 3 +---- +ok + +# Now we have a leader, but it hasn't appended the empty entry yet. +# Initiate a snapshot from 1 to 3, which will be at the initial index, 10. +send-snapshot 1 3 +---- +ok + +# 1 and 2 commit the empty entry. +stabilize 1 2 +---- +ok + +# They also commit an additional entry. The leader's last index +# is then strictly non-adjacent to index 10 (of the snapshot). +propose 1 "foo" +---- +ok + +stabilize 1 2 +---- +ok + +log-level debug +---- +ok + +status 1 +---- +1: StateReplicate match=12 next=13 +2: StateReplicate match=12 next=13 +3: StateProbe match=0 next=11 paused inactive + +# n3 gets the first MsgApp the leader originally sent, trying to append entry +# 11 but this is rejected because the follower's log would start at index 5. +deliver-msgs 3 type=MsgApp +---- +1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] +DEBUG 3 [logterm: 0, index: 10] rejected MsgApp [logterm: 1, index: 10] from 1 + +# Note that the RejectionHint is 5, which is below the first index 10 of the +# leader. Once the leader receives this, it will move 3 into StateSnapshot +# with PendingSnapshot=lastIndex=12. +process-ready 3 +---- +Ready MustSync=false: +Lead:1 State:StateFollower +Messages: +3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5) + +# 3 receives the snapshot, but doesn't handle it yet. +deliver-msgs 3 +---- +1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:10 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false +INFO log [committed=5, applied=5, applying=5, unstable.offset=6, unstable.offsetInProgress=6, len(unstable.Entries)=0] starts to restore snapshot [index: 10, term: 1] +INFO 3 switched to configuration voters=(1 2 3) +INFO 3 [commit: 10, lastindex: 10, lastterm: 1] restored snapshot [index: 10, term: 1] +INFO 3 [commit: 10] restored snapshot [index: 10, term: 1] + + +# 1 sees the rejection and asks for a snapshot at index 12 (which is 1's current +# last index). +stabilize 1 +---- +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5) + DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10 + DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6] + DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=12] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + +# Drop the extra MsgSnap(index=12) that 1 just sent, to keep the test tidy. +deliver-msgs drop=(3) +---- +dropped: 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + +# 3 sends the affirmative MsgAppResp that resulted from applying the snapshot +# at index 10. +stabilize 3 +---- +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:10 + Snapshot Index:10 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false + Messages: + 3->1 MsgAppResp Term:1 Log:0/10 + +stabilize 1 +---- +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:0/10 + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=10 next=11 paused pendingSnap=12] +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/10 Commit:12 Entries:[1/11 EntryNormal "", 1/12 EntryNormal "\"foo\""] + +# 3 is in StateReplicate thanks to receiving the snapshot at index 10. +# This is despite its PendingSnapshot having been 12. +status 1 +---- +1: StateReplicate match=12 next=13 +2: StateReplicate match=12 next=13 +3: StateReplicate match=10 next=13 inflight=1 diff --git a/tracker/progress.go b/tracker/progress.go index 5948fadf..4e8cbbb3 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -42,11 +42,26 @@ type Progress struct { // before and stops sending any replication message. State StateType - // PendingSnapshot is used in StateSnapshot. - // If there is a pending snapshot, the pendingSnapshot will be set to the - // index of the snapshot. If pendingSnapshot is set, the replication process of - // this Progress will be paused. raft will not resend snapshot until the pending one - // is reported to be failed. + // PendingSnapshot is used in StateSnapshot and tracks the last index of the + // leader at the time at which it realized a snapshot was necessary. This + // matches the index in the MsgSnap message emitted from raft. + // + // While there is a pending snapshot, replication to the follower is paused. + // The follower will transition back to StateReplicate if the leader receives + // an MsgAppResp from it that either: + // + // - is at or above PendingSnapshot + // - if ResumeReplicateBelowPendingSnapshot is enabled, reconnects the + // follower to the leader's log + // + // Such an MsgAppResp is emitted when the follower applies a snapshot. + // + // The follower will transition to StateProbe if ReportSnapshot is called on + // the leader; if SnapshotFinish is passed then PendingSnapshot becomes the + // basis for the next attempt to append. In practice, the first mechanism is + // the one that is relevant in most cases. However, if this MsgAppResp is + // lost (fallible network) then the second mechanism ensures that in this + // case the follower does not erroneously remain in StateSnapshot. PendingSnapshot uint64 // RecentActive is true if the progress is recently active. Receiving any messages