Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
72978: kvserver: simplify requestCanProceed and leaseGoodToGo r=nvanbenschoten a=tbg

I found myself having to make a change to this method in cockroachdb#72972 and was
really unsure about when this returned a nonzero lease status and
whether that even mattered. Also, the code was hard to grok due to
the large number of "fallthrough" if-else statements.

Now it follows a standard pattern of checking and immediately returning
errors as early as possible, and prefers early returns for the success
cases as well.

The method now clearly states what its contract (now) is: it will either
return a valid lease status and no error (leaseholder read), or a zero
lease status and no error (follower read or lease check skipped), or a
zero lease status and an error (can't serve request, for example invalid
lease, out of bounds, or pending merge, etc). Leases previously returned
a half-populated lease status up for the purpose of communicating the
sequence to use during proposal. However, it was cleaner to return a
zero status and to repurpose the existing special casing for leases
in `evalAndPropose` path to pull the sequence from the lease request.

I checked the callers to make sure that none of the now absent lease
statuses introduce a behavior change. The TL;DR is that callers always
ignored all but the valid lease status.

----

The simplest caller is `executeAdminBatch` which doesn't even assign
the returned lease status:

https://github.com/cockroachdb/cockroach/blob/55720d0a4c0dd8030cd19645461226d173eefc2e/pkg/kv/kvserver/replica_send.go#L759-L775

The second caller is `executeReadOnlyBatch`:

https://github.com/cockroachdb/cockroach/blob/86e2edece61091944d8575336b5de74fafc28b35/pkg/kv/kvserver/replica_read.go#L42-L46

`st` is used only if `err == nil`, in two places:

https://github.com/cockroachdb/cockroach/blob/64aadfec4b3877371b6f71bf5f9aa83bd97aa515/pkg/kv/kvserver/observedts/limit.go#L42-L50

This will return early in case of a zero lease status, i.e. is a no-op
in this case.

The second use is storing `st` into the `endCmds`:

https://github.com/cockroachdb/cockroach/blob/86e2edece61091944d8575336b5de74fafc28b35/pkg/kv/kvserver/replica_read.go#L152

which is accessed only under a similar lease validity check:

https://github.com/cockroachdb/cockroach/blob/55720d0a4c0dd8030cd19645461226d173eefc2e/pkg/kv/kvserver/replica_send.go#L1019-L1021

The third caller is `executeWriteBatch`:

https://github.com/cockroachdb/cockroach/blob/455cdddc6d75c03645f486b22970e5c6198a8d56/pkg/kv/kvserver/replica_write.go#L81-L89

We handled `ComputeLocalUncertaintyLimit` already. Other than that, `st`
is passed to `evalAndPropose`. In that method, since it is the write
path, we shouldn't see follower reads and so the remaining case are
leases, where the status will be zero and the special-casing in that
method takes over.

----

This commit also simplifies leaseGoodToGoRLocked. It was repeatedly
reassigning `status` which I found hard to follow.

Release note: None


73095: sql: reset indexedTypeFormatter on FmtCtx close r=RichardJCai a=RichardJCai

Previously indexedTypeFormatter wouldn't be reset when
FmtCtx.Close() was called causing the indexedTypeFormatter
function to potentially be reused.

Release note (enterprise change): Fix a bug where restore can sometimes map OIDs
to invalid types in certain circumstances containing UDTs.

Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: richardjcai <[email protected]>
  • Loading branch information
3 people committed Nov 24, 2021
3 parents 51ae74a + 7cbdb8c + d0e09f1 commit 0331536
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 80 deletions.
128 changes: 85 additions & 43 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,8 +1273,11 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(
// checkExecutionCanProceed returns an error if a batch request cannot be
// executed by the Replica. An error indicates that the Replica is not live and
// able to serve traffic or that the request is not compatible with the state of
// the Range due to the range's key bounds, the range's lease, or the ranges GC
// threshold.
// the Range due to the range's key bounds, the range's lease, the range's GC
// threshold, or due to a pending merge. On success, returns nil and either a
// zero LeaseStatus (indicating that the request was permitted to skip the lease
// checks) or a LeaseStatus in LeaseState_VALID (indicating that the Replica is
// the leaseholder and able to serve this request).
//
// The method accepts a concurrency Guard, which is used to indicate whether the
// caller has acquired latches. When this condition is false, the batch request
Expand All @@ -1284,21 +1287,18 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(
func (r *Replica) checkExecutionCanProceed(
ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard,
) (kvserverpb.LeaseStatus, error) {
var st kvserverpb.LeaseStatus
var shouldExtend bool
defer func() {
if shouldExtend {
r.maybeExtendLeaseAsync(ctx, st)
}
}()
now := r.Clock().NowAsClockTimestamp()
rSpan, err := keys.Range(ba.Requests)
if err != nil {
return kvserverpb.LeaseStatus{}, err
}

var shouldExtend bool
postRUnlock := func() {}
r.mu.RLock()
defer r.mu.RUnlock()
defer func() {
r.mu.RUnlock()
postRUnlock()
}()

// Has the replica been initialized?
// NB: this should have already been checked in Store.Send, so we don't need
Expand All @@ -1323,41 +1323,16 @@ func (r *Replica) checkExecutionCanProceed(
return kvserverpb.LeaseStatus{}, err
}

// Is the lease valid?
if ba.ReadConsistency == roachpb.INCONSISTENT {
// For INCONSISTENT requests, we don't need the lease.
st = kvserverpb.LeaseStatus{
Now: now,
}
} else if ba.IsSingleSkipLeaseCheckRequest() {
// For lease commands, use the provided previous lease for verification.
st = kvserverpb.LeaseStatus{
Lease: ba.GetPrevLeaseForLeaseRequest(),
Now: now,
}
} else {
// If the request is a write or a consistent read, it requires the
// replica serving it to hold the range lease. We pass the write
// timestamp of the request because this is the maximum timestamp that
// the request will operate at, ignoring the uncertainty interval, which
// is already accounted for in LeaseStatus's stasis period handling.
st, shouldExtend, err = r.leaseGoodToGoRLocked(ctx, now, ba.WriteTimestamp())
if err != nil {
// If not, can we serve this request on a follower?
if !r.canServeFollowerReadRLocked(ctx, ba) {
return st, err
}
err = nil // ignore error
st = kvserverpb.LeaseStatus{} // already empty for follower reads, but be explicit
}
}

// Is the request below the GC threshold?
if err := r.checkTSAboveGCThresholdRLocked(ba.EarliestActiveTimestamp(), st, ba.IsAdmin()); err != nil {
st, shouldExtend, err := r.checkGCThresholdAndLeaseRLocked(ctx, ba)
if err != nil {
return kvserverpb.LeaseStatus{}, err
}

// Is there a merge in progress?
// Is there a merge in progress? We intentionally check this last to let requests error out
// for other reasons first, in case callers don't require this replica to service the request.
// Tests such as TestClosedTimestampFrozenAfterSubsumption also rely on this late-checking of
// merges by checking for a NotLeaseholderError on replicas in a critical phase for certain
// requests.
if r.mergeInProgressRLocked() && g.HoldingLatches() {
// We only check for a merge if we are holding latches. In practice,
// this means that any request where concurrency.shouldAcquireLatches()
Expand All @@ -1372,9 +1347,76 @@ func (r *Replica) checkExecutionCanProceed(
}
}

if shouldExtend {
// If we're asked to extend the lease, trigger (async) lease renewal.
// Kicking this off requires an exclusive lock, and we hold a read-only lock
// already, so we jump through a hoop to run it in a suitably positioned
// defer.
postRUnlock = func() { r.maybeExtendLeaseAsync(ctx, st) }
}
return st, nil
}

// checkGCThresholdAndLeaseRLocked checks the provided batch against the GC
// threshold and lease. A nil error indicates to go ahead with the batch, and
// is accompanied either by a valid or zero lease status, the latter case
// indicating that the request was permitted to bypass the lease check. The
// returned bool indicates whether the lease should be extended (only on nil
// error).
func (r *Replica) checkGCThresholdAndLeaseRLocked(
ctx context.Context, ba *roachpb.BatchRequest,
) (kvserverpb.LeaseStatus, bool, error) {
now := r.Clock().NowAsClockTimestamp()
// If the request is a write or a consistent read, it requires the
// replica serving it to hold the range lease. We pass the write
// timestamp of the request because this is the maximum timestamp that
// the request will operate at, ignoring the uncertainty interval, which
// is already accounted for in LeaseStatus's stasis period handling.
// For INCONSISTENT requests (which are always pure reads), this coincides
// with the read timestamp.
reqTS := ba.WriteTimestamp()
st := r.leaseStatusForRequestRLocked(ctx, now, reqTS)

var shouldExtend bool
// Write commands that skip the lease check in practice are exactly
// RequestLease and TransferLease. Both use the provided previous lease for
// verification below raft. We return a zero lease status from this method and
// task evalAndPropose with pulling the correct lease sequence number from the
// lease request.
//
// If the request is an INCONSISTENT request (and thus a read), it similarly
// doesn't check the lease.
leaseChecked := !ba.IsSingleSkipLeaseCheckRequest() && ba.ReadConsistency != roachpb.INCONSISTENT
if leaseChecked {
// Now check the lease.
var err error
shouldExtend, err = r.leaseGoodToGoForStatusRLocked(ctx, now, reqTS, st)
if err != nil {
// No valid lease, but if we can serve this request via follower reads,
// we may continue.
if !r.canServeFollowerReadRLocked(ctx, ba) {
// If not, return the error.
return kvserverpb.LeaseStatus{}, false, err
}
// Otherwise, suppress the error. Also, remember that we're not serving
// this under the lease.
leaseChecked, err = false, nil
}
}

// Check if request is below the GC threshold and if so, error out. Note that
// this uses the lease status no matter whether it's valid or not, and the
// method is set up to handle that.
if err := r.checkTSAboveGCThresholdRLocked(ba.EarliestActiveTimestamp(), st, ba.IsAdmin()); err != nil {
return kvserverpb.LeaseStatus{}, false, err
}

if !leaseChecked {
return kvserverpb.LeaseStatus{}, false, nil
}
return st, shouldExtend, nil
}

// checkExecutionCanProceedForRangeFeed returns an error if a rangefeed request
// cannot be executed by the Replica.
func (r *Replica) checkExecutionCanProceedForRangeFeed(
Expand Down
17 changes: 13 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,20 @@ func (r *Replica) evalAndPropose(
// Continue with proposal...
}

// Attach information about the proposer to the command.
proposal.command.ProposerLeaseSequence = st.Lease.Sequence
// Perform a sanity check that the lease is owned by this replica.
if !st.Lease.OwnedBy(r.store.StoreID()) && !ba.IsLeaseRequest() {
// Attach information about the proposer's lease to the command, for
// verification below raft. Lease requests are special since they are not
// necessarily proposed under a valid lease (by necessity). Instead, they
// reference the previous lease. Note that TransferLease also skip lease
// checks (for technical reasons, see `TransferLease.flags`) and uses the
// same mechanism.
if ba.IsSingleSkipLeaseCheckRequest() {
proposal.command.ProposerLeaseSequence = ba.GetPrevLeaseForLeaseRequest().Sequence
} else if !st.Lease.OwnedBy(r.store.StoreID()) {
// Perform a sanity check that the lease is owned by this replica. This must
// have been ascertained by the callers in checkExecutionCanProceed.
log.Fatalf(ctx, "cannot propose %s on follower with remotely owned lease %s", ba, st.Lease)
} else {
proposal.command.ProposerLeaseSequence = st.Lease.Sequence
}

// Once a command is written to the raft log, it must be loaded into memory
Expand Down
63 changes: 37 additions & 26 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,15 +987,24 @@ func (r *Replica) checkRequestTimeRLocked(now hlc.ClockTimestamp, reqTS hlc.Time
func (r *Replica) leaseGoodToGoRLocked(
ctx context.Context, now hlc.ClockTimestamp, reqTS hlc.Timestamp,
) (_ kvserverpb.LeaseStatus, shouldExtend bool, _ error) {
if err := r.checkRequestTimeRLocked(now, reqTS); err != nil {
// Case (1): invalid request.
st := r.leaseStatusForRequestRLocked(ctx, now, reqTS)
shouldExtend, err := r.leaseGoodToGoForStatusRLocked(ctx, now, reqTS, st)
if err != nil {
return kvserverpb.LeaseStatus{}, false, err
}
return st, shouldExtend, err
}

st := r.leaseStatusForRequestRLocked(ctx, now, reqTS)
func (r *Replica) leaseGoodToGoForStatusRLocked(
ctx context.Context, now hlc.ClockTimestamp, reqTS hlc.Timestamp, st kvserverpb.LeaseStatus,
) (shouldExtend bool, _ error) {
if err := r.checkRequestTimeRLocked(now, reqTS); err != nil {
// Case (1): invalid request.
return false, err
}
if !st.IsValid() {
// Case (2): invalid lease.
return kvserverpb.LeaseStatus{}, false, &roachpb.InvalidLeaseError{}
return false, &roachpb.InvalidLeaseError{}
}
if !st.Lease.OwnedBy(r.store.StoreID()) {
// Case (3): not leaseholder.
Expand Down Expand Up @@ -1039,12 +1048,12 @@ func (r *Replica) leaseGoodToGoRLocked(
}
// Otherwise, if the lease is currently held by another replica, redirect
// to the holder.
return kvserverpb.LeaseStatus{}, false, newNotLeaseHolderError(
return false, newNotLeaseHolderError(
st.Lease, r.store.StoreID(), r.descRLocked(), "lease held by different store",
)
}
// Case (4): all good.
return st, r.shouldExtendLeaseRLocked(st), nil
return r.shouldExtendLeaseRLocked(st), nil
}

// leaseGoodToGo is like leaseGoodToGoRLocked, but will acquire the replica read
Expand Down Expand Up @@ -1094,21 +1103,23 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
) (kvserverpb.LeaseStatus, *roachpb.Error) {
// Try fast-path.
now := r.store.Clock().NowAsClockTimestamp()
status, shouldExtend, err := r.leaseGoodToGo(ctx, now, reqTS)
if err == nil {
if shouldExtend {
r.maybeExtendLeaseAsync(ctx, status)
{
status, shouldExtend, err := r.leaseGoodToGo(ctx, now, reqTS)
if err == nil {
if shouldExtend {
r.maybeExtendLeaseAsync(ctx, status)
}
return status, nil
} else if !errors.HasType(err, (*roachpb.InvalidLeaseError)(nil)) {
return kvserverpb.LeaseStatus{}, roachpb.NewError(err)
}
return status, nil
} else if !errors.HasType(err, (*roachpb.InvalidLeaseError)(nil)) {
return kvserverpb.LeaseStatus{}, roachpb.NewError(err)
}

// Loop until the lease is held or the replica ascertains the actual lease
// holder. Returns also on context.Done() (timeout or cancellation).
for attempt := 1; ; attempt++ {
now = r.store.Clock().NowAsClockTimestamp()
llHandle, transfer, pErr := func() (*leaseRequestHandle, bool, *roachpb.Error) {
llHandle, status, transfer, pErr := func() (*leaseRequestHandle, kvserverpb.LeaseStatus, bool, *roachpb.Error) {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -1119,13 +1130,13 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
// unsuccessfully before redirecting or retrying.
repDesc, err := r.getReplicaDescriptorRLocked()
if err != nil {
return nil, false, roachpb.NewError(err)
return nil, kvserverpb.LeaseStatus{}, false, roachpb.NewError(err)
}
if ok := r.mu.pendingLeaseRequest.TransferInProgress(repDesc.ReplicaID); ok {
return r.mu.pendingLeaseRequest.JoinRequest(), true /* transfer */, nil
return r.mu.pendingLeaseRequest.JoinRequest(), kvserverpb.LeaseStatus{}, true /* transfer */, nil
}

status = r.leaseStatusForRequestRLocked(ctx, now, reqTS)
status := r.leaseStatusForRequestRLocked(ctx, now, reqTS)
switch status.State {
case kvserverpb.LeaseState_ERROR:
// Lease state couldn't be determined.
Expand All @@ -1134,7 +1145,7 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
msg = "lease state could not be determined"
}
log.VEventf(ctx, 2, "%s", msg)
return nil, false, roachpb.NewError(
return nil, kvserverpb.LeaseStatus{}, false, roachpb.NewError(
newNotLeaseHolderError(roachpb.Lease{}, r.store.StoreID(), r.mu.state.Desc, msg))

case kvserverpb.LeaseState_VALID, kvserverpb.LeaseState_UNUSABLE:
Expand All @@ -1147,39 +1158,39 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
}
// Otherwise, if the lease is currently held by another replica, redirect
// to the holder.
return nil, false, roachpb.NewError(
return nil, kvserverpb.LeaseStatus{}, false, roachpb.NewError(
newNotLeaseHolderError(status.Lease, r.store.StoreID(), r.mu.state.Desc,
"lease held by different store"))
}

// If the lease is in stasis, we can't serve requests until we've
// renewed the lease, so we return the handle to block on renewal.
if status.State == kvserverpb.LeaseState_UNUSABLE {
return r.requestLeaseLocked(ctx, status), false, nil
return r.requestLeaseLocked(ctx, status), kvserverpb.LeaseStatus{}, false, nil
}

// Return a nil handle to signal that we have a valid lease.
return nil, false, nil
// Return a nil handle and status to signal that we have a valid lease.
return nil, status, false, nil

case kvserverpb.LeaseState_EXPIRED:
// No active lease: Request renewal if a renewal is not already pending.
log.VEventf(ctx, 2, "request range lease (attempt #%d)", attempt)
return r.requestLeaseLocked(ctx, status), false, nil
return r.requestLeaseLocked(ctx, status), kvserverpb.LeaseStatus{}, false, nil

case kvserverpb.LeaseState_PROSCRIBED:
// Lease proposed timestamp is earlier than the min proposed
// timestamp limit this replica must observe. If this store
// owns the lease, re-request. Otherwise, redirect.
if status.Lease.OwnedBy(r.store.StoreID()) {
log.VEventf(ctx, 2, "request range lease (attempt #%d)", attempt)
return r.requestLeaseLocked(ctx, status), false, nil
return r.requestLeaseLocked(ctx, status), kvserverpb.LeaseStatus{}, false, nil
}
// If lease is currently held by another, redirect to holder.
return nil, false, roachpb.NewError(
return nil, kvserverpb.LeaseStatus{}, false, roachpb.NewError(
newNotLeaseHolderError(status.Lease, r.store.StoreID(), r.mu.state.Desc, "lease proscribed"))

default:
return nil, false, roachpb.NewErrorf("unknown lease status state %v", status)
return nil, kvserverpb.LeaseStatus{}, false, roachpb.NewErrorf("unknown lease status state %v", status)
}
}()
if pErr != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,12 @@ func sendLeaseRequest(r *Replica, l *roachpb.Lease) error {
ctx := context.Background()
ba := roachpb.BatchRequest{}
ba.Timestamp = r.store.Clock().Now()
ba.Add(&roachpb.RequestLeaseRequest{Lease: *l})
st := r.CurrentLeaseStatus(ctx)
leaseReq := &roachpb.RequestLeaseRequest{
Lease: *l,
PrevLease: st.Lease,
}
ba.Add(leaseReq)
_, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, hlc.MinTimestamp)
ch, _, _, pErr := r.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}, tok.Move(ctx))
if pErr == nil {
Expand Down
9 changes: 3 additions & 6 deletions pkg/sql/sem/tree/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,12 +625,9 @@ var fmtCtxPool = sync.Pool{
// recommended for performance-sensitive paths.
func (ctx *FmtCtx) Close() {
ctx.Buffer.Reset()
ctx.flags = 0
ctx.ann = nil
ctx.indexedVarFormat = nil
ctx.tableNameFormatter = nil
ctx.placeholderFormat = nil
ctx.dataConversionConfig = sessiondatapb.DataConversionConfig{}
*ctx = FmtCtx{
Buffer: ctx.Buffer,
}
fmtCtxPool.Put(ctx)
}

Expand Down

0 comments on commit 0331536

Please sign in to comment.