Skip to content

Commit

Permalink
Merge #98460
Browse files Browse the repository at this point in the history
98460: kvserver: don't cancel lease requests when callers cancel r=erikgrinaker a=erikgrinaker

Previously, asynchronous lease requests would be cancelled when all callers waiting for the request were cancelled. This could cause lease acquisitions to persistently fail in the face of IO delays if all callers timed out.

Furthermore, requests to ranges with expiration-based leases would trigger an asynchronous lease extension via
`maybeExtendLeaseAsyncLocked()` in the latter half of the lease interval, but because the originating request was likely to complete soon this could end up cancelling the lease request before it completed.

Resolves #98453.

Epic: none
Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Mar 14, 2023
2 parents 43f7cde + 81471b6 commit 0c6ccaf
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 46 deletions.
60 changes: 14 additions & 46 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,17 @@ func (h *leaseRequestHandle) C() <-chan *kvpb.Error {
return h.c
}

// Cancel cancels the request handle. It also cancels the asynchronous
// lease request task if its reference count drops to zero.
// Cancel cancels the request handle. The asynchronous lease request will
// continue until it completes, to ensure leases can be acquired even if the
// client goes away (in particular in the face of IO delays which may trigger
// client timeouts).
func (h *leaseRequestHandle) Cancel() {
h.p.repl.mu.Lock()
defer h.p.repl.mu.Unlock()
if len(h.c) == 0 {
// Our lease request is ongoing...
// Unregister handle.
delete(h.p.llHandles, h)
// Cancel request, if necessary.
if len(h.p.llHandles) == 0 {
h.p.cancelLocked()
}
}
// Mark handle as canceled.
h.c = nil
Expand Down Expand Up @@ -141,13 +139,6 @@ type pendingLeaseRequest struct {
// Set of request handles attached to the lease acquisition.
// All accesses require repl.mu to be exclusively locked.
llHandles map[*leaseRequestHandle]struct{}
// cancelLocked is a context cancellation function for the async lease
// request, if one exists. It cancels an ongoing lease request and cleans up
// the requests state, including setting the cancelLocked function itself to
// nil. It will be called when a lease request is canceled because all
// handles cancel or when a lease request completes. If nil, then no request
// is in progress. repl.mu to be exclusively locked to call the function.
cancelLocked func()
// nextLease is the pending RequestLease request, if any. It can be used to
// figure out if we're in the process of extending our own lease, or
// transferring it to another replica.
Expand All @@ -166,11 +157,7 @@ func makePendingLeaseRequest(repl *Replica) pendingLeaseRequest {
//
// Requires repl.mu is read locked.
func (p *pendingLeaseRequest) RequestPending() (roachpb.Lease, bool) {
pending := p.cancelLocked != nil
if pending {
return p.nextLease, true
}
return roachpb.Lease{}, false
return p.nextLease, p.nextLease != roachpb.Lease{}
}

// InitOrJoinRequest executes a RequestLease command asynchronously and returns a
Expand Down Expand Up @@ -349,10 +336,9 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
status kvserverpb.LeaseStatus,
leaseReq kvpb.Request,
) error {
// Create a new context. We multiplex the cancellation of all contexts onto
// this new one, canceling it if all coalesced requests timeout/cancel.
// p.cancelLocked (defined below) is the cancel function that must be called;
// calling just cancel is insufficient.
// Create a new context. We run the request to completion even if all callers
// go away, to ensure leases can be acquired e.g. in the face of IO delays
// which may trigger client timeouts).
ctx := p.repl.AnnotateCtx(context.Background())

const opName = "request range lease"
Expand All @@ -374,16 +360,6 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
ctx, sp = tr.StartSpanCtx(ctx, opName, tagsOpt)
}

ctx, cancel := context.WithCancel(ctx)

// Make sure we clean up the context and request state. This will be called
// either when the request completes cleanly or when it is terminated early.
p.cancelLocked = func() {
cancel()
p.cancelLocked = nil
p.nextLease = roachpb.Lease{}
}

err := p.repl.store.Stopper().RunAsyncTaskEx(
ctx,
stop.TaskOpts{
Expand All @@ -410,12 +386,6 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// extend the existing lease in the future.
p.repl.mu.Lock()
defer p.repl.mu.Unlock()
if ctx.Err() != nil {
// We were canceled and this request was already cleaned up
// under lock. At this point, another async request could be
// active so we don't want to do anything else.
return
}
// Send result of lease to all waiter channels and cleanup request.
for llHandle := range p.llHandles {
// Don't send the same transaction object twice; this can lead to races.
Expand All @@ -429,10 +399,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
}
delete(p.llHandles, llHandle)
}
p.cancelLocked()
p.nextLease = roachpb.Lease{}
})
if err != nil {
p.cancelLocked()
p.nextLease = roachpb.Lease{}
sp.Finish()
return err
}
Expand Down Expand Up @@ -1444,12 +1414,10 @@ func (r *Replica) maybeExtendLeaseAsyncLocked(ctx context.Context, st kvserverpb
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "extending lease %s at %s", st.Lease, st.Now)
}
// We explicitly ignore the returned handle as we won't block on it.
//
// TODO(tbg): this ctx is likely cancelled very soon, which will in turn
// cancel the lease acquisition (unless joined by another more long-lived
// ctx). So this possibly isn't working as advertised (which only plays a role
// for expiration-based leases, at least).
// We explicitly ignore the returned handle as we won't block on it. This
// context will likely be cancelled soon (when the originating request
// completes), but the lease request will continue to completion independently
// of the caller's context
_ = r.requestLeaseLocked(ctx, st)
}

Expand Down
80 changes: 80 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2001,6 +2001,86 @@ func TestLeaseConcurrent(t *testing.T) {
})
}

// TestLeaseCallerCancelled tests that lease requests continue to completion
// even when all callers have cancelled.
func TestLeaseCallerCancelled(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const num = 5

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

var active, seen int32
var wg sync.WaitGroup
errC := make(chan error, 1)

tc := testContext{manualClock: timeutil.NewManualTime(timeutil.Unix(0, 123))}
cfg := TestStoreConfig(hlc.NewClockForTesting(tc.manualClock))
// Disable reasonNewLeader and reasonNewLeaderOrConfigChange proposal
// refreshes so that our lease proposal does not risk being rejected
// with an AmbiguousResultError.
cfg.TestingKnobs.DisableRefreshReasonNewLeader = true
cfg.TestingKnobs.DisableRefreshReasonNewLeaderOrConfigChange = true
cfg.TestingKnobs.TestingProposalFilter = func(args kvserverbase.ProposalFilterArgs) *kvpb.Error {
ll, ok := args.Req.Requests[0].GetInner().(*kvpb.RequestLeaseRequest)
if !ok || atomic.LoadInt32(&active) == 0 {
return nil
}
if c := atomic.AddInt32(&seen, 1); c > 1 {
// Morally speaking, this is an error, but reproposals can happen and so
// we warn (in case this trips the test up in more unexpected ways).
t.Logf("reproposal of %+v", ll)
}
// Wait for all lease requests to join the same LeaseRequest command
// and cancel.
wg.Wait()

// The lease request's context should not be cancelled. Propagate it up to
// the main test.
select {
case <-args.Ctx.Done():
case <-time.After(time.Second):
}
select {
case errC <- args.Ctx.Err():
default:
}
return nil
}
tc.StartWithStoreConfig(ctx, t, stopper, cfg)

atomic.StoreInt32(&active, 1)
tc.manualClock.MustAdvanceTo(leaseExpiry(tc.repl))
now := tc.Clock().NowAsClockTimestamp()
var llHandles []*leaseRequestHandle
for i := 0; i < num; i++ {
wg.Add(1)
tc.repl.mu.Lock()
status := tc.repl.leaseStatusAtRLocked(ctx, now)
llHandles = append(llHandles, tc.repl.requestLeaseLocked(ctx, status))
tc.repl.mu.Unlock()
}
for _, llHandle := range llHandles {
select {
case <-llHandle.C():
t.Fatal("lease request unexpectedly completed")
default:
}
llHandle.Cancel()
wg.Done()
}

select {
case err := <-errC:
require.NoError(t, err)
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for lease request")
}
}

// TestReplicaUpdateTSCache verifies that reads and ranged writes update the
// timestamp cache. The test performs the operations with and without the use
// of synthetic timestamps.
Expand Down

0 comments on commit 0c6ccaf

Please sign in to comment.