Skip to content

Commit

Permalink
kv: campaign on rejected lease request when leader not live in node l…
Browse files Browse the repository at this point in the history
…iveness

Fixes #84655.
Related to #49220.

This commit extends the logic introduced in 8aa1c14 to simultaneously campaign
for Raft leadership when rejecting a lease request on a Raft follower that
observes that the current Raft leader is not live according to node liveness.
These kinds of cases are often associated with asymmetric network partitions.

In such cases, the Raft leader will be unable to acquire an epoch-based lease
until it heartbeats its liveness record. As a result, the range can only regain
availability if a different replica acquires the lease. However, the protection
added in 8aa1c14 prevents followers from acquiring leases to protect against a
different form of unavailability.

After this commit, the followers will attempt to acquire Raft leadership when
detecting such cases by campaigning. This allows these ranges to recover
availability once Raft leadership moves off the partitioned Raft leader to one
of the followers that can reach node liveness and can subsequently acquire the
lease.

Campaigning for Raft leadership is safer than blindly allowing the lease request
to be proposed (through a redirected proposal). This is because the follower may
be arbitrarily far behind on its Raft log and acquiring the lease in such cases
could cause unavailability (the kind we saw in #37906). By instead calling a
Raft pre-vote election, the follower can determine whether it is behind on its
log without risking disruption. If so, we don't want it to acquire the lease —
one of the other followers that is caught up on its log can. If not, it will
eventually become leader and can proceed with a future attempt to acquire the
lease.

The commit adds a test that reproduces the failure mode described in #84655. It
creates an asymmetric network partition scenario that looks like:
```
        [0]       raft leader / initial leaseholder
         ^
        / \
       /   \
      v     v
    [1]<--->[2]   raft followers
      ^     ^
       \   /
        \ /
         v
        [3]       liveness range
```
It then waits for the raft leader's lease to expire and demonstrates that one of
the raft followers will now call a Raft election, which allows it to safely grab
Raft leadership, acquire the lease, and recover availability. Without the change,
the test failed.

Release justification: None. Too risky for the stability period. Potential
backport candidate after sufficient baking on master.

Release note (bug fix): A bug causing ranges to remain without a leaseholder in
cases of asymmetric network partitions has been resolved.
  • Loading branch information
nvanbenschoten authored and Nathan Stilwell committed Feb 2, 2023
1 parent d34e860 commit fbecbbc
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 38 deletions.
193 changes: 163 additions & 30 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,140 @@ func TestRequestsOnLaggingReplica(t *testing.T) {
require.Equal(t, leaderReplicaID, nlhe.LeaseHolder.ReplicaID)
}

// TestRequestsOnFollowerWithNonLiveLeaseholder tests the availability of a
// range that has an expired epoch-based lease and a live Raft leader that is
// unable to heartbeat its liveness record. Such a range should recover once
// Raft leadership moves off the partitioned Raft leader to one of the followers
// that can reach node liveness.
//
// This test relies on follower replicas campaigning for Raft leadership in
// certain cases when refusing to forward lease acquisition requests to the
// leader. In these cases where they determine that the leader is non-live
// according to node liveness, they will attempt to steal Raft leadership and,
// if successful, will be able to perform future lease acquisition attempts.
func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

var installPartition int32
partitionFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if atomic.LoadInt32(&installPartition) == 0 {
return nil
}
if ba.GatewayNodeID == 1 && ba.Replica.NodeID == 4 {
return roachpb.NewError(context.Canceled)
}
return nil
}

clusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
// Reduce the election timeout some to speed up the test.
RaftConfig: base.RaftConfig{RaftElectionTimeoutTicks: 10},
Knobs: base.TestingKnobs{
NodeLiveness: kvserver.NodeLivenessTestingKnobs{
// This test waits for an epoch-based lease to expire, so we're
// setting the liveness duration as low as possible while still
// keeping the test stable.
LivenessDuration: 3000 * time.Millisecond,
RenewalDuration: 1500 * time.Millisecond,
},
Store: &kvserver.StoreTestingKnobs{
// We eliminate clock offsets in order to eliminate the stasis period
// of leases, in order to speed up the test.
MaxOffset: time.Nanosecond,
TestingRequestFilter: partitionFilter,
},
},
},
}

tc := testcluster.StartTestCluster(t, 4, clusterArgs)
defer tc.Stopper().Stop(ctx)

{
// Move the liveness range to node 4.
desc := tc.LookupRangeOrFatal(t, keys.NodeLivenessPrefix)
tc.RebalanceVoterOrFatal(ctx, t, desc.StartKey.AsRawKey(), tc.Target(0), tc.Target(3))
}

// Create a new range.
_, rngDesc, err := tc.Servers[0].ScratchRangeEx()
require.NoError(t, err)
key := rngDesc.StartKey.AsRawKey()
// Add replicas on all the stores.
tc.AddVotersOrFatal(t, rngDesc.StartKey.AsRawKey(), tc.Target(1), tc.Target(2))

// Store 0 holds the lease.
store0 := tc.GetFirstStoreFromServer(t, 0)
store0Repl, err := store0.GetReplica(rngDesc.RangeID)
require.NoError(t, err)
leaseStatus := store0Repl.CurrentLeaseStatus(ctx)
require.True(t, leaseStatus.OwnedBy(store0.StoreID()))

{
// Write a value so that the respective key is present in all stores and we
// can increment it again later.
_, err := tc.Server(0).DB().Inc(ctx, key, 1)
require.NoError(t, err)
log.Infof(ctx, "test: waiting for initial values...")
tc.WaitForValues(t, key, []int64{1, 1, 1, 0})
log.Infof(ctx, "test: waiting for initial values... done")
}

// Begin dropping all node liveness heartbeats from the original raft leader
// while allowing the leader to maintain Raft leadership and otherwise behave
// normally. This mimics cases where the raft leader is partitioned away from
// the liveness range but can otherwise reach its followers. In these cases,
// it is still possible that the followers can reach the liveness range and
// see that the leader becomes non-live. For example, the configuration could
// look like:
//
// [0] raft leader
// ^
// / \
// / \
// v v
// [1]<--->[2] raft followers
// ^ ^
// \ /
// \ /
// v
// [3] liveness range
//
log.Infof(ctx, "test: partitioning node")
atomic.StoreInt32(&installPartition, 1)

// Wait until the lease expires.
log.Infof(ctx, "test: waiting for lease expiration")
testutils.SucceedsSoon(t, func() error {
leaseStatus = store0Repl.CurrentLeaseStatus(ctx)
if leaseStatus.IsValid() {
return errors.New("lease still valid")
}
return nil
})
log.Infof(ctx, "test: lease expired")

{
// Increment the initial value again, which requires range availability. To
// get there, the request will need to trigger a lease request on a follower
// replica, which will call a Raft election, acquire Raft leadership, then
// acquire the range lease.
_, err := tc.Server(0).DB().Inc(ctx, key, 1)
require.NoError(t, err)
log.Infof(ctx, "test: waiting for new lease...")
tc.WaitForValues(t, key, []int64{2, 2, 2, 0})
log.Infof(ctx, "test: waiting for new lease... done")
}

// Store 0 no longer holds the lease.
leaseStatus = store0Repl.CurrentLeaseStatus(ctx)
require.False(t, leaseStatus.OwnedBy(store0.StoreID()))
}

type fakeSnapshotStream struct {
nextReq *kvserverpb.SnapshotRequest
nextErr error
Expand Down Expand Up @@ -1529,11 +1663,11 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) {
// Test that when a Raft group is not able to establish a quorum, its Raft log
// does not grow without bound. It tests two different scenarios where this used
// to be possible (see #27772):
// 1. The leader proposes a command and cannot establish a quorum. The leader
// continually re-proposes the command.
// 2. The follower proposes a command and forwards it to the leader, who cannot
// establish a quorum. The follower continually re-proposes and forwards the
// command to the leader.
// 1. The leader proposes a command and cannot establish a quorum. The leader
// continually re-proposes the command.
// 2. The follower proposes a command and forwards it to the leader, who cannot
// establish a quorum. The follower continually re-proposes and forwards the
// command to the leader.
func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -4930,25 +5064,25 @@ func TestAckWriteBeforeApplication(t *testing.T) {
//
// Given this behavior there are 4 troubling cases with regards to splits.
//
// * In all cases we begin with s1 processing a presplit snapshot for
// r20. After the split the store should have r21/3.
// - In all cases we begin with s1 processing a presplit snapshot for
// r20. After the split the store should have r21/3.
//
// In the first two cases the following occurs:
//
// * s1 receives a message for r21/3 prior to acquiring the split lock
// in r21. This will create an uninitialized r21/3 which may write
// HardState.
// - s1 receives a message for r21/3 prior to acquiring the split lock
// in r21. This will create an uninitialized r21/3 which may write
// HardState.
//
// * Before the r20 processes the split r21 is removed and re-added to
// s1 as r21/4. s1 receives a raft message destined for r21/4 and proceeds
// to destroy its uninitialized r21/3, laying down a tombstone at 4 in the
// process.
// - Before the r20 processes the split r21 is removed and re-added to
// s1 as r21/4. s1 receives a raft message destined for r21/4 and proceeds
// to destroy its uninitialized r21/3, laying down a tombstone at 4 in the
// process.
//
// (1) s1 processes the split and finds the RHS to be an uninitialized replica
// with a higher replica ID.
// (1) s1 processes the split and finds the RHS to be an uninitialized replica
// with a higher replica ID.
//
// (2) s1 crashes before processing the split, forgetting the replica ID of the
// RHS but retaining its tombstone.
// (2) s1 crashes before processing the split, forgetting the replica ID of the
// RHS but retaining its tombstone.
//
// In both cases we know that the RHS could not have committed anything because
// it cannot have gotten a snapshot but we want to be sure to not synthesize a
Expand All @@ -4957,28 +5091,27 @@ func TestAckWriteBeforeApplication(t *testing.T) {
//
// In the third and fourth cases:
//
// * s1 never receives a message for r21/3.
// - s1 never receives a message for r21/3.
//
// * Before the r20 processes the split r21 is removed and re-added to
// s1 as r21/4. s1 receives a raft message destined for r21/4 and has never
// heard about r21/3.
// - Before the r20 processes the split r21 is removed and re-added to
// s1 as r21/4. s1 receives a raft message destined for r21/4 and has never
// heard about r21/3.
//
// (3) s1 processes the split and finds the RHS to be an uninitialized replica
// with a higher replica ID (but without a tombstone). This case is very
// similar to (1)
// (3) s1 processes the split and finds the RHS to be an uninitialized replica
// with a higher replica ID (but without a tombstone). This case is very
// similar to (1)
//
// (4) s1 crashes still before processing the split, forgetting that it had
// known about r21/4. When it reboots r21/4 is totally partitioned and
// r20 becomes unpartitioned.
// (4) s1 crashes still before processing the split, forgetting that it had
// known about r21/4. When it reboots r21/4 is totally partitioned and
// r20 becomes unpartitioned.
//
// * r20 processes the split successfully and initialized r21/3.
// - r20 processes the split successfully and initialized r21/3.
//
// In the 4th case we find that until we unpartition r21/4 (the RHS) and let it
// learn about its removal with a ReplicaTooOldError that it will be initialized
// with a CommitIndex at 10 as r21/3, the split's value. After r21/4 becomes
// unpartitioned it will learn it is removed by either catching up on its
// its log or receiving a ReplicaTooOldError which will lead to a tombstone.
//
func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
28 changes: 24 additions & 4 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/errorutil"
Expand Down Expand Up @@ -117,15 +118,14 @@ type propBuf struct {
}

type rangeLeaderInfo struct {
// iAmTheLeader is set if the local replica is the leader.
iAmTheLeader bool
// leaderKnown is set if the local Raft machinery knows who the leader is. If
// not set, all other fields are empty.
leaderKnown bool

// leader represents the Raft group's leader. Not set if leaderKnown is not
// set.
leader roachpb.ReplicaID
// iAmTheLeader is set if the local replica is the leader.
iAmTheLeader bool
// leaderEligibleForLease is set if the leader is known and its type of
// replica allows it to acquire a lease.
leaderEligibleForLease bool
Expand All @@ -143,6 +143,7 @@ type proposer interface {
closedTimestampTarget() hlc.Timestamp
leaderStatus(ctx context.Context, raftGroup proposerRaft) rangeLeaderInfo
ownsValidLease(ctx context.Context, now hlc.ClockTimestamp) bool
shouldCampaignOnRedirect(raftGroup proposerRaft) bool

// The following require the proposer to hold an exclusive lock.
withGroupLocked(func(proposerRaft) error) error
Expand Down Expand Up @@ -170,6 +171,7 @@ type proposerRaft interface {
Step(raftpb.Message) error
BasicStatus() raft.BasicStatus
ProposeConfChange(raftpb.ConfChangeI) error
Campaign() error
}

// Init initializes the proposal buffer and binds it to the provided proposer.
Expand Down Expand Up @@ -596,6 +598,12 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked(
log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is",
li.leader)
b.p.rejectProposalWithRedirectLocked(ctx, p, li.leader)
if b.p.shouldCampaignOnRedirect(raftGroup) {
log.VEventf(ctx, 2, "campaigning because Raft leader not live in node liveness map")
if err := raftGroup.Campaign(); err != nil {
log.VEventf(ctx, 1, "failed to campaign: %s", err)
}
}
return true
}
// If the leader is not known, or if it is known but it's ineligible
Expand Down Expand Up @@ -1125,9 +1133,9 @@ func (rp *replicaProposer) leaderStatus(
}
}
return rangeLeaderInfo{
iAmTheLeader: iAmTheLeader,
leaderKnown: leaderKnown,
leader: roachpb.ReplicaID(leader),
iAmTheLeader: iAmTheLeader,
leaderEligibleForLease: leaderEligibleForLease,
}
}
Expand All @@ -1136,6 +1144,18 @@ func (rp *replicaProposer) ownsValidLease(ctx context.Context, now hlc.ClockTime
return (*Replica)(rp).ownsValidLeaseRLocked(ctx, now)
}

func (rp *replicaProposer) shouldCampaignOnRedirect(raftGroup proposerRaft) bool {
r := (*Replica)(rp)
livenessMap, _ := r.store.livenessMap.Load().(liveness.IsLiveMap)
return shouldCampaignOnLeaseRequestRedirect(
raftGroup.BasicStatus(),
livenessMap,
r.descRLocked(),
r.requiresExpiringLeaseRLocked(),
r.store.Clock().Now(),
)
}

// rejectProposalWithRedirectLocked is part of the proposer interface.
func (rp *replicaProposer) rejectProposalWithRedirectLocked(
ctx context.Context, prop *ProposalData, redirectTo roachpb.ReplicaID,
Expand Down
Loading

0 comments on commit fbecbbc

Please sign in to comment.