diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 15f8f17b6f5a..bdf8433f16aa 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -6188,7 +6188,7 @@ func TestRaftForceCampaignPreVoteCheckQuorum(t *testing.T) { // Force-campaign n3. It may not win or hold onto leadership, but it's enough // to know that it bumped the term. - repl3.ForceCampaign(ctx) + repl3.ForceCampaign(ctx, initialStatus.BasicStatus) t.Logf("n3 campaigning") var leaderStatus *raft.Status diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index f62e0c113a20..6dca122a3f37 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -340,10 +340,10 @@ func (r *Replica) Campaign(ctx context.Context) { } // ForceCampaign force-campaigns the replica. -func (r *Replica) ForceCampaign(ctx context.Context) { +func (r *Replica) ForceCampaign(ctx context.Context, raftStatus raft.BasicStatus) { r.mu.Lock() defer r.mu.Unlock() - r.forceCampaignLocked(ctx) + r.forceCampaignLocked(ctx, raftStatus) } // LastAssignedLeaseIndexRLocked is like LastAssignedLeaseIndex, but requires diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index ddffd1b587cc..65cd4ea8f7f3 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1311,7 +1311,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( raftStatus := raftGroup.BasicStatus() if shouldCampaignAfterConfChange(ctx, r.store.ClusterSettings(), r.store.StoreID(), r.descRLocked(), raftStatus, leaseStatus) { - r.forceCampaignLocked(ctx) + r.forceCampaignLocked(ctx, raftStatus) } } @@ -2600,11 +2600,28 @@ func (r *Replica) campaignLocked(ctx context.Context) { // simply partitioned away from it and/or liveness. // // TODO(nvanbenschoten): this is the remaining logic which needs work in order -// to complete #125254. See the comment in raft.go about how even a local +// to complete #129796. See the comment in raft.go about how even a local // fortification check is not enough to make MsgTimeoutNow safe. -func (r *Replica) forceCampaignLocked(ctx context.Context) { +func (r *Replica) forceCampaignLocked(ctx context.Context, raftStatus raft.BasicStatus) { log.VEventf(ctx, 3, "force campaigning") - msg := raftpb.Message{To: raftpb.PeerID(r.replicaID), Type: raftpb.MsgTimeoutNow} + msg := raftpb.Message{ + To: raftpb.PeerID(r.replicaID), + // We pretend that the message was sent from the leader, who we know is set + // in the status because we checked in shouldCampaignAfterConfChange. + // + // As the TODO above implies, this is a hack and is borderline unsafe. Only + // the leader should be able to send a MsgTimeoutNow, because a "force" + // election gives the candidate permission to tell all voters to violate the + // leader's fortification. In this case, we lie to raft about this message + // coming from the leader, which in some ways it is because the leader is + // the one committing the entry which demotes itself to a learner. + // + // We should find a way to get rid of this, perhaps through the approach + // presented in #133308. + From: raftStatus.Lead, + Term: raftStatus.Term, + Type: raftpb.MsgTimeoutNow, + } if err := r.mu.internalRaftGroup.Step(msg); err != nil { log.VEventf(ctx, 1, "failed to campaign: %s", err) } diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index 119e3b51292c..c60fef529ca6 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -351,8 +351,6 @@ type raft struct { // leadEpoch, if set, corresponds to the StoreLiveness epoch that this peer // has supported the leader in. It's unset if the peer hasn't supported the // current leader. - // - // TODO(arul): This should be populated when responding to a MsgFortify. leadEpoch pb.Epoch // leadTransferee, if set, is the id of the leader transfer target during a // pending leadership transfer. The value is set while the outgoing leader @@ -444,7 +442,6 @@ func newRaft(c *Config) *raft { r := &raft{ id: c.ID, - lead: None, isLearner: false, raftLog: raftlog, maxMsgSize: entryEncodingSize(c.MaxSizePerMsg), @@ -818,7 +815,7 @@ func (r *raft) sendFortify(to pb.PeerID) { // Doing so avoids a self-addressed message. epoch, live := r.storeLiveness.SupportFor(r.lead) if live { - r.leadEpoch = epoch + r.setLeadEpoch(epoch) // The leader needs to persist the LeadEpoch durably before it can start // supporting itself. We do so by sending a self-addressed // MsgFortifyLeaderResp message so that it is added to the msgsAfterAppend @@ -1013,10 +1010,7 @@ func (r *raft) reset(term uint64) { // de-fortification. assertTrue(!r.supportingFortifiedLeader() || r.lead == r.id, "should not be changing terms when supporting a fortified leader; leader exempted") - r.Term = term - r.Vote = None - r.lead = None - r.leadEpoch = 0 + r.setTerm(term) } r.electionElapsed = 0 @@ -1043,6 +1037,51 @@ func (r *raft) reset(term uint64) { r.uncommittedSize = 0 } +func (r *raft) setTerm(term uint64) { + if term == r.Term { + return + } + assertTrue(term > r.Term, "term cannot regress") + r.Term = term + r.Vote = None + r.lead = None + r.leadEpoch = 0 +} + +func (r *raft) setVote(id pb.PeerID) { + if id == r.Vote { + return + } + assertTrue(r.Vote == None, "cannot change vote") + r.Vote = id +} + +func (r *raft) setLead(lead pb.PeerID) { + if lead == r.lead { + return + } + assertTrue(r.lead == None, "cannot change lead") + r.lead = lead +} + +func (r *raft) resetLead() { + r.lead = None + r.leadEpoch = 0 +} + +func (r *raft) setLeadEpoch(leadEpoch pb.Epoch) { + if leadEpoch == r.leadEpoch { + return + } + assertTrue(r.lead != None, "leader must be set") + assertTrue(leadEpoch > r.leadEpoch, "leadEpoch cannot regress") + r.leadEpoch = leadEpoch +} + +func (r *raft) resetLeadEpoch() { + r.leadEpoch = 0 +} + func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { last := r.raftLog.lastEntryID() for i := range es { @@ -1194,7 +1233,7 @@ func (r *raft) becomeFollower(term uint64, lead pb.PeerID) { r.step = stepFollower r.reset(term) r.tick = r.tickElection - r.lead = lead + r.setLead(lead) r.state = pb.StateFollower r.logger.Infof("%x became follower at term %d", r.id, r.Term) } @@ -1207,7 +1246,7 @@ func (r *raft) becomeCandidate() { r.step = stepCandidate r.reset(r.Term + 1) r.tick = r.tickElection - r.Vote = r.id + r.setVote(r.id) r.state = pb.StateCandidate r.logger.Infof("%x became candidate at term %d", r.id, r.Term) } @@ -1230,8 +1269,7 @@ func (r *raft) becomePreCandidate() { // leader leases, this is fine, because we wouldn't be here unless we'd // revoked StoreLiveness support for the leader's store to begin with. It's // a bit weird from the perspective of raft though. See if we can avoid this. - r.lead = None - r.leadEpoch = 0 + r.resetLead() r.state = pb.StatePreCandidate r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term) } @@ -1249,7 +1287,7 @@ func (r *raft) becomeLeader() { // and not even when learning of a leader in a later term. r.fortificationTracker.Reset(r.Term) r.tick = r.tickHeartbeat - r.lead = r.id + r.setLead(r.id) r.state = pb.StateLeader // Followers enter replicate mode when they've been successfully probed // (perhaps after having received a snapshot as a result). The leader is @@ -1602,7 +1640,7 @@ func (r *raft) Step(m pb.Message) error { if m.Type == pb.MsgVote { // Only record real votes. r.electionElapsed = 0 - r.Vote = m.From + r.setVote(m.From) } } else { r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", @@ -1997,6 +2035,13 @@ func stepLeader(r *raft, m pb.Message) error { // stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is // whether they respond to MsgVoteResp or MsgPreVoteResp. func stepCandidate(r *raft, m pb.Message) error { + if IsMsgFromLeader(m.Type) { + // If this is a message from a leader of r.Term, transition to a follower + // with the sender of the message as the leader, then process the message. + assertTrue(m.Term == r.Term, "message term should equal current term") + r.becomeFollower(m.Term, m.From) + return r.step(r, m) // stepFollower + } // Only handle vote responses corresponding to our candidacy (while in // StateCandidate, we may get stale MsgPreVoteResp messages in this term from // our pre-candidate state). @@ -2010,18 +2055,11 @@ func stepCandidate(r *raft, m pb.Message) error { case pb.MsgProp: r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return ErrProposalDropped - case pb.MsgApp: - r.becomeFollower(m.Term, m.From) // always m.Term == r.Term - r.handleAppendEntries(m) - case pb.MsgHeartbeat: - r.becomeFollower(m.Term, m.From) // always m.Term == r.Term - r.handleHeartbeat(m) case pb.MsgSnap: - r.becomeFollower(m.Term, m.From) // always m.Term == r.Term + // TODO(nvanbenschoten): we can't consider MsgSnap to be from the leader of + // Message.Term until we address #127348 and #127349. + r.becomeFollower(m.Term, None) r.handleSnapshot(m) - case pb.MsgFortifyLeader: - r.becomeFollower(m.Term, m.From) // always m.Term == r.Term - r.handleFortify(m) case myVoteRespType: gr, rj, res := r.poll(m.From, m.Type, !m.Reject) r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) @@ -2039,17 +2077,20 @@ func stepCandidate(r *raft, m pb.Message) error { // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, r.lead) } - case pb.MsgTimeoutNow: - r.becomeFollower(m.Term, m.From) // always m.Term == r.Term - // TODO(nvanbenschoten): this is temporarily duplicating logic from - // stepFollower. Unify. - r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership", r.id, r.Term, m.From) - r.hup(campaignTransfer) } return nil } func stepFollower(r *raft, m pb.Message) error { + if IsMsgFromLeader(m.Type) { + r.setLead(m.From) + if m.Type != pb.MsgDeFortifyLeader { + // If we receive any message from the leader except a MsgDeFortifyLeader, + // we know that the leader is still alive and still acting as the leader, + // so reset the election timer. + r.electionElapsed = 0 + } + } switch m.Type { case pb.MsgProp: if r.lead == None { @@ -2065,25 +2106,12 @@ func stepFollower(r *raft, m pb.Message) error { m.To = r.lead r.send(m) case pb.MsgApp: - r.electionElapsed = 0 - // TODO(arul): Once r.lead != None, we shouldn't need to update r.lead - // anymore within the course of a single term (in the context of which this - // function is always called). Instead, if r.lead != None, we should be able - // to assert that the leader hasn't changed within a given term. Maybe at - // the caller itself. - r.lead = m.From r.handleAppendEntries(m) case pb.MsgHeartbeat: - r.electionElapsed = 0 - r.lead = m.From r.handleHeartbeat(m) case pb.MsgSnap: - r.electionElapsed = 0 - r.lead = m.From r.handleSnapshot(m) case pb.MsgFortifyLeader: - r.electionElapsed = 0 - r.lead = m.From r.handleFortify(m) case pb.MsgDeFortifyLeader: r.handleDeFortify(m) @@ -2110,8 +2138,7 @@ func stepFollower(r *raft, m pb.Message) error { return nil } r.logger.Infof("%x forgetting leader %x at term %d", r.id, r.lead, r.Term) - r.lead = None - r.leadEpoch = 0 + r.resetLead() case pb.MsgTimeoutNow: // TODO(nvanbenschoten): we will eventually want some kind of logic like // this. However, even this may not be enough, because we're calling a @@ -2132,7 +2159,7 @@ func stepFollower(r *raft, m pb.Message) error { // be able to replace this leadEpoch assignment with a call to deFortify. // Currently, it may panic because only the leader should be able to // de-fortify without bumping the term. - r.leadEpoch = 0 + r.resetLeadEpoch() r.hup(campaignTransfer) } return nil @@ -2300,7 +2327,7 @@ func (r *raft) handleFortify(m pb.Message) { }) return } - r.leadEpoch = epoch + r.setLeadEpoch(epoch) r.send(pb.Message{ To: m.From, Type: pb.MsgFortifyLeaderResp, @@ -2362,7 +2389,7 @@ func (r *raft) deFortify(from pb.PeerID, term uint64) { (term == r.Term && from == r.id && !r.supportingFortifiedLeader()), "can only defortify at current term if told by the leader or if fortification has expired", ) - r.leadEpoch = 0 + r.resetLeadEpoch() } // restore recovers the state machine from a snapshot. It restores the log and the @@ -2568,10 +2595,10 @@ func (r *raft) loadState(state pb.HardState) { r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex()) } r.raftLog.committed = state.Commit - r.Term = state.Term - r.Vote = state.Vote - r.lead = state.Lead - r.leadEpoch = state.LeadEpoch + r.setTerm(state.Term) + r.setVote(state.Vote) + r.setLead(state.Lead) + r.setLeadEpoch(state.LeadEpoch) } // pastElectionTimeout returns true if r.electionElapsed is greater diff --git a/pkg/raft/raft_test.go b/pkg/raft/raft_test.go index ec5b091448f6..c7461e34496d 100644 --- a/pkg/raft/raft_test.go +++ b/pkg/raft/raft_test.go @@ -2615,7 +2615,7 @@ func TestRestoreFromSnapMsg(t *testing.T) { sm := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2))) sm.Step(m) - assert.Equal(t, pb.PeerID(1), sm.lead) + assert.Equal(t, None, sm.lead) // TODO(bdarnell): what should this test? } diff --git a/pkg/raft/rawnode_test.go b/pkg/raft/rawnode_test.go index 97ba8b7c6988..f6be2cd5f0b0 100644 --- a/pkg/raft/rawnode_test.go +++ b/pkg/raft/rawnode_test.go @@ -49,7 +49,7 @@ func TestRawNodeStep(t *testing.T) { rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s)) require.NoError(t, err, "#%d", i) msgt := pb.MessageType(i) - err = rawNode.Step(pb.Message{Type: msgt}) + err = rawNode.Step(pb.Message{Type: msgt, From: 2}) // LocalMsg should be ignored. if IsLocalMsg(msgt) { assert.Equal(t, ErrStepLocalMsg, err, "#%d", i) diff --git a/pkg/raft/util.go b/pkg/raft/util.go index dfad989b062b..f8d085cb1d94 100644 --- a/pkg/raft/util.go +++ b/pkg/raft/util.go @@ -50,8 +50,10 @@ var isResponseMsg = [...]bool{ } var isMsgFromLeader = [...]bool{ - pb.MsgApp: true, - pb.MsgSnap: true, + pb.MsgApp: true, + // TODO(nvanbenschoten): we can't consider MsgSnap to be from the leader of + // Message.Term until we address #127348 and #127349. + // pb.MsgSnap: true, pb.MsgHeartbeat: true, pb.MsgTimeoutNow: true, pb.MsgFortifyLeader: true, diff --git a/pkg/raft/util_test.go b/pkg/raft/util_test.go index a15a91ed564f..2dba8990920d 100644 --- a/pkg/raft/util_test.go +++ b/pkg/raft/util_test.go @@ -165,7 +165,7 @@ func TestMsgFromLeader(t *testing.T) { {pb.MsgAppResp, false}, {pb.MsgVote, false}, {pb.MsgVoteResp, false}, - {pb.MsgSnap, true}, + {pb.MsgSnap, false}, {pb.MsgHeartbeat, true}, {pb.MsgHeartbeatResp, false}, {pb.MsgTimeoutNow, true},