Skip to content

Commit

Permalink
kv: campaign on rejected lease request when leader not live in node l…
Browse files Browse the repository at this point in the history
…iveness

Fixes cockroachdb#84655.
Related to cockroachdb#49220.

This commit extends the logic introduced in 8aa1c14 to simultaneously campaign
for Raft leadership when rejecting a lease request on a Raft follower that
observes that the current Raft leader is not live according to node liveness.
These kinds of cases are often associated with asymmetric network partitions.

In such cases, the Raft leader will be unable to acquire an epoch-based lease
until it heartbeats its liveness record. As a result, the range can only regain
availability if a different replica acquires the lease. However, the protection
added in 8aa1c14 prevents followers from acquiring leases to protect against a
different form of unavailability.

After this commit, the followers will attempt to acquire Raft leadership when
detecting such cases by campaigning. This allows these ranges to recover
availability once Raft leadership moves off the partitioned Raft leader to one
of the followers that can reach node liveness and can subsequently acquire the
lease.

Campaigning for Raft leadership is safer than blindly allowing the lease request
to be proposed (through a redirected proposal). This is because the follower may
be arbitrarily far behind on its Raft log and acquiring the lease in such cases
could cause unavailability (the kind we saw in cockroachdb#37906). By instead calling a
Raft pre-vote election, the follower can determine whether it is behind on its
log without risking disruption. If so, we don't want it to acquire the lease —
one of the other followers that is caught up on its log can. If not, it will
eventually become leader and can proceed with a future attempt to acquire the
lease.

The commit adds a test that reproduces the failure mode described in cockroachdb#84655. It
creates an asymmetric network partition scenario that looks like:
```
        [0]       raft leader / initial leaseholder
         ^
        / \
       /   \
      v     v
    [1]<--->[2]   raft followers
      ^     ^
       \   /
        \ /
         v
        [3]       liveness range
```
It then waits for the raft leader's lease to expire and demonstrates that one of
the raft followers will now call a Raft election, which allows it to safely grab
Raft leadership, acquire the lease, and recover availability. Without the change,
the test failed.

Release justification: None. Too risky for the stability period. Potential
backport candidate after sufficient baking on master.

Release note (bug fix): A bug causing ranges to remain without a leaseholder in
cases of asymmetric network partitions has been resolved.
  • Loading branch information
nvanbenschoten committed Jan 26, 2023
1 parent 0a81ddb commit 8bacca2
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 10 deletions.
134 changes: 134 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,140 @@ func TestRequestsOnLaggingReplica(t *testing.T) {
require.Equal(t, leaderReplicaID, nlhe.Lease.Replica.ReplicaID)
}

// TestRequestsOnFollowerWithNonLiveLeaseholder tests the availability of a
// range that has an expired epoch-based lease and a live Raft leader that is
// unable to heartbeat its liveness record. Such a range should recover once
// Raft leadership moves off the partitioned Raft leader to one of the followers
// that can reach node liveness.
//
// This test relies on follower replicas campaigning for Raft leadership in
// certain cases when refusing to forward lease acquisition requests to the
// leader. In these cases where they determine that the leader is non-live
// according to node liveness, they will attempt to steal Raft leadership and,
// if successful, will be able to perform future lease acquisition attempts.
func TestRequestsOnFollowerWithNonLiveLeaseholder(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

var installPartition int32
partitionFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if atomic.LoadInt32(&installPartition) == 0 {
return nil
}
if ba.GatewayNodeID == 1 && ba.Replica.NodeID == 4 {
return roachpb.NewError(context.Canceled)
}
return nil
}

clusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
// Reduce the election timeout some to speed up the test.
RaftConfig: base.RaftConfig{RaftElectionTimeoutTicks: 10},
Knobs: base.TestingKnobs{
NodeLiveness: kvserver.NodeLivenessTestingKnobs{
// This test waits for an epoch-based lease to expire, so we're
// setting the liveness duration as low as possible while still
// keeping the test stable.
LivenessDuration: 3000 * time.Millisecond,
RenewalDuration: 1500 * time.Millisecond,
},
Store: &kvserver.StoreTestingKnobs{
// We eliminate clock offsets in order to eliminate the stasis period
// of leases, in order to speed up the test.
MaxOffset: time.Nanosecond,
TestingRequestFilter: partitionFilter,
},
},
},
}

tc := testcluster.StartTestCluster(t, 4, clusterArgs)
defer tc.Stopper().Stop(ctx)

{
// Move the liveness range to node 4.
desc := tc.LookupRangeOrFatal(t, keys.NodeLivenessPrefix)
tc.RebalanceVoterOrFatal(ctx, t, desc.StartKey.AsRawKey(), tc.Target(0), tc.Target(3))
}

// Create a new range.
_, rngDesc, err := tc.Servers[0].ScratchRangeEx()
require.NoError(t, err)
key := rngDesc.StartKey.AsRawKey()
// Add replicas on all the stores.
tc.AddVotersOrFatal(t, rngDesc.StartKey.AsRawKey(), tc.Target(1), tc.Target(2))

// Store 0 holds the lease.
store0 := tc.GetFirstStoreFromServer(t, 0)
store0Repl, err := store0.GetReplica(rngDesc.RangeID)
require.NoError(t, err)
leaseStatus := store0Repl.CurrentLeaseStatus(ctx)
require.True(t, leaseStatus.OwnedBy(store0.StoreID()))

{
// Write a value so that the respective key is present in all stores and we
// can increment it again later.
_, err := tc.Server(0).DB().Inc(ctx, key, 1)
require.NoError(t, err)
log.Infof(ctx, "test: waiting for initial values...")
tc.WaitForValues(t, key, []int64{1, 1, 1, 0})
log.Infof(ctx, "test: waiting for initial values... done")
}

// Begin dropping all node liveness heartbeats from the original raft leader
// while allowing the leader to maintain Raft leadership and otherwise behave
// normally. This mimics cases where the raft leader is partitioned away from
// the liveness range but can otherwise reach its followers. In these cases,
// it is still possible that the followers can reach the liveness range and
// see that the leader becomes non-live. For example, the configuration could
// look like:
//
// [0] raft leader
// ^
// / \
// / \
// v v
// [1]<--->[2] raft followers
// ^ ^
// \ /
// \ /
// v
// [3] liveness range
//
log.Infof(ctx, "test: partitioning node")
atomic.StoreInt32(&installPartition, 1)

// Wait until the lease expires.
log.Infof(ctx, "test: waiting for lease expiration")
testutils.SucceedsSoon(t, func() error {
leaseStatus = store0Repl.CurrentLeaseStatus(ctx)
if leaseStatus.IsValid() {
return errors.New("lease still valid")
}
return nil
})
log.Infof(ctx, "test: lease expired")

{
// Increment the initial value again, which requires range availability. To
// get there, the request will need to trigger a lease request on a follower
// replica, which will call a Raft election, acquire Raft leadership, then
// acquire the range lease.
_, err := tc.Server(0).DB().Inc(ctx, key, 1)
require.NoError(t, err)
log.Infof(ctx, "test: waiting for new lease...")
tc.WaitForValues(t, key, []int64{2, 2, 2, 0})
log.Infof(ctx, "test: waiting for new lease... done")
}

// Store 0 no longer holds the lease.
leaseStatus = store0Repl.CurrentLeaseStatus(ctx)
require.False(t, leaseStatus.OwnedBy(store0.StoreID()))
}

type fakeSnapshotStream struct {
nextReq *kvserverpb.SnapshotRequest
nextErr error
Expand Down
33 changes: 27 additions & 6 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -117,15 +118,14 @@ type propBuf struct {
}

type rangeLeaderInfo struct {
// iAmTheLeader is set if the local replica is the leader.
iAmTheLeader bool
// leaderKnown is set if the local Raft machinery knows who the leader is. If
// not set, all other fields are empty.
leaderKnown bool

// leader represents the Raft group's leader. Not set if leaderKnown is not
// set.
leader roachpb.ReplicaID
// iAmTheLeader is set if the local replica is the leader.
iAmTheLeader bool
// leaderEligibleForLease is set if the leader is known and its type of
// replica allows it to acquire a lease.
leaderEligibleForLease bool
Expand All @@ -144,6 +144,7 @@ type proposer interface {
closedTimestampTarget() hlc.Timestamp
leaderStatus(ctx context.Context, raftGroup proposerRaft) rangeLeaderInfo
ownsValidLease(ctx context.Context, now hlc.ClockTimestamp) bool
shouldCampaignOnRedirect(raftGroup proposerRaft) bool

// The following require the proposer to hold an exclusive lock.
withGroupLocked(func(proposerRaft) error) error
Expand Down Expand Up @@ -182,7 +183,9 @@ type proposer interface {
type proposerRaft interface {
Step(raftpb.Message) error
Status() raft.Status
BasicStatus() raft.BasicStatus
ProposeConfChange(raftpb.ConfChangeI) error
Campaign() error
}

// Init initializes the proposal buffer and binds it to the provided proposer.
Expand Down Expand Up @@ -614,6 +617,12 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked(
log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is",
li.leader)
b.p.rejectProposalWithRedirectLocked(ctx, p, li.leader)
if b.p.shouldCampaignOnRedirect(raftGroup) {
log.VEventf(ctx, 2, "campaigning because Raft leader not live in node liveness map")
if err := raftGroup.Campaign(); err != nil {
log.VEventf(ctx, 1, "failed to campaign: %s", err)
}
}
return true
}
// If the leader is not known, or if it is known but it's ineligible
Expand Down Expand Up @@ -715,7 +724,7 @@ func (b *propBuf) leaderStatusRLocked(ctx context.Context, raftGroup proposerRaf
!leaderInfo.iAmTheLeader {
log.Fatalf(ctx,
"inconsistent Raft state: state %s while the current replica is also the lead: %d",
raftGroup.Status().RaftState, leaderInfo.leader)
raftGroup.BasicStatus().RaftState, leaderInfo.leader)
}
return leaderInfo
}
Expand Down Expand Up @@ -1186,7 +1195,7 @@ func (rp *replicaProposer) leaderStatus(
) rangeLeaderInfo {
r := (*Replica)(rp)

status := raftGroup.Status()
status := raftGroup.BasicStatus()
iAmTheLeader := status.RaftState == raft.StateLeader
leader := status.Lead
leaderKnown := leader != raft.None
Expand Down Expand Up @@ -1217,9 +1226,9 @@ func (rp *replicaProposer) leaderStatus(
}
}
return rangeLeaderInfo{
iAmTheLeader: iAmTheLeader,
leaderKnown: leaderKnown,
leader: roachpb.ReplicaID(leader),
iAmTheLeader: iAmTheLeader,
leaderEligibleForLease: leaderEligibleForLease,
}
}
Expand All @@ -1228,6 +1237,18 @@ func (rp *replicaProposer) ownsValidLease(ctx context.Context, now hlc.ClockTime
return (*Replica)(rp).ownsValidLeaseRLocked(ctx, now)
}

func (rp *replicaProposer) shouldCampaignOnRedirect(raftGroup proposerRaft) bool {
r := (*Replica)(rp)
livenessMap, _ := r.store.livenessMap.Load().(liveness.IsLiveMap)
return shouldCampaignOnLeaseRequestRedirect(
raftGroup.BasicStatus(),
livenessMap,
r.descRLocked(),
r.requiresExpiringLeaseRLocked(),
r.store.Clock().Now(),
)
}

// rejectProposalWithRedirectLocked is part of the proposer interface.
func (rp *replicaProposer) rejectProposalWithRedirectLocked(
ctx context.Context, prop *ProposalData, redirectTo roachpb.ReplicaID,
Expand Down
39 changes: 36 additions & 3 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type testProposer struct {
lease *roachpb.Lease, reason raftutil.ReplicaNeedsSnapshotStatus)
// validLease is returned by ownsValidLease()
validLease bool
// leaderNotLive is returned from shouldCampaignOnRedirect().
leaderNotLive bool

// leaderReplicaInDescriptor is set if the leader (as indicated by raftGroup)
// is known, and that leader is part of the range's descriptor (as seen by the
Expand All @@ -82,7 +84,8 @@ type testProposerRaft struct {
status raft.Status
// proposals are the commands that the propBuf flushed (i.e. passed to the
// Raft group) and have not yet been consumed with consumeProposals().
proposals []kvserverpb.RaftCommand
proposals []kvserverpb.RaftCommand
campaigned bool
}

var _ proposerRaft = &testProposerRaft{}
Expand Down Expand Up @@ -113,11 +116,20 @@ func (t testProposerRaft) Status() raft.Status {
return t.status
}

func (t testProposerRaft) BasicStatus() raft.BasicStatus {
return t.status.BasicStatus
}

func (t testProposerRaft) ProposeConfChange(i raftpb.ConfChangeI) error {
// TODO(andrei, nvanbenschoten): Capture the message and test against it.
return nil
}

func (t *testProposerRaft) Campaign() error {
t.campaigned = true
return nil
}

func (t *testProposer) locker() sync.Locker {
return &t.RWMutex
}
Expand Down Expand Up @@ -174,7 +186,7 @@ func (t *testProposer) registerProposalLocked(p *ProposalData) {
}

func (t *testProposer) leaderStatus(ctx context.Context, raftGroup proposerRaft) rangeLeaderInfo {
lead := raftGroup.Status().Lead
lead := raftGroup.BasicStatus().Lead
leaderKnown := lead != raft.None
var leaderRep roachpb.ReplicaID
var iAmTheLeader, leaderEligibleForLease bool
Expand All @@ -200,9 +212,9 @@ func (t *testProposer) leaderStatus(ctx context.Context, raftGroup proposerRaft)
}
}
return rangeLeaderInfo{
iAmTheLeader: iAmTheLeader,
leaderKnown: leaderKnown,
leader: leaderRep,
iAmTheLeader: iAmTheLeader,
leaderEligibleForLease: leaderEligibleForLease,
}
}
Expand All @@ -211,6 +223,10 @@ func (t *testProposer) ownsValidLease(ctx context.Context, now hlc.ClockTimestam
return t.validLease
}

func (t *testProposer) shouldCampaignOnRedirect(raftGroup proposerRaft) bool {
return t.leaderNotLive
}

func (t *testProposer) rejectProposalWithRedirectLocked(
_ context.Context, _ *ProposalData, redirectTo roachpb.ReplicaID,
) {
Expand Down Expand Up @@ -493,10 +509,14 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
// Set to simulate situations where the local replica is so behind that the
// leader is not even part of the range descriptor.
leaderNotInRngDesc bool
// Set to simulate situations where the Raft leader is not live in the node
// liveness map.
leaderNotLive bool
// If true, the follower has a valid lease.
ownsValidLease bool

expRejection bool
expCampaign bool
}{
{
name: "leader",
Expand Down Expand Up @@ -552,6 +572,17 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
// FlushLockedWithRaftGroup().
expRejection: false,
},
{
name: "follower, known eligible non-live leader",
state: raft.StateFollower,
// Someone else is leader.
leader: self + 1,
leaderNotLive: true,
// Rejection - a follower can't request a lease.
expRejection: true,
// The leader is non-live, so we should campaign.
expCampaign: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
var p testProposer
Expand Down Expand Up @@ -585,6 +616,7 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
p.leaderReplicaInDescriptor = !tc.leaderNotInRngDesc
p.leaderReplicaType = tc.leaderRepType
p.validLease = tc.ownsValidLease
p.leaderNotLive = tc.leaderNotLive

var b propBuf
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
Expand All @@ -601,6 +633,7 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
} else {
require.Equal(t, roachpb.ReplicaID(0), rejected)
}
require.Equal(t, tc.expCampaign, r.campaigned)
require.Zero(t, tracker.Count())
})
}
Expand Down
Loading

0 comments on commit 8bacca2

Please sign in to comment.