diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index e026941900d0..6ab024da131d 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -349,6 +349,7 @@ go_test( "//pkg/kv/kvserver/protectedts/ptstorage", "//pkg/kv/kvserver/protectedts/ptutil", "//pkg/kv/kvserver/raftentry", + "//pkg/kv/kvserver/raftutil", "//pkg/kv/kvserver/rditer", "//pkg/kv/kvserver/readsummary/rspb", "//pkg/kv/kvserver/replicastats", diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index 842706216090..7732211baaad 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -43,6 +43,7 @@ func relocateAndCheck( voterTargets []roachpb.ReplicationTarget, nonVoterTargets []roachpb.ReplicationTarget, ) (retries int) { + t.Helper() every := log.Every(1 * time.Second) testutils.SucceedsSoon(t, func() error { err := tc.Servers[0].DB(). diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 060946253e66..22507b8ee39b 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -1724,23 +1724,15 @@ func TestRangeLocalUncertaintyLimitAfterNewLease(t *testing.T) { // Up-replicate the data in the range to node2. tc.AddVotersOrFatal(t, keyA, tc.Target(1)) - replica1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(keyA)) - - // Transfer the lease from node1 to node2. replica2 := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(keyA)) - replica2Desc, err := replica2.GetReplicaDescriptor() - if err != nil { - t.Fatal(err) - } + // Transfer the lease from node1 to node2. node1Before := tc.Servers[0].Clock().Now() + tc.TransferRangeLeaseOrFatal(t, *replica2.Desc(), tc.Target(1)) testutils.SucceedsSoon(t, func() error { - if err := replica1.AdminTransferLease(ctx, replica2Desc.StoreID); err != nil { - t.Fatal(err) - } lease, _ := replica2.GetLease() if lease.Replica.NodeID != replica2.NodeID() { - return errors.Errorf("expected lease transfer to node2: %s", lease) + return errors.Errorf("expected lease transfer to apply on node2: %s", lease) } return nil }) @@ -2152,20 +2144,15 @@ func TestLeaseInfoRequest(t *testing.T) { // Transfer the lease to Servers[0] so we start in a known state. Otherwise, // there might be already a lease owned by a random node. - err = tc.TransferRangeLease(rangeDesc, tc.Target(0)) - if err != nil { - t.Fatal(err) - } + tc.TransferRangeLeaseOrFatal(t, rangeDesc, tc.Target(0)) // Now test the LeaseInfo. We might need to loop until the node we query has applied the lease. validateLeaseholderSoon(t, kvDB0, rangeDesc.StartKey.AsRawKey(), replicas[0], true) // Transfer the lease to Server 1 and check that LeaseInfoRequest gets the // right answer. - err = tc.TransferRangeLease(rangeDesc, tc.Target(1)) - if err != nil { - t.Fatal(err) - } + tc.TransferRangeLeaseOrFatal(t, rangeDesc, tc.Target(1)) + // An inconsistent LeaseInfoReqeust on the old lease holder should give us the // right answer immediately, since the old holder has definitely applied the // transfer before TransferRangeLease returned. @@ -2185,10 +2172,7 @@ func TestLeaseInfoRequest(t *testing.T) { // Transfer the lease to Server 2 and check that LeaseInfoRequest gets the // right answer. - err = tc.TransferRangeLease(rangeDesc, tc.Target(2)) - if err != nil { - t.Fatal(err) - } + tc.TransferRangeLeaseOrFatal(t, rangeDesc, tc.Target(2)) // We're now going to ask servers[1] for the lease info. We don't use kvDB1; // instead we go directly to the store because otherwise the DistSender might @@ -2594,6 +2578,15 @@ func TestClearRange(t *testing.T) { // possible and hard to prevent entirely. The Replica will only learn that it is // the new leaseholder when it applies the snapshot. When doing so, it should // make sure to apply the lease-related side-effects to its in-memory state. +// +// EDIT: as of June 2022, we have protection that should make this scenario +// significantly more rare. This test uses a knob to disable the new protection +// so that it can create the scenario where a replica learns that it holds the +// lease through a snapshot. We'll want to keep the test and the corresponding +// logic in applySnapshot around until we can eliminate the scenario entirely. +// See the commentary in github.com/cockroachdb/cockroach/issues/81561 about +// sending Raft logs in Raft snapshots for a discussion about why this may not +// be worth eliminating. func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2605,6 +2598,9 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + AllowLeaseTransfersWhenTargetMayNeedSnapshot: true, + }, Server: &server.TestingKnobs{ WallClock: manualClock, }, @@ -2661,12 +2657,15 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { // Partition node 2 from the rest of its range. Once partitioned, perform // another write and truncate the Raft log on the two connected nodes. This - // ensures that that when node 2 comes back up it will require a snapshot - // from Raft. + // ensures that when node 2 comes back up it will require a snapshot from + // Raft. funcs := noopRaftHandlerFuncs() funcs.dropReq = func(*kvserverpb.RaftMessageRequest) bool { return true } + funcs.snapErr = func(*kvserverpb.SnapshotRequest_Header) error { + return errors.New("rejected") + } tc.Servers[2].RaftTransport().Listen(store2.StoreID(), &unreliableRaftHandler{ rangeID: repl0.GetRangeID(), RaftMessageHandler: store2, @@ -2691,9 +2690,7 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { // Finally, transfer the lease to node 2 while it is still unavailable and // behind. We try to avoid this case when picking new leaseholders in practice, // but we're never 100% successful. - if err := repl0.AdminTransferLease(ctx, store2.Ident.StoreID); err != nil { - t.Fatal(err) - } + tc.TransferRangeLeaseOrFatal(t, *repl0.Desc(), tc.Target(2)) // Remove the partition. A snapshot to node 2 should follow. This snapshot // will inform node 2 that it is the new leaseholder for the range. Node 2 @@ -2718,6 +2715,166 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { }) } +// TestLeaseTransferRejectedIfTargetNeedsSnapshot prevents a regression of +// #81561. It verifies that a replica will reject a lease transfer request if it +// can not guarantee that the lease target is sufficiently caught up on its Raft +// log such that it does not need a Raft snapshot. The test does so by inducing +// a partition, truncating the Raft log, and then trying to transfer the lease to +// the replica that is now cut off from the log. The lease transfer request must +// be rejected. +// +// The test has two variants. The !rejectAfterRevoke variant exercises the +// common but best-effort protection against unsafe lease transfers in +// Replica.AdminTransferLease. The rejectAfterRevoke variant exercises the +// uncommon but airtight protection against unsafe lease transfers in +// propBuf.maybeRejectUnsafeProposalLocked. +func TestLeaseTransferRejectedIfTargetNeedsSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testutils.RunTrueAndFalse(t, "reject-after-revoke", func(t *testing.T, rejectAfterRevoke bool) { + var transferLeaseReqBlockOnce sync.Once + transferLeaseReqBlockedC := make(chan struct{}) + transferLeaseReqUnblockedC := make(chan struct{}) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if rejectAfterRevoke && ba.IsSingleTransferLeaseRequest() { + transferLeaseReqBlockOnce.Do(func() { + close(transferLeaseReqBlockedC) + <-transferLeaseReqUnblockedC + }) + } + return nil + }, + // Speed up the lease transfer retry loop. + LeaseTransferRejectedRetryLoopCount: 2, + }, + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + store0 := tc.GetFirstStoreFromServer(t, 0) + store2 := tc.GetFirstStoreFromServer(t, 2) + + keyA := tc.ScratchRange(t) + keyB := keyA.Next() + keyC := keyB.Next() + + // First, do a couple of writes; we'll use these to determine when + // the dust has settled. + incA := incrementArgs(keyA, 1) + _, pErr := kv.SendWrapped(ctx, store0.TestSender(), incA) + require.Nil(t, pErr) + incC := incrementArgs(keyC, 2) + _, pErr = kv.SendWrapped(ctx, store0.TestSender(), incC) + require.Nil(t, pErr) + + tc.AddVotersOrFatal(t, keyA, tc.Targets(1, 2)...) + tc.WaitForValues(t, keyA, []int64{1, 1, 1}) + tc.WaitForValues(t, keyC, []int64{2, 2, 2}) + + repl0 := store0.LookupReplica(roachpb.RKey(keyA)) + + // Grab the current lease. We'll use it later. + preLease, _, err := tc.FindRangeLease(*repl0.Desc(), nil) + require.NoError(t, err) + require.Equal(t, store0.StoreID(), preLease.Replica.StoreID) + + // Partition node 2 from the rest of its range. Once partitioned, perform + // another write and truncate the Raft log on the two connected nodes. This + // ensures that when node 2 comes back up it will require a snapshot from + // Raft. + funcs := noopRaftHandlerFuncs() + funcs.dropReq = func(*kvserverpb.RaftMessageRequest) bool { + return true + } + funcs.snapErr = func(*kvserverpb.SnapshotRequest_Header) error { + return errors.New("rejected") + } + tc.Servers[2].RaftTransport().Listen(store2.StoreID(), &unreliableRaftHandler{ + rangeID: repl0.GetRangeID(), + RaftMessageHandler: store2, + unreliableRaftHandlerFuncs: funcs, + }) + + _, pErr = kv.SendWrapped(ctx, store0.TestSender(), incC) + require.Nil(t, pErr) + tc.WaitForValues(t, keyC, []int64{4, 4, 2}) + + // If we want the lease transfer rejection to come after the leaseholder has + // revoked its lease, we launch the lease transfer before the log truncation + // and block it after it has passed through the best-effort protection in + // Replica.AdminTransferLease. + transferErrC := make(chan error, 1) + if rejectAfterRevoke { + _ = tc.Stopper().RunAsyncTask(ctx, "transfer lease", func(ctx context.Context) { + transferErrC <- tc.TransferRangeLease(*repl0.Desc(), tc.Target(2)) + }) + <-transferLeaseReqBlockedC + } + + // Truncate the log at index+1 (log entries < N are removed, so this + // includes the increment). This necessitates a snapshot when the + // partitioned replica rejoins the rest of the range. + index := repl0.GetLastIndex() + truncArgs := truncateLogArgs(index+1, repl0.GetRangeID()) + truncArgs.Key = keyA + _, pErr = kv.SendWrapped(ctx, store0.TestSender(), truncArgs) + require.Nil(t, pErr) + + // Complete or initiate the lease transfer attempt to node 2, which must not + // succeed because node 2 now needs a snapshot. + var transferErr error + if rejectAfterRevoke { + close(transferLeaseReqUnblockedC) + transferErr = <-transferErrC + } else { + transferErr = tc.TransferRangeLease(*repl0.Desc(), tc.Target(2)) + } + isRejectedErr := kvserver.IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(transferErr) + require.True(t, isRejectedErr, "%+v", transferErr) + + // Remove the partition. A snapshot to node 2 should follow. + tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, store2) + tc.WaitForValues(t, keyC, []int64{4, 4, 4}) + + // Now that node 2 caught up on the log through a snapshot, we should be + // able to transfer the lease to it successfully. + // NOTE: we used a testing knob to disable automatic lease transfer retries, + // so we use a SucceedsSoon loop. + testutils.SucceedsSoon(t, func() error { + if err := tc.TransferRangeLease(*repl0.Desc(), tc.Target(2)); err != nil { + if kvserver.IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(err) { + return err + } + t.Fatal(err) + } + return nil + }) + + // Verify that the lease is now held by node 2. + postLease, _, err := tc.FindRangeLease(*repl0.Desc(), nil) + require.NoError(t, err) + require.Equal(t, store2.StoreID(), postLease.Replica.StoreID) + + // Additionally, verify that the lease has the expected sequence number. If + // the lease transfer rejection came after the previous lease was revoked, + // then node 0 must have re-acquired the lease (with a new sequence number) + // in order to transfer it to node 2. + expSeq := preLease.Sequence + 1 + if rejectAfterRevoke { + expSeq++ + } + require.Equal(t, expSeq, postLease.Sequence) + }) +} + // TestConcurrentAdminChangeReplicasRequests ensures that when two attempts to // change replicas for a range race, only one will succeed. func TestConcurrentAdminChangeReplicasRequests(t *testing.T) { diff --git a/pkg/kv/kvserver/markers.go b/pkg/kv/kvserver/markers.go index 9a4aca0fdde6..6d3d16a148c7 100644 --- a/pkg/kv/kvserver/markers.go +++ b/pkg/kv/kvserver/markers.go @@ -82,8 +82,21 @@ func IsIllegalReplicationChangeError(err error) bool { var errMarkReplicationChangeInProgress = errors.New("replication change in progress") // IsReplicationChangeInProgressError detects whether an error (assumed to have -// been emitted a replication change) indicates that the replication change +// been emitted by a replication change) indicates that the replication change // failed because another replication change was in progress on the range. func IsReplicationChangeInProgressError(err error) bool { return errors.Is(err, errMarkReplicationChangeInProgress) } + +var errMarkLeaseTransferRejectedBecauseTargetMayNeedSnapshot = errors.New( + "lease transfer rejected because the target may need a snapshot") + +// IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError detects whether an +// error (assumed to have been emitted by a lease transfer request) indicates +// that the lease transfer failed because the current leaseholder could not +// prove that the lease transfer target did not need a Raft snapshot. In order +// to prove this, the current leaseholder must also be the Raft leader, which is +// periodically requested in maybeTransferRaftLeadershipToLeaseholderLocked. +func IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(err error) bool { + return errors.Is(err, errMarkLeaseTransferRejectedBecauseTargetMayNeedSnapshot) +} diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index ab713c5354eb..b1c17227483e 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -104,6 +104,7 @@ func newUnloadedReplica( r.mu.checksums = map[uuid.UUID]replicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings()) r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader + r.mu.proposalBuf.testing.allowLeaseTransfersWhenTargetMayNeedSnapshot = store.cfg.TestingKnobs.AllowLeaseTransfersWhenTargetMayNeedSnapshot r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps r.mu.proposalBuf.testing.submitProposalFilter = store.cfg.TestingKnobs.TestingProposalSubmitFilter diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index b5b0a0800b20..f1f35bb5877e 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/errorutil" @@ -110,6 +111,10 @@ type propBuf struct { // heartbeats and then expect other replicas to take the lease without // worrying about Raft). allowLeaseProposalWhenNotLeader bool + // allowLeaseTransfersWhenTargetMayNeedSnapshot, if set, makes the proposal + // buffer allow lease request proposals even when the proposer cannot prove + // that the lease transfer target does not need a Raft snapshot. + allowLeaseTransfersWhenTargetMayNeedSnapshot bool // dontCloseTimestamps inhibits the closing of timestamps. dontCloseTimestamps bool } @@ -137,6 +142,7 @@ type proposer interface { // The following require the proposer to hold (at least) a shared lock. getReplicaID() roachpb.ReplicaID destroyed() destroyStatus + firstIndex() uint64 leaseAppliedIndex() uint64 enqueueUpdateCheck() closedTimestampTarget() hlc.Timestamp @@ -158,6 +164,18 @@ type proposer interface { prop *ProposalData, redirectTo roachpb.ReplicaID, ) + // rejectProposalWithLeaseTransferRejectedLocked rejects a proposal for a + // lease transfer when the transfer is deemed to be unsafe. The intended + // consequence of the rejection is that the lease transfer attempt will be + // rejected. Higher levels that decide whether or not to attempt a lease + // transfer have weaker versions of the same check, so we don't expect to see + // repeated lease transfer rejections. + rejectProposalWithLeaseTransferRejectedLocked( + ctx context.Context, + prop *ProposalData, + lease *roachpb.Lease, + reason raftutil.ReplicaNeedsSnapshotStatus, + ) // leaseDebugRLocked returns info on the current lease. leaseDebugRLocked() string @@ -167,7 +185,7 @@ type proposer interface { // testing. type proposerRaft interface { Step(raftpb.Message) error - BasicStatus() raft.BasicStatus + Status() raft.Status ProposeConfChange(raftpb.ConfChangeI) error } @@ -536,6 +554,11 @@ func (b *propBuf) FlushLockedWithRaftGroup( // // Currently, the request types which may be rejected by this function are: // - RequestLease when the proposer is not the raft leader (with caveats). +// - TransferLease when the proposer cannot guarantee that the lease transfer +// target does not currently need a Raft snapshot to catch up on its Raft log. +// In such cases, the proposer cannot guarantee that the lease transfer target +// will not need a Raft snapshot to catch up to and apply the lease transfer. +// This requires that the proposer is the raft leader. // // The function returns true if the proposal was rejected, and false if not. // If the proposal was rejected and true is returned, it will have been cleaned @@ -602,6 +625,79 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked( } return false + case p.Request.IsSingleTransferLeaseRequest(): + // When performing a lease transfer, the outgoing leaseholder revokes its + // lease before proposing the lease transfer request, meaning that it + // promises to stop using the previous lease to serve reads or writes. The + // lease transfer request is then proposed and committed to the Raft log, at + // which point the new lease officially becomes active. However, this new + // lease is not usable until the incoming leaseholder applies the Raft entry + // that contains the lease transfer and notices that it is now the + // leaseholder for the range. + // + // The effect of this handoff is that there exists a "power vacuum" time + // period when the outgoing leaseholder has revoked its previous lease but + // the incoming leaseholder has not yet applied its new lease. During this + // time period, a range is effectively unavailable for strong reads and + // writes, because no replica will act as the leaseholder. Instead, requests + // that require the lease will be redirected back and forth between the + // outgoing leaseholder and the incoming leaseholder (the client backs off). + // To minimize the disruption caused by lease transfers, we need to minimize + // this time period. + // + // We assume that if a lease transfer target is sufficiently caught up on + // its log such that it will be able to apply the lease transfer through log + // entry application then this unavailability window will be acceptable. + // This may be a faulty assumption in cases with severe replication lag, but + // we must balance any heuristics here that attempts to determine "too much + // lag" with the possibility of starvation of lease transfers under + // sustained write load and a resulting sustained replication lag. See + // #38065 and #42379, which removed such a heuristic. For now, we don't try + // to make such a determination. + // + // However, we draw a distinction between lease transfer targets that will + // be able to apply the lease transfer through log entry application and + // those that will require a Raft snapshot to catch up and apply the lease + // transfer. Raft snapshots are more expensive than Raft entry replication. + // They are also significantly more likely to be delayed due to queueing + // behind other snapshot traffic in the system. This potential for delay + // makes transferring a lease to a replica that needs a snapshot very risky, + // as doing so has the effect of inducing range unavailability until the + // snapshot completes, which could take seconds, minutes, or hours. + // + // In the future, we will likely get better at prioritizing snapshots to + // improve the responsiveness of snapshots that are needed to recover + // availability. However, even in this world, it is not worth inducing + // unavailability that can only be recovered through a Raft snapshot. It is + // better to catch the desired lease target up on the log first and then + // initiate the lease transfer once its log is connected to the leader's. + // + // For this reason, unless we can guarantee that the lease transfer target + // does not need a Raft snapshot, we don't let it through. This same check + // lives at higher levels in the stack as well (i.e. in the allocator). The + // higher level checks avoid wasted work and respond more gracefully to + // invalid targets (e.g. they pick the next best target). However, this is + // the only place where the protection is airtight against race conditions + // because the check is performed: + // 1. by the current Raft leader, else the proposal will fail + // 2. while holding latches that prevent interleaving log truncation + // + // If an error is thrown here, the outgoing leaseholder still won't be able + // to use its revoked lease. However, it will be able to immediately request + // a new lease. This may be disruptive, which is why we try to avoid hitting + // this airtight protection as much as possible by detecting the failure + // scenario before revoking the outgoing lease. + status := raftGroup.Status() + firstIndex := b.p.firstIndex() + newLease := p.command.ReplicatedEvalResult.State.Lease + newLeaseTarget := newLease.Replica.ReplicaID + snapStatus := raftutil.ReplicaMayNeedSnapshot(&status, firstIndex, newLeaseTarget) + if snapStatus != raftutil.NoSnapshotNeeded && !b.testing.allowLeaseTransfersWhenTargetMayNeedSnapshot { + b.p.rejectProposalWithLeaseTransferRejectedLocked(ctx, p, newLease, snapStatus) + return true + } + return false + default: return false } @@ -619,7 +715,7 @@ func (b *propBuf) leaderStatusRLocked(ctx context.Context, raftGroup proposerRaf !leaderInfo.iAmTheLeader { log.Fatalf(ctx, "inconsistent Raft state: state %s while the current replica is also the lead: %d", - raftGroup.BasicStatus().RaftState, leaderInfo.leader) + raftGroup.Status().RaftState, leaderInfo.leader) } return leaderInfo } @@ -1045,6 +1141,10 @@ func (rp *replicaProposer) destroyed() destroyStatus { return rp.mu.destroyStatus } +func (rp *replicaProposer) firstIndex() uint64 { + return (*Replica)(rp).raftFirstIndexRLocked() +} + func (rp *replicaProposer) leaseAppliedIndex() uint64 { return rp.mu.state.LeaseAppliedIndex } @@ -1084,7 +1184,7 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { func (rp *replicaProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderInfo { r := (*Replica)(rp) - status := raftGroup.BasicStatus() + status := raftGroup.Status() iAmTheLeader := status.RaftState == raft.StateLeader leader := status.Lead leaderKnown := leader != raft.None @@ -1130,9 +1230,26 @@ func (rp *replicaProposer) rejectProposalWithRedirectLocked( Replica: redirectRep, } log.VEventf(ctx, 2, "redirecting proposal to node %s; request: %s", redirectRep.NodeID, prop.Request) - r.cleanupFailedProposalLocked(prop) - prop.finishApplication(ctx, proposalResult{ - Err: roachpb.NewError(newNotLeaseHolderError( - speculativeLease, storeID, rangeDesc, "refusing to acquire lease on follower")), - }) + rp.rejectProposalWithErrLocked(ctx, prop, roachpb.NewError(newNotLeaseHolderError( + speculativeLease, storeID, rangeDesc, "refusing to acquire lease on follower"))) +} + +func (rp *replicaProposer) rejectProposalWithLeaseTransferRejectedLocked( + ctx context.Context, + prop *ProposalData, + lease *roachpb.Lease, + reason raftutil.ReplicaNeedsSnapshotStatus, +) { + rp.store.metrics.LeaseTransferErrorCount.Inc(1) + log.VEventf(ctx, 2, "not proposing lease transfer because the target %s may "+ + "need a snapshot: %s", lease.Replica, reason) + err := newLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(lease.Replica, reason) + rp.rejectProposalWithErrLocked(ctx, prop, roachpb.NewError(err)) +} + +func (rp *replicaProposer) rejectProposalWithErrLocked( + ctx context.Context, prop *ProposalData, pErr *roachpb.Error, +) { + (*Replica)(rp).cleanupFailedProposalLocked(prop) + prop.finishApplication(ctx, proposalResult{Err: pErr}) } diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 8e1256528333..08b7efe6879d 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -12,6 +12,7 @@ package kvserver import ( "context" + "math" "math/rand" "sync" "testing" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -33,6 +35,7 @@ import ( "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" + rafttracker "go.etcd.io/etcd/raft/v3/tracker" "golang.org/x/sync/errgroup" ) @@ -41,6 +44,7 @@ type testProposer struct { syncutil.RWMutex clock *hlc.Clock ds destroyStatus + fi uint64 lai uint64 enqueued int registered int @@ -48,9 +52,13 @@ type testProposer struct { // If not nil, this can be a testProposerRaft used to mock the raft group // passed to FlushLockedWithRaftGroup(). raftGroup proposerRaft - // If not nil, this is called by RejectProposalWithRedirectLocked(). If nil, - // RejectProposalWithRedirectLocked() panics. - onRejectProposalWithRedirectLocked func(prop *ProposalData, redirectTo roachpb.ReplicaID) + // If not nil, this is called by rejectProposalWithRedirectLocked(). + // If nil, rejectProposalWithRedirectLocked() panics. + onRejectProposalWithRedirectLocked func(redirectTo roachpb.ReplicaID) + // If not nil, this is called by rejectProposalWithLeaseTransferRejectedLocked(). + // If nil, rejectProposalWithLeaseTransferRejectedLocked() panics. + onRejectProposalWithLeaseTransferRejectedLocked func( + lease *roachpb.Lease, reason raftutil.ReplicaNeedsSnapshotStatus) // ownsValidLease is returned by ownsValidLeaseRLocked() ownsValidLease bool @@ -71,7 +79,7 @@ var _ proposer = &testProposer{} // be used as the Raft group used by a testProposer, and it records the commands // being proposed. type testProposerRaft struct { - status raft.BasicStatus + status raft.Status // proposals are the commands that the propBuf flushed (i.e. passed to the // Raft group) and have not yet been consumed with consumeProposals(). proposals []kvserverpb.RaftCommand @@ -101,7 +109,7 @@ func (t *testProposerRaft) consumeProposals() []kvserverpb.RaftCommand { return res } -func (t testProposerRaft) BasicStatus() raft.BasicStatus { +func (t testProposerRaft) Status() raft.Status { return t.status } @@ -126,6 +134,10 @@ func (t *testProposer) destroyed() destroyStatus { return t.ds } +func (t *testProposer) firstIndex() uint64 { + return t.fi +} + func (t *testProposer) leaseAppliedIndex() uint64 { return t.lai } @@ -166,11 +178,12 @@ func (t *testProposer) ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockT } func (t *testProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderInfo { - leaderKnown := raftGroup.BasicStatus().Lead != raft.None + lead := raftGroup.Status().Lead + leaderKnown := lead != raft.None var leaderRep roachpb.ReplicaID var iAmTheLeader, leaderEligibleForLease bool if leaderKnown { - leaderRep = roachpb.ReplicaID(raftGroup.BasicStatus().Lead) + leaderRep = roachpb.ReplicaID(lead) iAmTheLeader = leaderRep == t.getReplicaID() repDesc := roachpb.ReplicaDescriptor{ ReplicaID: leaderRep, @@ -199,12 +212,24 @@ func (t *testProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderIn } func (t *testProposer) rejectProposalWithRedirectLocked( - ctx context.Context, prop *ProposalData, redirectTo roachpb.ReplicaID, + _ context.Context, _ *ProposalData, redirectTo roachpb.ReplicaID, ) { if t.onRejectProposalWithRedirectLocked == nil { panic("unexpected rejectProposalWithRedirectLocked() call") } - t.onRejectProposalWithRedirectLocked(prop, redirectTo) + t.onRejectProposalWithRedirectLocked(redirectTo) +} + +func (t *testProposer) rejectProposalWithLeaseTransferRejectedLocked( + _ context.Context, + _ *ProposalData, + lease *roachpb.Lease, + reason raftutil.ReplicaNeedsSnapshotStatus, +) { + if t.onRejectProposalWithLeaseTransferRejectedLocked == nil { + panic("unexpected rejectProposalWithLeaseTransferRejectedLocked() call") + } + t.onRejectProposalWithLeaseTransferRejectedLocked(lease, reason) } // proposalCreator holds on to a lease and creates proposals using it. @@ -219,26 +244,35 @@ func (pc proposalCreator) newPutProposal(ts hlc.Timestamp) *ProposalData { return pc.newProposal(ba) } -func (pc proposalCreator) newLeaseProposal(lease roachpb.Lease) *ProposalData { +func (pc proposalCreator) newLeaseRequestProposal(lease roachpb.Lease) *ProposalData { var ba roachpb.BatchRequest - ba.Add(&roachpb.RequestLeaseRequest{Lease: lease}) - prop := pc.newProposal(ba) - prop.command.ReplicatedEvalResult.IsLeaseRequest = true - return prop + ba.Add(&roachpb.RequestLeaseRequest{Lease: lease, PrevLease: pc.lease.Lease}) + return pc.newProposal(ba) +} + +func (pc proposalCreator) newLeaseTransferProposal(lease roachpb.Lease) *ProposalData { + var ba roachpb.BatchRequest + ba.Add(&roachpb.TransferLeaseRequest{Lease: lease, PrevLease: pc.lease.Lease}) + return pc.newProposal(ba) } func (pc proposalCreator) newProposal(ba roachpb.BatchRequest) *ProposalData { var lease *roachpb.Lease - r, ok := ba.GetArg(roachpb.RequestLease) - if ok { - lease = &r.(*roachpb.RequestLeaseRequest).Lease + var isLeaseRequest bool + switch v := ba.Requests[0].GetInner().(type) { + case *roachpb.RequestLeaseRequest: + lease = &v.Lease + isLeaseRequest = true + case *roachpb.TransferLeaseRequest: + lease = &v.Lease } p := &ProposalData{ ctx: context.Background(), idKey: kvserverbase.CmdIDKey("test-cmd"), command: &kvserverpb.RaftCommand{ ReplicatedEvalResult: kvserverpb.ReplicatedEvalResult{ - State: &kvserverpb.ReplicaState{Lease: lease}, + IsLeaseRequest: isLeaseRequest, + State: &kvserverpb.ReplicaState{Lease: lease}, }, }, Request: &ba, @@ -282,7 +316,7 @@ func TestProposalBuffer(t *testing.T) { leaseReq := i == leaseReqIdx var pd *ProposalData if leaseReq { - pd = pc.newLeaseProposal(roachpb.Lease{}) + pd = pc.newLeaseRequestProposal(roachpb.Lease{}) } else { pd = pc.newPutProposal(hlc.Timestamp{}) } @@ -528,25 +562,22 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { var rejected roachpb.ReplicaID if tc.expRejection { - p.onRejectProposalWithRedirectLocked = func(_ *ProposalData, redirectTo roachpb.ReplicaID) { + p.onRejectProposalWithRedirectLocked = func(redirectTo roachpb.ReplicaID) { if rejected != 0 { t.Fatalf("unexpected 2nd rejection") } rejected = redirectTo } } else { - p.onRejectProposalWithRedirectLocked = func(_ *ProposalData, _ roachpb.ReplicaID) { + p.onRejectProposalWithRedirectLocked = func(_ roachpb.ReplicaID) { t.Fatalf("unexpected redirection") } } - raftStatus := raft.BasicStatus{ - ID: self, - SoftState: raft.SoftState{ - RaftState: tc.state, - Lead: tc.leader, - }, - } + var raftStatus raft.Status + raftStatus.ID = self + raftStatus.RaftState = tc.state + raftStatus.Lead = tc.leader r := &testProposerRaft{ status: raftStatus, } @@ -560,7 +591,7 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { tracker := tracker.NewLockfreeTracker() b.Init(&p, tracker, clock, cluster.MakeTestingClusterSettings()) - pd := pc.newLeaseProposal(roachpb.Lease{}) + pd := pc.newLeaseRequestProposal(roachpb.Lease{}) _, tok := b.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) err := b.Insert(ctx, pd, tok.Move(ctx)) require.NoError(t, err) @@ -575,6 +606,157 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { } } +// Test that the proposal buffer rejects lease transfer proposals to replicas +// that it deems would be unsafe. +func TestProposalBufferRejectUnsafeLeaseTransfer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + proposer := uint64(1) + proposerFirstIndex := uint64(5) + target := uint64(2) + + // Each subtest will try to propose a lease transfer in a different Raft + // scenario. Some proposals should be allowed, some should be rejected. + for _, tc := range []struct { + name string + proposerState raft.StateType + // math.MaxUint64 if the target is not in the raft group. + targetState rafttracker.StateType + targetMatch uint64 + + expRejection bool + expRejectionReason raftutil.ReplicaNeedsSnapshotStatus + }{ + { + name: "follower", + proposerState: raft.StateFollower, + expRejection: true, + expRejectionReason: raftutil.LocalReplicaNotLeader, + }, + { + name: "candidate", + proposerState: raft.StateCandidate, + expRejection: true, + expRejectionReason: raftutil.LocalReplicaNotLeader, + }, + { + name: "leader, no progress for target", + proposerState: raft.StateLeader, + targetState: math.MaxUint64, + expRejection: true, + expRejectionReason: raftutil.ReplicaUnknown, + }, + { + name: "leader, target state probe", + proposerState: raft.StateLeader, + targetState: rafttracker.StateProbe, + expRejection: true, + expRejectionReason: raftutil.ReplicaStateProbe, + }, + { + name: "leader, target state snapshot", + proposerState: raft.StateLeader, + targetState: rafttracker.StateSnapshot, + expRejection: true, + expRejectionReason: raftutil.ReplicaStateSnapshot, + }, + { + name: "leader, target state replicate, match+1 < firstIndex", + proposerState: raft.StateLeader, + targetState: rafttracker.StateReplicate, + targetMatch: proposerFirstIndex - 2, + expRejection: true, + expRejectionReason: raftutil.ReplicaMatchBelowLeadersFirstIndex, + }, + { + name: "leader, target state replicate, match+1 == firstIndex", + proposerState: raft.StateLeader, + targetState: rafttracker.StateReplicate, + targetMatch: proposerFirstIndex - 1, + expRejection: false, + }, + { + name: "leader, target state replicate, match+1 > firstIndex", + proposerState: raft.StateLeader, + targetState: rafttracker.StateReplicate, + targetMatch: proposerFirstIndex, + expRejection: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + var p testProposer + var pc proposalCreator + require.Equal(t, proposer, uint64(p.getReplicaID())) + + var rejectedLease *roachpb.Lease + var rejectedReason raftutil.ReplicaNeedsSnapshotStatus + if tc.expRejection { + p.onRejectProposalWithLeaseTransferRejectedLocked = func(lease *roachpb.Lease, reason raftutil.ReplicaNeedsSnapshotStatus) { + if rejectedLease != nil { + t.Fatalf("unexpected 2nd rejection") + } + rejectedLease = lease + rejectedReason = reason + } + } else { + p.onRejectProposalWithLeaseTransferRejectedLocked = func(lease *roachpb.Lease, reason raftutil.ReplicaNeedsSnapshotStatus) { + t.Fatalf("unexpected rejection") + } + } + + var raftStatus raft.Status + raftStatus.ID = proposer + raftStatus.RaftState = tc.proposerState + if tc.proposerState == raft.StateLeader { + raftStatus.Lead = proposer + raftStatus.Progress = map[uint64]rafttracker.Progress{ + proposer: {State: rafttracker.StateReplicate, Match: proposerFirstIndex}, + } + if tc.targetState != math.MaxUint64 { + raftStatus.Progress[target] = rafttracker.Progress{ + State: tc.targetState, Match: tc.targetMatch, + } + } + } + r := &testProposerRaft{ + status: raftStatus, + } + p.raftGroup = r + p.fi = proposerFirstIndex + + var b propBuf + clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */) + tracker := tracker.NewLockfreeTracker() + b.Init(&p, tracker, clock, cluster.MakeTestingClusterSettings()) + + nextLease := roachpb.Lease{ + Start: clock.NowAsClockTimestamp(), + Sequence: pc.lease.Lease.Sequence + 1, + Replica: roachpb.ReplicaDescriptor{ + ReplicaID: roachpb.ReplicaID(target), + }, + } + pd := pc.newLeaseTransferProposal(nextLease) + + _, tok := b.TrackEvaluatingRequest(ctx, hlc.MinTimestamp) + err := b.Insert(ctx, pd, tok.Move(ctx)) + require.NoError(t, err) + require.NoError(t, b.flushLocked(ctx)) + if tc.expRejection { + require.NotNil(t, rejectedLease) + require.Equal(t, nextLease, *rejectedLease) + require.Equal(t, tc.expRejectionReason, rejectedReason) + } else { + require.Nil(t, rejectedLease) + require.Zero(t, rejectedReason) + } + require.Zero(t, tracker.Count()) + }) + } +} + // Test that the propBuf properly assigns closed timestamps to proposals being // flushed out of it. Each subtest proposes one command and checks for the // expected closed timestamp being written to the proposal by the propBuf. @@ -622,6 +804,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { Start: hlc.ClockTimestamp{}, // Expiration is filled by each test. Expiration: nil, + Replica: roachpb.ReplicaDescriptor{ReplicaID: 1}, } const ( @@ -743,6 +926,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { lease: roachpb.Lease{ Sequence: curLease.Sequence + 1, Start: now, + Replica: curLease.Replica, }, trackerLowerBound: hlc.Timestamp{}, leaseExp: hlc.MaxTimestamp, @@ -765,6 +949,10 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { r := &testProposerRaft{} + r.status.RaftState = raft.StateLeader + r.status.Progress = map[uint64]rafttracker.Progress{ + 1: {State: rafttracker.StateReplicate}, + } p := testProposer{ clock: clock, lai: 10, @@ -786,17 +974,9 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { case regularWrite: pd = pc.newPutProposal(now.ToTimestamp()) case newLease: - pd = pc.newLeaseProposal(tc.lease) + pd = pc.newLeaseRequestProposal(tc.lease) case leaseTransfer: - var ba roachpb.BatchRequest - ba.Add(&roachpb.TransferLeaseRequest{ - Lease: roachpb.Lease{ - Start: now, - Sequence: pc.lease.Lease.Sequence + 1, - }, - PrevLease: pc.lease.Lease, - }) - pd = pc.newProposal(ba) + pd = pc.newLeaseTransferProposal(tc.lease) default: t.Fatalf("unknown req type %d", tc.reqType) } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index da8ff1ac42c9..c57dfeac6fb4 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -52,10 +52,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -890,18 +892,63 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID "another transfer to a different store is in progress") } + // Verify that the lease transfer would be safe. This check is best-effort + // in that it can race with Raft leadership changes and log truncation. See + // propBuf.maybeRejectUnsafeProposalLocked for a non-racy version of this + // check, along with a full explanation of why it is important. We include + // both because rejecting a lease transfer in the propBuf after we have + // revoked our current lease is more disruptive than doing so here, before + // we have revoked our current lease. + raftStatus := r.raftStatusRLocked() + raftFirstIndex := r.raftFirstIndexRLocked() + snapStatus := raftutil.ReplicaMayNeedSnapshot(raftStatus, raftFirstIndex, nextLeaseHolder.ReplicaID) + if snapStatus != raftutil.NoSnapshotNeeded && !r.store.TestingKnobs().AllowLeaseTransfersWhenTargetMayNeedSnapshot { + r.store.metrics.LeaseTransferErrorCount.Inc(1) + log.VEventf(ctx, 2, "not initiating lease transfer because the target %s may "+ + "need a snapshot: %s", nextLeaseHolder, snapStatus) + err := newLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(nextLeaseHolder, snapStatus) + return nil, nil, err + } + transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest( ctx, nextLeaseHolder, status, desc.StartKey.AsRawKey(), true, /* transfer */ ) return nil, transfer, nil } + // Before transferring a lease, we ensure that the lease transfer is safe. If + // the leaseholder cannot guarantee this, we reject the lease transfer. To + // make such a claim, the leaseholder needs to become the Raft leader and + // probe the lease target's log. Doing so may take time, so we use a small + // exponential backoff loop with a maximum retry count before returning the + // rejection to the client. As configured, this retry loop should back off + // for about 6 seconds before returning an error. + retryOpts := retry.Options{ + InitialBackoff: 50 * time.Millisecond, + MaxBackoff: 1 * time.Second, + Multiplier: 2, + MaxRetries: 10, + } + if count := r.store.TestingKnobs().LeaseTransferRejectedRetryLoopCount; count != 0 { + retryOpts.MaxRetries = count + } + transferRejectedRetry := retry.StartWithCtx(ctx, retryOpts) + transferRejectedRetry.Next() // The first call to Next does not block. + // Loop while there's an extension in progress. for { // See if there's an extension in progress that we have to wait for. // If there isn't, request a transfer. extension, transfer, err := initTransferHelper() if err != nil { + if IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(err) && transferRejectedRetry.Next() { + // If the lease transfer was rejected because the target may need a + // snapshot, try again. After the backoff, we may have become the Raft + // leader (through maybeTransferRaftLeadershipToLeaseholderLocked) or + // may have learned more about the state of the lease target's log. + log.VEventf(ctx, 2, "retrying lease transfer to store %d after rejection", target) + continue + } return err } if extension == nil { @@ -989,6 +1036,17 @@ func newNotLeaseHolderError( return err } +// newLeaseTransferRejectedBecauseTargetMayNeedSnapshotError return an error +// indicating that a lease transfer failed because the current leaseholder could +// not prove that the lease transfer target did not need a Raft snapshot. +func newLeaseTransferRejectedBecauseTargetMayNeedSnapshotError( + target roachpb.ReplicaDescriptor, snapStatus raftutil.ReplicaNeedsSnapshotStatus, +) error { + err := errors.Errorf("refusing to transfer lease to %d because target may need a Raft snapshot: %s", + target, snapStatus) + return errors.Mark(err, errMarkLeaseTransferRejectedBecauseTargetMayNeedSnapshot) +} + // checkRequestTimeRLocked checks that the provided request timestamp is not // too far in the future. We define "too far" as a time that would require a // lease extension even if we were perfectly proactive about extending our diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 371e6463a3ea..afc00a7e7227 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -616,6 +616,9 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) { tsc := TestStoreConfig(clock) var leaseAcquisitionTrap atomic.Value tsc.TestingKnobs.DisableAutomaticLeaseRenewal = true + // We're transferring the lease to a bogus replica, so disable protection + // which would otherwise notice this and reject the lease transfer. + tsc.TestingKnobs.AllowLeaseTransfersWhenTargetMayNeedSnapshot = true tsc.TestingKnobs.LeaseRequestEvent = func(ts hlc.Timestamp, _ roachpb.StoreID, _ roachpb.RangeID) *roachpb.Error { val := leaseAcquisitionTrap.Load() if val == nil { diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index c18e374c819f..ff36aff7d8c2 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -340,6 +340,15 @@ type StoreTestingKnobs struct { // heartbeats and then expect other replicas to take the lease without // worrying about Raft). AllowLeaseRequestProposalsWhenNotLeader bool + // AllowLeaseTransfersWhenTargetMayNeedSnapshot, if set, makes the Replica + // and proposal buffer allow lease request proposals even when the proposer + // cannot prove that the lease transfer target does not need a Raft snapshot. + AllowLeaseTransfersWhenTargetMayNeedSnapshot bool + // LeaseTransferRejectedRetryLoopCount, if set, configures the maximum number + // of retries for the retry loop used during lease transfers. This retry loop + // retries after transfer attempts are rejected because the transfer is deemed + // to be unsafe. + LeaseTransferRejectedRetryLoopCount int // DontCloseTimestamps inhibits the propBuf's closing of timestamps. All Raft // commands will carry an empty closed timestamp. DontCloseTimestamps bool diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index 2842a031a800..991eca0d3ceb 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -114,7 +114,7 @@ type TestClusterInterface interface { // RebalanceVoterOrFatal rebalances a voting replica from src to dest but wil // fatal if it fails. RebalanceVoterOrFatal( - ctx context.Context, t *testing.T, startKey roachpb.Key, src, dest roachpb.ReplicationTarget, + ctx context.Context, t testing.TB, startKey roachpb.Key, src, dest roachpb.ReplicationTarget, ) *roachpb.RangeDescriptor // SwapVoterWithNonVoter atomically "swaps" the voting replica located on @@ -129,7 +129,7 @@ type TestClusterInterface interface { // SwapVoterWithNonVoterOrFatal is the same as SwapVoterWithNonVoter but will // fatal if it fails. SwapVoterWithNonVoterOrFatal( - t *testing.T, startKey roachpb.Key, voterTarget, nonVoterTarget roachpb.ReplicationTarget, + t testing.TB, startKey roachpb.Key, voterTarget, nonVoterTarget roachpb.ReplicationTarget, ) *roachpb.RangeDescriptor // FindRangeLeaseHolder returns the current lease holder for the given range. @@ -158,6 +158,12 @@ type TestClusterInterface interface { rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget, ) error + // TransferRangeLeaseOrFatal is the same as TransferRangeLease but will fatal + // if it fails. + TransferRangeLeaseOrFatal( + t testing.TB, rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget, + ) + // MoveRangeLeaseNonCooperatively performs a non-cooperative transfer of the // lease for a range from whoever has it to a particular store. That store // must already have a replica of the range. If that replica already has the diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index cb7a2367aa34..73d5fa8628b6 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -975,7 +975,7 @@ func (tc *TestCluster) SwapVoterWithNonVoter( // SwapVoterWithNonVoterOrFatal is part of TestClusterInterface. func (tc *TestCluster) SwapVoterWithNonVoterOrFatal( - t *testing.T, startKey roachpb.Key, voterTarget, nonVoterTarget roachpb.ReplicationTarget, + t testing.TB, startKey roachpb.Key, voterTarget, nonVoterTarget roachpb.ReplicationTarget, ) *roachpb.RangeDescriptor { afterDesc, err := tc.SwapVoterWithNonVoter(startKey, voterTarget, nonVoterTarget) @@ -1011,7 +1011,7 @@ func (tc *TestCluster) RebalanceVoter( // RebalanceVoterOrFatal is part of TestClusterInterface. func (tc *TestCluster) RebalanceVoterOrFatal( - ctx context.Context, t *testing.T, startKey roachpb.Key, src, dest roachpb.ReplicationTarget, + ctx context.Context, t testing.TB, startKey roachpb.Key, src, dest roachpb.ReplicationTarget, ) *roachpb.RangeDescriptor { afterDesc, err := tc.RebalanceVoter(ctx, startKey, src, dest) if err != nil {