Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.3: raft: assert on HardState field modifications #133581

Merged
merged 4 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6188,7 +6188,7 @@ func TestRaftForceCampaignPreVoteCheckQuorum(t *testing.T) {

// Force-campaign n3. It may not win or hold onto leadership, but it's enough
// to know that it bumped the term.
repl3.ForceCampaign(ctx)
repl3.ForceCampaign(ctx, initialStatus.BasicStatus)
t.Logf("n3 campaigning")

var leaderStatus *raft.Status
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,10 @@ func (r *Replica) Campaign(ctx context.Context) {
}

// ForceCampaign force-campaigns the replica.
func (r *Replica) ForceCampaign(ctx context.Context) {
func (r *Replica) ForceCampaign(ctx context.Context, raftStatus raft.BasicStatus) {
r.mu.Lock()
defer r.mu.Unlock()
r.forceCampaignLocked(ctx)
r.forceCampaignLocked(ctx, raftStatus)
}

// LastAssignedLeaseIndexRLocked is like LastAssignedLeaseIndex, but requires
Expand Down
25 changes: 21 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,7 +1311,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
raftStatus := raftGroup.BasicStatus()
if shouldCampaignAfterConfChange(ctx, r.store.ClusterSettings(), r.store.StoreID(),
r.descRLocked(), raftStatus, leaseStatus) {
r.forceCampaignLocked(ctx)
r.forceCampaignLocked(ctx, raftStatus)
}
}

Expand Down Expand Up @@ -2600,11 +2600,28 @@ func (r *Replica) campaignLocked(ctx context.Context) {
// simply partitioned away from it and/or liveness.
//
// TODO(nvanbenschoten): this is the remaining logic which needs work in order
// to complete #125254. See the comment in raft.go about how even a local
// to complete #129796. See the comment in raft.go about how even a local
// fortification check is not enough to make MsgTimeoutNow safe.
func (r *Replica) forceCampaignLocked(ctx context.Context) {
func (r *Replica) forceCampaignLocked(ctx context.Context, raftStatus raft.BasicStatus) {
log.VEventf(ctx, 3, "force campaigning")
msg := raftpb.Message{To: raftpb.PeerID(r.replicaID), Type: raftpb.MsgTimeoutNow}
msg := raftpb.Message{
To: raftpb.PeerID(r.replicaID),
// We pretend that the message was sent from the leader, who we know is set
// in the status because we checked in shouldCampaignAfterConfChange.
//
// As the TODO above implies, this is a hack and is borderline unsafe. Only
// the leader should be able to send a MsgTimeoutNow, because a "force"
// election gives the candidate permission to tell all voters to violate the
// leader's fortification. In this case, we lie to raft about this message
// coming from the leader, which in some ways it is because the leader is
// the one committing the entry which demotes itself to a learner.
//
// We should find a way to get rid of this, perhaps through the approach
// presented in #133308.
From: raftStatus.Lead,
Term: raftStatus.Term,
Type: raftpb.MsgTimeoutNow,
}
if err := r.mu.internalRaftGroup.Step(msg); err != nil {
log.VEventf(ctx, 1, "failed to campaign: %s", err)
}
Expand Down
131 changes: 79 additions & 52 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,6 @@ type raft struct {
// leadEpoch, if set, corresponds to the StoreLiveness epoch that this peer
// has supported the leader in. It's unset if the peer hasn't supported the
// current leader.
//
// TODO(arul): This should be populated when responding to a MsgFortify.
leadEpoch pb.Epoch
// leadTransferee, if set, is the id of the leader transfer target during a
// pending leadership transfer. The value is set while the outgoing leader
Expand Down Expand Up @@ -444,7 +442,6 @@ func newRaft(c *Config) *raft {

r := &raft{
id: c.ID,
lead: None,
isLearner: false,
raftLog: raftlog,
maxMsgSize: entryEncodingSize(c.MaxSizePerMsg),
Expand Down Expand Up @@ -818,7 +815,7 @@ func (r *raft) sendFortify(to pb.PeerID) {
// Doing so avoids a self-addressed message.
epoch, live := r.storeLiveness.SupportFor(r.lead)
if live {
r.leadEpoch = epoch
r.setLeadEpoch(epoch)
// The leader needs to persist the LeadEpoch durably before it can start
// supporting itself. We do so by sending a self-addressed
// MsgFortifyLeaderResp message so that it is added to the msgsAfterAppend
Expand Down Expand Up @@ -1013,10 +1010,7 @@ func (r *raft) reset(term uint64) {
// de-fortification.
assertTrue(!r.supportingFortifiedLeader() || r.lead == r.id,
"should not be changing terms when supporting a fortified leader; leader exempted")
r.Term = term
r.Vote = None
r.lead = None
r.leadEpoch = 0
r.setTerm(term)
}

r.electionElapsed = 0
Expand All @@ -1043,6 +1037,51 @@ func (r *raft) reset(term uint64) {
r.uncommittedSize = 0
}

func (r *raft) setTerm(term uint64) {
if term == r.Term {
return
}
assertTrue(term > r.Term, "term cannot regress")
r.Term = term
r.Vote = None
r.lead = None
r.leadEpoch = 0
}

func (r *raft) setVote(id pb.PeerID) {
if id == r.Vote {
return
}
assertTrue(r.Vote == None, "cannot change vote")
r.Vote = id
}

func (r *raft) setLead(lead pb.PeerID) {
if lead == r.lead {
return
}
assertTrue(r.lead == None, "cannot change lead")
r.lead = lead
}

func (r *raft) resetLead() {
r.lead = None
r.leadEpoch = 0
}

func (r *raft) setLeadEpoch(leadEpoch pb.Epoch) {
if leadEpoch == r.leadEpoch {
return
}
assertTrue(r.lead != None, "leader must be set")
assertTrue(leadEpoch > r.leadEpoch, "leadEpoch cannot regress")
r.leadEpoch = leadEpoch
}

func (r *raft) resetLeadEpoch() {
r.leadEpoch = 0
}

func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
last := r.raftLog.lastEntryID()
for i := range es {
Expand Down Expand Up @@ -1194,7 +1233,7 @@ func (r *raft) becomeFollower(term uint64, lead pb.PeerID) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.setLead(lead)
r.state = pb.StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
Expand All @@ -1207,7 +1246,7 @@ func (r *raft) becomeCandidate() {
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.setVote(r.id)
r.state = pb.StateCandidate
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}
Expand All @@ -1230,8 +1269,7 @@ func (r *raft) becomePreCandidate() {
// leader leases, this is fine, because we wouldn't be here unless we'd
// revoked StoreLiveness support for the leader's store to begin with. It's
// a bit weird from the perspective of raft though. See if we can avoid this.
r.lead = None
r.leadEpoch = 0
r.resetLead()
r.state = pb.StatePreCandidate
r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
}
Expand All @@ -1249,7 +1287,7 @@ func (r *raft) becomeLeader() {
// and not even when learning of a leader in a later term.
r.fortificationTracker.Reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.setLead(r.id)
r.state = pb.StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
Expand Down Expand Up @@ -1602,7 +1640,7 @@ func (r *raft) Step(m pb.Message) error {
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
r.setVote(m.From)
}
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
Expand Down Expand Up @@ -1997,6 +2035,13 @@ func stepLeader(r *raft, m pb.Message) error {
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
if IsMsgFromLeader(m.Type) {
// If this is a message from a leader of r.Term, transition to a follower
// with the sender of the message as the leader, then process the message.
assertTrue(m.Term == r.Term, "message term should equal current term")
r.becomeFollower(m.Term, m.From)
return r.step(r, m) // stepFollower
}
// Only handle vote responses corresponding to our candidacy (while in
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
Expand All @@ -2010,18 +2055,11 @@ func stepCandidate(r *raft, m pb.Message) error {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
// TODO(nvanbenschoten): we can't consider MsgSnap to be from the leader of
// Message.Term until we address #127348 and #127349.
r.becomeFollower(m.Term, None)
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleFortify(m)
case myVoteRespType:
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
Expand All @@ -2039,17 +2077,20 @@ func stepCandidate(r *raft, m pb.Message) error {
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, r.lead)
}
case pb.MsgTimeoutNow:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
// TODO(nvanbenschoten): this is temporarily duplicating logic from
// stepFollower. Unify.
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership", r.id, r.Term, m.From)
r.hup(campaignTransfer)
}
return nil
}

func stepFollower(r *raft, m pb.Message) error {
if IsMsgFromLeader(m.Type) {
r.setLead(m.From)
if m.Type != pb.MsgDeFortifyLeader {
// If we receive any message from the leader except a MsgDeFortifyLeader,
// we know that the leader is still alive and still acting as the leader,
// so reset the election timer.
r.electionElapsed = 0
}
}
switch m.Type {
case pb.MsgProp:
if r.lead == None {
Expand All @@ -2065,25 +2106,12 @@ func stepFollower(r *raft, m pb.Message) error {
m.To = r.lead
r.send(m)
case pb.MsgApp:
r.electionElapsed = 0
// TODO(arul): Once r.lead != None, we shouldn't need to update r.lead
// anymore within the course of a single term (in the context of which this
// function is always called). Instead, if r.lead != None, we should be able
// to assert that the leader hasn't changed within a given term. Maybe at
// the caller itself.
r.lead = m.From
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
r.handleHeartbeat(m)
case pb.MsgSnap:
r.electionElapsed = 0
r.lead = m.From
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.electionElapsed = 0
r.lead = m.From
r.handleFortify(m)
case pb.MsgDeFortifyLeader:
r.handleDeFortify(m)
Expand All @@ -2110,8 +2138,7 @@ func stepFollower(r *raft, m pb.Message) error {
return nil
}
r.logger.Infof("%x forgetting leader %x at term %d", r.id, r.lead, r.Term)
r.lead = None
r.leadEpoch = 0
r.resetLead()
case pb.MsgTimeoutNow:
// TODO(nvanbenschoten): we will eventually want some kind of logic like
// this. However, even this may not be enough, because we're calling a
Expand All @@ -2132,7 +2159,7 @@ func stepFollower(r *raft, m pb.Message) error {
// be able to replace this leadEpoch assignment with a call to deFortify.
// Currently, it may panic because only the leader should be able to
// de-fortify without bumping the term.
r.leadEpoch = 0
r.resetLeadEpoch()
r.hup(campaignTransfer)
}
return nil
Expand Down Expand Up @@ -2300,7 +2327,7 @@ func (r *raft) handleFortify(m pb.Message) {
})
return
}
r.leadEpoch = epoch
r.setLeadEpoch(epoch)
r.send(pb.Message{
To: m.From,
Type: pb.MsgFortifyLeaderResp,
Expand Down Expand Up @@ -2362,7 +2389,7 @@ func (r *raft) deFortify(from pb.PeerID, term uint64) {
(term == r.Term && from == r.id && !r.supportingFortifiedLeader()),
"can only defortify at current term if told by the leader or if fortification has expired",
)
r.leadEpoch = 0
r.resetLeadEpoch()
}

// restore recovers the state machine from a snapshot. It restores the log and the
Expand Down Expand Up @@ -2568,10 +2595,10 @@ func (r *raft) loadState(state pb.HardState) {
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
}
r.raftLog.committed = state.Commit
r.Term = state.Term
r.Vote = state.Vote
r.lead = state.Lead
r.leadEpoch = state.LeadEpoch
r.setTerm(state.Term)
r.setVote(state.Vote)
r.setLead(state.Lead)
r.setLeadEpoch(state.LeadEpoch)
}

// pastElectionTimeout returns true if r.electionElapsed is greater
Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2615,7 +2615,7 @@ func TestRestoreFromSnapMsg(t *testing.T) {
sm := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
sm.Step(m)

assert.Equal(t, pb.PeerID(1), sm.lead)
assert.Equal(t, None, sm.lead)
// TODO(bdarnell): what should this test?
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestRawNodeStep(t *testing.T) {
rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
require.NoError(t, err, "#%d", i)
msgt := pb.MessageType(i)
err = rawNode.Step(pb.Message{Type: msgt})
err = rawNode.Step(pb.Message{Type: msgt, From: 2})
// LocalMsg should be ignored.
if IsLocalMsg(msgt) {
assert.Equal(t, ErrStepLocalMsg, err, "#%d", i)
Expand Down
6 changes: 4 additions & 2 deletions pkg/raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ var isResponseMsg = [...]bool{
}

var isMsgFromLeader = [...]bool{
pb.MsgApp: true,
pb.MsgSnap: true,
pb.MsgApp: true,
// TODO(nvanbenschoten): we can't consider MsgSnap to be from the leader of
// Message.Term until we address #127348 and #127349.
// pb.MsgSnap: true,
pb.MsgHeartbeat: true,
pb.MsgTimeoutNow: true,
pb.MsgFortifyLeader: true,
Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestMsgFromLeader(t *testing.T) {
{pb.MsgAppResp, false},
{pb.MsgVote, false},
{pb.MsgVoteResp, false},
{pb.MsgSnap, true},
{pb.MsgSnap, false},
{pb.MsgHeartbeat, true},
{pb.MsgHeartbeatResp, false},
{pb.MsgTimeoutNow, true},
Expand Down