diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index f4fba68ee86a..3da558c36cfb 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -95,8 +95,10 @@ func (h *leaseRequestHandle) C() <-chan *kvpb.Error { return h.c } -// Cancel cancels the request handle. It also cancels the asynchronous -// lease request task if its reference count drops to zero. +// Cancel cancels the request handle. The asynchronous lease request will +// continue until it completes, to ensure leases can be acquired even if the +// client goes away (in particular in the face of IO delays which may trigger +// client timeouts). func (h *leaseRequestHandle) Cancel() { h.p.repl.mu.Lock() defer h.p.repl.mu.Unlock() @@ -104,10 +106,6 @@ func (h *leaseRequestHandle) Cancel() { // Our lease request is ongoing... // Unregister handle. delete(h.p.llHandles, h) - // Cancel request, if necessary. - if len(h.p.llHandles) == 0 { - h.p.cancelLocked() - } } // Mark handle as canceled. h.c = nil @@ -141,13 +139,6 @@ type pendingLeaseRequest struct { // Set of request handles attached to the lease acquisition. // All accesses require repl.mu to be exclusively locked. llHandles map[*leaseRequestHandle]struct{} - // cancelLocked is a context cancellation function for the async lease - // request, if one exists. It cancels an ongoing lease request and cleans up - // the requests state, including setting the cancelLocked function itself to - // nil. It will be called when a lease request is canceled because all - // handles cancel or when a lease request completes. If nil, then no request - // is in progress. repl.mu to be exclusively locked to call the function. - cancelLocked func() // nextLease is the pending RequestLease request, if any. It can be used to // figure out if we're in the process of extending our own lease, or // transferring it to another replica. @@ -166,11 +157,7 @@ func makePendingLeaseRequest(repl *Replica) pendingLeaseRequest { // // Requires repl.mu is read locked. func (p *pendingLeaseRequest) RequestPending() (roachpb.Lease, bool) { - pending := p.cancelLocked != nil - if pending { - return p.nextLease, true - } - return roachpb.Lease{}, false + return p.nextLease, p.nextLease != roachpb.Lease{} } // InitOrJoinRequest executes a RequestLease command asynchronously and returns a @@ -349,10 +336,9 @@ func (p *pendingLeaseRequest) requestLeaseAsync( status kvserverpb.LeaseStatus, leaseReq kvpb.Request, ) error { - // Create a new context. We multiplex the cancellation of all contexts onto - // this new one, canceling it if all coalesced requests timeout/cancel. - // p.cancelLocked (defined below) is the cancel function that must be called; - // calling just cancel is insufficient. + // Create a new context. We run the request to completion even if all callers + // go away, to ensure leases can be acquired e.g. in the face of IO delays + // which may trigger client timeouts). ctx := p.repl.AnnotateCtx(context.Background()) const opName = "request range lease" @@ -374,16 +360,6 @@ func (p *pendingLeaseRequest) requestLeaseAsync( ctx, sp = tr.StartSpanCtx(ctx, opName, tagsOpt) } - ctx, cancel := context.WithCancel(ctx) - - // Make sure we clean up the context and request state. This will be called - // either when the request completes cleanly or when it is terminated early. - p.cancelLocked = func() { - cancel() - p.cancelLocked = nil - p.nextLease = roachpb.Lease{} - } - err := p.repl.store.Stopper().RunAsyncTaskEx( ctx, stop.TaskOpts{ @@ -410,12 +386,6 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // extend the existing lease in the future. p.repl.mu.Lock() defer p.repl.mu.Unlock() - if ctx.Err() != nil { - // We were canceled and this request was already cleaned up - // under lock. At this point, another async request could be - // active so we don't want to do anything else. - return - } // Send result of lease to all waiter channels and cleanup request. for llHandle := range p.llHandles { // Don't send the same transaction object twice; this can lead to races. @@ -429,10 +399,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync( } delete(p.llHandles, llHandle) } - p.cancelLocked() + p.nextLease = roachpb.Lease{} }) if err != nil { - p.cancelLocked() + p.nextLease = roachpb.Lease{} sp.Finish() return err } @@ -1444,12 +1414,10 @@ func (r *Replica) maybeExtendLeaseAsyncLocked(ctx context.Context, st kvserverpb if log.ExpensiveLogEnabled(ctx, 2) { log.Infof(ctx, "extending lease %s at %s", st.Lease, st.Now) } - // We explicitly ignore the returned handle as we won't block on it. - // - // TODO(tbg): this ctx is likely cancelled very soon, which will in turn - // cancel the lease acquisition (unless joined by another more long-lived - // ctx). So this possibly isn't working as advertised (which only plays a role - // for expiration-based leases, at least). + // We explicitly ignore the returned handle as we won't block on it. This + // context will likely be cancelled soon (when the originating request + // completes), but the lease request will continue to completion independently + // of the caller's context _ = r.requestLeaseLocked(ctx, st) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index e19e1cdcb197..51267ae8b2ec 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -2001,6 +2001,86 @@ func TestLeaseConcurrent(t *testing.T) { }) } +// TestLeaseCallerCancelled tests that lease requests continue to completion +// even when all callers have cancelled. +func TestLeaseCallerCancelled(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const num = 5 + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + var active, seen int32 + var wg sync.WaitGroup + errC := make(chan error, 1) + + tc := testContext{manualClock: timeutil.NewManualTime(timeutil.Unix(0, 123))} + cfg := TestStoreConfig(hlc.NewClockForTesting(tc.manualClock)) + // Disable reasonNewLeader and reasonNewLeaderOrConfigChange proposal + // refreshes so that our lease proposal does not risk being rejected + // with an AmbiguousResultError. + cfg.TestingKnobs.DisableRefreshReasonNewLeader = true + cfg.TestingKnobs.DisableRefreshReasonNewLeaderOrConfigChange = true + cfg.TestingKnobs.TestingProposalFilter = func(args kvserverbase.ProposalFilterArgs) *kvpb.Error { + ll, ok := args.Req.Requests[0].GetInner().(*kvpb.RequestLeaseRequest) + if !ok || atomic.LoadInt32(&active) == 0 { + return nil + } + if c := atomic.AddInt32(&seen, 1); c > 1 { + // Morally speaking, this is an error, but reproposals can happen and so + // we warn (in case this trips the test up in more unexpected ways). + t.Logf("reproposal of %+v", ll) + } + // Wait for all lease requests to join the same LeaseRequest command + // and cancel. + wg.Wait() + + // The lease request's context should not be cancelled. Propagate it up to + // the main test. + select { + case <-args.Ctx.Done(): + case <-time.After(time.Second): + } + select { + case errC <- args.Ctx.Err(): + default: + } + return nil + } + tc.StartWithStoreConfig(ctx, t, stopper, cfg) + + atomic.StoreInt32(&active, 1) + tc.manualClock.MustAdvanceTo(leaseExpiry(tc.repl)) + now := tc.Clock().NowAsClockTimestamp() + var llHandles []*leaseRequestHandle + for i := 0; i < num; i++ { + wg.Add(1) + tc.repl.mu.Lock() + status := tc.repl.leaseStatusAtRLocked(ctx, now) + llHandles = append(llHandles, tc.repl.requestLeaseLocked(ctx, status)) + tc.repl.mu.Unlock() + } + for _, llHandle := range llHandles { + select { + case <-llHandle.C(): + t.Fatal("lease request unexpectedly completed") + default: + } + llHandle.Cancel() + wg.Done() + } + + select { + case err := <-errC: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for lease request") + } +} + // TestReplicaUpdateTSCache verifies that reads and ranged writes update the // timestamp cache. The test performs the operations with and without the use // of synthetic timestamps.