Skip to content

Commit

Permalink
Merge #104969
Browse files Browse the repository at this point in the history
104969: kvserver: force-campaign leaseholder on leader removal r=erikgrinaker a=erikgrinaker

**kvserver: clarify Raft campaign behavior with PreVote+CheckQuorum**

This patch tweaks and clarifies explicit campaign behavior when Raft PreVote and CheckQuorum are enabled. In this case, followers will only grant prevotes if they haven't heard from a leader in the past election timeout. It also adds a test covering this.
  
**kvserver: add `Replica.forceCampaignLocked()`**

This patch adds `forceCampaignLocked()`, which can be used to force an election, transitioning the replica directly to candidate and bypassing PreVote+CheckQuorum.
  
**kvserver: force-campaign leaseholder on leader removal**

Previously, when the leader was removed from the range via a conf change, the first voter in the range descriptor would campaign to avoid waiting for an election timeout. This had a few drawbacks:

* If the first voter is unavailable or lags, noone will campaign.

* If the first voter isn't the leaseholder, it has to immediately transfer leadership to the leaseholder.

* It used Raft PreVote, so it wouldn't be able to win when CheckQuorum is enabled, since followers won't grant votes when they've recently heard from the leader.

This patch instead campaigns on the current leaseholder. We know there can only be one, avoiding election ties. The conf change is typically proposed by the leaseholder anyway so it's likely to be up-to-date. And we want it to be colocated with the leader. If there is no leaseholder then waiting out the election timeout is less problematic, since either we'll have to wait out the lease interval anyway, or the range is idle.

It also forces an election by transitioning directly to candidate, bypassing PreVote. This is ok, since we know the current leader is dead.


To avoid election ties in mixed 23.1/23.2 clusters, we retain the old voter designation until the upgrade is finalized, but always force an election instead of using pre-vote.

Resolves #104871.
Touches #92088.
Follows #104189.
Epic: none.

Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Jun 16, 2023
2 parents 41a817f + 2959dda commit cf4e6d1
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 33 deletions.
3 changes: 2 additions & 1 deletion pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ var (
// partition from stealing away leadership from an established leader
// (assuming they have an up-to-date log, which they do with a read-only
// workload). With asymmetric or partial network partitions, this can allow an
// unreachable node to steal away leadership, leading to range unavailability.
// unreachable node to steal leadership away from the leaseholder, leading to
// range unavailability if the leaseholder can no longer reach the leader.
//
// The asymmetric partition concerns have largely been addressed by RPC
// dialback (see rpc.dialback.enabled), but the partial partition concerns
Expand Down
258 changes: 258 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5933,6 +5933,161 @@ func TestRaftSnapshotsWithMVCCRangeKeysEverywhere(t *testing.T) {
}
}

// TestRaftCampaignPreVoteCheckQuorum tests that campaignLocked() respects
// PreVote+CheckQuorum, by not granting prevotes if there is an active leader.
func TestRaftCampaignPreVoteCheckQuorum(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Timing-sensitive, so skip under deadlock detector and stressrace.
skip.UnderDeadlock(t)
skip.UnderStressRace(t)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: base.RaftConfig{
RaftEnableCheckQuorum: true,
RaftTickInterval: 100 * time.Millisecond, // speed up test
},
},
})
defer tc.Stopper().Stop(ctx)

logStatus := func(s *raft.Status) {
t.Helper()
require.NotNil(t, s)
t.Logf("n%d %s at term=%d commit=%d", s.ID, s.RaftState, s.Term, s.Commit)
}

sender := tc.GetFirstStoreFromServer(t, 0).TestSender()

// Create a range, upreplicate it, and replicate a write.
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
_, pErr := kv.SendWrapped(ctx, sender, incrementArgs(key, 1))
require.NoError(t, pErr.GoError())
tc.WaitForValues(t, key, []int64{1, 1, 1})

repl1, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)
repl2, err := tc.GetFirstStoreFromServer(t, 1).GetReplica(desc.RangeID)
require.NoError(t, err)
repl3, err := tc.GetFirstStoreFromServer(t, 2).GetReplica(desc.RangeID)
require.NoError(t, err)
repls := []*kvserver.Replica{repl1, repl2, repl3}

// Make sure n1 is leader.
initialStatus := repl1.RaftStatus()
require.Equal(t, raft.StateLeader, initialStatus.RaftState)
logStatus(initialStatus)
t.Logf("n1 is leader")

// Campaign n3. It shouldn't win prevotes, reverting to follower
// in the current term.
repl3.Campaign(ctx)
t.Logf("n3 campaigning")

require.Eventually(t, func() bool {
status := repl3.RaftStatus()
logStatus(status)
return status.RaftState == raft.StateFollower
}, 10*time.Second, 500*time.Millisecond)
t.Logf("n3 reverted to follower")

// n1 should still be the leader in the same term, with n2 and n3 followers.
for _, repl := range repls {
st := repl.RaftStatus()
logStatus(st)
if st.ID == 1 {
require.Equal(t, raft.StateLeader, st.RaftState)
} else {
require.Equal(t, raft.StateFollower, st.RaftState)
}
require.Equal(t, initialStatus.Term, st.Term)
}
}

// TestRaftForceCampaignPreVoteCheckQuorum tests that forceCampaignLocked()
// ignores PreVote+CheckQuorum, transitioning directly to candidate and bumping
// the term. It may not actually win or hold onto leadership, but bumping the
// term is proof enough that it called an election.
func TestRaftForceCampaignPreVoteCheckQuorum(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Timing-sensitive, so skip under deadlock detector and stressrace.
skip.UnderDeadlock(t)
skip.UnderStressRace(t)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: base.RaftConfig{
RaftEnableCheckQuorum: true,
RaftTickInterval: 200 * time.Millisecond, // speed up test
RaftHeartbeatIntervalTicks: 10, // allow n3 to win the election
RaftElectionTimeoutTicks: 20,
},
},
})
defer tc.Stopper().Stop(ctx)

logStatus := func(s *raft.Status) {
t.Helper()
require.NotNil(t, s)
t.Logf("n%d %s at term=%d commit=%d", s.ID, s.RaftState, s.Term, s.Commit)
}

sender := tc.GetFirstStoreFromServer(t, 0).TestSender()

// Create a range, upreplicate it, and replicate a write.
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
_, pErr := kv.SendWrapped(ctx, sender, incrementArgs(key, 1))
require.NoError(t, pErr.GoError())
tc.WaitForValues(t, key, []int64{1, 1, 1})

repl1, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)
repl2, err := tc.GetFirstStoreFromServer(t, 2).GetReplica(desc.RangeID)
require.NoError(t, err)
repl3, err := tc.GetFirstStoreFromServer(t, 2).GetReplica(desc.RangeID)
require.NoError(t, err)
repls := []*kvserver.Replica{repl1, repl2, repl3}

// Make sure n1 is leader.
initialStatus := repl1.RaftStatus()
require.Equal(t, raft.StateLeader, initialStatus.RaftState)
logStatus(initialStatus)
t.Logf("n1 is leader in term %d", initialStatus.Term)

// Force-campaign n3. It may not win or hold onto leadership, but it's enough
// to know that it bumped the term.
repl3.ForceCampaign(ctx)
t.Logf("n3 campaigning")

var leaderStatus *raft.Status
require.Eventually(t, func() bool {
for _, repl := range repls {
st := repl.RaftStatus()
logStatus(st)
if st.Term <= initialStatus.Term {
return false
}
if st.RaftState == raft.StateLeader {
leaderStatus = st
}
}
return leaderStatus != nil
}, 10*time.Second, 500*time.Millisecond)
t.Logf("n%d is leader, with bumped term %d", leaderStatus.ID, leaderStatus.Term)
}

// TestRaftPreVote tests that Raft PreVote works properly, including the recent
// leader check only enabled via CheckQuorum. Specifically, a replica that's
// partitioned away from the leader (or restarted) should not be able to call an
Expand Down Expand Up @@ -6324,3 +6479,106 @@ func TestRaftCheckQuorum(t *testing.T) {
})
})
}

// TestRaftLeaderRemovesItself tests that when a raft leader removes itself via
// a conf change the leaseholder campaigns for leadership, ignoring
// PreVote+CheckQuorum and transitioning directly to candidate.
//
// We set up three replicas:
//
// n1: Raft leader
// n2: follower
// n3: follower + leaseholder
//
// We disable leader following the leaseholder (which would otherwise transfer
// leadership if we happened to tick during the conf change), and then remove n1
// from the range. n3 should acquire leadership.
//
// We disable election timeouts, such that the only way n3 can become leader is
// by campaigning explicitly. Furthermore, it must skip pre-votes, since with
// PreVote+CheckQuorum n2 wouldn't vote for it (it would think n1 was still the
// leader).
func TestRaftLeaderRemovesItself(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Timing-sensitive, so skip under deadlock detector and stressrace.
skip.UnderDeadlock(t)
skip.UnderStressRace(t)

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: base.RaftConfig{
RaftEnableCheckQuorum: true,
RaftTickInterval: 100 * time.Millisecond, // speed up test
// Set a large election timeout. We don't want replicas to call
// elections due to timeouts, we want them to campaign and obtain
// votes despite PreVote+CheckQuorum.
RaftElectionTimeoutTicks: 200,
},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableLeaderFollowsLeaseholder: true, // the leader should stay put
},
},
},
})
defer tc.Stopper().Stop(ctx)

logStatus := func(s *raft.Status) {
t.Helper()
require.NotNil(t, s)
t.Logf("n%d %s at term=%d commit=%d", s.ID, s.RaftState, s.Term, s.Commit)
}

send1 := tc.GetFirstStoreFromServer(t, 0).TestSender()
send3 := tc.GetFirstStoreFromServer(t, 2).TestSender()

// Create a range, upreplicate it, and replicate a write.
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
_, pErr := kv.SendWrapped(ctx, send1, incrementArgs(key, 1))
require.NoError(t, pErr.GoError())
tc.WaitForValues(t, key, []int64{1, 1, 1})

repl1, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)
repl2, err := tc.GetFirstStoreFromServer(t, 1).GetReplica(desc.RangeID)
require.NoError(t, err)
repl3, err := tc.GetFirstStoreFromServer(t, 2).GetReplica(desc.RangeID)
require.NoError(t, err)

// Move the lease to n3, and make sure everyone has applied it.
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(2))
require.Eventually(t, func() bool {
lease, _ := repl3.GetLease()
return lease.Replica.ReplicaID == repl3.ReplicaID()
}, 10*time.Second, 500*time.Millisecond)
_, pErr = kv.SendWrapped(ctx, send3, incrementArgs(key, 1))
require.NoError(t, pErr.GoError())
tc.WaitForValues(t, key, []int64{2, 2, 2})
t.Logf("n3 has lease")

// Make sure n1 is still leader.
st := repl1.RaftStatus()
require.Equal(t, raft.StateLeader, st.RaftState)
logStatus(st)

// Remove n1 and wait for n3 to become leader.
tc.RemoveVotersOrFatal(t, key, tc.Target(0))
t.Logf("n1 removed from range")

require.Eventually(t, func() bool {
logStatus(repl2.RaftStatus())
logStatus(repl3.RaftStatus())
if repl3.RaftStatus().RaftState == raft.StateLeader {
t.Logf("n3 is leader")
return true
}
return false
}, 10*time.Second, 500*time.Millisecond)
}
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,20 @@ func (r *Replica) LastAssignedLeaseIndex() kvpb.LeaseAppliedIndex {
return r.mu.proposalBuf.LastAssignedLeaseIndexRLocked()
}

// Campaign campaigns the replica.
func (r *Replica) Campaign(ctx context.Context) {
r.mu.Lock()
defer r.mu.Unlock()
r.campaignLocked(ctx)
}

// ForceCampaign force-campaigns the replica.
func (r *Replica) ForceCampaign(ctx context.Context) {
r.mu.Lock()
defer r.mu.Unlock()
r.forceCampaignLocked(ctx)
}

// LastAssignedLeaseIndexRLocked is like LastAssignedLeaseIndex, but requires
// b.mu to be held in read mode.
func (b *propBuf) LastAssignedLeaseIndexRLocked() kvpb.LeaseAppliedIndex {
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ type proposer interface {
// The following require the proposer to hold an exclusive lock.
withGroupLocked(func(proposerRaft) error) error
registerProposalLocked(*ProposalData)
campaignLocked(ctx context.Context)
// rejectProposalWithRedirectLocked rejects a proposal and redirects the
// proposer to try it on another node. This is used to sometimes reject lease
// acquisitions when another replica is the leader; the intended consequence
Expand Down Expand Up @@ -670,9 +671,7 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked(
b.p.rejectProposalWithRedirectLocked(ctx, p, li.leader)
if b.p.shouldCampaignOnRedirect(raftGroup) {
log.VEventf(ctx, 2, "campaigning because Raft leader not live in node liveness map")
if err := raftGroup.Campaign(); err != nil {
log.VEventf(ctx, 1, "failed to campaign: %s", err)
}
b.p.campaignLocked(ctx)
}
return true
}
Expand Down Expand Up @@ -1340,6 +1339,10 @@ func (rp *replicaProposer) shouldCampaignOnRedirect(raftGroup proposerRaft) bool
)
}

func (rp *replicaProposer) campaignLocked(ctx context.Context) {
(*Replica)(rp).campaignLocked(ctx)
}

func (rp *replicaProposer) flowControlHandle(ctx context.Context) kvflowcontrol.Handle {
handle, found := rp.mu.replicaFlowControlIntegration.handle()
if !found {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ func (t *testProposer) shouldCampaignOnRedirect(raftGroup proposerRaft) bool {
return t.leaderNotLive
}

func (t *testProposer) campaignLocked(ctx context.Context) {
if err := t.raftGroup.Campaign(); err != nil {
panic(err)
}
}

func (t *testProposer) rejectProposalWithRedirectLocked(
_ context.Context, _ *ProposalData, redirectTo roachpb.ReplicaID,
) {
Expand Down
Loading

0 comments on commit cf4e6d1

Please sign in to comment.