Skip to content

Commit

Permalink
kvserver: always transfer expiration-based leases
Browse files Browse the repository at this point in the history
Fixes cockroachdb#81764.

In addition to ranges that unconditionally require expiration-based
leases (node liveness and earlier), we also use them during lease
transfers for all other ranges. After acquiring such expiration-based
leases, the leaseholders are expected to soon upgrade them to the more
efficient epoch-based ones. By transferring an expiration-based lease,
we can limit the effect of an ill-advised lease transfer since the in
coming leaseholder needs to recognize itself as such within a few
seconds; if it doesn't (we accidentally sent the lease to a replica in
need of a snapshot), the lease is up for grabs. If we simply transferred
epoch based leases, it would be possible for the new leaseholder in need
of a snapshot to maintain its lease if the node it was on is able to
heartbeat its liveness record.

Release note: None.
  • Loading branch information
irfansharif committed Aug 10, 2022
1 parent d7cf6d2 commit b1a0828
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 15 deletions.
82 changes: 82 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -1178,3 +1180,83 @@ func TestAlterRangeRelocate(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tc.WaitForVoters(rhsDesc.StartKey.AsRawKey(), tc.Targets(0, 3, 4)...))
}

func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

manualClock := hlc.NewHybridManualClock()
mu := struct {
syncutil.Mutex
untrackedRangeID roachpb.RangeID
}{}
tci := serverutils.StartNewTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
LeaseRenewalDurationOverride: 10 * time.Millisecond, // speed up the test
LeaseRenewalUntrackCallback: func(id roachpb.RangeID) {
mu.Lock()
defer mu.Unlock()
mu.untrackedRangeID = id
},
},
},
},
})
tc := tci.(*testcluster.TestCluster)
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)
// Add a replica; we're going to move the lease to it below.
desc := tc.AddVotersOrFatal(t, scratchKey, tc.Target(1))

n2 := tc.Server(1)
n2Target := tc.Target(1)

// Transfer the lease from n1 to n2. Expect it to be transferred as an
// expiration based lease.
tc.TransferRangeLeaseOrFatal(t, desc, n2Target)
testutils.SucceedsSoon(t, func() error {
li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil)
require.NoError(t, err)
if !li.Current().OwnedBy(n2.GetFirstStoreID()) {
return errors.New("lease still owned by n1")
}
require.Equal(t, roachpb.LeaseExpiration, li.Current().Type())
return nil
})

// Run up the clock to force a lease renewal (and thus the change in lease
// types).
manualClock.Increment(
tc.GetFirstStoreFromServer(t, 1).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() +
time.Second.Nanoseconds(),
)

testutils.SucceedsSoon(t, func() error {
li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil)
require.NoError(t, err)
if li.Current().Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease still an expiration based lease")
}
require.Equal(t, int64(1), li.Current().Epoch)
return nil
})

testutils.SucceedsSoon(t, func() error {
mu.Lock()
defer mu.Unlock()

if mu.untrackedRangeID != desc.RangeID {
return errors.Errorf("range %s still tracked in set of renewable leases", desc.RangeID)
}
return nil
})
}
11 changes: 7 additions & 4 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2748,10 +2748,13 @@ func TestStoreCapacityAfterSplit(t *testing.T) {

// Increment the manual clock and do a write to increase the qps above zero.
manualClock.Increment(int64(replicastats.MinStatsDuration))
pArgs := incrementArgs(key, 10)
if _, pErr := kv.SendWrapped(ctx, s.TestSender(), pArgs); pErr != nil {
t.Fatal(pErr)
}
testutils.SucceedsSoon(t, func() error {
pArgs := incrementArgs(key, 10)
if _, pErr := kv.SendWrapped(ctx, s.TestSender(), pArgs); pErr != nil {
return errors.Errorf("failed to increment data: %s", pErr)
}
return nil
})
// We want to make sure we can read the value through raft, so we know
// the stats are updated.
testutils.SucceedsSoon(t, func() error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (r *Replica) leasePostApplyLocked(
// lease but not the updated merge or timestamp cache state, which can result
// in serializability violations.
r.mu.state.Lease = newLease
expirationBasedLease := r.requiresExpiringLeaseRLocked()
hasExpirationBasedLease := newLease.Type() == roachpb.LeaseExpiration

// Gossip the first range whenever its lease is acquired. We check to make
// sure the lease is active so that a trailing replica won't process an old
Expand All @@ -361,7 +361,7 @@ func (r *Replica) leasePostApplyLocked(
// Whenever we first acquire an expiration-based lease, notify the lease
// renewer worker that we want it to keep proactively renewing the lease
// before it expires.
if leaseChangingHands && iAmTheLeaseHolder && expirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) {
if leaseChangingHands && iAmTheLeaseHolder && hasExpirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) {
r.store.renewableLeases.Store(int64(r.RangeID), unsafe.Pointer(r))
select {
case r.store.renewableLeasesSignal <- struct{}{}:
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1922,8 +1922,8 @@ func shouldCampaignOnWake(
if raftStatus.Lead == raft.None {
return true
}
// Avoid a circular dependency on liveness and skip the is leader alive check for
// expiration based leases.
// Avoid a circular dependency on liveness and skip the is leader alive
// check for ranges that always use expiration based leases.
if requiresExpiringLease {
return false
}
Expand Down
27 changes: 21 additions & 6 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,20 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
ProposedTS: &status.Now,
}

if p.repl.requiresExpiringLeaseRLocked() {
if p.repl.requiresExpiringLeaseRLocked() || transfer {
// In addition to ranges that unconditionally require expiration-based
// leases (node liveness and earlier), we also use them during lease
// transfers for all other ranges. After acquiring these expiration
// based leases, the leaseholders are expected to upgrade them to the
// more efficient epoch-based ones. But by transferring an
// expiration-based lease, we can limit the effect of an ill-advised
// lease transfer since the incoming leaseholder needs to recognize
// itself as such within a few seconds; if it doesn't (we accidentally
// sent the lease to a replica in need of a snapshot or far behind on
// its log), the lease is up for grabs. If we simply transferred epoch
// based leases, it's possible for the new leaseholder that's delayed
// in applying the lease transfer to maintain its lease (assuming the
// node it's on is able to heartbeat its liveness record).
reqLease.Expiration = &hlc.Timestamp{}
*reqLease.Expiration = status.Now.ToTimestamp().Add(int64(p.repl.store.cfg.RangeLeaseActiveDuration()), 0)
} else {
Expand Down Expand Up @@ -772,10 +785,12 @@ func (r *Replica) ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockTimest
return st.IsValid() && st.OwnedBy(r.store.StoreID())
}

// requiresExpiringLeaseRLocked returns whether this range uses an
// expiration-based lease; false if epoch-based. Ranges located before or
// including the node liveness table must use expiration leases to avoid
// circular dependencies on the node liveness table.
// requiresExpiringLeaseRLocked returns whether this range unconditionally uses
// an expiration-based lease. Ranges located before or including the node
// liveness table must always use expiration leases to avoid circular
// dependencies on the node liveness table. All other ranges typically use
// epoch-based leases, but may temporarily use expiration based leases during
// lease transfers.
func (r *Replica) requiresExpiringLeaseRLocked() bool {
return r.store.cfg.NodeLiveness == nil ||
r.mu.state.Desc.StartKey.Less(roachpb.RKey(keys.NodeLivenessKeyMax))
Expand Down Expand Up @@ -1453,7 +1468,7 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout(
// returns true if this range uses expiration-based leases, the lease is
// in need of renewal, and there's not already an extension pending.
func (r *Replica) shouldExtendLeaseRLocked(st kvserverpb.LeaseStatus) bool {
if !r.requiresExpiringLeaseRLocked() {
if st.Lease.Type() != roachpb.LeaseExpiration {
return false
}
if _, ok := r.mu.pendingLeaseRequest.RequestPending(); ok {
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,22 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) {
return nil
})

// Run up the clock to upgrade the expiration based lease to an epoch based
// one. This test wants to later expire the epoch based lease by pausing
// liveness heartbeats.
manualClock.Increment(
tc.GetFirstStoreFromServer(t, 1).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() +
time.Second.Nanoseconds(),
)
testutils.SucceedsSoon(t, func() error {
repl := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(scratchKey))
leaseStatus := repl.CurrentLeaseStatus(ctx)
if leaseStatus.Lease.Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease still an expiration based lease")
}
return nil
})

// Expire the lease. Given that the Raft leadership is on n2, only n2 will be
// eligible to acquire a new lease.
log.Infof(ctx, "test expiring lease")
Expand Down
14 changes: 13 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2242,12 +2242,24 @@ func (s *Store) startLeaseRenewer(ctx context.Context) {
numRenewableLeases++
repl := (*Replica)(v)
annotatedCtx := repl.AnnotateCtx(ctx)
if _, pErr := repl.redirectOnOrAcquireLease(annotatedCtx); pErr != nil {
if leaseStatus, pErr := repl.redirectOnOrAcquireLease(annotatedCtx); pErr != nil {
if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); !ok {
log.Warningf(annotatedCtx, "failed to proactively renew lease: %s", pErr)
}
s.renewableLeases.Delete(k)
} else if leaseStatus.Lease.Type() == roachpb.LeaseEpoch {
// We're dealing with an epoch based lease that was
// previously an expiration based one (we always transfer
// expiration based leases, and for ranges after the
// liveness table, they're upgraded to epoch based ones). We
// can stop tracking this range in our renewable map, its
// lease extensions take place through liveness heartbeats.
if cb := s.TestingKnobs().LeaseRenewalUntrackCallback; cb != nil {
cb(repl.RangeID)
}
s.renewableLeases.Delete(k)
}

return true
})

Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ type StoreTestingKnobs struct {
LeaseRenewalSignalChan chan struct{}
// LeaseRenewalOnPostCycle is invoked after each lease renewal cycle.
LeaseRenewalOnPostCycle func()
// LeaseRenewalUntrackCallback, if set, is invoked with the range ID of a
// replica when it's no longer tracked in the set of replicas holding
// renewable expiration leases.
LeaseRenewalUntrackCallback func(roachpb.RangeID)
// LeaseRenewalDurationOverride replaces the timer duration for proactively
// renewing expiration based leases.
LeaseRenewalDurationOverride time.Duration
Expand Down

0 comments on commit b1a0828

Please sign in to comment.