Skip to content

Commit

Permalink
kvserver: always transfer expiration-based leases
Browse files Browse the repository at this point in the history
Fixes cockroachdb#81764.

In addition to ranges that unconditionally require expiration-based
leases (node liveness and earlier), we also use them during lease
transfers for all other ranges. After acquiring such expiration-based
leases, the leaseholders are expected to soon upgrade them to the more
efficient epoch-based ones. By transferring an expiration-based lease,
we can limit the effect of an ill-advised lease transfer since the in
coming leaseholder needs to recognize itself as such within a few
seconds; if it doesn't (we accidentally sent the lease to a replica in
need of a snapshot), the lease is up for grabs. If we simply transferred
epoch based leases, it would be possible for the new leaseholder in need
of a snapshot to maintain its lease if the node it was on is able to
heartbeat its liveness record.

Release note: None.
  • Loading branch information
irfansharif committed Aug 6, 2022
1 parent d7cf6d2 commit b47f808
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 20 deletions.
57 changes: 57 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -1178,3 +1179,59 @@ func TestAlterRangeRelocate(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tc.WaitForVoters(rhsDesc.StartKey.AsRawKey(), tc.Targets(0, 3, 4)...))
}

func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

manualClock := hlc.NewHybridManualClock()
tci := serverutils.StartNewTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
},
},
})
tc := tci.(*testcluster.TestCluster)
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)
// Add a replica; we're going to move the lease to it below.
desc := tc.AddVotersOrFatal(t, scratchKey, tc.Target(1))

n2 := tc.Server(1)
n2Target := tc.Target(1)

// Transfer the lease from n1 to n2. Expect it to be transferred as an
// expiration based lease.
tc.TransferRangeLeaseOrFatal(t, desc, n2Target)
testutils.SucceedsSoon(t, func() error {
li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil)
require.NoError(t, err)
if !li.Current().OwnedBy(n2.GetFirstStoreID()) {
return errors.New("lease still owned by n1")
}
require.Equal(t, roachpb.LeaseExpiration, li.Current().Type())
return nil
})

manualClock.Increment(
tc.GetFirstStoreFromServer(t, 1).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() +
time.Second.Nanoseconds(),
)

testutils.SucceedsSoon(t, func() error {
li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil)
require.NoError(t, err)
if li.Current().Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease still an expiration based lease")
}
require.Equal(t, int64(1), li.Current().Epoch)
return nil
})
}
11 changes: 7 additions & 4 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2748,10 +2748,13 @@ func TestStoreCapacityAfterSplit(t *testing.T) {

// Increment the manual clock and do a write to increase the qps above zero.
manualClock.Increment(int64(replicastats.MinStatsDuration))
pArgs := incrementArgs(key, 10)
if _, pErr := kv.SendWrapped(ctx, s.TestSender(), pArgs); pErr != nil {
t.Fatal(pErr)
}
testutils.SucceedsSoon(t, func() error {
pArgs := incrementArgs(key, 10)
if _, pErr := kv.SendWrapped(ctx, s.TestSender(), pArgs); pErr != nil {
return errors.Errorf("failed to increment data: %s", pErr)
}
return nil
})
// We want to make sure we can read the value through raft, so we know
// the stats are updated.
testutils.SucceedsSoon(t, func() error {
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (r *Replica) leasePostApplyLocked(
// lease but not the updated merge or timestamp cache state, which can result
// in serializability violations.
r.mu.state.Lease = newLease
expirationBasedLease := r.requiresExpiringLeaseRLocked()
requiresExpirationBasedLease := r.requiresExpiringLeaseRLocked()

// Gossip the first range whenever its lease is acquired. We check to make
// sure the lease is active so that a trailing replica won't process an old
Expand All @@ -358,10 +358,10 @@ func (r *Replica) leasePostApplyLocked(
r.gossipFirstRangeLocked(ctx)
}

// Whenever we first acquire an expiration-based lease, notify the lease
// renewer worker that we want it to keep proactively renewing the lease
// before it expires.
if leaseChangingHands && iAmTheLeaseHolder && expirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) {
// Whenever we first acquire an expiration-based lease for ranges that
// require it, notify the lease renewer worker that we want it to keep
// proactively renewing the lease before it expires.
if leaseChangingHands && iAmTheLeaseHolder && requiresExpirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) {
r.store.renewableLeases.Store(int64(r.RangeID), unsafe.Pointer(r))
select {
case r.store.renewableLeasesSignal <- struct{}{}:
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1922,8 +1922,8 @@ func shouldCampaignOnWake(
if raftStatus.Lead == raft.None {
return true
}
// Avoid a circular dependency on liveness and skip the is leader alive check for
// expiration based leases.
// Avoid a circular dependency on liveness and skip the is leader alive
// check for ranges that always use expiration based leases.
if requiresExpiringLease {
return false
}
Expand Down
34 changes: 25 additions & 9 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,19 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
ProposedTS: &status.Now,
}

if p.repl.requiresExpiringLeaseRLocked() {
if p.repl.requiresExpiringLeaseRLocked() || transfer {
// In addition to ranges that unconditionally require expiration-based
// leases (node liveness and earlier), we also use them during lease
// transfers for all other ranges. After acquiring these expiration
// based leases, the leaseholders are expected to upgrade them to the
// more efficient epoch-based ones. But by transferring an
// expiration-based lease, we can limit the effect of an ill-advised
// lease transfer since the in coming leaseholder needs to recognize
// itself as such within a few seconds; if it doesn't (we accidentally
// sent the lease to a replica in need of a snapshot), the lease is up
// for grabs. If we simply transferred epoch based leases, it's possible
// for the new leaseholder in need of a snapshot to maintain its lease
// if the node it's on is able to heartbeat its liveness record.
reqLease.Expiration = &hlc.Timestamp{}
*reqLease.Expiration = status.Now.ToTimestamp().Add(int64(p.repl.store.cfg.RangeLeaseActiveDuration()), 0)
} else {
Expand Down Expand Up @@ -772,10 +784,12 @@ func (r *Replica) ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockTimest
return st.IsValid() && st.OwnedBy(r.store.StoreID())
}

// requiresExpiringLeaseRLocked returns whether this range uses an
// expiration-based lease; false if epoch-based. Ranges located before or
// including the node liveness table must use expiration leases to avoid
// circular dependencies on the node liveness table.
// requiresExpiringLeaseRLocked returns whether this range unconditionally uses
// an expiration-based lease. Ranges located before or including the node
// liveness table must always use expiration leases to avoid circular
// dependencies on the node liveness table. All other ranges typically use
// epoch-based leases, but may temporarily use expiration based leases during
// lease transfers.
func (r *Replica) requiresExpiringLeaseRLocked() bool {
return r.store.cfg.NodeLiveness == nil ||
r.mu.state.Desc.StartKey.Less(roachpb.RKey(keys.NodeLivenessKeyMax))
Expand Down Expand Up @@ -1062,9 +1076,11 @@ func newLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(
// This serves as a stricter version of a check that if we were to perform
// a lease extension at now, the request would be contained within the new
// lease's expiration (and stasis period).
func (r *Replica) checkRequestTimeRLocked(now hlc.ClockTimestamp, reqTS hlc.Timestamp) error {
func (r *Replica) checkRequestTimeRLocked(
ctx context.Context, now hlc.ClockTimestamp, reqTS hlc.Timestamp,
) error {
var leaseRenewal time.Duration
if r.requiresExpiringLeaseRLocked() {
if r.leaseStatusAtRLocked(ctx, now).Lease.Type() == roachpb.LeaseExpiration {
_, leaseRenewal = r.store.cfg.RangeLeaseDurations()
} else {
_, leaseRenewal = r.store.cfg.NodeLivenessDurations()
Expand Down Expand Up @@ -1124,7 +1140,7 @@ func (r *Replica) leaseGoodToGoRLocked(
func (r *Replica) leaseGoodToGoForStatusRLocked(
ctx context.Context, now hlc.ClockTimestamp, reqTS hlc.Timestamp, st kvserverpb.LeaseStatus,
) (shouldExtend bool, _ error) {
if err := r.checkRequestTimeRLocked(now, reqTS); err != nil {
if err := r.checkRequestTimeRLocked(ctx, now, reqTS); err != nil {
// Case (1): invalid request.
return false, err
}
Expand Down Expand Up @@ -1453,7 +1469,7 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout(
// returns true if this range uses expiration-based leases, the lease is
// in need of renewal, and there's not already an extension pending.
func (r *Replica) shouldExtendLeaseRLocked(st kvserverpb.LeaseStatus) bool {
if !r.requiresExpiringLeaseRLocked() {
if st.Lease.Type() != roachpb.LeaseExpiration {
return false
}
if _, ok := r.mu.pendingLeaseRequest.RequestPending(); ok {
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,21 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) {
return nil
})

// Run up the clock to upgrade the expiration based lease to an epoch based
// one.
manualClock.Increment(
tc.GetFirstStoreFromServer(t, 1).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() +
time.Second.Nanoseconds(),
)
testutils.SucceedsSoon(t, func() error {
repl := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(scratchKey))
leaseStatus := repl.CurrentLeaseStatus(ctx)
if leaseStatus.Lease.Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease still an expiration based lease")
}
return nil
})

// Expire the lease. Given that the Raft leadership is on n2, only n2 will be
// eligible to acquire a new lease.
log.Infof(ctx, "test expiring lease")
Expand Down

0 comments on commit b47f808

Please sign in to comment.