Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
61221: kv: sync lease transfers and range merges with closed timestamp side-transport r=nvanbenschoten a=nvanbenschoten

Needed for the safety of cockroachdb#61137.

This commit updates the manner through which lease transfers (through `LeaseTransferRequest`) and range merges (through `SubsumeRequest`) handle the "transfer of power" from their outgoing leaseholder to their incoming leaseholder. Specifically, it updates the handling of these requests in order to rationalize the interaction between their evaluation and the closing of timestamps through the closed timestamp side-transport. It is now clearer when and how these requests instruct the outgoing leaseholder to relinquish its ability to advance the closed timestamp and, as a result, now possible for the requests to query and operate on the maximum closed timestamp published by the outgoing leaseholder.

For lease transfers, this commit begins by addressing an existing TODO to push the revocation of the outgoing lease out of `AdminTransferLease` and into the evaluation of `LeaseTransferRequest` through a new `RevokeLease` method on the `EvalContext`. Once a lease is revoked, the side-transport will no longer be able to advance the closed timestamp under it. This was made possible by cockroachdb#59086 and was suggested by @tbg during the code review. We generally like to keep replica state changes out of "admin" requests themselves, which are intended to coordinate changes through individual non-admin requests. Admin requests generally don't even need to evaluate on a leaseholder (though they try to), so having them perform any state changes is fragile.

For range merges, this commit removes the `MaybeWatchForMerge` flag from the `LocalResult` returned by `SubsumeRequest` and replaces it with a `WatchForMerge` method on the `EvalContext`. This allows the `SubsumeRequest` to launch the range merge watcher goroutine during it evaluation, which the side-transport checks for to see whether a leaseholder can advance its closed timestamp. In doing so, the `SubsumeRequest` is able to pause closed timestamps when it wants and is also able to observe and return the maximum closed timestamp _after_ the closed timestamp has stopped advancing. This is a stark improvement over the approach with the original closed timestamp system, which required a herculean effort in cockroachdb#50265 to make correct.

With these refactors complete, the closed timestamp side-transport should have a much easier and safer time checking whether a given leaseholder is able to advance its closed timestamp.

Release justification: Necessary for the safety of new functionality.

61237: util/log: ensure that all channel logs are displayed with `-show-logs` r=tbg a=knz

When `-show-logs` is specified, the `log.Scope` becomes a no-op and
the default configuration in the `log` package is used. This is the
only time ever when the default configuration is used.

Prior to this patch, only the logging for the DEV channel would
make its way to the standard error (and the test output) in that
case. This was unfortunate, since the intent (as spelled out in a
comment already) was to display everything.

This patch fixes that.

Release justification: non-production code changes

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
3 people committed Mar 1, 2021
3 parents 8a31338 + 9c3ed73 + a3d35c8 commit 3fe1992
Show file tree
Hide file tree
Showing 20 changed files with 303 additions and 299 deletions.
4 changes: 1 addition & 3 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,10 @@ func (v *validator) processOp(txnID *string, op Operation) {
v.failIfError(op, t.Result)
}
case *TransferLeaseOperation:
if resultIsErrorStr(t.Result, `cannot transfer lease to replica of type (VOTER_INCOMING|VOTER_OUTGOING|VOTER_DEMOTING|LEARNER|NON_VOTER)`) {
if resultIsErrorStr(t.Result, `replica cannot hold lease`) {
// Only VOTER_FULL replicas can currently hold a range lease.
// Attempts to transfer to lease to any other replica type are
// rejected.
} else if resultIsErrorStr(t.Result, `replica cannot hold lease`) {
// Same case as above, but with a slightly different message.
} else if resultIsErrorStr(t.Result, `unable to find store \d+ in range`) {
// A lease transfer that races with a replica removal may find that
// the store it was targeting is no longer part of the range.
Expand Down
27 changes: 19 additions & 8 deletions pkg/kv/kvserver/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) {
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)

prevLease := roachpb.Lease{
Replica: replicas[0],
Sequence: 1,
}
nextLease := roachpb.Lease{
Replica: replicas[1],
Start: clock.NowAsClockTimestamp(),
Expand All @@ -209,16 +213,19 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) {
}
currentReadSummary := rspb.FromTimestamp(maxPriorReadTS)

evalCtx := &MockEvalCtx{
ClusterSettings: cluster.MakeTestingClusterSettings(),
StoreID: 1,
Desc: &desc,
Clock: clock,
Lease: prevLease,
CurrentReadSummary: currentReadSummary,
}
cArgs := CommandArgs{
EvalCtx: (&MockEvalCtx{
ClusterSettings: cluster.MakeTestingClusterSettings(),
StoreID: 1,
Desc: &desc,
Clock: clock,
CurrentReadSummary: currentReadSummary,
}).EvalContext(),
EvalCtx: evalCtx.EvalContext(),
Args: &roachpb.TransferLeaseRequest{
Lease: nextLease,
Lease: nextLease,
PrevLease: prevLease,
},
}

Expand All @@ -233,6 +240,10 @@ func TestLeaseTransferForwardsStartTime(t *testing.T) {
require.NotNil(t, propLease)
require.True(t, nextLease.Start.Less(propLease.Start))
require.True(t, beforeEval.Less(propLease.Start))
require.Equal(t, prevLease.Sequence+1, propLease.Sequence)

// The previous lease should have been revoked.
require.Equal(t, prevLease.Sequence, evalCtx.RevokedLeaseSeq)

// The prior read summary should reflect the maximum read times
// served under the current leaseholder.
Expand Down
35 changes: 28 additions & 7 deletions pkg/kv/kvserver/batcheval/cmd_lease_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,38 @@ func TransferLease(
newLease.Start.Forward(cArgs.EvalCtx.Clock().NowAsClockTimestamp())
args.Lease = roachpb.Lease{} // prevent accidental use below

// If this check is removed at some point, the filtering of learners on the
// sending side would have to be removed as well.
if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil {
return newFailedLeaseTrigger(true /* isTransfer */), err
}

// Stop using the current lease. All future calls to leaseStatus on this
// node with the current lease will now return a PROSCRIBED status. This
// includes calls to leaseStatus from the closed timestamp side-transport,
// meaning that the following call to GetCurrentReadSummary is guaranteed to
// observe the highest closed timestamp published under this lease.
//
// We perform this action during evaluation to ensure that the lease
// revocation takes place regardless of whether the corresponding Raft
// proposal succeeds, fails, or is ambiguous - in which case there's no
// guarantee that the transfer will not still apply. This means that if the
// proposal fails, we'll have relinquished the current lease but not managed
// to give the lease to someone else, so we'll have to re-acquire the lease
// again through a RequestLease request to recover. This situation is tested
// in TestBehaviorDuringLeaseTransfer/transferSucceeds=false.
//
// NOTE: RevokeLease will be a no-op if the lease has already changed. In
// such cases, we could detect that here and fail fast, but it's safe and
// easier to just let the TransferLease be proposed under the wrong lease
// and be rejected with the correct error below Raft.
cArgs.EvalCtx.RevokeLease(ctx, args.PrevLease.Sequence)

// Collect a read summary from the outgoing leaseholder to ship to the
// incoming leaseholder. This is used to instruct the new leaseholder on how
// to update its timestamp cache to ensure that no future writes are allowed
// to invalidate prior reads.
priorReadSum := cArgs.EvalCtx.GetCurrentReadSummary()
priorReadSum, _ := cArgs.EvalCtx.GetCurrentReadSummary()
// For now, forward this summary to the proposed lease's start time. This
// may appear to undermine the benefit of the read summary, but it doesn't
// entirely. Until we ship higher-resolution read summaries, the read
Expand All @@ -94,12 +121,6 @@ func TransferLease(
// think about.
priorReadSum.Merge(rspb.FromTimestamp(newLease.Start.ToTimestamp()))

// If this check is removed at some point, the filtering of learners on the
// sending side would have to be removed as well.
if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc()); err != nil {
return newFailedLeaseTrigger(true /* isTransfer */), err
}

log.VEventf(ctx, 2, "lease transfer: prev lease: %+v, new lease: %+v", prevLease, newLease)
return evalNewLease(ctx, cArgs.EvalCtx, readWriter, cArgs.Stats,
newLease, prevLease, &priorReadSum, false /* isExtension */, true /* isTransfer */)
Expand Down
51 changes: 33 additions & 18 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,21 @@ func Subsume(
return result.Result{}, errors.AssertionFailedf("non-deletion intent on local range descriptor")
}

// NOTE: the deletion intent on the range's meta2 descriptor is just as
// important to correctness as the deletion intent on the local descriptor,
// but the check is too expensive as it would involve a network roundtrip on
// most nodes.

// Freeze the range. Do so by blocking all requests while a newly launched
// async goroutine watches (pushes with low priority) the merge transaction.
// This will also block the closed timestamp side-transport from closing new
// timestamps, meaning that the following call to GetCurrentReadSummary is
// guaranteed to observe the highest closed timestamp ever published by this
// range (if the merge eventually completes).
if err := cArgs.EvalCtx.WatchForMerge(ctx); err != nil {
return result.Result{}, errors.Wrap(err, "watching for merge during subsume")
}

// We prevent followers of the RHS from being able to serve follower reads on
// timestamps that fall in the timestamp window representing the range's
// subsumed state (i.e. between the subsumption time (FreezeStart) and the
Expand Down Expand Up @@ -142,33 +157,25 @@ func Subsume(
// is subsumed, we ensure that the initial MLAI update broadcast by the new
// leaseholder respects the invariant in question, in much the same way we do
// here. Take a look at `EmitMLAI()` in replica_closedts.go for more details.
//
// TODO(nvanbenschoten): remove this in v21.2 when the rest of the v1 closed
// timestamp system disappears.
_, untrack := cArgs.EvalCtx.GetTracker().Track(ctx)
lease, _ := cArgs.EvalCtx.GetLease()
lai := cArgs.EvalCtx.GetLeaseAppliedIndex()
untrack(ctx, ctpb.Epoch(lease.Epoch), desc.RangeID, ctpb.LAI(lai+1))

// NOTE: the deletion intent on the range's meta2 descriptor is just as
// important to correctness as the deletion intent on the local descriptor,
// but the check is too expensive as it would involve a network roundtrip on
// most nodes.

// Now that the range is frozen, collect some information to ship to the LHS
// leaseholder through the merge trigger.
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.LeaseAppliedIndex = lai
reply.FreezeStart = cArgs.EvalCtx.Clock().NowAsClockTimestamp()
// FrozenClosedTimestamp might return an empty timestamp if the Raft-based
// closed timestamp transport hasn't been enabled yet. That's OK because, if
// the new transport is not enabled, then ranges with leading closed
// timestamps can't exist yet, and so the closed timestamp must be below the
// FreezeStart. The FreezeStart is used by Store.MergeRange to bump the RHS'
// ts cache if LHS/RHS leases are not collocated. The case when the leases are
// collocated also works out because then the closed timestamp (according to
// the old mechanism) is the same for both ranges being merged.
reply.ClosedTimestamp = cArgs.EvalCtx.GetFrozenClosedTimestamp()

// Collect a read summary from the RHS leaseholder to ship to the LHS
// leaseholder. This is used to instruct the LHS on how to update its
// timestamp cache to ensure that no future writes are allowed to invalidate
// prior reads performed to this point on the RHS range.
priorReadSum := cArgs.EvalCtx.GetCurrentReadSummary()
priorReadSum, closedTS := cArgs.EvalCtx.GetCurrentReadSummary()
// For now, forward this summary to the freeze time. This may appear to
// undermine the benefit of the read summary, but it doesn't entirely. Until
// we ship higher-resolution read summaries, the read summary doesn't
Expand All @@ -181,8 +188,16 @@ func Subsume(
// think about.
priorReadSum.Merge(rspb.FromTimestamp(reply.FreezeStart.ToTimestamp()))
reply.ReadSummary = &priorReadSum
// NOTE FOR v21.1: GetCurrentReadSummary might return an empty timestamp if
// the Raft-based closed timestamp transport hasn't been enabled yet. That's
// OK because, if the new transport is not enabled, then ranges with leading
// closed timestamps can't exist yet, and so the closed timestamp must be
// below the FreezeStart. The FreezeStart is used by Store.MergeRange to
// bump the RHS' ts cache if LHS/RHS leases are not collocated. The case
// when the leases are collocated also works out because then the closed
// timestamp (according to the old mechanism) is the same for both ranges
// being merged.
reply.ClosedTimestamp = closedTS

return result.Result{
Local: result.LocalResult{FreezeStart: reply.FreezeStart.ToTimestamp()},
}, nil
return result.Result{}, nil
}
33 changes: 26 additions & 7 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,33 @@ type EvalContext interface {
GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error)
GetLease() (roachpb.Lease, roachpb.Lease)
GetRangeInfo(context.Context) roachpb.RangeInfo
GetFrozenClosedTimestamp() hlc.Timestamp

// GetCurrentReadSummary returns a new ReadSummary reflecting all reads
// served by the range to this point. The method requires a write latch
// across all keys in the range (see declareAllKeys), because it will only
// return a meaningful summary if the caller has serialized with all other
// requests on the range.
GetCurrentReadSummary() rspb.ReadSummary
//
// The method also returns the current closed timestamp on the range. This
// closed timestamp is already incorporated into the read summary, but some
// callers also need is separated out. It is expected that a caller will
// have performed some action (either calling RevokeLease or WatchForMerge)
// to freeze further progression of the closed timestamp before calling this
// method.
GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp)

GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error)
GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage,
error)

// RevokeLease stops the replica from using its current lease, if that lease
// matches the provided lease sequence. All future calls to leaseStatus on
// this node with the current lease will now return a PROSCRIBED status.
RevokeLease(context.Context, roachpb.LeaseSequence)

// WatchForMerge arranges to block all requests until the in-progress merge
// completes. Returns an error if no in-progress merge is detected.
WatchForMerge(ctx context.Context) error
}

// MockEvalCtx is a dummy implementation of EvalContext for testing purposes.
Expand All @@ -126,6 +141,7 @@ type MockEvalCtx struct {
CanCreateTxn func() (bool, hlc.Timestamp, roachpb.TransactionAbortedReason)
Lease roachpb.Lease
CurrentReadSummary rspb.ReadSummary
RevokedLeaseSeq roachpb.LeaseSequence
}

// EvalContext returns the MockEvalCtx as an EvalContext. It will reflect future
Expand Down Expand Up @@ -194,9 +210,6 @@ func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() uint64 {
func (m *mockEvalCtxImpl) GetTracker() closedts.TrackerI {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) GetFrozenClosedTimestamp() hlc.Timestamp {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) Desc() *roachpb.RangeDescriptor {
return m.MockEvalCtx.Desc
}
Expand Down Expand Up @@ -226,8 +239,8 @@ func (m *mockEvalCtxImpl) GetLease() (roachpb.Lease, roachpb.Lease) {
func (m *mockEvalCtxImpl) GetRangeInfo(ctx context.Context) roachpb.RangeInfo {
return roachpb.RangeInfo{Desc: *m.Desc(), Lease: m.Lease}
}
func (m *mockEvalCtxImpl) GetCurrentReadSummary() rspb.ReadSummary {
return m.CurrentReadSummary
func (m *mockEvalCtxImpl) GetCurrentReadSummary() (rspb.ReadSummary, hlc.Timestamp) {
return m.CurrentReadSummary, hlc.Timestamp{}
}
func (m *mockEvalCtxImpl) GetExternalStorage(
ctx context.Context, dest roachpb.ExternalStorage,
Expand All @@ -239,3 +252,9 @@ func (m *mockEvalCtxImpl) GetExternalStorageFromURI(
) (cloud.ExternalStorage, error) {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) RevokeLease(_ context.Context, seq roachpb.LeaseSequence) {
m.RevokedLeaseSeq = seq
}
func (m *mockEvalCtxImpl) WatchForMerge(ctx context.Context) error {
panic("unimplemented")
}
1 change: 0 additions & 1 deletion pkg/kv/kvserver/batcheval/result/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_library(
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_kr_pretty//:pretty",
Expand Down
17 changes: 2 additions & 15 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
Expand Down Expand Up @@ -64,10 +63,6 @@ type LocalResult struct {
MaybeAddToSplitQueue bool
// Call MaybeGossipNodeLiveness with the specified Span, if set.
MaybeGossipNodeLiveness *roachpb.Span
// FreezeStart indicates the high water mark timestamp beyond which the range
// is guaranteed to not have served any requests. This value is only set when
// a range merge is in progress. If set, call maybeWatchForMerge.
FreezeStart hlc.Timestamp

// Metrics contains counters which are to be passed to the
// metrics subsystem.
Expand All @@ -87,7 +82,6 @@ func (lResult *LocalResult) IsZero() bool {
!lResult.MaybeGossipSystemConfig &&
!lResult.MaybeGossipSystemConfigIfHaveFailure &&
lResult.MaybeGossipNodeLiveness == nil &&
lResult.FreezeStart.IsEmpty() &&
lResult.Metrics == nil
}

Expand All @@ -100,13 +94,13 @@ func (lResult *LocalResult) String() string {
"#updated txns: %d #end txns: %d, "+
"GossipFirstRange:%t MaybeGossipSystemConfig:%t "+
"MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+
"MaybeGossipNodeLiveness:%s FreezeStart:%s",
"MaybeGossipNodeLiveness:%s ",
lResult.Reply,
len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks),
len(lResult.UpdatedTxns), len(lResult.EndTxns),
lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig,
lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue,
lResult.MaybeGossipNodeLiveness, lResult.FreezeStart)
lResult.MaybeGossipNodeLiveness)
}

// DetachEncounteredIntents returns (and removes) those encountered
Expand Down Expand Up @@ -343,13 +337,6 @@ func (p *Result) MergeAndDestroy(q Result) error {
}
q.Local.MaybeGossipNodeLiveness = nil

if p.Local.FreezeStart.IsEmpty() {
p.Local.FreezeStart = q.Local.FreezeStart
} else if !q.Local.FreezeStart.IsEmpty() {
return errors.AssertionFailedf("conflicting FreezeStart")
}
q.Local.FreezeStart = hlc.Timestamp{}

coalesceBool(&p.Local.GossipFirstRange, &q.Local.GossipFirstRange)
coalesceBool(&p.Local.MaybeGossipSystemConfig, &q.Local.MaybeGossipSystemConfig)
coalesceBool(&p.Local.MaybeGossipSystemConfigIfHaveFailure, &q.Local.MaybeGossipSystemConfigIfHaveFailure)
Expand Down
Loading

0 comments on commit 3fe1992

Please sign in to comment.