diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 6cdb9c56976b..bf2723d6715d 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1178,3 +1179,132 @@ func TestAlterRangeRelocate(t *testing.T) { require.NoError(t, err) require.NoError(t, tc.WaitForVoters(rhsDesc.StartKey.AsRawKey(), tc.Targets(0, 3, 4)...)) } + +// TestAcquireLeaseTimeout is a regression test that lease acquisition timeouts +// always return a NotLeaseHolderError, both when the given context times out and +// when the internal context times out. It tests a very specific scenario, where +// the outer, uncoalesced request timeout (in redirectOnOrAcquireLeaseForRequest) +// does not fire but the inner coalesced request timeout (in requestLease) does +// fire, and does so while waiting for the request to return. Previously, this +// would return a DeadlineExceeded error which in turn got wrapped in a +// TimeoutError due to RunWithTimeout(), preventing the DistSender from handling +// it correctly and instead propagating it to the client. +func TestAcquireLeaseTimeout(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Set a timeout for the test context, to guard against the test getting stuck. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // blockRangeID, when non-zero, will signal the replica to delay lease + // requests for the given range until the request's context times out, and + // return the context error. + var blockRangeID int32 + + maybeBlockLeaseRequest := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if ba.IsSingleRequestLeaseRequest() && int32(ba.RangeID) == atomic.LoadInt32(&blockRangeID) { + t.Logf("blocked lease request for r%d", ba.RangeID) + <-ctx.Done() + return roachpb.NewError(ctx.Err()) + } + return nil + } + + // The lease request timeout depends on the Raft election timeout, so we set + // it low to get faster timeouts (800 ms) and speed up the test. + var raftCfg base.RaftConfig + raftCfg.SetDefaults() + raftCfg.RaftHeartbeatIntervalTicks = 1 + raftCfg.RaftElectionTimeoutTicks = 2 + + manualClock := hlc.NewHybridManualClock() + + // Start a two-node cluster. + const numNodes = 2 + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + RaftConfig: raftCfg, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + WallClock: manualClock, + }, + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: maybeBlockLeaseRequest, + AllowLeaseRequestProposalsWhenNotLeader: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + srv := tc.Server(0) + + // Split off a range, upreplicate it to both servers, and move the lease + // from n1 to n2. + splitKey := roachpb.Key("a") + _, desc := tc.SplitRangeOrFatal(t, splitKey) + tc.AddVotersOrFatal(t, splitKey, tc.Target(1)) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID) + require.NoError(t, err) + + // Stop n2 and increment its epoch to invalidate the lease. + lv, ok := tc.Server(1).NodeLiveness().(*liveness.NodeLiveness) + require.True(t, ok) + lvNode2, ok := lv.Self() + require.True(t, ok) + tc.StopServer(1) + + manualClock.Forward(lvNode2.Expiration.WallTime) + lv, ok = srv.NodeLiveness().(*liveness.NodeLiveness) + require.True(t, ok) + testutils.SucceedsSoon(t, func() error { + err := lv.IncrementEpoch(context.Background(), lvNode2) + if errors.Is(err, liveness.ErrEpochAlreadyIncremented) { + return nil + } + return err + }) + require.False(t, repl.CurrentLeaseStatus(ctx).IsValid()) + + // Trying to acquire the lease should error with an empty NLHE, since the + // range doesn't have quorum. + var nlhe *roachpb.NotLeaseHolderError + _, err = repl.TestingAcquireLease(ctx) + require.Error(t, err) + require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type + require.ErrorAs(t, err, &nlhe) + require.Empty(t, nlhe.Lease) + + // Now for the real test: block lease requests for the range, and send off a + // bunch of sequential lease requests with a small delay, which should join + // onto the same lease request internally. All of these should return a NLHE + // when they time out, either because of their own timeout or because the + // coalesced request timed out. We need to explicitly block lease requests + // while in flight rather than rely on the lost quorum here, to tickle the + // error return path that manifested the original bug. + atomic.StoreInt32(&blockRangeID, int32(desc.RangeID)) + + const attempts = 20 + var wg sync.WaitGroup + errC := make(chan error, attempts) + wg.Add(attempts) + for i := 0; i < attempts; i++ { + time.Sleep(10 * time.Millisecond) + go func() { + _, err := repl.TestingAcquireLease(ctx) + errC <- err + wg.Done() + }() + } + wg.Wait() + close(errC) + + for err := range errC { + require.Error(t, err) + require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type + require.ErrorAs(t, err, &nlhe) + require.Empty(t, nlhe.Lease) + } +} diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index d6ac463daef1..d4a909653504 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -54,7 +54,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -355,11 +354,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // command, which can take a couple of Raft election timeouts. timeout := 2 * p.repl.store.cfg.RaftElectionTimeout() - const taskName = "pendingLeaseRequest: requesting lease" err := p.repl.store.Stopper().RunAsyncTaskEx( ctx, stop.TaskOpts{ - TaskName: taskName, + TaskName: "pendingLeaseRequest: requesting lease", // Trace the lease acquisition as a child even though it might outlive the // parent in case the parent's ctx is canceled. Other requests might // later block on this lease acquisition too, and we can't include the @@ -374,9 +372,12 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // return a NotLeaseHolderError rather than hanging, otherwise we could // prevent the caller from nudging a different replica into acquiring the // lease. - err := contextutil.RunWithTimeout(ctx, taskName, timeout, func(ctx context.Context) error { - return p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq) - }) + // + // Does not use RunWithTimeout(), because we do not want to mask the + // NotLeaseHolderError on context cancellation. + requestLeaseCtx, requestLeaseCancel := context.WithTimeout(ctx, timeout) // nolint:context + defer requestLeaseCancel() + err := p.requestLease(requestLeaseCtx, nextLeaseHolder, reqLease, status, leaseReq) // Error will be handled below. // We reset our state below regardless of whether we've gotten an error or @@ -526,6 +527,14 @@ func (p *pendingLeaseRequest) requestLease( // up on the lease and thus worsening the situation. ba.Add(leaseReq) _, pErr := p.repl.Send(ctx, ba) + // If the lease request failed and the context was cancelled, return a + // NotLeaseHolderError. We expect asyncRequestLease to pass a new context with + // a timeout, disconnected from the caller's context. The DistSender will + // check for client context cancellation when handling the error too. + if pErr != nil && ctx.Err() != nil { + return newNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(), + fmt.Sprintf("lease acquisition cancelled: %s", ctx.Err())) + } return pErr.GoError() } @@ -1228,22 +1237,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( // We may need to hold a Raft election and repropose the lease acquisition // command, which can take a couple of Raft election timeouts. timeout := 2 * r.store.cfg.RaftElectionTimeout() - if err := contextutil.RunWithTimeout(ctx, "acquire-lease", timeout, - func(ctx context.Context) error { - status, pErr = r.redirectOnOrAcquireLeaseForRequestWithoutTimeout(ctx, reqTS, brSig) - return nil - }, - ); err != nil { - return kvserverpb.LeaseStatus{}, roachpb.NewError(err) - } - return status, pErr -} -// redirectOnOrAcquireLeaseForRequestWithoutTimeout is like -// redirectOnOrAcquireLeaseForRequest, but runs without a timeout. -func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout( - ctx context.Context, reqTS hlc.Timestamp, brSig signaller, -) (kvserverpb.LeaseStatus, *roachpb.Error) { + // Does not use RunWithTimeout(), because we do not want to mask the + // NotLeaseHolderError on context cancellation. + ctx, cancel := context.WithTimeout(ctx, timeout) // nolint:context + defer cancel() + // Try fast-path. now := r.store.Clock().NowAsClockTimestamp() {