diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 8850bd24dbf5..512d40a79de0 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" @@ -1309,3 +1310,123 @@ func TestAlterRangeRelocate(t *testing.T) { require.NoError(t, err) require.NoError(t, tc.WaitForVoters(rhsDesc.StartKey.AsRawKey(), tc.Targets(0, 3, 4)...)) } + +// TestAcquireLeaseTimeout is a regression test that lease acquisition timeouts +// always return a NotLeaseHolderError. +func TestAcquireLeaseTimeout(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Set a timeout for the test context, to guard against the test getting stuck. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // blockRangeID, when non-zero, will signal the replica to delay lease + // requests for the given range until the request's context is cancelled, and + // return the context error. + var blockRangeID int32 + + maybeBlockLeaseRequest := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if ba.IsSingleRequest() && ba.Requests[0].GetInner().Method() == roachpb.RequestLease && + int32(ba.RangeID) == atomic.LoadInt32(&blockRangeID) { + t.Logf("blocked lease request for r%d", ba.RangeID) + <-ctx.Done() + return roachpb.NewError(ctx.Err()) + } + return nil + } + + // The lease request timeout depends on the Raft election timeout, so we set + // it low to get faster timeouts (800 ms) and speed up the test. + var raftCfg base.RaftConfig + raftCfg.SetDefaults() + raftCfg.RaftHeartbeatIntervalTicks = 1 + raftCfg.RaftElectionTimeoutTicks = 2 + + manualClock := hlc.NewHybridManualClock() + + // Start a two-node cluster. + const numNodes = 2 + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + RaftConfig: raftCfg, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: maybeBlockLeaseRequest, + AllowLeaseRequestProposalsWhenNotLeader: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + srv := tc.Server(0) + + // Split off a range, upreplicate it to both servers, and move the lease + // from n1 to n2. + splitKey := roachpb.Key("a") + _, desc := tc.SplitRangeOrFatal(t, splitKey) + tc.AddVotersOrFatal(t, splitKey, tc.Target(1)) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID) + require.NoError(t, err) + + // Stop n2 and increment its epoch to invalidate the lease. + lv, ok := tc.Server(1).NodeLiveness().(*liveness.NodeLiveness) + require.True(t, ok) + lvNode2, ok := lv.Self() + require.True(t, ok) + tc.StopServer(1) + + manualClock.Forward(lvNode2.Expiration.WallTime) + lv, ok = srv.NodeLiveness().(*liveness.NodeLiveness) + require.True(t, ok) + testutils.SucceedsSoon(t, func() error { + err := lv.IncrementEpoch(context.Background(), lvNode2) + if errors.Is(err, liveness.ErrEpochAlreadyIncremented) { + return nil + } + return err + }) + require.False(t, repl.CurrentLeaseStatus(ctx).IsValid()) + + // Trying to acquire the lease should error with an empty NLHE, since the + // range doesn't have quorum. + var nlhe *roachpb.NotLeaseHolderError + _, err = repl.TestingAcquireLease(ctx) + require.Error(t, err) + require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type + require.ErrorAs(t, err, &nlhe) + require.Empty(t, nlhe.Lease) + + // Now for the real test: block lease requests for the range, and send off a + // bunch of sequential lease requests with a small delay, which should join + // onto the same lease request internally. All of these should return a NLHE + // when they time out, regardless of the internal mechanics. + atomic.StoreInt32(&blockRangeID, int32(desc.RangeID)) + + const attempts = 20 + var wg sync.WaitGroup + errC := make(chan error, attempts) + wg.Add(attempts) + for i := 0; i < attempts; i++ { + time.Sleep(10 * time.Millisecond) + go func() { + _, err := repl.TestingAcquireLease(ctx) + errC <- err + wg.Done() + }() + } + wg.Wait() + close(errC) + + for err := range errC { + require.Error(t, err) + require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type + require.ErrorAs(t, err, &nlhe) + require.Empty(t, nlhe.Lease) + } +} diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 2388bdc31d53..4ec35570aa30 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -53,7 +53,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -349,15 +348,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync( p.nextLease = roachpb.Lease{} } - // We may need to hold a Raft election and repropose the lease acquisition - // command, which can take a couple of Raft election timeouts. - timeout := 2 * p.repl.store.cfg.RaftElectionTimeout() - - const taskName = "pendingLeaseRequest: requesting lease" err := p.repl.store.Stopper().RunAsyncTaskEx( ctx, stop.TaskOpts{ - TaskName: taskName, + TaskName: "pendingLeaseRequest: requesting lease", // Trace the lease acquisition as a child even though it might outlive the // parent in case the parent's ctx is canceled. Other requests might // later block on this lease acquisition too, and we can't include the @@ -368,13 +362,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( func(ctx context.Context) { defer sp.Finish() - // Run the lease acquisition request with a timeout. We must eventually - // return a NotLeaseHolderError rather than hanging, otherwise we could - // prevent the caller from nudging a different replica into acquiring the - // lease. - err := contextutil.RunWithTimeout(ctx, taskName, timeout, func(ctx context.Context) error { - return p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq) - }) + err := p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq) // Error will be handled below. // We reset our state below regardless of whether we've gotten an error or @@ -1167,22 +1155,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( // We may need to hold a Raft election and repropose the lease acquisition // command, which can take a couple of Raft election timeouts. timeout := 2 * r.store.cfg.RaftElectionTimeout() - if err := contextutil.RunWithTimeout(ctx, "acquire-lease", timeout, - func(ctx context.Context) error { - status, pErr = r.redirectOnOrAcquireLeaseForRequestWithoutTimeout(ctx, reqTS, brSig) - return nil - }, - ); err != nil { - return kvserverpb.LeaseStatus{}, roachpb.NewError(err) - } - return status, pErr -} -// redirectOnOrAcquireLeaseForRequestWithoutTimeout is like -// redirectOnOrAcquireLeaseForRequest, but runs without a timeout. -func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout( - ctx context.Context, reqTS hlc.Timestamp, brSig signaller, -) (kvserverpb.LeaseStatus, *roachpb.Error) { + // Does not use RunWithTimeout(), because we do not want to mask the + // NotLeaseHolderError on context cancellation. + ctx, cancel := context.WithTimeout(ctx, timeout) // nolint:context + defer cancel() + // Try fast-path. now := r.store.Clock().NowAsClockTimestamp() {