diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index 108d1f90ec2a..5ad450fd5e7c 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -339,12 +339,10 @@ func (v *validator) processOp(txnID *string, op Operation) { v.failIfError(op, t.Result) } case *TransferLeaseOperation: - if resultIsErrorStr(t.Result, `cannot transfer lease to replica of type (VOTER_INCOMING|VOTER_OUTGOING|VOTER_DEMOTING|LEARNER|NON_VOTER)`) { + if resultIsErrorStr(t.Result, `replica cannot hold lease`) { // Only VOTER_FULL replicas can currently hold a range lease. // Attempts to transfer to lease to any other replica type are // rejected. - } else if resultIsErrorStr(t.Result, `replica cannot hold lease`) { - // Same case as above, but with a slightly different message. } else if resultIsErrorStr(t.Result, `unable to find store \d+ in range`) { // A lease transfer that races with a replica removal may find that // the store it was targeting is no longer part of the range. diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_test.go b/pkg/kv/kvserver/batcheval/cmd_lease_test.go index 1e4bfdc7eaaf..d5cd5efa023e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_test.go @@ -190,6 +190,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { manual := hlc.NewManualClock(123) clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + prevLease := roachpb.Lease{ + Replica: replicas[0], + Sequence: 1, + } nextLease := roachpb.Lease{ Replica: replicas[1], Start: clock.NowAsClockTimestamp(), @@ -209,16 +213,19 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { } currentReadSummary := rspb.FromTimestamp(maxPriorReadTS) + evalCtx := &MockEvalCtx{ + ClusterSettings: cluster.MakeTestingClusterSettings(), + StoreID: 1, + Desc: &desc, + Clock: clock, + Lease: prevLease, + CurrentReadSummary: currentReadSummary, + } cArgs := CommandArgs{ - EvalCtx: (&MockEvalCtx{ - ClusterSettings: cluster.MakeTestingClusterSettings(), - StoreID: 1, - Desc: &desc, - Clock: clock, - CurrentReadSummary: currentReadSummary, - }).EvalContext(), + EvalCtx: evalCtx.EvalContext(), Args: &roachpb.TransferLeaseRequest{ - Lease: nextLease, + Lease: nextLease, + PrevLease: prevLease, }, } @@ -233,6 +240,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) { require.NotNil(t, propLease) require.True(t, nextLease.Start.Less(propLease.Start)) require.True(t, beforeEval.Less(propLease.Start)) + require.Equal(t, prevLease.Sequence+1, propLease.Sequence) + + // The previous lease should have been revoked. + require.Equal(t, prevLease.Sequence, evalCtx.RevokedLeaseSeq) // The prior read summary should reflect the maximum read times // served under the current leaseholder. diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go index d744aa79d61d..fce187c5ec54 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go @@ -77,6 +77,29 @@ func TransferLease( newLease.Start.Forward(cArgs.EvalCtx.Clock().NowAsClockTimestamp()) args.Lease = roachpb.Lease{} // prevent accidental use below + // If this check is removed at some point, the filtering of learners on the + // sending side would have to be removed as well. + if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil { + return newFailedLeaseTrigger(true /* isTransfer */), err + } + + // Stop using the current lease. All future calls to leaseStatus on this + // node with the current lease will now return a PROSCRIBED status. This + // includes calls to leaseStatus from the closed timestamp side-transport, + // meaning that the following call to GetCurrentReadSummary is guaranteed to + // observe the highest closed timestamp published under this lease. + // + // We perform this action during evaluation to ensure that the lease + // revocation takes place regardless of whether the corresponding Raft + // proposal succeeds, fails, or is ambiguous - in which case there's no + // guarantee that the transfer will not still apply. + // + // NOTE: RevokeLease will be a no-op if the lease has already changed. In + // such cases, we could detect that here and fail fast, but it's safe and + // easier to just let the TransferLease be proposed under the wrong lease + // and be rejected with the correct error below Raft. + cArgs.EvalCtx.RevokeLease(ctx, args.PrevLease.Sequence) + // Collect a read summary from the outgoing leaseholder to ship to the // incoming leaseholder. This is used to instruct the new leaseholder on how // to update its timestamp cache to ensure that no future writes are allowed @@ -94,12 +117,6 @@ func TransferLease( // think about. priorReadSum.Merge(rspb.FromTimestamp(newLease.Start.ToTimestamp())) - // If this check is removed at some point, the filtering of learners on the - // sending side would have to be removed as well. - if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil { - return newFailedLeaseTrigger(true /* isTransfer */), err - } - log.VEventf(ctx, 2, "lease transfer: prev lease: %+v, new lease: %+v", prevLease, newLease) return evalNewLease(ctx, cArgs.EvalCtx, readWriter, cArgs.Stats, newLease, prevLease, &priorReadSum, false /* isExtension */, true /* isTransfer */) diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index c278afe57eb2..18cb3c5a5dd5 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -115,6 +115,16 @@ func Subsume( // but the check is too expensive as it would involve a network roundtrip on // most nodes. + // Freeze the range. Do so by blocking all requests while a newly launched + // async goroutine watches (pushes with low priority) the merge transaction. + // This will also block the closed timestamp side-transport from closing new + // timestamps, meaning that the following call to GetCurrentReadSummary is + // guaranteed to observe the highest closed timestamp ever published by this + // range (if the merge eventually completes). + if err := cArgs.EvalCtx.WatchForMerge(ctx); err != nil { + return result.Result{}, errors.Wrap(err, "watching for merge during subsume") + } + // We prevent followers of the RHS from being able to serve follower reads on // timestamps that fall in the timestamp window representing the range's // subsumed state (i.e. between the subsumption time (FreezeStart) and the @@ -189,7 +199,5 @@ func Subsume( // being merged. reply.ClosedTimestamp = closedTS - return result.Result{ - Local: result.LocalResult{MaybeWatchForMerge: true}, - }, nil + return result.Result{}, nil } diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 2f1b5bb6f978..66af76e7bf11 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -114,6 +114,15 @@ type EvalContext interface { GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage, error) + + // RevokeLease stops the replica from using its current lease, if that lease + // matches the provided lease sequence. All future calls to leaseStatus on + // this node with the current lease will now return a PROSCRIBED status. + RevokeLease(context.Context, roachpb.LeaseSequence) + + // WatchForMerge arranges to block all requests until the in-progress merge + // completes. Returns an error if no in-progress merge is detected. + WatchForMerge(ctx context.Context) error } // MockEvalCtx is a dummy implementation of EvalContext for testing purposes. @@ -131,6 +140,7 @@ type MockEvalCtx struct { CanCreateTxn func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) Lease roachpb.Lease CurrentReadSummary rspb.ReadSummary + RevokedLeaseSeq roachpb.LeaseSequence } // EvalContext returns the MockEvalCtx as an EvalContext. It will reflect future @@ -241,3 +251,9 @@ func (m *mockEvalCtxImpl) GetExternalStorageFromURI( ) (cloud.ExternalStorage, error) { panic("unimplemented") } +func (m *mockEvalCtxImpl) RevokeLease(_ context.Context, seq roachpb.LeaseSequence) { + m.RevokedLeaseSeq = seq +} +func (m *mockEvalCtxImpl) WatchForMerge(ctx context.Context) error { + panic("unimplemented") +} diff --git a/pkg/kv/kvserver/batcheval/result/BUILD.bazel b/pkg/kv/kvserver/batcheval/result/BUILD.bazel index 83fd2b90b2eb..57423b77f80b 100644 --- a/pkg/kv/kvserver/batcheval/result/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/result/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", - "//pkg/util/hlc", "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", "@com_github_kr_pretty//:pretty", diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 448bee0f2630..3ac37d887c40 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -63,8 +63,6 @@ type LocalResult struct { MaybeAddToSplitQueue bool // Call MaybeGossipNodeLiveness with the specified Span, if set. MaybeGossipNodeLiveness *roachpb.Span - // Call maybeWatchForMerge. - MaybeWatchForMerge bool // Metrics contains counters which are to be passed to the // metrics subsystem. @@ -84,7 +82,6 @@ func (lResult *LocalResult) IsZero() bool { !lResult.MaybeGossipSystemConfig && !lResult.MaybeGossipSystemConfigIfHaveFailure && lResult.MaybeGossipNodeLiveness == nil && - !lResult.MaybeWatchForMerge && lResult.Metrics == nil } @@ -97,13 +94,13 @@ func (lResult *LocalResult) String() string { "#updated txns: %d #end txns: %d, "+ "GossipFirstRange:%t MaybeGossipSystemConfig:%t "+ "MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+ - "MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t", + "MaybeGossipNodeLiveness:%s ", lResult.Reply, len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks), len(lResult.UpdatedTxns), len(lResult.EndTxns), lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue, - lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge) + lResult.MaybeGossipNodeLiveness) } // DetachEncounteredIntents returns (and removes) those encountered @@ -344,7 +341,6 @@ func (p *Result) MergeAndDestroy(q Result) error { coalesceBool(&p.Local.MaybeGossipSystemConfig, &q.Local.MaybeGossipSystemConfig) coalesceBool(&p.Local.MaybeGossipSystemConfigIfHaveFailure, &q.Local.MaybeGossipSystemConfigIfHaveFailure) coalesceBool(&p.Local.MaybeAddToSplitQueue, &q.Local.MaybeAddToSplitQueue) - coalesceBool(&p.Local.MaybeWatchForMerge, &q.Local.MaybeWatchForMerge) if p.Local.Metrics == nil { p.Local.Metrics = q.Local.Metrics diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 2afd08c63a60..8ce3e9689bae 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1452,31 +1452,53 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool { r.mu.replicaID > rightDesc.ReplicaID } -// maybeWatchForMerge checks whether a merge of this replica into its left +// WatchForMerge is like maybeWatchForMergeLocked, except it expects a merge to +// be in progress and returns an error if one is not. +// +// See docs/tech-notes/range-merges.md. +func (r *Replica) WatchForMerge(ctx context.Context) error { + ok, err := r.maybeWatchForMerge(ctx) + if err != nil { + return err + } else if !ok { + return errors.AssertionFailedf("range merge unexpectedly not in-progress") + } + return nil +} + +// maybeWatchForMergeLocked checks whether a merge of this replica into its left // neighbor is in its critical phase and, if so, arranges to block all requests -// until the merge completes. -func (r *Replica) maybeWatchForMerge(ctx context.Context) error { +// until the merge completes. Returns a boolean indicating whether a merge was +// found to be in progress. +// +// See docs/tech-notes/range-merges.md. +func (r *Replica) maybeWatchForMerge(ctx context.Context) (bool, error) { r.mu.Lock() defer r.mu.Unlock() return r.maybeWatchForMergeLocked(ctx) } -func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error { +func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) { + // Checking for a deletion intent on the local range descriptor, which + // indicates that a merge is in progress and this range is currently in its + // critical phase of being subsumed by its left-hand side neighbor. Read + // inconsistently at the maximum timestamp to ensure that we see an intent + // if one exists, regardless of what timestamp it is written at. desc := r.descRLocked() descKey := keys.RangeDescriptorKey(desc.StartKey) - _, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, r.Clock().Now(), + _, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, hlc.MaxTimestamp, storage.MVCCGetOptions{Inconsistent: true}) if err != nil { - return err + return false, err } else if intent == nil { - return nil + return false, nil } val, _, err := storage.MVCCGetAsTxn( ctx, r.Engine(), descKey, intent.Txn.WriteTimestamp, intent.Txn) if err != nil { - return err + return false, err } else if val != nil { - return nil + return false, nil } // At this point, we know we have a deletion intent on our range descriptor. @@ -1489,7 +1511,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error { // Another request already noticed the merge, installed a mergeComplete // channel, and launched a goroutine to watch for the merge's completion. // Nothing more to do. - return nil + return true, nil } r.mu.mergeComplete = mergeCompleteCh // The RHS of a merge is not permitted to quiesce while a mergeComplete @@ -1620,7 +1642,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error { // requests will get dropped and retried on another node. Suppress the error. err = nil } - return err + return true, err } // maybeTransferRaftLeadershipToLeaseholderLocked attempts to transfer the diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index 7ea060d51333..93e7e97e8b9c 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -246,3 +246,14 @@ func (rec *SpanSetReplicaEvalContext) GetExternalStorageFromURI( ) (cloud.ExternalStorage, error) { return rec.i.GetExternalStorageFromURI(ctx, uri, user) } + +// RevokeLease stops the replica from using its current lease. +func (rec *SpanSetReplicaEvalContext) RevokeLease(ctx context.Context, seq roachpb.LeaseSequence) { + rec.i.RevokeLease(ctx, seq) +} + +// WatchForMerge arranges to block all requests until the in-progress merge +// completes. +func (rec *SpanSetReplicaEvalContext) WatchForMerge(ctx context.Context) error { + return rec.i.WatchForMerge(ctx) +} diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 42c01b2f1438..19f399b314f8 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -735,8 +735,9 @@ func TestLearnerNoAcceptLease(t *testing.T) { desc := tc.LookupRangeOrFatal(t, scratchStartKey) err := tc.TransferRangeLease(desc, tc.Target(1)) - if !testutils.IsError(err, `cannot transfer lease to replica of type LEARNER`) { - t.Fatalf(`expected "cannot transfer lease to replica of type LEARNER" error got: %+v`, err) + exp := `replica cannot hold lease` + if !testutils.IsError(err, exp) { + t.Fatalf(`expected %q error got: %+v`, exp, err) } } @@ -760,7 +761,7 @@ func TestJointConfigLease(t *testing.T) { require.True(t, desc.Replicas().InAtomicReplicationChange(), desc) err := tc.TransferRangeLease(desc, tc.Target(1)) - exp := `cannot transfer lease to replica of type VOTER_INCOMING` + exp := `replica cannot hold lease` require.True(t, testutils.IsError(err, exp), err) // NB: we don't have to transition out of the previous joint config first @@ -768,7 +769,6 @@ func TestJointConfigLease(t *testing.T) { // it's asked to do. desc = tc.RemoveVotersOrFatal(t, k, tc.Target(1)) err = tc.TransferRangeLease(desc, tc.Target(1)) - exp = `cannot transfer lease to replica of type VOTER_DEMOTING_LEARNER` require.True(t, testutils.IsError(err, exp), err) } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index f072339edc39..5b25354c6104 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -396,7 +396,7 @@ func (r *Replica) leasePostApplyLocked( // progress, as only the old leaseholder would have been explicitly notified // of the merge. If there is a merge in progress, maybeWatchForMerge will // arrange to block all traffic to this replica unless the merge aborts. - if err := r.maybeWatchForMergeLocked(ctx); err != nil { + if _, err := r.maybeWatchForMergeLocked(ctx); err != nil { // We were unable to determine whether a merge was in progress. We cannot // safely proceed. log.Fatalf(ctx, "failed checking for in-progress merge while installing new lease %s: %s", @@ -623,9 +623,6 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re if lResult.EndTxns != nil { log.Fatalf(ctx, "LocalEvalResult.EndTxns should be nil: %+v", lResult.EndTxns) } - if lResult.MaybeWatchForMerge { - log.Fatalf(ctx, "LocalEvalResult.MaybeWatchForMerge should be false") - } if lResult.AcquiredLocks != nil { for i := range lResult.AcquiredLocks { diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 97cfffd64cbe..8557f23eed74 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -785,18 +785,6 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID return nil, nil, errors.Errorf("unable to find store %d in range %+v", target, desc) } - // For now, don't allow replicas of type LEARNER to be leaseholders, see - // comments in RequestLease and TransferLease for why. - // - // TODO(dan): We shouldn't need this, the checks in RequestLease and - // TransferLease are the canonical ones and should be sufficient. Sadly, the - // `r.mu.minLeaseProposedTS = status.Timestamp` line below will likely play - // badly with that. This would be an issue even without learners, but - // omitting this check would make it worse. Fixme. - if t := nextLeaseHolder.GetType(); t != roachpb.VOTER_FULL { - return nil, nil, errors.Errorf(`cannot transfer lease to replica of type %s`, t) - } - if nextLease, ok := r.mu.pendingLeaseRequest.RequestPending(); ok && nextLease.Replica != nextLeaseHolder { repDesc, err := r.getReplicaDescriptorRLocked() @@ -813,15 +801,7 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID return nil, nil, newNotLeaseHolderError(nextLease, r.store.StoreID(), desc, "another transfer to a different store is in progress") } - // Stop using the current lease. All future calls to leaseStatus on this - // node with the current lease will now return a PROSCRIBED status. - // - // TODO(nvanbenschoten): since we aren't pulling the transfer time here - // anymore, we could also move this below latching as well, similar to - // how a Subsume request sets the FreezeStart time and pauses closed - // timestamps during evaluation and communicates this back up using the - // LocalResult. - r.mu.minLeaseProposedTS = now + transfer = r.mu.pendingLeaseRequest.InitOrJoinRequest( ctx, nextLeaseHolder, status, desc.StartKey.AsRawKey(), true, /* transfer */ ) @@ -877,6 +857,17 @@ func (r *Replica) getLeaseRLocked() (roachpb.Lease, roachpb.Lease) { return *r.mu.state.Lease, roachpb.Lease{} } +// RevokeLease stops the replica from using its current lease, if that lease +// matches the provided lease sequence. All future calls to leaseStatus on this +// node with the current lease will now return a PROSCRIBED status. +func (r *Replica) RevokeLease(ctx context.Context, seq roachpb.LeaseSequence) { + r.mu.Lock() + defer r.mu.Unlock() + if r.mu.state.Lease.Sequence == seq { + r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp() + } +} + // newNotLeaseHolderError returns a NotLeaseHolderError initialized with the // replica for the holder (if any) of the given lease. // diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 190154442eae..5b77e3caad10 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -180,16 +180,6 @@ func (r *Replica) handleReadOnlyLocalEvalResult( lResult.AcquiredLocks = nil } - if lResult.MaybeWatchForMerge { - // A merge is (likely) about to be carried out, and this replica needs - // to block all traffic until the merge either commits or aborts. See - // docs/tech-notes/range-merges.md. - if err := r.maybeWatchForMerge(ctx); err != nil { - return roachpb.NewError(err) - } - lResult.MaybeWatchForMerge = false - } - if !lResult.IsZero() { log.Fatalf(ctx, "unhandled field in LocalEvalResult: %s", pretty.Diff(lResult, result.LocalResult{})) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 8a4cb17102cc..6d7fc678e039 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -741,7 +741,7 @@ func TestBehaviorDuringLeaseTransfer(t *testing.T) { return nil } transferSem := make(chan struct{}) - tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = + tsc.TestingKnobs.EvalKnobs.TestingPostEvalFilter = func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { if _, ok := filterArgs.Req.(*roachpb.TransferLeaseRequest); ok { // Notify the test that the transfer has been trapped.