From a3747752436870f5c24c62e535aed007d0b00168 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 10 Jun 2022 16:51:44 -0400 Subject: [PATCH] kv: don't allow lease transfers to replicas in need of snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #81763. Part of #81561. \### Background When performing a lease transfer, the outgoing leaseholder revokes its lease before proposing the lease transfer request, meaning that it promises to stop using the previous lease to serve reads or writes. The lease transfer request is then proposed and committed to the Raft log, at which point the new lease officially becomes active. However, this new lease is not usable until the incoming leaseholder applies the Raft entry that contains the lease transfer and notices that it is now the leaseholder for the range. The effect of this handoff is that there exists a "power vacuum" time period when the outgoing leaseholder has revoked its previous lease but the incoming leaseholder has not yet applied its new lease. During this time period, a range is effectively unavailable for strong reads and writes, because no replica will act as the leaseholder. Instead, requests that require the lease will be redirected back and forth between the outgoing leaseholder and the incoming leaseholder (the client backs off). To minimize the disruption caused by lease transfers, we need to minimize this time period. We assume that if a lease transfer target is sufficiently caught up on its log such that it will be able to apply the lease transfer through log entry application then this unavailability window will be acceptable. This may be a faulty assumption in cases with severe replication lag, but we must balance any heuristics here that attempts to determine "too much lag" with the possibility of starvation of lease transfers under sustained write load and a resulting sustained replication lag. See #38065 and #42379, which removed such a heuristic. For now, we don't try to make such a determination. \### Patch Details However, with this change, we now draw a distinction between lease transfer targets that will be able to apply the lease transfer through log entry application and those that will require a Raft snapshot to catch up and apply the lease transfer. Raft snapshots are more expensive than Raft entry replication. They are also significantly more likely to be delayed due to queueing behind other snapshot traffic in the system. This potential for delay makes transferring a lease to a replica that needs a snapshot very risky, as doing so has the effect of inducing range unavailability until the snapshot completes, which could take seconds, minutes, or hours. In the future, we will likely get better at prioritizing snapshots to improve the responsiveness of snapshots that are needed to recover availability. However, even in this world, it is not worth inducing unavailability that can only be recovered through a Raft snapshot. It is better to catch the desired lease target up on the log first and then initiate the lease transfer once its log is connected to the leader's. For this reason, unless we can guarantee that the lease transfer target does not need a Raft snapshot, we don't let it through. This commit adds protection against such risky lease transfers at two levels. First, it includes hardened protection in the Replica proposal buffer, immediately before handing the lease transfer proposal off to `etcd/raft`. Second, it includes best-effort protection before a Replica begins to initiate a lease transfer in `AdminTransferLease`, which all lease transfer operations flow through. The change includes protection at two levels because rejecting a lease transfer in the proposal buffer after we have revoked our current lease is more disruptive than doing so earlier, before we have revoked our current lease. Best-effort protection is also able to respond more gracefully to invalid targets (e.g. they pick the next best target). However, the check in the Replica proposal buffer is the only place where the protection is airtight against race conditions because the check is performed: 1. by the current Raft leader, else the proposal will fail 2. while holding latches that prevent interleaving log truncation \### Remaining Work With this change, there is a single known race which can lead to an incoming leaseholder needing a snapshot. This is the case when a leader/leaseholder transfers the lease and then quickly loses Raft leadership to a peer that has a shorter log. Even if the older leader could have caught the incoming leaseholder up on its log, the new leader may not be able to because its log may not go back as far. Such a scenario has been possible since we stopped ensuring that all replicas have logs that start at the same index. For more details, see the discussion about #35701 in #81561. This race is theoretical — we have not seen it in practice. It's not clear whether we will try to address it or rely on a mitigation like the one described in #81764 to limit its blast radius. ---- Release note (bug fix): Range lease transfers are no longer permitted to follower replicas that may require a Raft snapshot. This ensures that lease transfers are never delayed behind snapshots, which could previously create range unavailability until the snapshot completed. Lease transfers now err on the side of caution and are only allowed when the outgoing leaseholder can guarantee that the incoming leaseholder does not need a snapshot. --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/client_relocate_range_test.go | 1 + pkg/kv/kvserver/client_replica_test.go | 213 +++++++++++++-- pkg/kv/kvserver/markers.go | 15 +- pkg/kv/kvserver/replica_init.go | 1 + pkg/kv/kvserver/replica_proposal_buf.go | 132 ++++++++- pkg/kv/kvserver/replica_proposal_buf_test.go | 258 +++++++++++++++--- pkg/kv/kvserver/replica_range_lease.go | 58 ++++ pkg/kv/kvserver/replica_test.go | 3 + pkg/kv/kvserver/testing_knobs.go | 9 + .../serverutils/test_cluster_shim.go | 10 +- pkg/testutils/testcluster/testcluster.go | 4 +- 12 files changed, 625 insertions(+), 80 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 87d796b3a683..8d637f92a039 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -348,6 +348,7 @@ go_test( "//pkg/kv/kvserver/protectedts/ptstorage", "//pkg/kv/kvserver/protectedts/ptutil", "//pkg/kv/kvserver/raftentry", + "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary/rspb", "//pkg/kv/kvserver/replicastats", diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index 842706216090..7732211baaad 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -43,6 +43,7 @@ func relocateAndCheck( voterTargets []roachpb.ReplicationTarget, nonVoterTargets []roachpb.ReplicationTarget, ) (retries int) { + t.Helper() every := log.Every(1 * time.Second) testutils.SucceedsSoon(t, func() error { err := tc.Servers[0].DB(). diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index a3d9a9386795..ac603d020bcc 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -1723,23 +1723,15 @@ func TestRangeLocalUncertaintyLimitAfterNewLease(t *testing.T) { // Up-replicate the data in the range to node2. tc.AddVotersOrFatal(t, keyA, tc.Target(1)) - replica1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(keyA)) - - // Transfer the lease from node1 to node2. replica2 := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(keyA)) - replica2Desc, err := replica2.GetReplicaDescriptor() - if err != nil { - t.Fatal(err) - } + // Transfer the lease from node1 to node2. node1Before := tc.Servers[0].Clock().Now() + tc.TransferRangeLeaseOrFatal(t, *replica2.Desc(), tc.Target(1)) testutils.SucceedsSoon(t, func() error { - if err := replica1.AdminTransferLease(ctx, replica2Desc.StoreID); err != nil { - t.Fatal(err) - } lease, _ := replica2.GetLease() if lease.Replica.NodeID != replica2.NodeID() { - return errors.Errorf("expected lease transfer to node2: %s", lease) + return errors.Errorf("expected lease transfer to apply on node2: %s", lease) } return nil }) @@ -2151,20 +2143,15 @@ func TestLeaseInfoRequest(t *testing.T) { // Transfer the lease to Servers[0] so we start in a known state. Otherwise, // there might be already a lease owned by a random node. - err = tc.TransferRangeLease(rangeDesc, tc.Target(0)) - if err != nil { - t.Fatal(err) - } + tc.TransferRangeLeaseOrFatal(t, rangeDesc, tc.Target(0)) // Now test the LeaseInfo. We might need to loop until the node we query has applied the lease. validateLeaseholderSoon(t, kvDB0, rangeDesc.StartKey.AsRawKey(), replicas[0], true) // Transfer the lease to Server 1 and check that LeaseInfoRequest gets the // right answer. - err = tc.TransferRangeLease(rangeDesc, tc.Target(1)) - if err != nil { - t.Fatal(err) - } + tc.TransferRangeLeaseOrFatal(t, rangeDesc, tc.Target(1)) + // An inconsistent LeaseInfoReqeust on the old lease holder should give us the // right answer immediately, since the old holder has definitely applied the // transfer before TransferRangeLease returned. @@ -2184,10 +2171,7 @@ func TestLeaseInfoRequest(t *testing.T) { // Transfer the lease to Server 2 and check that LeaseInfoRequest gets the // right answer. - err = tc.TransferRangeLease(rangeDesc, tc.Target(2)) - if err != nil { - t.Fatal(err) - } + tc.TransferRangeLeaseOrFatal(t, rangeDesc, tc.Target(2)) // We're now going to ask servers[1] for the lease info. We don't use kvDB1; // instead we go directly to the store because otherwise the DistSender might @@ -2593,6 +2577,15 @@ func TestClearRange(t *testing.T) { // possible and hard to prevent entirely. The Replica will only learn that it is // the new leaseholder when it applies the snapshot. When doing so, it should // make sure to apply the lease-related side-effects to its in-memory state. +// +// EDIT: as of June 2022, we have protection that should make this scenario +// significantly more rare. This test uses a knob to disable the new protection +// so that it can create the scenario where a replica learns that it holds the +// lease through a snapshot. We'll want to keep the test and the corresponding +// logic in applySnapshot around until we can eliminate the scenario entirely. +// See the commentary in github.com/cockroachdb/cockroach/issues/81561 about +// sending Raft logs in Raft snapshots for a discussion about why this may not +// be worth eliminating. func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2604,6 +2597,9 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + AllowLeaseTransfersWhenTargetMayNeedSnapshot: true, + }, Server: &server.TestingKnobs{ WallClock: manualClock, }, @@ -2660,12 +2656,15 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { // Partition node 2 from the rest of its range. Once partitioned, perform // another write and truncate the Raft log on the two connected nodes. This - // ensures that that when node 2 comes back up it will require a snapshot - // from Raft. + // ensures that when node 2 comes back up it will require a snapshot from + // Raft. funcs := noopRaftHandlerFuncs() funcs.dropReq = func(*kvserverpb.RaftMessageRequest) bool { return true } + funcs.snapErr = func(*kvserverpb.SnapshotRequest_Header) error { + return errors.New("rejected") + } tc.Servers[2].RaftTransport().Listen(store2.StoreID(), &unreliableRaftHandler{ rangeID: repl0.GetRangeID(), RaftMessageHandler: store2, @@ -2690,9 +2689,7 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { // Finally, transfer the lease to node 2 while it is still unavailable and // behind. We try to avoid this case when picking new leaseholders in practice, // but we're never 100% successful. - if err := repl0.AdminTransferLease(ctx, store2.Ident.StoreID); err != nil { - t.Fatal(err) - } + tc.TransferRangeLeaseOrFatal(t, *repl0.Desc(), tc.Target(2)) // Remove the partition. A snapshot to node 2 should follow. This snapshot // will inform node 2 that it is the new leaseholder for the range. Node 2 @@ -2717,6 +2714,166 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { }) } +// TestLeaseTransferRejectedIfTargetNeedsSnapshot prevents a regression of +// #81561. It verifies that a replica will reject a lease transfer request if it +// can not guarantee that the lease target is sufficiently caught up on its Raft +// log such that it does not need a Raft snapshot. The test does so by inducing +// a partition, truncating the Raft log, and then trying to transfer the lease to +// the replica that is now cut off from the log. The lease transfer request must +// be rejected. +// +// The test has two variants. The !rejectAfterRevoke variant exercises the +// common but best-effort protection against unsafe lease transfers in +// Replica.AdminTransferLease. The rejectAfterRevoke variant exercises the +// uncommon but airtight protection against unsafe lease transfers in +// propBuf.maybeRejectUnsafeProposalLocked. +func TestLeaseTransferRejectedIfTargetNeedsSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "reject-after-revoke", func(t *testing.T, rejectAfterRevoke bool) { + var transferLeaseReqBlockOnce sync.Once + transferLeaseReqBlockedC := make(chan struct{}) + transferLeaseReqUnblockedC := make(chan struct{}) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if rejectAfterRevoke && ba.IsSingleTransferLeaseRequest() { + transferLeaseReqBlockOnce.Do(func() { + close(transferLeaseReqBlockedC) + <-transferLeaseReqUnblockedC + }) + } + return nil + }, + // Speed up the lease transfer retry loop. + LeaseTransferRejectedRetryLoopCount: 2, + }, + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + store0 := tc.GetFirstStoreFromServer(t, 0) + store2 := tc.GetFirstStoreFromServer(t, 2) + + keyA := tc.ScratchRange(t) + keyB := keyA.Next() + keyC := keyB.Next() + + // First, do a couple of writes; we'll use these to determine when + // the dust has settled. + incA := incrementArgs(keyA, 1) + _, pErr := kv.SendWrapped(ctx, store0.TestSender(), incA) + require.Nil(t, pErr) + incC := incrementArgs(keyC, 2) + _, pErr = kv.SendWrapped(ctx, store0.TestSender(), incC) + require.Nil(t, pErr) + + tc.AddVotersOrFatal(t, keyA, tc.Targets(1, 2)...) + tc.WaitForValues(t, keyA, []int64{1, 1, 1}) + tc.WaitForValues(t, keyC, []int64{2, 2, 2}) + + repl0 := store0.LookupReplica(roachpb.RKey(keyA)) + + // Grab the current lease. We'll use it later. + preLease, _, err := tc.FindRangeLease(*repl0.Desc(), nil) + require.NoError(t, err) + require.Equal(t, store0.StoreID(), preLease.Replica.StoreID) + + // Partition node 2 from the rest of its range. Once partitioned, perform + // another write and truncate the Raft log on the two connected nodes. This + // ensures that when node 2 comes back up it will require a snapshot from + // Raft. + funcs := noopRaftHandlerFuncs() + funcs.dropReq = func(*kvserverpb.RaftMessageRequest) bool { + return true + } + funcs.snapErr = func(*kvserverpb.SnapshotRequest_Header) error { + return errors.New("rejected") + } + tc.Servers[2].RaftTransport().Listen(store2.StoreID(), &unreliableRaftHandler{ + rangeID: repl0.GetRangeID(), + RaftMessageHandler: store2, + unreliableRaftHandlerFuncs: funcs, + }) + + _, pErr = kv.SendWrapped(ctx, store0.TestSender(), incC) + require.Nil(t, pErr) + tc.WaitForValues(t, keyC, []int64{4, 4, 2}) + + // If we want the lease transfer rejection to come after the leaseholder has + // revoked its lease, we launch the lease transfer before the log truncation + // and block it after it has passed through the best-effort protection in + // Replica.AdminTransferLease. + transferErrC := make(chan error, 1) + if rejectAfterRevoke { + _ = tc.Stopper().RunAsyncTask(ctx, "transfer leas", func(ctx context.Context) { + transferErrC <- tc.TransferRangeLease(*repl0.Desc(), tc.Target(2)) + }) + <-transferLeaseReqBlockedC + } + + // Truncate the log at index+1 (log entries < N are removed, so this + // includes the increment). This necessitates a snapshot when the + // partitioned replica rejoins the rest of the range. + index := repl0.GetLastIndex() + truncArgs := truncateLogArgs(index+1, repl0.GetRangeID()) + truncArgs.Key = keyA + _, pErr = kv.SendWrapped(ctx, store0.TestSender(), truncArgs) + require.Nil(t, pErr) + + // Complete or initiate the lease transfer attempt to node 2, which must not + // succeed because node 2 now needs a snapshot. + var transferErr error + if rejectAfterRevoke { + close(transferLeaseReqUnblockedC) + transferErr = <-transferErrC + } else { + transferErr = tc.TransferRangeLease(*repl0.Desc(), tc.Target(2)) + } + isRejectedErr := kvserver.IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(transferErr) + require.True(t, isRejectedErr, "%+v", transferErr) + + // Remove the partition. A snapshot to node 2 should follow. + tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, store2) + tc.WaitForValues(t, keyC, []int64{4, 4, 4}) + + // Now that node 2 caught up on the log through a snapshot, we should be + // able to transfer the lease to it successfully. + // NOTE: we used a testing knob to disable automatic lease transfer retries, + // so we use a SucceedsSoon loop. + testutils.SucceedsSoon(t, func() error { + if err := tc.TransferRangeLease(*repl0.Desc(), tc.Target(2)); err != nil { + if kvserver.IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(err) { + return err + } + t.Fatal(err) + } + return nil + }) + + // Verify that the lease is now held by node 2. + postLease, _, err := tc.FindRangeLease(*repl0.Desc(), nil) + require.NoError(t, err) + require.Equal(t, store2.StoreID(), postLease.Replica.StoreID) + + // Additionally, verify that the lease has the expected sequence number. If + // the lease transfer rejection came after the previous lease was revoked, + // then node 0 must have re-acquired the lease (with a new sequence number) + // in order to transfer it to node 2. + expSeq := preLease.Sequence + 1 + if rejectAfterRevoke { + expSeq++ + } + require.Equal(t, expSeq, postLease.Sequence) + }) +} + // TestConcurrentAdminChangeReplicasRequests ensures that when two attempts to // change replicas for a range race, only one will succeed. func TestConcurrentAdminChangeReplicasRequests(t *testing.T) { diff --git a/pkg/kv/kvserver/markers.go b/pkg/kv/kvserver/markers.go index 9a4aca0fdde6..6d3d16a148c7 100644 --- a/pkg/kv/kvserver/markers.go +++ b/pkg/kv/kvserver/markers.go @@ -82,8 +82,21 @@ func IsIllegalReplicationChangeError(err error) bool { var errMarkReplicationChangeInProgress = errors.New("replication change in progress") // IsReplicationChangeInProgressError detects whether an error (assumed to have -// been emitted a replication change) indicates that the replication change +// been emitted by a replication change) indicates that the replication change // failed because another replication change was in progress on the range. func IsReplicationChangeInProgressError(err error) bool { return errors.Is(err, errMarkReplicationChangeInProgress) } + +var errMarkLeaseTransferRejectedBecauseTargetMayNeedSnapshot = errors.New( + "lease transfer rejected because the target may need a snapshot") + +// IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError detects whether an +// error (assumed to have been emitted by a lease transfer request) indicates +// that the lease transfer failed because the current leaseholder could not +// prove that the lease transfer target did not need a Raft snapshot. In order +// to prove this, the current leaseholder must also be the Raft leader, which is +// periodically requested in maybeTransferRaftLeadershipToLeaseholderLocked. +func IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(err error) bool { + return errors.Is(err, errMarkLeaseTransferRejectedBecauseTargetMayNeedSnapshot) +} diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index ab713c5354eb..b1c17227483e 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -104,6 +104,7 @@ func newUnloadedReplica( r.mu.checksums = map[uuid.UUID]replicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings()) r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader + r.mu.proposalBuf.testing.allowLeaseTransfersWhenTargetMayNeedSnapshot = store.cfg.TestingKnobs.AllowLeaseTransfersWhenTargetMayNeedSnapshot r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps r.mu.proposalBuf.testing.submitProposalFilter = store.cfg.TestingKnobs.TestingProposalSubmitFilter diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index b5b0a0800b20..9603613d1ced 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/errorutil" @@ -110,6 +111,10 @@ type propBuf struct { // heartbeats and then expect other replicas to take the lease without // worrying about Raft). allowLeaseProposalWhenNotLeader bool + // allowLeaseTransfersWhenTargetMayNeedSnapshot, if set, makes the proposal + // buffer allow lease request proposals even when the proposer cannot prove + // that the lease transfer target does not need a Raft snapshot. + allowLeaseTransfersWhenTargetMayNeedSnapshot bool // dontCloseTimestamps inhibits the closing of timestamps. dontCloseTimestamps bool } @@ -137,6 +142,7 @@ type proposer interface { // The following require the proposer to hold (at least) a shared lock. getReplicaID() roachpb.ReplicaID destroyed() destroyStatus + firstIndex() uint64 leaseAppliedIndex() uint64 enqueueUpdateCheck() closedTimestampTarget() hlc.Timestamp @@ -158,6 +164,18 @@ type proposer interface { prop *ProposalData, redirectTo roachpb.ReplicaID, ) + // rejectProposalWithLeaseTransferRejectedLocked rejects a proposal for a + // lease transfer when the transfer is deemed to be unsafe. The intended + // consequence of the rejection is that the lease transfer attempt will be + // rejected. Higher levels that decide whether or not to attempt a lease + // transfer have weaker versions of the same check, so we don't expect to see + // repeated lease transfer rejections. + rejectProposalWithLeaseTransferRejectedLocked( + ctx context.Context, + prop *ProposalData, + lease *roachpb.Lease, + reason raftutil.ReplicaNeedsSnapshotStatus, + ) // leaseDebugRLocked returns info on the current lease. leaseDebugRLocked() string @@ -167,7 +185,7 @@ type proposer interface { // testing. type proposerRaft interface { Step(raftpb.Message) error - BasicStatus() raft.BasicStatus + Status() raft.Status ProposeConfChange(raftpb.ConfChangeI) error } @@ -536,6 +554,10 @@ func (b *propBuf) FlushLockedWithRaftGroup( // // Currently, the request types which may be rejected by this function are: // - RequestLease when the proposer is not the raft leader (with caveats). +// - TransferLease when the proposer cannot guarantee that the lease transfer +// target does not currently need a Raft snapshot and therefore will not +// need a Raft snapshot to catch up to and apply the lease transfer. This +// requires that the proposer is the raft leader. // // The function returns true if the proposal was rejected, and false if not. // If the proposal was rejected and true is returned, it will have been cleaned @@ -602,6 +624,79 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked( } return false + case p.Request.IsSingleTransferLeaseRequest(): + // When performing a lease transfer, the outgoing leaseholder revokes its + // lease before proposing the lease transfer request, meaning that it + // promises to stop using the previous lease to serve reads or writes. The + // lease transfer request is then proposed and committed to the Raft log, at + // which point the new lease officially becomes active. However, this new + // lease is not usable until the incoming leaseholder applies the Raft entry + // that contains the lease transfer and notices that it is now the + // leaseholder for the range. + // + // The effect of this handoff is that there exists a "power vacuum" time + // period when the outgoing leaseholder has revoked its previous lease but + // the incoming leaseholder has not yet applied its new lease. During this + // time period, a range is effectively unavailable for strong reads and + // writes, because no replica will act as the leaseholder. Instead, requests + // that require the lease will be redirected back and forth between the + // outgoing leaseholder and the incoming leaseholder (the client backs off). + // To minimize the disruption caused by lease transfers, we need to minimize + // this time period. + // + // We assume that if a lease transfer target is sufficiently caught up on + // its log such that it will be able to apply the lease transfer through log + // entry application then this unavailability window will be acceptable. + // This may be a faulty assumption in cases with severe replication lag, but + // we must balance any heuristics here that attempts to determine "too much + // lag" with the possibility of starvation of lease transfers under + // sustained write load and a resulting sustained replication lag. See + // #38065 and #42379, which removed such a heuristic. For now, we don't try + // to make such a determination. + // + // However, we draw a distinction between lease transfer targets that will + // be able to apply the lease transfer through log entry application and + // those that will require a Raft snapshot to catch up and apply the lease + // transfer. Raft snapshots are more expensive than Raft entry replication. + // They are also significantly more likely to be delayed due to queueing + // behind other snapshot traffic in the system. This potential for delay + // makes transferring a lease to a replica that needs a snapshot very risky, + // as doing so has the effect of inducing range unavailability until the + // snapshot completes, which could take seconds, minutes, or hours. + // + // In the future, we will likely get better at prioritizing snapshots to + // improve the responsiveness of snapshots that are needed to recover + // availability. However, even in this world, it is not worth inducing + // unavailability that can only be recovered through a Raft snapshot. It is + // better to catch the desired lease target up on the log first and then + // initiate the lease transfer once its log is connected to the leader's. + // + // For this reason, unless we can guarantee that the lease transfer target + // does not need a Raft snapshot, we don't let it through. This same check + // lives at higher levels in the stack as well (i.e. in the allocator). The + // higher level checks avoid wasted work and respond more gracefully to + // invalid targets (e.g. they pick the next best target). However, this is + // the only place where the protection is airtight against race conditions + // because the check is performed: + // 1. by the current Raft leader, else the proposal will fail + // 2. while holding latches that prevent interleaving log truncation + // + // If an error is thrown here, the outgoing leaseholder still won't be able + // to use its revoked lease. However, it will be able to immediately request + // a new lease. This may be disruptive, which is why we try to avoid hitting + // this airtight protection as much as possible by detecting the failure + // scenario before revoking the outgoing lease. + status := raftGroup.Status() + firstIndex := b.p.firstIndex() + newLease := p.command.ReplicatedEvalResult.State.Lease + newLeaseTarget := newLease.Replica.ReplicaID + snapStatus := raftutil.ReplicaMayNeedSnapshot(&status, firstIndex, newLeaseTarget) + if snapStatus != raftutil.NoSnapshotNeeded && !b.testing.allowLeaseTransfersWhenTargetMayNeedSnapshot { + b.p.rejectProposalWithLeaseTransferRejectedLocked(ctx, p, newLease, snapStatus) + return true + } + return false + default: return false } @@ -619,7 +714,7 @@ func (b *propBuf) leaderStatusRLocked(ctx context.Context, raftGroup proposerRaf !leaderInfo.iAmTheLeader { log.Fatalf(ctx, "inconsistent Raft state: state %s while the current replica is also the lead: %d", - raftGroup.BasicStatus().RaftState, leaderInfo.leader) + raftGroup.Status().RaftState, leaderInfo.leader) } return leaderInfo } @@ -1045,6 +1140,10 @@ func (rp *replicaProposer) destroyed() destroyStatus { return rp.mu.destroyStatus } +func (rp *replicaProposer) firstIndex() uint64 { + return (*Replica)(rp).raftFirstIndexRLocked() +} + func (rp *replicaProposer) leaseAppliedIndex() uint64 { return rp.mu.state.LeaseAppliedIndex } @@ -1084,7 +1183,7 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { func (rp *replicaProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderInfo { r := (*Replica)(rp) - status := raftGroup.BasicStatus() + status := raftGroup.Status() iAmTheLeader := status.RaftState == raft.StateLeader leader := status.Lead leaderKnown := leader != raft.None @@ -1130,9 +1229,26 @@ func (rp *replicaProposer) rejectProposalWithRedirectLocked( Replica: redirectRep, } log.VEventf(ctx, 2, "redirecting proposal to node %s; request: %s", redirectRep.NodeID, prop.Request) - r.cleanupFailedProposalLocked(prop) - prop.finishApplication(ctx, proposalResult{ - Err: roachpb.NewError(newNotLeaseHolderError( - speculativeLease, storeID, rangeDesc, "refusing to acquire lease on follower")), - }) + rp.rejectProposalWithErrLocked(ctx, prop, roachpb.NewError(newNotLeaseHolderError( + speculativeLease, storeID, rangeDesc, "refusing to acquire lease on follower"))) +} + +func (rp *replicaProposer) rejectProposalWithLeaseTransferRejectedLocked( + ctx context.Context, + prop *ProposalData, + lease *roachpb.Lease, + reason raftutil.ReplicaNeedsSnapshotStatus, +) { + rp.store.metrics.LeaseTransferErrorCount.Inc(1) + log.VEventf(ctx, 2, "not proposing lease transfer because the target %s may "+ + "need a snapshot: %s", lease.Replica, reason) + err := newLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(lease.Replica, reason) + rp.rejectProposalWithErrLocked(ctx, prop, roachpb.NewError(err)) +} + +func (rp *replicaProposer) rejectProposalWithErrLocked( + ctx context.Context, prop *ProposalData, pErr *roachpb.Error, +) { + (*Replica)(rp).cleanupFailedProposalLocked(prop) + prop.finishApplication(ctx, proposalResult{Err: pErr}) } diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 8e1256528333..08b7efe6879d 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -12,6 +12,7 @@ package kvserver import ( "context" + "math" "math/rand" "sync" "testing" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -33,6 +35,7 @@ import ( "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" + rafttracker "go.etcd.io/etcd/raft/v3/tracker" "golang.org/x/sync/errgroup" ) @@ -41,6 +44,7 @@ type testProposer struct { syncutil.RWMutex clock *hlc.Clock ds destroyStatus + fi uint64 lai uint64 enqueued int registered int @@ -48,9 +52,13 @@ type testProposer struct { // If not nil, this can be a testProposerRaft used to mock the raft group // passed to FlushLockedWithRaftGroup(). raftGroup proposerRaft - // If not nil, this is called by RejectProposalWithRedirectLocked(). If nil, - // RejectProposalWithRedirectLocked() panics. - onRejectProposalWithRedirectLocked func(prop *ProposalData, redirectTo roachpb.ReplicaID) + // If not nil, this is called by rejectProposalWithRedirectLocked(). + // If nil, rejectProposalWithRedirectLocked() panics. + onRejectProposalWithRedirectLocked func(redirectTo roachpb.ReplicaID) + // If not nil, this is called by rejectProposalWithLeaseTransferRejectedLocked(). + // If nil, rejectProposalWithLeaseTransferRejectedLocked() panics. + onRejectProposalWithLeaseTransferRejectedLocked func( + lease *roachpb.Lease, reason raftutil.ReplicaNeedsSnapshotStatus) // ownsValidLease is returned by ownsValidLeaseRLocked() ownsValidLease bool @@ -71,7 +79,7 @@ var _ proposer = &testProposer{} // be used as the Raft group used by a testProposer, and it records the commands // being proposed. type testProposerRaft struct { - status raft.BasicStatus + status raft.Status // proposals are the commands that the propBuf flushed (i.e. passed to the // Raft group) and have not yet been consumed with consumeProposals(). proposals []kvserverpb.RaftCommand @@ -101,7 +109,7 @@ func (t *testProposerRaft) consumeProposals() []kvserverpb.RaftCommand { return res } -func (t testProposerRaft) BasicStatus() raft.BasicStatus { +func (t testProposerRaft) Status() raft.Status { return t.status } @@ -126,6 +134,10 @@ func (t *testProposer) destroyed() destroyStatus { return t.ds } +func (t *testProposer) firstIndex() uint64 { + return t.fi +} + func (t *testProposer) leaseAppliedIndex() uint64 { return t.lai } @@ -166,11 +178,12 @@ func (t *testProposer) ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockT } func (t *testProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderInfo { - leaderKnown := raftGroup.BasicStatus().Lead != raft.None + lead := raftGroup.Status().Lead + leaderKnown := lead != raft.None var leaderRep roachpb.ReplicaID var iAmTheLeader, leaderEligibleForLease bool if leaderKnown { - leaderRep = roachpb.ReplicaID(raftGroup.BasicStatus().Lead) + leaderRep = roachpb.ReplicaID(lead) iAmTheLeader = leaderRep == t.getReplicaID() repDesc := roachpb.ReplicaDescriptor{ ReplicaID: leaderRep, @@ -199,12 +212,24 @@ func (t *testProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderIn } func (t *testProposer) rejectProposalWithRedirectLocked( - ctx context.Context, prop *ProposalData, redirectTo roachpb.ReplicaID, + _ context.Context, _ *ProposalData, redirectTo roachpb.ReplicaID, ) { if t.onRejectProposalWithRedirectLocked == nil { panic("unexpected rejectProposalWithRedirectLocked() call") } - t.onRejectProposalWithRedirectLocked(prop, redirectTo) + t.onRejectProposalWithRedirectLocked(redirectTo) +} + +func (t *testProposer) rejectProposalWithLeaseTransferRejectedLocked( + _ context.Context, + _ *ProposalData, + lease *roachpb.Lease, + reason raftutil.ReplicaNeedsSnapshotStatus, +) { + if t.onRejectProposalWithLeaseTransferRejectedLocked == nil { + panic("unexpected rejectProposalWithLeaseTransferRejectedLocked() call") + } + t.onRejectProposalWithLeaseTransferRejectedLocked(lease, reason) } // proposalCreator holds on to a lease and creates proposals using it. @@ -219,26 +244,35 @@ func (pc proposalCreator) newPutProposal(ts hlc.Timestamp) *ProposalData { return pc.newProposal(ba) } -func (pc proposalCreator) newLeaseProposal(lease roachpb.Lease) *ProposalData { +func (pc proposalCreator) newLeaseRequestProposal(lease roachpb.Lease) *ProposalData { var ba roachpb.BatchRequest - ba.Add(&roachpb.RequestLeaseRequest{Lease: lease}) - prop := pc.newProposal(ba) - prop.command.ReplicatedEvalResult.IsLeaseRequest = true - return prop + ba.Add(&roachpb.RequestLeaseRequest{Lease: lease, PrevLease: pc.lease.Lease}) + return pc.newProposal(ba) +} + +func (pc proposalCreator) newLeaseTransferProposal(lease roachpb.Lease) *ProposalData { + var ba roachpb.BatchRequest + ba.Add(&roachpb.TransferLeaseRequest{Lease: lease, PrevLease: pc.lease.Lease}) + return pc.newProposal(ba) } func (pc proposalCreator) newProposal(ba roachpb.BatchRequest) *ProposalData { var lease *roachpb.Lease - r, ok := ba.GetArg(roachpb.RequestLease) - if ok { - lease = &r.(*roachpb.RequestLeaseRequest).Lease + var isLeaseRequest bool + switch v := ba.Requests[0].GetInner().(type) { + case *roachpb.RequestLeaseRequest: + lease = &v.Lease + isLeaseRequest = true + case *roachpb.TransferLeaseRequest: + lease = &v.Lease } p := &ProposalData{ ctx: context.Background(), idKey: kvserverbase.CmdIDKey("test-cmd"), command: &kvserverpb.RaftCommand{ ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{ - State: &kvserverpb.ReplicaState{Lease: lease}, + IsLeaseRequest: isLeaseRequest, + State: &kvserverpb.ReplicaState{Lease: lease}, }, }, Request: &ba, @@ -282,7 +316,7 @@ func TestProposalBuffer(t *testing.T) { leaseReq := i == leaseReqIdx var pd *ProposalData if leaseReq { - pd = pc.newLeaseProposal(roachpb.Lease{}) + pd = pc.newLeaseRequestProposal(roachpb.Lease{}) } else { pd = pc.newPutProposal(hlc.Timestamp{}) } @@ -528,25 +562,22 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { var rejected roachpb.ReplicaID if tc.expRejection { - p.onRejectProposalWithRedirectLocked = func(_ *ProposalData, redirectTo roachpb.ReplicaID) { + p.onRejectProposalWithRedirectLocked = func(redirectTo roachpb.ReplicaID) { if rejected != 0 { t.Fatalf("unexpected 2nd rejection") } rejected = redirectTo } } else { - p.onRejectProposalWithRedirectLocked = func(_ *ProposalData, _ roachpb.ReplicaID) { + p.onRejectProposalWithRedirectLocked = func(_ roachpb.ReplicaID) { t.Fatalf("unexpected redirection") } } - raftStatus := raft.BasicStatus{ - ID: self, - SoftState: raft.SoftState{ - RaftState: tc.state, - Lead: tc.leader, - }, - } + var raftStatus raft.Status + raftStatus.ID = self + raftStatus.RaftState = tc.state + raftStatus.Lead = tc.leader r := &testProposerRaft{ status: raftStatus, } @@ -560,7 +591,7 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { tracker := tracker.NewLockfreeTracker() b.Init(&p, tracker, clock, cluster.MakeTestingClusterSettings()) - pd := pc.newLeaseProposal(roachpb.Lease{}) + pd := pc.newLeaseRequestProposal(roachpb.Lease{}) _, tok := b.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) err := b.Insert(ctx, pd, tok.Move(ctx)) require.NoError(t, err) @@ -575,6 +606,157 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { } } +// Test that the proposal buffer rejects lease transfer proposals to replicas +// that it deems would be unsafe. +func TestProposalBufferRejectUnsafeLeaseTransfer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + proposer := uint64(1) + proposerFirstIndex := uint64(5) + target := uint64(2) + + // Each subtest will try to propose a lease transfer in a different Raft + // scenario. Some proposals should be allowed, some should be rejected. + for _, tc := range []struct { + name string + proposerState raft.StateType + // math.MaxUint64 if the target is not in the raft group. + targetState rafttracker.StateType + targetMatch uint64 + + expRejection bool + expRejectionReason raftutil.ReplicaNeedsSnapshotStatus + }{ + { + name: "follower", + proposerState: raft.StateFollower, + expRejection: true, + expRejectionReason: raftutil.LocalReplicaNotLeader, + }, + { + name: "candidate", + proposerState: raft.StateCandidate, + expRejection: true, + expRejectionReason: raftutil.LocalReplicaNotLeader, + }, + { + name: "leader, no progress for target", + proposerState: raft.StateLeader, + targetState: math.MaxUint64, + expRejection: true, + expRejectionReason: raftutil.ReplicaUnknown, + }, + { + name: "leader, target state probe", + proposerState: raft.StateLeader, + targetState: rafttracker.StateProbe, + expRejection: true, + expRejectionReason: raftutil.ReplicaStateProbe, + }, + { + name: "leader, target state snapshot", + proposerState: raft.StateLeader, + targetState: rafttracker.StateSnapshot, + expRejection: true, + expRejectionReason: raftutil.ReplicaStateSnapshot, + }, + { + name: "leader, target state replicate, match+1 < firstIndex", + proposerState: raft.StateLeader, + targetState: rafttracker.StateReplicate, + targetMatch: proposerFirstIndex - 2, + expRejection: true, + expRejectionReason: raftutil.ReplicaMatchBelowLeadersFirstIndex, + }, + { + name: "leader, target state replicate, match+1 == firstIndex", + proposerState: raft.StateLeader, + targetState: rafttracker.StateReplicate, + targetMatch: proposerFirstIndex - 1, + expRejection: false, + }, + { + name: "leader, target state replicate, match+1 > firstIndex", + proposerState: raft.StateLeader, + targetState: rafttracker.StateReplicate, + targetMatch: proposerFirstIndex, + expRejection: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + var p testProposer + var pc proposalCreator + require.Equal(t, proposer, uint64(p.getReplicaID())) + + var rejectedLease *roachpb.Lease + var rejectedReason raftutil.ReplicaNeedsSnapshotStatus + if tc.expRejection { + p.onRejectProposalWithLeaseTransferRejectedLocked = func(lease *roachpb.Lease, reason raftutil.ReplicaNeedsSnapshotStatus) { + if rejectedLease != nil { + t.Fatalf("unexpected 2nd rejection") + } + rejectedLease = lease + rejectedReason = reason + } + } else { + p.onRejectProposalWithLeaseTransferRejectedLocked = func(lease *roachpb.Lease, reason raftutil.ReplicaNeedsSnapshotStatus) { + t.Fatalf("unexpected rejection") + } + } + + var raftStatus raft.Status + raftStatus.ID = proposer + raftStatus.RaftState = tc.proposerState + if tc.proposerState == raft.StateLeader { + raftStatus.Lead = proposer + raftStatus.Progress = map[uint64]rafttracker.Progress{ + proposer: {State: rafttracker.StateReplicate, Match: proposerFirstIndex}, + } + if tc.targetState != math.MaxUint64 { + raftStatus.Progress[target] = rafttracker.Progress{ + State: tc.targetState, Match: tc.targetMatch, + } + } + } + r := &testProposerRaft{ + status: raftStatus, + } + p.raftGroup = r + p.fi = proposerFirstIndex + + var b propBuf + clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */) + tracker := tracker.NewLockfreeTracker() + b.Init(&p, tracker, clock, cluster.MakeTestingClusterSettings()) + + nextLease := roachpb.Lease{ + Start: clock.NowAsClockTimestamp(), + Sequence: pc.lease.Lease.Sequence + 1, + Replica: roachpb.ReplicaDescriptor{ + ReplicaID: roachpb.ReplicaID(target), + }, + } + pd := pc.newLeaseTransferProposal(nextLease) + + _, tok := b.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) + err := b.Insert(ctx, pd, tok.Move(ctx)) + require.NoError(t, err) + require.NoError(t, b.flushLocked(ctx)) + if tc.expRejection { + require.NotNil(t, rejectedLease) + require.Equal(t, nextLease, *rejectedLease) + require.Equal(t, tc.expRejectionReason, rejectedReason) + } else { + require.Nil(t, rejectedLease) + require.Zero(t, rejectedReason) + } + require.Zero(t, tracker.Count()) + }) + } +} + // Test that the propBuf properly assigns closed timestamps to proposals being // flushed out of it. Each subtest proposes one command and checks for the // expected closed timestamp being written to the proposal by the propBuf. @@ -622,6 +804,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { Start: hlc.ClockTimestamp{}, // Expiration is filled by each test. Expiration: nil, + Replica: roachpb.ReplicaDescriptor{ReplicaID: 1}, } const ( @@ -743,6 +926,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { lease: roachpb.Lease{ Sequence: curLease.Sequence + 1, Start: now, + Replica: curLease.Replica, }, trackerLowerBound: hlc.Timestamp{}, leaseExp: hlc.MaxTimestamp, @@ -765,6 +949,10 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { r := &testProposerRaft{} + r.status.RaftState = raft.StateLeader + r.status.Progress = map[uint64]rafttracker.Progress{ + 1: {State: rafttracker.StateReplicate}, + } p := testProposer{ clock: clock, lai: 10, @@ -786,17 +974,9 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { case regularWrite: pd = pc.newPutProposal(now.ToTimestamp()) case newLease: - pd = pc.newLeaseProposal(tc.lease) + pd = pc.newLeaseRequestProposal(tc.lease) case leaseTransfer: - var ba roachpb.BatchRequest - ba.Add(&roachpb.TransferLeaseRequest{ - Lease: roachpb.Lease{ - Start: now, - Sequence: pc.lease.Lease.Sequence + 1, - }, - PrevLease: pc.lease.Lease, - }) - pd = pc.newProposal(ba) + pd = pc.newLeaseTransferProposal(tc.lease) default: t.Fatalf("unknown req type %d", tc.reqType) } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index da8ff1ac42c9..c57dfeac6fb4 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -52,10 +52,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -890,18 +892,63 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID "another transfer to a different store is in progress") } + // Verify that the lease transfer would be safe. This check is best-effort + // in that it can race with Raft leadership changes and log truncation. See + // propBuf.maybeRejectUnsafeProposalLocked for a non-racy version of this + // check, along with a full explanation of why it is important. We include + // both because rejecting a lease transfer in the propBuf after we have + // revoked our current lease is more disruptive than doing so here, before + // we have revoked our current lease. + raftStatus := r.raftStatusRLocked() + raftFirstIndex := r.raftFirstIndexRLocked() + snapStatus := raftutil.ReplicaMayNeedSnapshot(raftStatus, raftFirstIndex, nextLeaseHolder.ReplicaID) + if snapStatus != raftutil.NoSnapshotNeeded && !r.store.TestingKnobs().AllowLeaseTransfersWhenTargetMayNeedSnapshot { + r.store.metrics.LeaseTransferErrorCount.Inc(1) + log.VEventf(ctx, 2, "not initiating lease transfer because the target %s may "+ + "need a snapshot: %s", nextLeaseHolder, snapStatus) + err := newLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(nextLeaseHolder, snapStatus) + return nil, nil, err + } + transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest( ctx, nextLeaseHolder, status, desc.StartKey.AsRawKey(), true, /* transfer */ ) return nil, transfer, nil } + // Before transferring a lease, we ensure that the lease transfer is safe. If + // the leaseholder cannot guarantee this, we reject the lease transfer. To + // make such a claim, the leaseholder needs to become the Raft leader and + // probe the lease target's log. Doing so may take time, so we use a small + // exponential backoff loop with a maximum retry count before returning the + // rejection to the client. As configured, this retry loop should back off + // for about 6 seconds before returning an error. + retryOpts := retry.Options{ + InitialBackoff: 50 * time.Millisecond, + MaxBackoff: 1 * time.Second, + Multiplier: 2, + MaxRetries: 10, + } + if count := r.store.TestingKnobs().LeaseTransferRejectedRetryLoopCount; count != 0 { + retryOpts.MaxRetries = count + } + transferRejectedRetry := retry.StartWithCtx(ctx, retryOpts) + transferRejectedRetry.Next() // The first call to Next does not block. + // Loop while there's an extension in progress. for { // See if there's an extension in progress that we have to wait for. // If there isn't, request a transfer. extension, transfer, err := initTransferHelper() if err != nil { + if IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(err) && transferRejectedRetry.Next() { + // If the lease transfer was rejected because the target may need a + // snapshot, try again. After the backoff, we may have become the Raft + // leader (through maybeTransferRaftLeadershipToLeaseholderLocked) or + // may have learned more about the state of the lease target's log. + log.VEventf(ctx, 2, "retrying lease transfer to store %d after rejection", target) + continue + } return err } if extension == nil { @@ -989,6 +1036,17 @@ func newNotLeaseHolderError( return err } +// newLeaseTransferRejectedBecauseTargetMayNeedSnapshotError return an error +// indicating that a lease transfer failed because the current leaseholder could +// not prove that the lease transfer target did not need a Raft snapshot. +func newLeaseTransferRejectedBecauseTargetMayNeedSnapshotError( + target roachpb.ReplicaDescriptor, snapStatus raftutil.ReplicaNeedsSnapshotStatus, +) error { + err := errors.Errorf("refusing to transfer lease to %d because target may need a Raft snapshot: %s", + target, snapStatus) + return errors.Mark(err, errMarkLeaseTransferRejectedBecauseTargetMayNeedSnapshot) +} + // checkRequestTimeRLocked checks that the provided request timestamp is not // too far in the future. We define "too far" as a time that would require a // lease extension even if we were perfectly proactive about extending our diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 371e6463a3ea..afc00a7e7227 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -616,6 +616,9 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) { tsc := TestStoreConfig(clock) var leaseAcquisitionTrap atomic.Value tsc.TestingKnobs.DisableAutomaticLeaseRenewal = true + // We're transferring the lease to a bogus replica, so disable protection + // which would otherwise notice this and reject the lease transfer. + tsc.TestingKnobs.AllowLeaseTransfersWhenTargetMayNeedSnapshot = true tsc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp, _ roachpb.StoreID, _ roachpb.RangeID) *roachpb.Error { val := leaseAcquisitionTrap.Load() if val == nil { diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index c18e374c819f..ff36aff7d8c2 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -340,6 +340,15 @@ type StoreTestingKnobs struct { // heartbeats and then expect other replicas to take the lease without // worrying about Raft). AllowLeaseRequestProposalsWhenNotLeader bool + // AllowLeaseTransfersWhenTargetMayNeedSnapshot, if set, makes the Replica + // and proposal buffer allow lease request proposals even when the proposer + // cannot prove that the lease transfer target does not need a Raft snapshot. + AllowLeaseTransfersWhenTargetMayNeedSnapshot bool + // LeaseTransferRejectedRetryLoopCount, if set, configures the maximum number + // of retries for the retry loop used during lease transfers. This retry loop + // retries after transfer attempts are rejected because the transfer is deemed + // to be unsafe. + LeaseTransferRejectedRetryLoopCount int // DontCloseTimestamps inhibits the propBuf's closing of timestamps. All Raft // commands will carry an empty closed timestamp. DontCloseTimestamps bool diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index 75c79650982d..0f2e40667834 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -114,7 +114,7 @@ type TestClusterInterface interface { // RebalanceVoterOrFatal rebalances a voting replica from src to dest but wil // fatal if it fails. RebalanceVoterOrFatal( - ctx context.Context, t *testing.T, startKey roachpb.Key, src, dest roachpb.ReplicationTarget, + ctx context.Context, t testing.TB, startKey roachpb.Key, src, dest roachpb.ReplicationTarget, ) *roachpb.RangeDescriptor // SwapVoterWithNonVoter atomically "swaps" the voting replica located on @@ -129,7 +129,7 @@ type TestClusterInterface interface { // SwapVoterWithNonVoterOrFatal is the same as SwapVoterWithNonVoter but will // fatal if it fails. SwapVoterWithNonVoterOrFatal( - t *testing.T, startKey roachpb.Key, voterTarget, nonVoterTarget roachpb.ReplicationTarget, + t testing.TB, startKey roachpb.Key, voterTarget, nonVoterTarget roachpb.ReplicationTarget, ) *roachpb.RangeDescriptor // FindRangeLeaseHolder returns the current lease holder for the given range. @@ -158,6 +158,12 @@ type TestClusterInterface interface { rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget, ) error + // TransferRangeLeaseOrFatal is the same as TransferRangeLease but will fatal + // if it fails. + TransferRangeLeaseOrFatal( + t testing.TB, rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget, + ) + // MoveRangeLeaseNonCooperatively performs a non-cooperative transfer of the // lease for a range from whoever has it to a particular store. That store // must already have a replica of the range. If that replica already has the diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 1cff73bc27b0..cc324a601274 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -886,7 +886,7 @@ func (tc *TestCluster) SwapVoterWithNonVoter( // SwapVoterWithNonVoterOrFatal is part of TestClusterInterface. func (tc *TestCluster) SwapVoterWithNonVoterOrFatal( - t *testing.T, startKey roachpb.Key, voterTarget, nonVoterTarget roachpb.ReplicationTarget, + t testing.TB, startKey roachpb.Key, voterTarget, nonVoterTarget roachpb.ReplicationTarget, ) *roachpb.RangeDescriptor { afterDesc, err := tc.SwapVoterWithNonVoter(startKey, voterTarget, nonVoterTarget) @@ -922,7 +922,7 @@ func (tc *TestCluster) RebalanceVoter( // RebalanceVoterOrFatal is part of TestClusterInterface. func (tc *TestCluster) RebalanceVoterOrFatal( - ctx context.Context, t *testing.T, startKey roachpb.Key, src, dest roachpb.ReplicationTarget, + ctx context.Context, t testing.TB, startKey roachpb.Key, src, dest roachpb.ReplicationTarget, ) *roachpb.RangeDescriptor { afterDesc, err := tc.RebalanceVoter(ctx, startKey, src, dest) if err != nil {