diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index f0053a21d3d7..5dabf0f03395 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -1298,3 +1299,64 @@ func TestAcquireLeaseTimeout(t *testing.T) { require.Empty(t, nlhe.Lease) } } + +func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + manualClock := hlc.NewHybridManualClock() + tci := serverutils.StartNewTestCluster(t, 2, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + WallClock: manualClock, + }, + Store: &kvserver.StoreTestingKnobs{ + LeaseRenewalDurationOverride: 10 * time.Millisecond, // speed up the test + }, + }, + }, + }) + tc := tci.(*testcluster.TestCluster) + defer tc.Stopper().Stop(ctx) + + scratchKey := tc.ScratchRange(t) + // Add a replica; we're going to move the lease to it below. + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Target(1)) + + n2 := tc.Server(1) + n2Target := tc.Target(1) + + // Transfer the lease from n1 to n2. Expect it to be transferred as an + // expiration based lease. + tc.TransferRangeLeaseOrFatal(t, desc, n2Target) + testutils.SucceedsSoon(t, func() error { + li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil) + require.NoError(t, err) + if !li.Current().OwnedBy(n2.GetFirstStoreID()) { + return errors.New("lease still owned by n1") + } + require.Equal(t, roachpb.LeaseExpiration, li.Current().Type()) + return nil + }) + + // Run up the clock to force a lease renewal (and thus the change in lease + // types). + manualClock.Increment( + tc.GetFirstStoreFromServer(t, 1).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() + + time.Second.Nanoseconds(), + ) + + testutils.SucceedsSoon(t, func() error { + li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil) + require.NoError(t, err) + if li.Current().Type() != roachpb.LeaseEpoch { + return errors.Errorf("lease still an expiration based lease") + } + require.Equal(t, int64(1), li.Current().Epoch) + return nil + }) +} diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 3c11c033b0c4..0569232976b6 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -2749,10 +2749,13 @@ func TestStoreCapacityAfterSplit(t *testing.T) { // Increment the manual clock and do a write to increase the qps above zero. manualClock.Increment(int64(replicastats.MinStatsDuration)) - pArgs := incrementArgs(key, 10) - if _, pErr := kv.SendWrapped(ctx, s.TestSender(), pArgs); pErr != nil { - t.Fatal(pErr) - } + testutils.SucceedsSoon(t, func() error { + pArgs := incrementArgs(key, 10) + if _, pErr := kv.SendWrapped(ctx, s.TestSender(), pArgs); pErr != nil { + return errors.Errorf("failed to increment data: %s", pErr) + } + return nil + }) // We want to make sure we can read the value through raft, so we know // the stats are updated. testutils.SucceedsSoon(t, func() error { diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index a97e4f96cbee..7ab9b1f74ffc 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -29,6 +29,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/quotapool" @@ -344,7 +346,8 @@ func (r *Replica) leasePostApplyLocked( // lease but not the updated merge or timestamp cache state, which can result // in serializability violations. r.mu.state.Lease = newLease - expirationBasedLease := r.requiresExpiringLeaseRLocked() + requiresExpirationBasedLease := r.requiresExpiringLeaseRLocked() + hasExpirationBasedLease := newLease.Type() == roachpb.LeaseExpiration // Gossip the first range whenever its lease is acquired. We check to make // sure the lease is active so that a trailing replica won't process an old @@ -354,10 +357,10 @@ func (r *Replica) leasePostApplyLocked( r.gossipFirstRangeLocked(ctx) } - // Whenever we first acquire an expiration-based lease, notify the lease - // renewer worker that we want it to keep proactively renewing the lease - // before it expires. - if leaseChangingHands && iAmTheLeaseHolder && expirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) { + // Whenever we first acquire an expiration-based lease for a range that + // requires it, notify the lease renewer worker that we want it to keep + // proactively renewing the lease before it expires. + if leaseChangingHands && iAmTheLeaseHolder && requiresExpirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) { r.store.renewableLeases.Store(int64(r.RangeID), unsafe.Pointer(r)) select { case r.store.renewableLeasesSignal <- struct{}{}: @@ -365,6 +368,21 @@ func (r *Replica) leasePostApplyLocked( } } + if hasExpirationBasedLease && !requiresExpirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) { + if buildutil.CrdbTestBuild && !iAmTheLeaseHolder { + log.Fatalf(ctx, "replica owns valid lease but is not a leaseholder") + } + // We received an expiration lease for a range that doesn't require it, + // i.e. comes after the liveness keyspan. We've also applied it before + // it has expired. Upgrade this lease to the more efficient epoch-based + // one. + if log.V(1) { + log.VEventf(ctx, 1, "upgrading expiration lease %s to an epoch-based one", newLease) + } + st := r.leaseStatusForRequestRLocked(ctx, now, hlc.Timestamp{}) + r.maybeExtendLeaseAsyncLocked(ctx, st) + } + // If we're the current raft leader, may want to transfer the leadership to // the new leaseholder. Note that this condition is also checked periodically // when ticking the replica. diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index fc14c073a265..2d49ae205d21 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1878,8 +1878,8 @@ func shouldCampaignOnWake( if raftStatus.Lead == raft.None { return true } - // Avoid a circular dependency on liveness and skip the is leader alive check for - // expiration based leases. + // Avoid a circular dependency on liveness and skip the is leader alive + // check for ranges that always use expiration based leases. if requiresExpiringLease { return false } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 6fef0e76b1b0..6f9b16bc0e4c 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -247,7 +247,20 @@ func (p *pendingLeaseRequest) InitOrJoinRequest( ProposedTS: &status.Now, } - if p.repl.requiresExpiringLeaseRLocked() { + if p.repl.requiresExpiringLeaseRLocked() || transfer { + // In addition to ranges that unconditionally require expiration-based + // leases (node liveness and earlier), we also use them during lease + // transfers for all other ranges. After acquiring these expiration + // based leases, the leaseholders are expected to upgrade them to the + // more efficient epoch-based ones. But by transferring an + // expiration-based lease, we can limit the effect of an ill-advised + // lease transfer since the incoming leaseholder needs to recognize + // itself as such within a few seconds; if it doesn't (we accidentally + // sent the lease to a replica in need of a snapshot or far behind on + // its log), the lease is up for grabs. If we simply transferred epoch + // based leases, it's possible for the new leaseholder that's delayed + // in applying the lease transfer to maintain its lease (assuming the + // node it's on is able to heartbeat its liveness record). reqLease.Expiration = &hlc.Timestamp{} *reqLease.Expiration = status.Now.ToTimestamp().Add(int64(p.repl.store.cfg.RangeLeaseActiveDuration()), 0) } else { @@ -760,10 +773,12 @@ func (r *Replica) ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockTimest return st.IsValid() && st.OwnedBy(r.store.StoreID()) } -// requiresExpiringLeaseRLocked returns whether this range uses an -// expiration-based lease; false if epoch-based. Ranges located before or -// including the node liveness table must use expiration leases to avoid -// circular dependencies on the node liveness table. +// requiresExpiringLeaseRLocked returns whether this range unconditionally uses +// an expiration-based lease. Ranges located before or including the node +// liveness table must always use expiration leases to avoid circular +// dependencies on the node liveness table. All other ranges typically use +// epoch-based leases, but may temporarily use expiration based leases during +// lease transfers. func (r *Replica) requiresExpiringLeaseRLocked() bool { return r.store.cfg.NodeLiveness == nil || r.mu.state.Desc.StartKey.Less(roachpb.RKey(keys.NodeLivenessKeyMax)) @@ -1431,7 +1446,7 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( // returns true if this range uses expiration-based leases, the lease is // in need of renewal, and there's not already an extension pending. func (r *Replica) shouldExtendLeaseRLocked(st kvserverpb.LeaseStatus) bool { - if !r.requiresExpiringLeaseRLocked() { + if st.Lease.Type() != roachpb.LeaseExpiration { return false } if _, ok := r.mu.pendingLeaseRequest.RequestPending(); ok { @@ -1447,6 +1462,11 @@ func (r *Replica) shouldExtendLeaseRLocked(st kvserverpb.LeaseStatus) bool { func (r *Replica) maybeExtendLeaseAsync(ctx context.Context, st kvserverpb.LeaseStatus) { r.mu.Lock() defer r.mu.Unlock() + + r.maybeExtendLeaseAsyncLocked(ctx, st) +} + +func (r *Replica) maybeExtendLeaseAsyncLocked(ctx context.Context, st kvserverpb.LeaseStatus) { // Check shouldExtendLeaseRLocked again, because others may have raced to // extend the lease and beaten us here after we made the determination // (under a shared lock) that the extension was needed. diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 5c9832cbc36c..92d96d36fd09 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -1285,6 +1285,22 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) { return nil }) + // Run up the clock to upgrade the expiration based lease to an epoch based + // one. This test wants to later expire the epoch based lease by pausing + // liveness heartbeats. + manualClock.Increment( + tc.GetFirstStoreFromServer(t, 1).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() + + time.Second.Nanoseconds(), + ) + testutils.SucceedsSoon(t, func() error { + repl := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(scratchKey)) + leaseStatus := repl.CurrentLeaseStatus(ctx) + if leaseStatus.Lease.Type() != roachpb.LeaseEpoch { + return errors.Errorf("lease still an expiration based lease") + } + return nil + }) + // Expire the lease. Given that the Raft leadership is on n2, only n2 will be // eligible to acquire a new lease. log.Infof(ctx, "test expiring lease")