Skip to content

Commit

Permalink
kvserver: always transfer expiration-based leases
Browse files Browse the repository at this point in the history
Fixes cockroachdb#81764.

In addition to ranges that unconditionally require expiration-based
leases (node liveness and earlier), we also use them during lease
transfers for all other ranges. After acquiring such expiration-based
leases, the leaseholders are expected to soon upgrade them to the more
efficient epoch-based ones. By transferring an expiration-based lease,
we can limit the effect of an ill-advised lease transfer since the in
coming leaseholder needs to recognize itself as such within a few
seconds; if it doesn't (we accidentally sent the lease to a replica in
need of a snapshot), the lease is up for grabs. If we simply transferred
epoch based leases, it would be possible for the new leaseholder in need
of a snapshot to maintain its lease if the node it was on is able to
heartbeat its liveness record.

Release note: None.

Release justification:
  • Loading branch information
irfansharif committed Aug 19, 2022
1 parent 04c6a1a commit 463258d
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 17 deletions.
62 changes: 62 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -1298,3 +1299,64 @@ func TestAcquireLeaseTimeout(t *testing.T) {
require.Empty(t, nlhe.Lease)
}
}

func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

manualClock := hlc.NewHybridManualClock()
tci := serverutils.StartNewTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
LeaseRenewalDurationOverride: 10 * time.Millisecond, // speed up the test
},
},
},
})
tc := tci.(*testcluster.TestCluster)
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)
// Add a replica; we're going to move the lease to it below.
desc := tc.AddVotersOrFatal(t, scratchKey, tc.Target(1))

n2 := tc.Server(1)
n2Target := tc.Target(1)

// Transfer the lease from n1 to n2. Expect it to be transferred as an
// expiration based lease.
tc.TransferRangeLeaseOrFatal(t, desc, n2Target)
testutils.SucceedsSoon(t, func() error {
li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil)
require.NoError(t, err)
if !li.Current().OwnedBy(n2.GetFirstStoreID()) {
return errors.New("lease still owned by n1")
}
require.Equal(t, roachpb.LeaseExpiration, li.Current().Type())
return nil
})

// Run up the clock to force a lease renewal (and thus the change in lease
// types).
manualClock.Increment(
tc.GetFirstStoreFromServer(t, 1).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() +
time.Second.Nanoseconds(),
)

testutils.SucceedsSoon(t, func() error {
li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil)
require.NoError(t, err)
if li.Current().Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease still an expiration based lease")
}
require.Equal(t, int64(1), li.Current().Epoch)
return nil
})
}
11 changes: 7 additions & 4 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2749,10 +2749,13 @@ func TestStoreCapacityAfterSplit(t *testing.T) {

// Increment the manual clock and do a write to increase the qps above zero.
manualClock.Increment(int64(replicastats.MinStatsDuration))
pArgs := incrementArgs(key, 10)
if _, pErr := kv.SendWrapped(ctx, s.TestSender(), pArgs); pErr != nil {
t.Fatal(pErr)
}
testutils.SucceedsSoon(t, func() error {
pArgs := incrementArgs(key, 10)
if _, pErr := kv.SendWrapped(ctx, s.TestSender(), pArgs); pErr != nil {
return errors.Errorf("failed to increment data: %s", pErr)
}
return nil
})
// We want to make sure we can read the value through raft, so we know
// the stats are updated.
testutils.SucceedsSoon(t, func() error {
Expand Down
28 changes: 23 additions & 5 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
Expand Down Expand Up @@ -344,7 +346,8 @@ func (r *Replica) leasePostApplyLocked(
// lease but not the updated merge or timestamp cache state, which can result
// in serializability violations.
r.mu.state.Lease = newLease
expirationBasedLease := r.requiresExpiringLeaseRLocked()
requiresExpirationBasedLease := r.requiresExpiringLeaseRLocked()
hasExpirationBasedLease := newLease.Type() == roachpb.LeaseExpiration

// Gossip the first range whenever its lease is acquired. We check to make
// sure the lease is active so that a trailing replica won't process an old
Expand All @@ -354,17 +357,32 @@ func (r *Replica) leasePostApplyLocked(
r.gossipFirstRangeLocked(ctx)
}

// Whenever we first acquire an expiration-based lease, notify the lease
// renewer worker that we want it to keep proactively renewing the lease
// before it expires.
if leaseChangingHands && iAmTheLeaseHolder && expirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) {
// Whenever we first acquire an expiration-based lease for a range that
// requires it, notify the lease renewer worker that we want it to keep
// proactively renewing the lease before it expires.
if leaseChangingHands && iAmTheLeaseHolder && requiresExpirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) {
r.store.renewableLeases.Store(int64(r.RangeID), unsafe.Pointer(r))
select {
case r.store.renewableLeasesSignal <- struct{}{}:
default:
}
}

if hasExpirationBasedLease && !requiresExpirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) {
if buildutil.CrdbTestBuild && !iAmTheLeaseHolder {
log.Fatalf(ctx, "replica owns valid lease but is not a leaseholder")
}
// We received an expiration lease for a range that doesn't require it,
// i.e. comes after the liveness keyspan. We've also applied it before
// it has expired. Upgrade this lease to the more efficient epoch-based
// one.
if log.V(1) {
log.VEventf(ctx, 1, "upgrading expiration lease %s to an epoch-based one", newLease)
}
st := r.leaseStatusForRequestRLocked(ctx, now, hlc.Timestamp{})
r.maybeExtendLeaseAsyncLocked(ctx, st)
}

// If we're the current raft leader, may want to transfer the leadership to
// the new leaseholder. Note that this condition is also checked periodically
// when ticking the replica.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1878,8 +1878,8 @@ func shouldCampaignOnWake(
if raftStatus.Lead == raft.None {
return true
}
// Avoid a circular dependency on liveness and skip the is leader alive check for
// expiration based leases.
// Avoid a circular dependency on liveness and skip the is leader alive
// check for ranges that always use expiration based leases.
if requiresExpiringLease {
return false
}
Expand Down
32 changes: 26 additions & 6 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,20 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
ProposedTS: &status.Now,
}

if p.repl.requiresExpiringLeaseRLocked() {
if p.repl.requiresExpiringLeaseRLocked() || transfer {
// In addition to ranges that unconditionally require expiration-based
// leases (node liveness and earlier), we also use them during lease
// transfers for all other ranges. After acquiring these expiration
// based leases, the leaseholders are expected to upgrade them to the
// more efficient epoch-based ones. But by transferring an
// expiration-based lease, we can limit the effect of an ill-advised
// lease transfer since the incoming leaseholder needs to recognize
// itself as such within a few seconds; if it doesn't (we accidentally
// sent the lease to a replica in need of a snapshot or far behind on
// its log), the lease is up for grabs. If we simply transferred epoch
// based leases, it's possible for the new leaseholder that's delayed
// in applying the lease transfer to maintain its lease (assuming the
// node it's on is able to heartbeat its liveness record).
reqLease.Expiration = &hlc.Timestamp{}
*reqLease.Expiration = status.Now.ToTimestamp().Add(int64(p.repl.store.cfg.RangeLeaseActiveDuration()), 0)
} else {
Expand Down Expand Up @@ -760,10 +773,12 @@ func (r *Replica) ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockTimest
return st.IsValid() && st.OwnedBy(r.store.StoreID())
}

// requiresExpiringLeaseRLocked returns whether this range uses an
// expiration-based lease; false if epoch-based. Ranges located before or
// including the node liveness table must use expiration leases to avoid
// circular dependencies on the node liveness table.
// requiresExpiringLeaseRLocked returns whether this range unconditionally uses
// an expiration-based lease. Ranges located before or including the node
// liveness table must always use expiration leases to avoid circular
// dependencies on the node liveness table. All other ranges typically use
// epoch-based leases, but may temporarily use expiration based leases during
// lease transfers.
func (r *Replica) requiresExpiringLeaseRLocked() bool {
return r.store.cfg.NodeLiveness == nil ||
r.mu.state.Desc.StartKey.Less(roachpb.RKey(keys.NodeLivenessKeyMax))
Expand Down Expand Up @@ -1431,7 +1446,7 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
// returns true if this range uses expiration-based leases, the lease is
// in need of renewal, and there's not already an extension pending.
func (r *Replica) shouldExtendLeaseRLocked(st kvserverpb.LeaseStatus) bool {
if !r.requiresExpiringLeaseRLocked() {
if st.Lease.Type() != roachpb.LeaseExpiration {
return false
}
if _, ok := r.mu.pendingLeaseRequest.RequestPending(); ok {
Expand All @@ -1447,6 +1462,11 @@ func (r *Replica) shouldExtendLeaseRLocked(st kvserverpb.LeaseStatus) bool {
func (r *Replica) maybeExtendLeaseAsync(ctx context.Context, st kvserverpb.LeaseStatus) {
r.mu.Lock()
defer r.mu.Unlock()

r.maybeExtendLeaseAsyncLocked(ctx, st)
}

func (r *Replica) maybeExtendLeaseAsyncLocked(ctx context.Context, st kvserverpb.LeaseStatus) {
// Check shouldExtendLeaseRLocked again, because others may have raced to
// extend the lease and beaten us here after we made the determination
// (under a shared lock) that the extension was needed.
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,22 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) {
return nil
})

// Run up the clock to upgrade the expiration based lease to an epoch based
// one. This test wants to later expire the epoch based lease by pausing
// liveness heartbeats.
manualClock.Increment(
tc.GetFirstStoreFromServer(t, 1).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() +
time.Second.Nanoseconds(),
)
testutils.SucceedsSoon(t, func() error {
repl := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(scratchKey))
leaseStatus := repl.CurrentLeaseStatus(ctx)
if leaseStatus.Lease.Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease still an expiration based lease")
}
return nil
})

// Expire the lease. Given that the Raft leadership is on n2, only n2 will be
// eligible to acquire a new lease.
log.Infof(ctx, "test expiring lease")
Expand Down

0 comments on commit 463258d

Please sign in to comment.