Skip to content

Commit

Permalink
kvserver: add setting to use expiration-based leases
Browse files Browse the repository at this point in the history
This patch adds the experimental cluster setting
`kv.expiration_leases_only.enabled`, which uses expiration-based leases
for all ranges. The setting is marked as experimental because, while we
believe the system will function correctly, it has performance
implications that need to be mapped out and optimized.

Expiration-based leases are compelling because they are much more robust
than epoch-based leases, and better handle failure modes such as
partial/asymmetric network partitions, disk stalls, deadlocks, etc. They
require a Raft roundtrip to extend a lease, which ensures that the lease
must be functional, while epoch leases only require an RPC request to
the liveness leaseholder regardless of whether the lease actually works.

Except for the meta and liveness ranges, expiration leases are only
extended when a request is processed in the last half of the lease
interval (i.e. in the last 3 seconds of the 6 second lease). Otherwise,
we allow the lease to expire. This reduces the cost of idle ranges in
large clusters, since we avoid the periodic lease extension writes for
every range, and can let the ranges quiesce as usual. However, it incurs
a synchronous lease acquisition on the next request to the range.

Because expiration leases incur one Raft write per range per lease
extension, as well as a lease acquisition for idle ranges, they
currently come with significant performance overhead. In TPC-E
experiments at 100.000 customers with various transaction types, p50
latencies increased 5-70%, p99 latencies increased 20-80%, and pMax
latencies increased 0-1000%. A kv95 workload on 10.000 ranges with
active leases showed a throughput reduction of about 10%, most likely
due to the ~3.000 lease extension writes per second.

When the setting is changed, leases are asynchronously switched to the
appropriate type (either expiration-based or epoch-based) via the
replicate queue. This can take up to the `ScanInterval` to complete, 10
minutes by default (more on nodes with large numbers or ranges).

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Mar 16, 2023
1 parent a64df6f commit 04f4284
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 49 deletions.
24 changes: 13 additions & 11 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,6 @@ func (r *Replica) leasePostApplyLocked(
// lease but not the updated merge or timestamp cache state, which can result
// in serializability violations.
r.mu.state.Lease = newLease
requiresExpirationBasedLease := r.requiresExpirationLeaseRLocked()
hasExpirationBasedLease := newLease.Type() == roachpb.LeaseExpiration

now := r.store.Clock().NowAsClockTimestamp()

Expand All @@ -368,21 +366,25 @@ func (r *Replica) leasePostApplyLocked(
r.gossipFirstRangeLocked(ctx)
}

if leaseChangingHands && iAmTheLeaseHolder && hasExpirationBasedLease && r.ownsValidLeaseRLocked(ctx, now) {
if requiresExpirationBasedLease {
hasExpirationLease := newLease.Type() == roachpb.LeaseExpiration
if leaseChangingHands && iAmTheLeaseHolder && hasExpirationLease && r.ownsValidLeaseRLocked(ctx, now) {
if r.requiresExpirationLeaseRLocked() {
// Whenever we first acquire an expiration-based lease for a range that
// requires it, notify the lease renewer worker that we want it to keep
// proactively renewing the lease before it expires.
// requires it (i.e. the liveness or meta ranges), notify the lease
// renewer worker that we want it to keep proactively renewing the lease
// before it expires. We don't eagerly renew other expiration leases,
// because a more sophisticated scheduler is needed to handle large
// numbers of expiration leases.
r.store.renewableLeases.Store(int64(r.RangeID), unsafe.Pointer(r))
select {
case r.store.renewableLeasesSignal <- struct{}{}:
default:
}
} else {
// We received an expiration lease for a range that doesn't require it,
// i.e. comes after the liveness keyspan. We've also applied it before
// it has expired. Upgrade this lease to the more efficient epoch-based
// one.
} else if !r.shouldUseExpirationLeaseRLocked() {
// We received an expiration lease for a range that shouldn't keep using
// it, most likely as part of a lease transfer (which is always
// expiration-based). We've also applied it before it has expired. Upgrade
// this lease to the more efficient epoch-based one.
if log.V(1) {
log.VEventf(ctx, 1, "upgrading expiration lease %s to an epoch-based one", newLease)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,7 @@ func (rp *replicaProposer) shouldCampaignOnRedirect(raftGroup proposerRaft) bool
raftGroup.BasicStatus(),
livenessMap,
r.descRLocked(),
r.requiresExpirationLeaseRLocked(),
r.shouldUseExpirationLeaseRLocked(),
r.store.Clock().Now(),
)
}
Expand Down
22 changes: 12 additions & 10 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1997,8 +1997,11 @@ func shouldCampaignOnWake(
if raftStatus.Lead == raft.None {
return true
}
// Avoid a circular dependency on liveness and skip the is leader alive
// check for ranges that always use expiration based leases.
// Avoid a circular dependency on liveness and skip the is leader alive check
// for ranges that require expiration based leases (the meta and liveness
// ranges). We do want to check the liveness entry for other expiration
// leases, since in the case of a dead leader it allows us to campaign
// immediately without waiting out the election timeout.
if requiresExpirationLease {
return false
}
Expand Down Expand Up @@ -2061,7 +2064,7 @@ func shouldCampaignOnLeaseRequestRedirect(
raftStatus raft.BasicStatus,
livenessMap livenesspb.IsLiveMap,
desc *roachpb.RangeDescriptor,
requiresExpirationLease bool,
shouldUseExpirationLease bool,
now hlc.Timestamp,
) bool {
// If we're already campaigning don't start a new term.
Expand All @@ -2076,15 +2079,14 @@ func shouldCampaignOnLeaseRequestRedirect(
if raftStatus.Lead == raft.None {
return true
}
// Avoid a circular dependency on liveness and skip the is leader alive check
// for ranges that always use expiration based leases. These ranges don't need
// to campaign based on liveness state because there can never be a case where
// a node can retain Raft leadership but still be unable to acquire the lease.
// This is possible on ranges that use epoch-based leases because the Raft
// leader may be partitioned from the liveness range.
// If we should be using an expiration lease then we don't need to campaign
// based on liveness state because there can never be a case where a node can
// retain Raft leadership but still be unable to acquire the lease. This is
// possible on ranges that use epoch-based leases because the Raft leader may
// be partitioned from the liveness range.
// See TestRequestsOnFollowerWithNonLiveLeaseholder for an example of a test
// that demonstrates this case.
if requiresExpirationLease {
if shouldUseExpirationLease {
return false
}
// Determine if we think the leader is alive, if we don't have the leader in
Expand Down
86 changes: 71 additions & 15 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ var transferExpirationLeasesFirstEnabled = settings.RegisterBoolSetting(
true,
)

var expirationLeasesOnly = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.expiration_leases_only.enabled",
"only use expiration-based leases, never epoch-based ones (experimental, affects performance)",
false,
)

var leaseStatusLogLimiter = func() *log.EveryN {
e := log.Every(15 * time.Second)
e.ShouldLog() // waste the first shot
Expand Down Expand Up @@ -244,23 +251,23 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
ProposedTS: &status.Now,
}

if p.repl.requiresExpirationLeaseRLocked() ||
if p.repl.shouldUseExpirationLeaseRLocked() ||
(transfer &&
transferExpirationLeasesFirstEnabled.Get(&p.repl.store.ClusterSettings().SV) &&
p.repl.store.ClusterSettings().Version.IsActive(ctx, clusterversion.TODODelete_V22_2EnableLeaseUpgrade)) {
// In addition to ranges that unconditionally require expiration-based
// leases (node liveness and earlier), we also use them during lease
// transfers for all other ranges. After acquiring these expiration
// based leases, the leaseholders are expected to upgrade them to the
// more efficient epoch-based ones. But by transferring an
// expiration-based lease, we can limit the effect of an ill-advised
// lease transfer since the incoming leaseholder needs to recognize
// itself as such within a few seconds; if it doesn't (we accidentally
// sent the lease to a replica in need of a snapshot or far behind on
// its log), the lease is up for grabs. If we simply transferred epoch
// based leases, it's possible for the new leaseholder that's delayed
// in applying the lease transfer to maintain its lease (assuming the
// node it's on is able to heartbeat its liveness record).
// In addition to ranges that should be using expiration-based leases
// (typically the meta and liveness ranges), we also use them during lease
// transfers for all other ranges. After acquiring these expiration based
// leases, the leaseholders are expected to upgrade them to the more
// efficient epoch-based ones. But by transferring an expiration-based
// lease, we can limit the effect of an ill-advised lease transfer since the
// incoming leaseholder needs to recognize itself as such within a few
// seconds; if it doesn't (we accidentally sent the lease to a replica in
// need of a snapshot or far behind on its log), the lease is up for grabs.
// If we simply transferred epoch based leases, it's possible for the new
// leaseholder that's delayed in applying the lease transfer to maintain its
// lease (assuming the node it's on is able to heartbeat its liveness
// record).
reqLease.Expiration = &hlc.Timestamp{}
*reqLease.Expiration = status.Now.ToTimestamp().Add(int64(p.repl.store.cfg.RangeLeaseDuration), 0)
} else {
Expand Down Expand Up @@ -769,11 +776,22 @@ func (r *Replica) ownsValidLeaseRLocked(ctx context.Context, now hlc.ClockTimest
// dependencies on the node liveness table. All other ranges typically use
// epoch-based leases, but may temporarily use expiration based leases during
// lease transfers.
//
// TODO(erikgrinaker): It isn't always clear when to use this and when to use
// shouldUseExpirationLeaseRLocked. We can merge these once there are no more
// callers: when expiration leases don't quiesce and are always eagerly renewed.
func (r *Replica) requiresExpirationLeaseRLocked() bool {
return r.store.cfg.NodeLiveness == nil ||
r.mu.state.Desc.StartKey.Less(roachpb.RKey(keys.NodeLivenessKeyMax))
}

// shouldUseExpirationLeaseRLocked returns true if this range should be using an
// expiration-based lease, either because it requires one or because
// kv.expiration_leases_only.enabled is enabled.
func (r *Replica) shouldUseExpirationLeaseRLocked() bool {
return expirationLeasesOnly.Get(&r.ClusterSettings().SV) || r.requiresExpirationLeaseRLocked()
}

// requestLeaseLocked executes a request to obtain or extend a lease
// asynchronously and returns a channel on which the result will be posted. If
// there's already a request in progress, we join in waiting for the results of
Expand Down Expand Up @@ -1003,7 +1021,7 @@ func NewLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(
// lease's expiration (and stasis period).
func (r *Replica) checkRequestTimeRLocked(now hlc.ClockTimestamp, reqTS hlc.Timestamp) error {
var leaseRenewal time.Duration
if r.requiresExpirationLeaseRLocked() {
if r.shouldUseExpirationLeaseRLocked() {
_, leaseRenewal = r.store.cfg.RangeLeaseDurations()
} else {
_, leaseRenewal = r.store.cfg.NodeLivenessDurations()
Expand Down Expand Up @@ -1428,6 +1446,44 @@ func (r *Replica) maybeExtendLeaseAsyncLocked(ctx context.Context, st kvserverpb
_ = r.requestLeaseLocked(ctx, st)
}

// maybeSwitchLeaseType will synchronously renew a lease using the appropriate
// type if it is (or was) owned by this replica and has an incorrect type. This
// typically happens when changing kv.expiration_leases_only.enabled.
func (r *Replica) maybeSwitchLeaseType(ctx context.Context, st kvserverpb.LeaseStatus) *kvpb.Error {
if !st.OwnedBy(r.store.StoreID()) {
return nil
}

var llHandle *leaseRequestHandle
r.mu.Lock()
if !r.hasCorrectLeaseTypeRLocked(st.Lease) {
llHandle = r.requestLeaseLocked(ctx, st)
}
r.mu.Unlock()

if llHandle != nil {
select {
case pErr := <-llHandle.C():
return pErr
case <-ctx.Done():
return kvpb.NewError(ctx.Err())
}
}
return nil
}

// hasCorrectLeaseType returns true if the lease type is correct for this replica.
func (r *Replica) hasCorrectLeaseType(lease roachpb.Lease) bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.hasCorrectLeaseTypeRLocked(lease)
}

func (r *Replica) hasCorrectLeaseTypeRLocked(lease roachpb.Lease) bool {
hasExpirationLease := lease.Type() == roachpb.LeaseExpiration
return hasExpirationLease == r.shouldUseExpirationLeaseRLocked()
}

// leaseViolatesPreferences checks if current replica owns the lease and if it
// violates the lease preferences defined in the span config. If there is an
// error or no preferences defined then it will return false and consider that
Expand Down
46 changes: 34 additions & 12 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,17 +687,25 @@ func (rq *replicateQueue) shouldQueue(
log.KvDistribution.VEventf(ctx, 2, "lease transfer needed, enqueuing")
return true, 0
}
if !repl.LeaseStatusAt(ctx, now).IsValid() {
// The lease for this range is currently invalid, if this replica is
// the raft leader then it is necessary that it acquires the lease. We
// enqueue it regardless of being a leader or follower, where the
// leader at the time of processing will succeed. There is no
// requirement that the expired lease belongs to this replica, as
// regardless of the lease history, the current leader should hold the
// lease.

leaseStatus := repl.LeaseStatusAt(ctx, now)
if !leaseStatus.IsValid() && leaseStatus.Lease.Type() != roachpb.LeaseExpiration {
// The epoch lease for this range is currently invalid. If this replica is
// the raft leader then we'd like it to hold a valid lease. We enqueue it
// regardless of being a leader or follower, where the leader at the time of
// processing will succeed.
//
// We don't do this for expiration leases, because we'd like them to expire
// if the range is idle.
log.KvDistribution.VEventf(ctx, 2, "invalid lease, enqueuing")
return true, 0
}
if leaseStatus.OwnedBy(repl.StoreID()) && !repl.hasCorrectLeaseType(leaseStatus.Lease) {
// This replica holds (or held) an incorrect lease type, switch it to the
// correct type. Typically when changing kv.expiration_leases_only.enabled.
log.KvDistribution.VEventf(ctx, 2, "incorrect lease type, enqueueing")
return true, 0
}

return false, 0
}
Expand Down Expand Up @@ -1001,12 +1009,26 @@ func (rq *replicateQueue) PlanOneChange(
if _, err := repl.IsDestroyed(); err != nil {
return change, err
}

// Ensure ranges have a lease (returning NLHE if someone else has it), and
// switch the lease type if necessary (e.g. due to
// kv.expiration_leases_only.enabled).
//
// TODO(kvoli): This check should fail if not the leaseholder. In the case
// where we want to use the replicate queue to acquire leases, this should
// occur before planning or as a result. In order to return this in
// planning, it is necessary to simulate the prior change having succeeded
// to then plan this lease transfer.
if _, pErr := repl.redirectOnOrAcquireLease(ctx); pErr != nil {
// occur before planning or as a result. In order to return this in planning,
// it is necessary to simulate the prior change having succeeded to then plan
// this lease transfer.
//
// TODO(erikgrinaker): We shouldn't overload the replicate queue to also be
// responsible for lease maintenance, but it'll do for now. See:
// https://github.com/cockroachdb/cockroach/issues/98433
leaseStatus, pErr := repl.redirectOnOrAcquireLease(ctx)
if pErr != nil {
return change, pErr.GoError()
}
pErr = repl.maybeSwitchLeaseType(ctx, leaseStatus)
if pErr != nil {
return change, pErr.GoError()
}

Expand Down
83 changes: 83 additions & 0 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
Expand Down Expand Up @@ -2350,3 +2351,85 @@ func TestPromoteNonVoterInAddVoter(t *testing.T) {
}
}
}

// TestReplicateQueueExpirationLeasesOnly tests that changing
// kv.expiration_leases_only.enabled switches all leases to the correct kind.
func TestReplicateQueueExpirationLeasesOnly(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t) // too slow under stressrace
skip.UnderShort(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Speed up the replicate queue, which switches the lease type.
ScanMinIdleTime: time.Millisecond,
ScanMaxIdleTime: time.Millisecond,
},
})
defer tc.Stopper().Stop(ctx)

require.NoError(t, tc.WaitForFullReplication())

db := tc.Server(0).DB()
sqlDB := tc.ServerConn(0)

// Split off a few ranges so we have something to work with.
scratchKey := tc.ScratchRange(t)
for i := 0; i <= 255; i++ {
splitKey := append(scratchKey.Clone(), byte(i))
require.NoError(t, db.AdminSplit(ctx, splitKey, hlc.MaxTimestamp))
}

countLeases := func() (epoch int64, expiration int64) {
for i := 0; i < tc.NumServers(); i++ {
require.NoError(t, tc.Server(i).GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error {
require.NoError(t, s.ComputeMetrics(ctx))
expiration += s.Metrics().LeaseExpirationCount.Value()
epoch += s.Metrics().LeaseEpochCount.Value()
return nil
}))
}
return
}

// We expect to have both expiration and epoch leases at the start, since the
// meta and liveness ranges require expiration leases. However, it's possible
// that there are a few other stray expiration leases too, since lease
// transfers use expiration leases as well.
epochLeases, expLeases := countLeases()
require.NotZero(t, epochLeases)
require.NotZero(t, expLeases)
initialExpLeases := expLeases
t.Logf("initial: epochLeases=%d expLeases=%d", epochLeases, expLeases)

// Switch to expiration leases and wait for them to change.
_, err := sqlDB.ExecContext(ctx, `SET CLUSTER SETTING kv.expiration_leases_only.enabled = true`)
require.NoError(t, err)
require.Eventually(t, func() bool {
epochLeases, expLeases = countLeases()
return epochLeases == 0 && expLeases > 0
}, 10*time.Second, 200*time.Millisecond)
t.Logf("enabled: epochLeases=%d expLeases=%d", epochLeases, expLeases)

// Run a scan across the ranges, just to make sure they work.
scanCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
_, err = db.Scan(scanCtx, scratchKey, scratchKey.PrefixEnd(), 1)
require.NoError(t, err)

// Switch back to epoch leases and wait for them to change. We still expect to
// have some required expiration leases, but they should be at or below the
// number of expiration leases we had at the start (primarily the meta and
// liveness ranges, but possibly a few more since lease transfers also use
// expiration leases).
_, err = sqlDB.ExecContext(ctx, `SET CLUSTER SETTING kv.expiration_leases_only.enabled = false`)
require.NoError(t, err)
require.Eventually(t, func() bool {
epochLeases, expLeases = countLeases()
return epochLeases > 0 && expLeases > 0 && expLeases <= initialExpLeases
}, 10*time.Second, 200*time.Millisecond)
t.Logf("disabled: epochLeases=%d expLeases=%d", epochLeases, expLeases)
}
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2124,6 +2124,13 @@ func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, erro
// This reduces user-visible latency when range lookups are needed to serve a
// request and reduces ping-ponging of r1's lease to different replicas as
// maybeGossipFirstRange is called on each (e.g. #24753).
//
// Currently, this is only used for ranges that _require_ expiration-based
// leases, as determined by Replica.requiresExpirationLeaseRLocked(), i.e. the
// meta and liveness ranges. For large numbers of expiration-based leases, e.g.
// with kv.expiration_leases_only.enabled, a more sophisticated scheduler is
// needed since the linear scan here can't keep up. See:
// https://github.com/cockroachdb/cockroach/issues/98433
func (s *Store) startLeaseRenewer(ctx context.Context) {
// Start a goroutine that watches and proactively renews certain
// expiration-based leases.
Expand Down

0 comments on commit 04f4284

Please sign in to comment.