Skip to content

Commit

Permalink
kv: sync lease transfers and range merges with closedts side-transport
Browse files Browse the repository at this point in the history
Needed for #61137.

This commit updates the manner through which lease transfers (through
`LeaseTransferRequest`) and range merges (through `SubsumeRequest`)
handle the "transfer of power" from their outgoing leaseholder to their
incoming leaseholder. Specifically, it updates the handling of these
requests in order to rationalize the interaction between their
evaluation and the closing of timestamps through the closed timestamp
side-transport. It is now clearer when and how these requests instruct
the outgoing leaseholder to relinquish its ability to advance the closed
timestamp and, as a result, now possible for the requests to query and
operate on the maximum closed timestamp published by the outgoing leaseholder.

For lease transfers, this commit begins by addressing an existing TODO
to push the revocation of the outgoing lease out of `AdminTransferLease`
and into the evaluation of `LeaseTransferRequest` through a new
`RevokeLease` method on the `EvalContext`. Once a lease is revoked, the
side-transport will no longer be able to advance the closed timestamp
under it. This was made possible by #59086 and was suggested by @tbg
during the code review. We generally like to keep replica state changes
out of "admin" requests themselves, which are intended to coordinate
changes through individual non-admin requests. Admin requests generally
don't even need to evaluate on a leaseholder (though they try to), so
having them perform any state changes is fragile.

For range merges, this commit removes the `MaybeWatchForMerge` flag from
the `LocalResult` returned by `SubsumeRequest` and replaces it with a
`WatchForMerge` method on the `EvalContext`. This allows the
`SubsumeRequest` to launch the range merge watcher goroutine during it
evaluation, which the side-transport checks for to see whether a
leaseholder can advance its closed timestamp. In doing so, the
`SubsumeRequest` is able to pause closed timestamps when it wants and is
also able to observe and return the maximum closed timestamp _after_ the
closed timestamp has stopped advancing. This is a stark improvement over
the approach with the original closed timestamp system, which required a
herculean effort in #50265 to make correct.

With these refactors complete, the closed timestamp side-transport
should have a much easier and safer time checking whether a given
leaseholder is able to advance its closed timestamp.

Release justification: Necessary for the safety of new functionality.
  • Loading branch information
nvanbenschoten committed Mar 1, 2021
1 parent a3cb57c commit 9c3ed73
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 78 deletions.
4 changes: 1 addition & 3 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,10 @@ func (v *validator) processOp(txnID *string, op Operation) {
v.failIfError(op, t.Result)
}
case *TransferLeaseOperation:
if resultIsErrorStr(t.Result, `cannot transfer lease to replica of type (VOTER_INCOMING|VOTER_OUTGOING|VOTER_DEMOTING|LEARNER|NON_VOTER)`) {
if resultIsErrorStr(t.Result, `replica cannot hold lease`) {
// Only VOTER_FULL replicas can currently hold a range lease.
// Attempts to transfer to lease to any other replica type are
// rejected.
} else if resultIsErrorStr(t.Result, `replica cannot hold lease`) {
// Same case as above, but with a slightly different message.
} else if resultIsErrorStr(t.Result, `unable to find store \d+ in range`) {
// A lease transfer that races with a replica removal may find that
// the store it was targeting is no longer part of the range.
Expand Down
27 changes: 19 additions & 8 deletions pkg/kv/kvserver/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) {
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)

prevLease := roachpb.Lease{
Replica: replicas[0],
Sequence: 1,
}
nextLease := roachpb.Lease{
Replica: replicas[1],
Start: clock.NowAsClockTimestamp(),
Expand All @@ -209,16 +213,19 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) {
}
currentReadSummary := rspb.FromTimestamp(maxPriorReadTS)

evalCtx := &MockEvalCtx{
ClusterSettings: cluster.MakeTestingClusterSettings(),
StoreID: 1,
Desc: &desc,
Clock: clock,
Lease: prevLease,
CurrentReadSummary: currentReadSummary,
}
cArgs := CommandArgs{
EvalCtx: (&MockEvalCtx{
ClusterSettings: cluster.MakeTestingClusterSettings(),
StoreID: 1,
Desc: &desc,
Clock: clock,
CurrentReadSummary: currentReadSummary,
}).EvalContext(),
EvalCtx: evalCtx.EvalContext(),
Args: &roachpb.TransferLeaseRequest{
Lease: nextLease,
Lease: nextLease,
PrevLease: prevLease,
},
}

Expand All @@ -233,6 +240,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) {
require.NotNil(t, propLease)
require.True(t, nextLease.Start.Less(propLease.Start))
require.True(t, beforeEval.Less(propLease.Start))
require.Equal(t, prevLease.Sequence+1, propLease.Sequence)

// The previous lease should have been revoked.
require.Equal(t, prevLease.Sequence, evalCtx.RevokedLeaseSeq)

// The prior read summary should reflect the maximum read times
// served under the current leaseholder.
Expand Down
33 changes: 27 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_lease_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,33 @@ func TransferLease(
newLease.Start.Forward(cArgs.EvalCtx.Clock().NowAsClockTimestamp())
args.Lease = roachpb.Lease{} // prevent accidental use below

// If this check is removed at some point, the filtering of learners on the
// sending side would have to be removed as well.
if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil {
return newFailedLeaseTrigger(true /* isTransfer */), err
}

// Stop using the current lease. All future calls to leaseStatus on this
// node with the current lease will now return a PROSCRIBED status. This
// includes calls to leaseStatus from the closed timestamp side-transport,
// meaning that the following call to GetCurrentReadSummary is guaranteed to
// observe the highest closed timestamp published under this lease.
//
// We perform this action during evaluation to ensure that the lease
// revocation takes place regardless of whether the corresponding Raft
// proposal succeeds, fails, or is ambiguous - in which case there's no
// guarantee that the transfer will not still apply. This means that if the
// proposal fails, we'll have relinquished the current lease but not managed
// to give the lease to someone else, so we'll have to re-acquire the lease
// again through a RequestLease request to recover. This situation is tested
// in TestBehaviorDuringLeaseTransfer/transferSucceeds=false.
//
// NOTE: RevokeLease will be a no-op if the lease has already changed. In
// such cases, we could detect that here and fail fast, but it's safe and
// easier to just let the TransferLease be proposed under the wrong lease
// and be rejected with the correct error below Raft.
cArgs.EvalCtx.RevokeLease(ctx, args.PrevLease.Sequence)

// Collect a read summary from the outgoing leaseholder to ship to the
// incoming leaseholder. This is used to instruct the new leaseholder on how
// to update its timestamp cache to ensure that no future writes are allowed
Expand All @@ -94,12 +121,6 @@ func TransferLease(
// think about.
priorReadSum.Merge(rspb.FromTimestamp(newLease.Start.ToTimestamp()))

// If this check is removed at some point, the filtering of learners on the
// sending side would have to be removed as well.
if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil {
return newFailedLeaseTrigger(true /* isTransfer */), err
}

log.VEventf(ctx, 2, "lease transfer: prev lease: %+v, new lease: %+v", prevLease, newLease)
return evalNewLease(ctx, cArgs.EvalCtx, readWriter, cArgs.Stats,
newLease, prevLease, &priorReadSum, false /* isExtension */, true /* isTransfer */)
Expand Down
14 changes: 11 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ func Subsume(
// but the check is too expensive as it would involve a network roundtrip on
// most nodes.

// Freeze the range. Do so by blocking all requests while a newly launched
// async goroutine watches (pushes with low priority) the merge transaction.
// This will also block the closed timestamp side-transport from closing new
// timestamps, meaning that the following call to GetCurrentReadSummary is
// guaranteed to observe the highest closed timestamp ever published by this
// range (if the merge eventually completes).
if err := cArgs.EvalCtx.WatchForMerge(ctx); err != nil {
return result.Result{}, errors.Wrap(err, "watching for merge during subsume")
}

// We prevent followers of the RHS from being able to serve follower reads on
// timestamps that fall in the timestamp window representing the range's
// subsumed state (i.e. between the subsumption time (FreezeStart) and the
Expand Down Expand Up @@ -189,7 +199,5 @@ func Subsume(
// being merged.
reply.ClosedTimestamp = closedTS

return result.Result{
Local: result.LocalResult{MaybeWatchForMerge: true},
}, nil
return result.Result{}, nil
}
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ type EvalContext interface {
GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error)
GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage,
error)

// RevokeLease stops the replica from using its current lease, if that lease
// matches the provided lease sequence. All future calls to leaseStatus on
// this node with the current lease will now return a PROSCRIBED status.
RevokeLease(context.Context, roachpb.LeaseSequence)

// WatchForMerge arranges to block all requests until the in-progress merge
// completes. Returns an error if no in-progress merge is detected.
WatchForMerge(ctx context.Context) error
}

// MockEvalCtx is a dummy implementation of EvalContext for testing purposes.
Expand All @@ -132,6 +141,7 @@ type MockEvalCtx struct {
CanCreateTxn func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason)
Lease roachpb.Lease
CurrentReadSummary rspb.ReadSummary
RevokedLeaseSeq roachpb.LeaseSequence
}

// EvalContext returns the MockEvalCtx as an EvalContext. It will reflect future
Expand Down Expand Up @@ -242,3 +252,9 @@ func (m *mockEvalCtxImpl) GetExternalStorageFromURI(
) (cloud.ExternalStorage, error) {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) RevokeLease(_ context.Context, seq roachpb.LeaseSequence) {
m.RevokedLeaseSeq = seq
}
func (m *mockEvalCtxImpl) WatchForMerge(ctx context.Context) error {
panic("unimplemented")
}
1 change: 0 additions & 1 deletion pkg/kv/kvserver/batcheval/result/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_library(
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_kr_pretty//:pretty",
Expand Down
8 changes: 2 additions & 6 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ type LocalResult struct {
MaybeAddToSplitQueue bool
// Call MaybeGossipNodeLiveness with the specified Span, if set.
MaybeGossipNodeLiveness *roachpb.Span
// Call maybeWatchForMerge.
MaybeWatchForMerge bool

// Metrics contains counters which are to be passed to the
// metrics subsystem.
Expand All @@ -84,7 +82,6 @@ func (lResult *LocalResult) IsZero() bool {
!lResult.MaybeGossipSystemConfig &&
!lResult.MaybeGossipSystemConfigIfHaveFailure &&
lResult.MaybeGossipNodeLiveness == nil &&
!lResult.MaybeWatchForMerge &&
lResult.Metrics == nil
}

Expand All @@ -97,13 +94,13 @@ func (lResult *LocalResult) String() string {
"#updated txns: %d #end txns: %d, "+
"GossipFirstRange:%t MaybeGossipSystemConfig:%t "+
"MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+
"MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t",
"MaybeGossipNodeLiveness:%s ",
lResult.Reply,
len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks),
len(lResult.UpdatedTxns), len(lResult.EndTxns),
lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig,
lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue,
lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge)
lResult.MaybeGossipNodeLiveness)
}

// DetachEncounteredIntents returns (and removes) those encountered
Expand Down Expand Up @@ -344,7 +341,6 @@ func (p *Result) MergeAndDestroy(q Result) error {
coalesceBool(&p.Local.MaybeGossipSystemConfig, &q.Local.MaybeGossipSystemConfig)
coalesceBool(&p.Local.MaybeGossipSystemConfigIfHaveFailure, &q.Local.MaybeGossipSystemConfigIfHaveFailure)
coalesceBool(&p.Local.MaybeAddToSplitQueue, &q.Local.MaybeAddToSplitQueue)
coalesceBool(&p.Local.MaybeWatchForMerge, &q.Local.MaybeWatchForMerge)

if p.Local.Metrics == nil {
p.Local.Metrics = q.Local.Metrics
Expand Down
44 changes: 33 additions & 11 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1444,31 +1444,53 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool {
r.mu.replicaID > rightDesc.ReplicaID
}

// maybeWatchForMerge checks whether a merge of this replica into its left
// WatchForMerge is like maybeWatchForMergeLocked, except it expects a merge to
// be in progress and returns an error if one is not.
//
// See docs/tech-notes/range-merges.md.
func (r *Replica) WatchForMerge(ctx context.Context) error {
ok, err := r.maybeWatchForMerge(ctx)
if err != nil {
return err
} else if !ok {
return errors.AssertionFailedf("range merge unexpectedly not in-progress")
}
return nil
}

// maybeWatchForMergeLocked checks whether a merge of this replica into its left
// neighbor is in its critical phase and, if so, arranges to block all requests
// until the merge completes.
func (r *Replica) maybeWatchForMerge(ctx context.Context) error {
// until the merge completes. Returns a boolean indicating whether a merge was
// found to be in progress.
//
// See docs/tech-notes/range-merges.md.
func (r *Replica) maybeWatchForMerge(ctx context.Context) (bool, error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.maybeWatchForMergeLocked(ctx)
}

func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error {
func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
// Checking for a deletion intent on the local range descriptor, which
// indicates that a merge is in progress and this range is currently in its
// critical phase of being subsumed by its left-hand side neighbor. Read
// inconsistently at the maximum timestamp to ensure that we see an intent
// if one exists, regardless of what timestamp it is written at.
desc := r.descRLocked()
descKey := keys.RangeDescriptorKey(desc.StartKey)
_, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, r.Clock().Now(),
_, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, hlc.MaxTimestamp,
storage.MVCCGetOptions{Inconsistent: true})
if err != nil {
return err
return false, err
} else if intent == nil {
return nil
return false, nil
}
val, _, err := storage.MVCCGetAsTxn(
ctx, r.Engine(), descKey, intent.Txn.WriteTimestamp, intent.Txn)
if err != nil {
return err
return false, err
} else if val != nil {
return nil
return false, nil
}

// At this point, we know we have a deletion intent on our range descriptor.
Expand All @@ -1481,7 +1503,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error {
// Another request already noticed the merge, installed a mergeComplete
// channel, and launched a goroutine to watch for the merge's completion.
// Nothing more to do.
return nil
return true, nil
}
r.mu.mergeComplete = mergeCompleteCh
// The RHS of a merge is not permitted to quiesce while a mergeComplete
Expand Down Expand Up @@ -1612,7 +1634,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) error {
// requests will get dropped and retried on another node. Suppress the error.
err = nil
}
return err
return true, err
}

// maybeTransferRaftLeadershipToLeaseholderLocked attempts to transfer the
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,14 @@ func (rec *SpanSetReplicaEvalContext) GetExternalStorageFromURI(
) (cloud.ExternalStorage, error) {
return rec.i.GetExternalStorageFromURI(ctx, uri, user)
}

// RevokeLease stops the replica from using its current lease.
func (rec *SpanSetReplicaEvalContext) RevokeLease(ctx context.Context, seq roachpb.LeaseSequence) {
rec.i.RevokeLease(ctx, seq)
}

// WatchForMerge arranges to block all requests until the in-progress merge
// completes.
func (rec *SpanSetReplicaEvalContext) WatchForMerge(ctx context.Context) error {
return rec.i.WatchForMerge(ctx)
}
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,9 @@ func TestLearnerNoAcceptLease(t *testing.T) {

desc := tc.LookupRangeOrFatal(t, scratchStartKey)
err := tc.TransferRangeLease(desc, tc.Target(1))
if !testutils.IsError(err, `cannot transfer lease to replica of type LEARNER`) {
t.Fatalf(`expected "cannot transfer lease to replica of type LEARNER" error got: %+v`, err)
exp := `replica cannot hold lease`
if !testutils.IsError(err, exp) {
t.Fatalf(`expected %q error got: %+v`, exp, err)
}
}

Expand All @@ -760,15 +761,14 @@ func TestJointConfigLease(t *testing.T) {
require.True(t, desc.Replicas().InAtomicReplicationChange(), desc)

err := tc.TransferRangeLease(desc, tc.Target(1))
exp := `cannot transfer lease to replica of type VOTER_INCOMING`
exp := `replica cannot hold lease`
require.True(t, testutils.IsError(err, exp), err)

// NB: we don't have to transition out of the previous joint config first
// because this is done automatically by ChangeReplicas before it does what
// it's asked to do.
desc = tc.RemoveVotersOrFatal(t, k, tc.Target(1))
err = tc.TransferRangeLease(desc, tc.Target(1))
exp = `cannot transfer lease to replica of type VOTER_DEMOTING_LEARNER`
require.True(t, testutils.IsError(err, exp), err)
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (r *Replica) leasePostApplyLocked(
// progress, as only the old leaseholder would have been explicitly notified
// of the merge. If there is a merge in progress, maybeWatchForMerge will
// arrange to block all traffic to this replica unless the merge aborts.
if err := r.maybeWatchForMergeLocked(ctx); err != nil {
if _, err := r.maybeWatchForMergeLocked(ctx); err != nil {
// We were unable to determine whether a merge was in progress. We cannot
// safely proceed.
log.Fatalf(ctx, "failed checking for in-progress merge while installing new lease %s: %s",
Expand Down Expand Up @@ -623,9 +623,6 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re
if lResult.EndTxns != nil {
log.Fatalf(ctx, "LocalEvalResult.EndTxns should be nil: %+v", lResult.EndTxns)
}
if lResult.MaybeWatchForMerge {
log.Fatalf(ctx, "LocalEvalResult.MaybeWatchForMerge should be false")
}

if lResult.AcquiredLocks != nil {
for i := range lResult.AcquiredLocks {
Expand Down
Loading

0 comments on commit 9c3ed73

Please sign in to comment.