Skip to content

Commit

Permalink
Merge #123442
Browse files Browse the repository at this point in the history
123442: kv: prevent lease interval regression during expiration-to-epoch promotion r=nvanbenschoten a=nvanbenschoten

Fixes #121480.
Fixes #122016.

This commit resolves a bug in the expiration-based to epoch-based lease promotion transition, where the lease's effective expiration could be allowed to regress. To prevent this, we detect when such cases are about to occur and synchronously heartbeat the leaseholder's liveness record. This works because the liveness record interval and the expiration-based lease interval are the same, so a synchronous heartbeat ensures that the liveness record has a later expiration than the prior lease by the time the lease promotion goes into effect.

The code structure here leaves a lot to be desired, but since we're going to be cleaning up and/or removing a lot of this code soon anyway, I'm prioritizing backportability. This is therefore more targeted and less general than it could be.

The resolution here also leaves something to be desired. A nicer fix would be to introduce a minimum_lease_expiration field on epoch-based leases so that we can locally ensure that the expiration does not regress. This is what we plan to do for leader leases in the upcoming release. We don't make this change because it would be require a version gate to avoid replica divergence, so it would not be backportable.

Release note (bug fix): Fixed a rare bug where a lease transfer could lead to a `side-transport update saw closed timestamp regression` panic. The bug could occur when a node was overloaded and failing to heartbeat its node liveness record.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed May 3, 2024
2 parents 065119e + 58a0a17 commit b1d7489
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 25 deletions.
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package batcheval

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -67,6 +68,15 @@ func RequestLease(
Requested: args.Lease,
}

// However, we verify that the current lease's sequence number and proposed
// timestamp match the provided PrevLease. This ensures that the validation
// here is consistent with the validation that was performed when the lease
// request was constructed.
if prevLease.Sequence != args.PrevLease.Sequence || !prevLease.ProposedTS.Equal(args.PrevLease.ProposedTS) {
rErr.Message = fmt.Sprintf("expected previous lease %s, found %s", args.PrevLease, prevLease)
return newFailedLeaseTrigger(false /* isTransfer */), rErr
}

// MIGRATION(tschottdorf): needed to apply Raft commands which got proposed
// before the StartStasis field was introduced.
newLease := args.Lease
Expand Down
82 changes: 82 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/raft"
Expand Down Expand Up @@ -1663,3 +1664,84 @@ func TestLeaseRequestBumpsEpoch(t *testing.T) {
require.Greater(t, liveness.Epoch, prevLease.Epoch)
})
}

// TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration tests that a
// promotion from an expiration-based lease to an epoch-based lease does not
// permit the expiration time of the lease to regress. This is enforced by
// detecting cases where the leaseholder's liveness record's expiration trails
// its expiration-based lease's expiration and synchronously heartbeating the
// leaseholder's liveness record before promoting the lease.
func TestLeaseRequestFromExpirationToEpochDoesNotRegressExpiration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true) // override metamorphism

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
},
})
defer tc.Stopper().Stop(ctx)

// Create scratch range.
key := tc.ScratchRange(t)
desc := tc.LookupRangeOrFatal(t, key)

// Pause n1's node liveness heartbeats, to allow its liveness expiration to
// fall behind.
l0 := tc.Server(0).NodeLiveness().(*liveness.NodeLiveness)
l0.PauseHeartbeatLoopForTest()
l, ok := l0.GetLiveness(tc.Server(0).NodeID())
require.True(t, ok)

// Make sure n1 has an expiration-based lease.
s0 := tc.GetFirstStoreFromServer(t, 0)
repl := s0.LookupReplica(desc.StartKey)
require.NotNil(t, repl)
expLease := repl.CurrentLeaseStatus(ctx)
require.True(t, expLease.IsValid())
require.Equal(t, roachpb.LeaseExpiration, expLease.Lease.Type())

// Wait for the expiration-based lease to have a later expiration than the
// expiration timestamp in n1's liveness record.
testutils.SucceedsSoon(t, func() error {
expLease = repl.CurrentLeaseStatus(ctx)
if expLease.Expiration().Less(l.Expiration.ToTimestamp()) {
return errors.Errorf("lease %v not extended beyond liveness %v", expLease, l)
}
return nil
})

// Enable epoch-based leases. This will cause automatic lease renewal to try
// to promote the expiration-based lease to an epoch-based lease.
//
// Since we have disabled the background node liveness heartbeat loop, it is
// critical that this lease promotion synchronously heartbeats node liveness
// before acquiring the epoch-based lease.
kvserver.ExpirationLeasesOnly.Override(ctx, &s0.ClusterSettings().SV, false)

// Wait for that lease promotion to occur.
var epochLease kvserverpb.LeaseStatus
testutils.SucceedsSoon(t, func() error {
epochLease = repl.CurrentLeaseStatus(ctx)
if epochLease.Lease.Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease %v not upgraded to epoch-based", epochLease)
}
return nil
})

// Once the lease has been promoted to an epoch-based lease, the effective
// expiration (maintained indirectly in the liveness record) must be greater
// than that in the preceding expiration-based lease. If this were to regress,
// a non-cooperative lease failover to a third lease held by a different node
// could overlap in MVCC time with the first lease (i.e. its start time could
// precede expLease.Expiration), violating the lease disjointness property.
//
// If we disable the `expToEpochPromo` branch in replica_range_lease.go, this
// assertion fails.
require.True(t, expLease.Expiration().Less(epochLease.Expiration()))
}
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3236,6 +3236,12 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
RaftConfig: base.RaftConfig{
// We plan to increment the manual clock by MinStatsDuration a few
// times below and would like for leases to not expire. Configure a
// longer lease duration to achieve this.
RangeLeaseDuration: 10 * replicastats.MinStatsDuration,
},
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
Expand All @@ -3253,8 +3259,6 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))

tc.IncrClockForLeaseUpgrade(t, manualClock)
tc.WaitForLeaseUpgrade(ctx, t, desc)

cap, err := s.Capacity(ctx, false /* useCached */)
Expand Down
68 changes: 58 additions & 10 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
nextLeaseHolder roachpb.ReplicaDescriptor,
status kvserverpb.LeaseStatus,
startKey roachpb.Key,
transfer bool,
bypassSafetyChecks bool,
limiter *quotapool.IntPool,
) *leaseRequestHandle {
Expand All @@ -284,10 +283,20 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
nextLeaseHolder.ReplicaID, nextLease.Replica.ReplicaID))
}

acquisition := !status.Lease.OwnedBy(p.repl.store.StoreID())
extension := !transfer && !acquisition
// Who owns the previous and next lease?
prevLocal := status.Lease.OwnedBy(p.repl.store.StoreID())
nextLocal := nextLeaseHolder.StoreID == p.repl.store.StoreID()

// Assert that the lease acquisition, extension, or transfer is valid.
acquisition := !prevLocal && nextLocal
extension := prevLocal && nextLocal
transfer := prevLocal && !nextLocal
remote := !prevLocal && !nextLocal
_ = extension // not used, just documentation

if remote {
log.Fatalf(ctx, "cannot acquire/extend lease for remote replica: %v -> %v", status, nextLeaseHolder)
}
if acquisition {
// If this is a non-cooperative lease change (i.e. an acquisition), it
// is up to us to ensure that Lease.Start is greater than the end time
Expand Down Expand Up @@ -326,6 +335,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
ProposedTS: &status.Now,
}

var reqLeaseLiveness livenesspb.Liveness
if p.repl.shouldUseExpirationLeaseRLocked() ||
(transfer &&
TransferExpirationLeasesFirstEnabled.Get(&p.repl.store.ClusterSettings().SV)) {
Expand Down Expand Up @@ -356,6 +366,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
return llHandle
}
reqLease.Epoch = l.Epoch
reqLeaseLiveness = l.Liveness
}

var leaseReq kvpb.Request
Expand Down Expand Up @@ -386,7 +397,7 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
}
}

err := p.requestLeaseAsync(ctx, nextLeaseHolder, status, leaseReq, limiter)
err := p.requestLeaseAsync(ctx, status, reqLease, reqLeaseLiveness, leaseReq, limiter)
if err != nil {
if errors.Is(err, stop.ErrThrottled) {
llHandle.resolve(kvpb.NewError(err))
Expand Down Expand Up @@ -417,10 +428,14 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
//
// The status argument is used as the expected value for liveness operations.
// leaseReq must be consistent with the LeaseStatus.
//
// The reqLeaseLiveness argument is provided when reqLease is an epoch-based
// lease.
func (p *pendingLeaseRequest) requestLeaseAsync(
parentCtx context.Context,
nextLeaseHolder roachpb.ReplicaDescriptor,
status kvserverpb.LeaseStatus,
reqLease roachpb.Lease,
reqLeaseLiveness livenesspb.Liveness,
leaseReq kvpb.Request,
limiter *quotapool.IntPool,
) error {
Expand Down Expand Up @@ -456,7 +471,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// RPC, but here we submit the request directly to the local replica.
growstack.Grow()

err := p.requestLease(ctx, nextLeaseHolder, status, leaseReq)
err := p.requestLease(ctx, status, reqLease, reqLeaseLiveness, leaseReq)
// Error will be handled below.

// We reset our state below regardless of whether we've gotten an error or
Expand Down Expand Up @@ -496,24 +511,57 @@ var logFailedHeartbeatOwnLiveness = log.Every(10 * time.Second)
// requestLease sends a synchronous transfer lease or lease request to the
// specified replica. It is only meant to be called from requestLeaseAsync,
// since it does not coordinate with other in-flight lease requests.
//
// The reqLeaseLiveness argument is provided when reqLease is an epoch-based
// lease.
func (p *pendingLeaseRequest) requestLease(
ctx context.Context,
nextLeaseHolder roachpb.ReplicaDescriptor,
status kvserverpb.LeaseStatus,
reqLease roachpb.Lease,
reqLeaseLiveness livenesspb.Liveness,
leaseReq kvpb.Request,
) error {
started := timeutil.Now()
defer func() {
p.repl.store.metrics.LeaseRequestLatency.RecordValue(timeutil.Since(started).Nanoseconds())
}()

nextLeaseHolder := reqLease.Replica
extension := status.OwnedBy(nextLeaseHolder.StoreID)

// If we are promoting an expiration-based lease to an epoch-based lease, we
// must make sure the expiration does not regress. We do this here because the
// expiration is stored directly in the lease for expiration-based leases but
// indirectly in liveness record for epoch-based leases. To ensure this, we
// manually heartbeat our liveness record if necessary. This is expected to
// work because the liveness record interval and the expiration-based lease
// interval are the same.
expToEpochPromo := extension && status.Lease.Type() == roachpb.LeaseExpiration && reqLease.Type() == roachpb.LeaseEpoch
if expToEpochPromo && reqLeaseLiveness.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
err := p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, reqLeaseLiveness)
if err != nil {
if logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
return kvpb.NewNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("failed to manipulate liveness record: %s", err))
}
// Assert that the liveness record expiration is now greater than the
// expiration of the lease we're promoting.
l, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(reqLeaseLiveness.NodeID)
if !ok || l.Expiration.ToTimestamp().Less(status.Lease.GetExpiration()) {
return errors.AssertionFailedf("expiration of liveness record %s is not greater than "+
"expiration of the previous lease %s after liveness heartbeat", l, status.Lease)
}
}

// If we're replacing an expired epoch-based lease, we must increment the
// epoch of the prior owner to invalidate its leases. If we were the owner,
// then we instead heartbeat to become live.
if status.Lease.Type() == roachpb.LeaseEpoch && status.State == kvserverpb.LeaseState_EXPIRED {
var err error
// If this replica is previous & next lease holder, manually heartbeat to become live.
if status.OwnedBy(nextLeaseHolder.StoreID) && p.repl.store.StoreID() == nextLeaseHolder.StoreID {
if extension {
if err = p.repl.store.cfg.NodeLiveness.Heartbeat(ctx, status.Liveness); err != nil && logFailedHeartbeatOwnLiveness.ShouldLog() {
log.Errorf(ctx, "failed to heartbeat own liveness record: %s", err)
}
Expand Down Expand Up @@ -921,7 +969,7 @@ func (r *Replica) requestLeaseLocked(
}
return r.mu.pendingLeaseRequest.InitOrJoinRequest(
ctx, repDesc, status, r.mu.state.Desc.StartKey.AsRawKey(),
false /* transfer */, false /* bypassSafetyChecks */, limiter)
false /* bypassSafetyChecks */, limiter)
}

// AdminTransferLease transfers the LeaderLease to another replica. Only the
Expand Down Expand Up @@ -1015,7 +1063,7 @@ func (r *Replica) AdminTransferLease(
}

transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest(ctx, nextLeaseHolder, status,
desc.StartKey.AsRawKey(), true /* transfer */, bypassSafetyChecks, nil /* limiter */)
desc.StartKey.AsRawKey(), bypassSafetyChecks, nil /* limiter */)
return nil, transfer, nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,8 @@ func TestReplicaLease(t *testing.T) {
kvpb.AdmissionHeader{},
),
Args: &kvpb.RequestLeaseRequest{
Lease: lease,
Lease: lease,
PrevLease: tc.repl.CurrentLeaseStatus(ctx).Lease,
},
}, &kvpb.RequestLeaseResponse{}); !testutils.IsError(err, "replica not found") {
t.Fatalf("unexpected error: %+v", err)
Expand Down Expand Up @@ -1317,7 +1318,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) {
st := tc.repl.CurrentLeaseStatus(ctx)
ba := &kvpb.BatchRequest{}
ba.Timestamp = tc.repl.store.Clock().Now()
ba.Add(&kvpb.RequestLeaseRequest{Lease: *lease})
ba.Add(&kvpb.RequestLeaseRequest{Lease: *lease, PrevLease: st.Lease})
_, tok := tc.repl.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
ch, _, _, _, pErr := tc.repl.evalAndPropose(ctx, ba, allSpansGuard(), &st, uncertainty.Interval{}, tok.Move(ctx))
if pErr == nil {
Expand Down
11 changes: 0 additions & 11 deletions pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1112,17 +1112,6 @@ func (tc *TestCluster) TransferRangeLeaseOrFatal(
}
}

// IncrClockForLeaseUpgrade run up the clock to force a lease renewal (and thus
// the change in lease types).
func (tc *TestCluster) IncrClockForLeaseUpgrade(
t serverutils.TestFataler, clock *hlc.HybridManualClock,
) {
clock.Increment(
tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().RangeLeaseRenewalDuration().Nanoseconds() +
time.Second.Nanoseconds(),
)
}

// MaybeWaitForLeaseUpgrade waits until the lease held for the given range
// descriptor is upgraded to an epoch-based one, but only if we expect the lease
// to be upgraded.
Expand Down

0 comments on commit b1d7489

Please sign in to comment.