From 31f9993f1411c3ed48020f75a5032039721c2c26 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 26 Feb 2021 23:48:55 -0500 Subject: [PATCH] kv: sync lease transfers and range merges with closedts side-transport Needed for #61137. This commit updates the manner through which lease transfers (through `LeaseTransferRequest`) and range merges (through `SubsumeRequest`) handle the "transfer of power" from their outgoing leaseholder to their incoming leaseholder. Specifically, it updates the handling of these requests in order to rationalize the interaction between their evaluation and the closing of timestamps through the closed timestamp side-transport. It is now clearer when and how these requests instruct the outgoing leaseholder to relinquish its ability to advance the closed timestamp and, as a result, now possible for the requests to query and operate on the maximum closed timestamp published by the outgoing leaseholder. For lease transfers, this commit begins by addressing an existing TODO to push the revocation of the outgoing lease out of `AdminTransferLease` and into the evaluation of `LeaseTransferRequest` through a new `RevokeLease` method on the `EvalContext`. Once a lease is revoked, the side-transport will no longer be able to advance the closed timestamp under it. This was made possible by #59086 and was suggested by @tbg during the code review. We generally like to keep replica state changes out of "admin" requests themselves, which are intended to coordinate changes through individual non-admin requests. Admin requests generally don't even need to evaluate on a leaseholder (though they try to), so having them perform any state changes is fragile. For range merges, this commit removes the `MaybeWatchForMerge` flag from the `LocalResult` returned by `SubsumeRequest` and replaces it with a `WatchForMerge` method on the `EvalContext`. This allows the `SubsumeRequest` to launch the range merge watcher goroutine during it evaluation, which the side-transport checks for to see whether a leaseholder can advance its closed timestamp. In doing so, the `SubsumeRequest` is able to pause closed timestamps when it wants and is also able to observe and return the maximum closed timestamp _after_ the closed timestamp has stopped advancing. This is a stark improvement over the approach with the original closed timestamp system, which required a herculean effort in #50265 to make correct. With these refactors complete, the closed timestamp side-transport should have a much easier and safer time checking whether a given leaseholder is able to advance its closed timestamp. Release justification: Necessary for the safety of new functionality. --- pkg/kv/kvnemesis/validator.go | 4 +- pkg/kv/kvserver/batcheval/cmd_lease_test.go | 27 ++++++++---- .../kvserver/batcheval/cmd_lease_transfer.go | 29 +++++++++--- pkg/kv/kvserver/batcheval/cmd_subsume.go | 14 ++++-- pkg/kv/kvserver/batcheval/eval_context.go | 16 +++++++ pkg/kv/kvserver/batcheval/result/BUILD.bazel | 1 - pkg/kv/kvserver/batcheval/result/result.go | 8 +--- pkg/kv/kvserver/replica.go | 44 ++++++++++++++----- pkg/kv/kvserver/replica_eval_context_span.go | 11 +++++ pkg/kv/kvserver/replica_learner_test.go | 8 ++-- pkg/kv/kvserver/replica_proposal.go | 5 +-- pkg/kv/kvserver/replica_range_lease.go | 33 +++++--------- pkg/kv/kvserver/replica_read.go | 10 ----- pkg/kv/kvserver/replica_test.go | 2 +- 14 files changed, 134 insertions(+), 78 deletions(-) diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index 108d1f90ec2a..5ad450fd5e7c 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -339,12 +339,10 @@ func (v *validator) processOp(txnID *string, op Operation) { v.failIfError(op, t.Result) } case *TransferLeaseOperation: - if resultIsErrorStr(t.Result, `cannot transfer lease to replica of type (VOTER_INCOMING|VOTER_OUTGOING|VOTER_DEMOTING|LEARNER|NON_VOTER)`) { + if resultIsErrorStr(t.Result, `replica cannot hold lease`) { // Only VOTER_FULL replicas can currently hold a range lease. // Attempts to transfer to lease to any other replica type are // rejected. - } else if resultIsErrorStr(t.Result, `replica cannot hold lease`) { - // Same case as above, but with a slightly different message. } else if resultIsErrorStr(t.Result, `unable to find store \d+ in range`) { // A lease transfer that races with a replica removal may find that // the store it was targeting is no longer part of the range. diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_test.go b/pkg/kv/kvserver/batcheval/cmd_lease_test.go index 1e4bfdc7eaaf..d5cd5efa023e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_test.go @@ -190,6 +190,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { manual := hlc.NewManualClock(123) clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + prevLease := roachpb.Lease{ + Replica: replicas[0], + Sequence: 1, + } nextLease := roachpb.Lease{ Replica: replicas[1], Start: clock.NowAsClockTimestamp(), @@ -209,16 +213,19 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { } currentReadSummary := rspb.FromTimestamp(maxPriorReadTS) + evalCtx := &MockEvalCtx{ + ClusterSettings: cluster.MakeTestingClusterSettings(), + StoreID: 1, + Desc: &desc, + Clock: clock, + Lease: prevLease, + CurrentReadSummary: currentReadSummary, + } cArgs := CommandArgs{ - EvalCtx: (&MockEvalCtx{ - ClusterSettings: cluster.MakeTestingClusterSettings(), - StoreID: 1, - Desc: &desc, - Clock: clock, - CurrentReadSummary: currentReadSummary, - }).EvalContext(), + EvalCtx: evalCtx.EvalContext(), Args: &roachpb.TransferLeaseRequest{ - Lease: nextLease, + Lease: nextLease, + PrevLease: prevLease, }, } @@ -233,6 +240,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { require.NotNil(t, propLease) require.True(t, nextLease.Start.Less(propLease.Start)) require.True(t, beforeEval.Less(propLease.Start)) + require.Equal(t, prevLease.Sequence+1, propLease.Sequence) + + // The previous lease should have been revoked. + require.Equal(t, prevLease.Sequence, evalCtx.RevokedLeaseSeq) // The prior read summary should reflect the maximum read times // served under the current leaseholder. diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go index d744aa79d61d..fce187c5ec54 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go @@ -77,6 +77,29 @@ func TransferLease( newLease.Start.Forward(cArgs.EvalCtx.Clock().NowAsClockTimestamp()) args.Lease = roachpb.Lease{} // prevent accidental use below + // If this check is removed at some point, the filtering of learners on the + // sending side would have to be removed as well. + if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil { + return newFailedLeaseTrigger(true /* isTransfer */), err + } + + // Stop using the current lease. All future calls to leaseStatus on this + // node with the current lease will now return a PROSCRIBED status. This + // includes calls to leaseStatus from the closed timestamp side-transport, + // meaning that the following call to GetCurrentReadSummary is guaranteed to + // observe the highest closed timestamp published under this lease. + // + // We perform this action during evaluation to ensure that the lease + // revocation takes place regardless of whether the corresponding Raft + // proposal succeeds, fails, or is ambiguous - in which case there's no + // guarantee that the transfer will not still apply. + // + // NOTE: RevokeLease will be a no-op if the lease has already changed. In + // such cases, we could detect that here and fail fast, but it's safe and + // easier to just let the TransferLease be proposed under the wrong lease + // and be rejected with the correct error below Raft. + cArgs.EvalCtx.RevokeLease(ctx, args.PrevLease.Sequence) + // Collect a read summary from the outgoing leaseholder to ship to the // incoming leaseholder. This is used to instruct the new leaseholder on how // to update its timestamp cache to ensure that no future writes are allowed @@ -94,12 +117,6 @@ func TransferLease( // think about. priorReadSum.Merge(rspb.FromTimestamp(newLease.Start.ToTimestamp())) - // If this check is removed at some point, the filtering of learners on the - // sending side would have to be removed as well. - if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil { - return newFailedLeaseTrigger(true /* isTransfer */), err - } - log.VEventf(ctx, 2, "lease transfer: prev lease: %+v, new lease: %+v", prevLease, newLease) return evalNewLease(ctx, cArgs.EvalCtx, readWriter, cArgs.Stats, newLease, prevLease, &priorReadSum, false /* isExtension */, true /* isTransfer */) diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index c278afe57eb2..18cb3c5a5dd5 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -115,6 +115,16 @@ func Subsume( // but the check is too expensive as it would involve a network roundtrip on // most nodes. + // Freeze the range. Do so by blocking all requests while a newly launched + // async goroutine watches (pushes with low priority) the merge transaction. + // This will also block the closed timestamp side-transport from closing new + // timestamps, meaning that the following call to GetCurrentReadSummary is + // guaranteed to observe the highest closed timestamp ever published by this + // range (if the merge eventually completes). + if err := cArgs.EvalCtx.WatchForMerge(ctx); err != nil { + return result.Result{}, errors.Wrap(err, "watching for merge during subsume") + } + // We prevent followers of the RHS from being able to serve follower reads on // timestamps that fall in the timestamp window representing the range's // subsumed state (i.e. between the subsumption time (FreezeStart) and the @@ -189,7 +199,5 @@ func Subsume( // being merged. reply.ClosedTimestamp = closedTS - return result.Result{ - Local: result.LocalResult{MaybeWatchForMerge: true}, - }, nil + return result.Result{}, nil } diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 2f1b5bb6f978..66af76e7bf11 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -114,6 +114,15 @@ type EvalContext interface { GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage, error) + + // RevokeLease stops the replica from using its current lease, if that lease + // matches the provided lease sequence. All future calls to leaseStatus on + // this node with the current lease will now return a PROSCRIBED status. + RevokeLease(context.Context, roachpb.LeaseSequence) + + // WatchForMerge arranges to block all requests until the in-progress merge + // completes. Returns an error if no in-progress merge is detected. + WatchForMerge(ctx context.Context) error } // MockEvalCtx is a dummy implementation of EvalContext for testing purposes. @@ -131,6 +140,7 @@ type MockEvalCtx struct { CanCreateTxn func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) Lease roachpb.Lease CurrentReadSummary rspb.ReadSummary + RevokedLeaseSeq roachpb.LeaseSequence } // EvalContext returns the MockEvalCtx as an EvalContext. It will reflect future @@ -241,3 +251,9 @@ func (m *mockEvalCtxImpl) GetExternalStorageFromURI( ) (cloud.ExternalStorage, error) { panic("unimplemented") } +func (m *mockEvalCtxImpl) RevokeLease(_ context.Context, seq roachpb.LeaseSequence) { + m.RevokedLeaseSeq = seq +} +func (m *mockEvalCtxImpl) WatchForMerge(ctx context.Context) error { + panic("unimplemented") +} diff --git a/pkg/kv/kvserver/batcheval/result/BUILD.bazel b/pkg/kv/kvserver/batcheval/result/BUILD.bazel index 83fd2b90b2eb..57423b77f80b 100644 --- a/pkg/kv/kvserver/batcheval/result/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/result/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", - "//pkg/util/hlc", "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", "@com_github_kr_pretty//:pretty", diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 448bee0f2630..3ac37d887c40 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -63,8 +63,6 @@ type LocalResult struct { MaybeAddToSplitQueue bool // Call MaybeGossipNodeLiveness with the specified Span, if set. MaybeGossipNodeLiveness *roachpb.Span - // Call maybeWatchForMerge. - MaybeWatchForMerge bool // Metrics contains counters which are to be passed to the // metrics subsystem. @@ -84,7 +82,6 @@ func (lResult *LocalResult) IsZero() bool { !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && lResult.MaybeGossipNodeLiveness == nil && - !lResult.MaybeWatchForMerge && lResult.Metrics == nil } @@ -97,13 +94,13 @@ func (lResult *LocalResult) String() string { "#updated txns: %d #end txns: %d, "+ "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ - "MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t", + "MaybeGossipNodeLiveness:%s ", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, - lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge) + lResult.MaybeGossipNodeLiveness) } // DetachEncounteredIntents returns (and removes) those encountered @@ -344,7 +341,6 @@ func (p *Result) MergeAndDestroy(q Result) error { coalesceBool(&p.Local.MaybeGossipSystemConfig, &q.Local.MaybeGossipSystemConfig) coalesceBool(&p.Local.MaybeGossipSystemConfigIfHaveFailure, &q.Local.MaybeGossipSystemConfigIfHaveFailure) coalesceBool(&p.Local.MaybeAddToSplitQueue, &q.Local.MaybeAddToSplitQueue) - coalesceBool(&p.Local.MaybeWatchForMerge, &q.Local.MaybeWatchForMerge) if p.Local.Metrics == nil { p.Local.Metrics = q.Local.Metrics diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 2afd08c63a60..8ce3e9689bae 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1452,31 +1452,53 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool { r.mu.replicaID > rightDesc.ReplicaID } -// maybeWatchForMerge checks whether a merge of this replica into its left +// WatchForMerge is like maybeWatchForMergeLocked, except it expects a merge to +// be in progress and returns an error if one is not. +// +// See docs/tech-notes/range-merges.md. +func (r *Replica) WatchForMerge(ctx context.Context) error { + ok, err := r.maybeWatchForMerge(ctx) + if err != nil { + return err + } else if !ok { + return errors.AssertionFailedf("range merge unexpectedly not in-progress") + } + return nil +} + +// maybeWatchForMergeLocked checks whether a merge of this replica into its left // neighbor is in its critical phase and, if so, arranges to block all requests -// until the merge completes. -func (r *Replica) maybeWatchForMerge(ctx context.Context) error { +// until the merge completes. Returns a boolean indicating whether a merge was +// found to be in progress. +// +// See docs/tech-notes/range-merges.md. +func (r *Replica) maybeWatchForMerge(ctx context.Context) (bool, error) { r.mu.Lock() defer r.mu.Unlock() return r.maybeWatchForMergeLocked(ctx) } -func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error { +func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { + // Checking for a deletion intent on the local range descriptor, which + // indicates that a merge is in progress and this range is currently in its + // critical phase of being subsumed by its left-hand side neighbor. Read + // inconsistently at the maximum timestamp to ensure that we see an intent + // if one exists, regardless of what timestamp it is written at. desc := r.descRLocked() descKey := keys.RangeDescriptorKey(desc.StartKey) - _, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, r.Clock().Now(), + _, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, hlc.MaxTimestamp, storage.MVCCGetOptions{Inconsistent: true}) if err != nil { - return err + return false, err } else if intent == nil { - return nil + return false, nil } val, _, err := storage.MVCCGetAsTxn( ctx, r.Engine(), descKey, intent.Txn.WriteTimestamp, intent.Txn) if err != nil { - return err + return false, err } else if val != nil { - return nil + return false, nil } // At this point, we know we have a deletion intent on our range descriptor. @@ -1489,7 +1511,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error { // Another request already noticed the merge, installed a mergeComplete // channel, and launched a goroutine to watch for the merge's completion. // Nothing more to do. - return nil + return true, nil } r.mu.mergeComplete = mergeCompleteCh // The RHS of a merge is not permitted to quiesce while a mergeComplete @@ -1620,7 +1642,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error { // requests will get dropped and retried on another node. Suppress the error. err = nil } - return err + return true, err } // maybeTransferRaftLeadershipToLeaseholderLocked attempts to transfer the diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index 7ea060d51333..93e7e97e8b9c 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -246,3 +246,14 @@ func (rec *SpanSetReplicaEvalContext) GetExternalStorageFromURI( ) (cloud.ExternalStorage, error) { return rec.i.GetExternalStorageFromURI(ctx, uri, user) } + +// RevokeLease stops the replica from using its current lease. +func (rec *SpanSetReplicaEvalContext) RevokeLease(ctx context.Context, seq roachpb.LeaseSequence) { + rec.i.RevokeLease(ctx, seq) +} + +// WatchForMerge arranges to block all requests until the in-progress merge +// completes. +func (rec *SpanSetReplicaEvalContext) WatchForMerge(ctx context.Context) error { + return rec.i.WatchForMerge(ctx) +} diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 42c01b2f1438..19f399b314f8 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -735,8 +735,9 @@ func TestLearnerNoAcceptLease(t *testing.T) { desc := tc.LookupRangeOrFatal(t, scratchStartKey) err := tc.TransferRangeLease(desc, tc.Target(1)) - if !testutils.IsError(err, `cannot transfer lease to replica of type LEARNER`) { - t.Fatalf(`expected "cannot transfer lease to replica of type LEARNER" error got: %+v`, err) + exp := `replica cannot hold lease` + if !testutils.IsError(err, exp) { + t.Fatalf(`expected %q error got: %+v`, exp, err) } } @@ -760,7 +761,7 @@ func TestJointConfigLease(t *testing.T) { require.True(t, desc.Replicas().InAtomicReplicationChange(), desc) err := tc.TransferRangeLease(desc, tc.Target(1)) - exp := `cannot transfer lease to replica of type VOTER_INCOMING` + exp := `replica cannot hold lease` require.True(t, testutils.IsError(err, exp), err) // NB: we don't have to transition out of the previous joint config first @@ -768,7 +769,6 @@ func TestJointConfigLease(t *testing.T) { // it's asked to do. desc = tc.RemoveVotersOrFatal(t, k, tc.Target(1)) err = tc.TransferRangeLease(desc, tc.Target(1)) - exp = `cannot transfer lease to replica of type VOTER_DEMOTING_LEARNER` require.True(t, testutils.IsError(err, exp), err) } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index f072339edc39..5b25354c6104 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -396,7 +396,7 @@ func (r *Replica) leasePostApplyLocked( // progress, as only the old leaseholder would have been explicitly notified // of the merge. If there is a merge in progress, maybeWatchForMerge will // arrange to block all traffic to this replica unless the merge aborts. - if err := r.maybeWatchForMergeLocked(ctx); err != nil { + if _, err := r.maybeWatchForMergeLocked(ctx); err != nil { // We were unable to determine whether a merge was in progress. We cannot // safely proceed. log.Fatalf(ctx, "failed checking for in-progress merge while installing new lease %s: %s", @@ -623,9 +623,6 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re if lResult.EndTxns != nil { log.Fatalf(ctx, "LocalEvalResult.EndTxns should be nil: %+v", lResult.EndTxns) } - if lResult.MaybeWatchForMerge { - log.Fatalf(ctx, "LocalEvalResult.MaybeWatchForMerge should be false") - } if lResult.AcquiredLocks != nil { for i := range lResult.AcquiredLocks { diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 97cfffd64cbe..8557f23eed74 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -785,18 +785,6 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID return nil, nil, errors.Errorf("unable to find store %d in range %+v", target, desc) } - // For now, don't allow replicas of type LEARNER to be leaseholders, see - // comments in RequestLease and TransferLease for why. - // - // TODO(dan): We shouldn't need this, the checks in RequestLease and - // TransferLease are the canonical ones and should be sufficient. Sadly, the - // `r.mu.minLeaseProposedTS = status.Timestamp` line below will likely play - // badly with that. This would be an issue even without learners, but - // omitting this check would make it worse. Fixme. - if t := nextLeaseHolder.GetType(); t != roachpb.VOTER_FULL { - return nil, nil, errors.Errorf(`cannot transfer lease to replica of type %s`, t) - } - if nextLease, ok := r.mu.pendingLeaseRequest.RequestPending(); ok && nextLease.Replica != nextLeaseHolder { repDesc, err := r.getReplicaDescriptorRLocked() @@ -813,15 +801,7 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID return nil, nil, newNotLeaseHolderError(nextLease, r.store.StoreID(), desc, "another transfer to a different store is in progress") } - // Stop using the current lease. All future calls to leaseStatus on this - // node with the current lease will now return a PROSCRIBED status. - // - // TODO(nvanbenschoten): since we aren't pulling the transfer time here - // anymore, we could also move this below latching as well, similar to - // how a Subsume request sets the FreezeStart time and pauses closed - // timestamps during evaluation and communicates this back up using the - // LocalResult. - r.mu.minLeaseProposedTS = now + transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest( ctx, nextLeaseHolder, status, desc.StartKey.AsRawKey(), true, /* transfer */ ) @@ -877,6 +857,17 @@ func (r *Replica) getLeaseRLocked() (roachpb.Lease, roachpb.Lease) { return *r.mu.state.Lease, roachpb.Lease{} } +// RevokeLease stops the replica from using its current lease, if that lease +// matches the provided lease sequence. All future calls to leaseStatus on this +// node with the current lease will now return a PROSCRIBED status. +func (r *Replica) RevokeLease(ctx context.Context, seq roachpb.LeaseSequence) { + r.mu.Lock() + defer r.mu.Unlock() + if r.mu.state.Lease.Sequence == seq { + r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp() + } +} + // newNotLeaseHolderError returns a NotLeaseHolderError initialized with the // replica for the holder (if any) of the given lease. // diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 190154442eae..5b77e3caad10 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -180,16 +180,6 @@ func (r *Replica) handleReadOnlyLocalEvalResult( lResult.AcquiredLocks = nil } - if lResult.MaybeWatchForMerge { - // A merge is (likely) about to be carried out, and this replica needs - // to block all traffic until the merge either commits or aborts. See - // docs/tech-notes/range-merges.md. - if err := r.maybeWatchForMerge(ctx); err != nil { - return roachpb.NewError(err) - } - lResult.MaybeWatchForMerge = false - } - if !lResult.IsZero() { log.Fatalf(ctx, "unhandled field in LocalEvalResult: %s", pretty.Diff(lResult, result.LocalResult{})) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 8a4cb17102cc..6d7fc678e039 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -741,7 +741,7 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) { return nil } transferSem := make(chan struct{}) - tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = + tsc.TestingKnobs.EvalKnobs.TestingPostEvalFilter = func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { if _, ok := filterArgs.Req.(*roachpb.TransferLeaseRequest); ok { // Notify the test that the transfer has been trapped.