From c874b0dbfbabdfb36fc7dfbb78edb062ec0c93b6 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 21 Jul 2022 13:07:04 +0000 Subject: [PATCH] kvserver: always return NLHE on lease acquisition timeouts In ab74b974bd we added internal timeouts for lease acquisitions. These were wrapped in `RunWithTimeout()`, as mandated for context timeouts. However, this would mask the returned `NotLeaseHolderError` as a `TimeoutError`, preventing the DistSender from retrying it and instead propagating it out to the client. Additionally, context cancellation errors from the actual RPC call were never wrapped as a `NotLeaseHolderError` in the first place. This ended up only happening in a very specific scenario where the outer timeout added to the client context did not trigger, but the inner timeout for the coalesced request context did trigger while the lease request was in flight. Accidentally, the outer `RunWithTimeout()` call did not return the `roachpb.Error` from the closure but instead passed it via a captured variable, bypassing the error wrapping. This patch replaces the `RunWithTimeout()` calls with regular `context.WithTimeout()` calls to avoid the error wrapping. Another option would be to extract an NLHE from the error chain, but this would require correct propagation of the structured error chain across RPC boundaries, so with an eye towards backports we instead choose to return a bare `NotLeaseHolderError`. The patch also removes the inner lease request timeout, since it has questionable benefits and the outer timeout is sufficient to prevent leases getting stuck for the overall range. Release note (bug fix): Fixed a bug where clients could sometimes receive errors due to lease acquisition timeouts of the form `operation "storage.pendingLeaseRequest: requesting lease" timed out after 6s`. --- pkg/kv/kvserver/client_lease_test.go | 121 +++++++++++++++++++++++++ pkg/kv/kvserver/replica_range_lease.go | 36 ++------ 2 files changed, 128 insertions(+), 29 deletions(-) diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 8850bd24dbf5..512d40a79de0 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" @@ -1309,3 +1310,123 @@ func TestAlterRangeRelocate(t *testing.T) { require.NoError(t, err) require.NoError(t, tc.WaitForVoters(rhsDesc.StartKey.AsRawKey(), tc.Targets(0, 3, 4)...)) } + +// TestAcquireLeaseTimeout is a regression test that lease acquisition timeouts +// always return a NotLeaseHolderError. +func TestAcquireLeaseTimeout(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Set a timeout for the test context, to guard against the test getting stuck. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // blockRangeID, when non-zero, will signal the replica to delay lease + // requests for the given range until the request's context is cancelled, and + // return the context error. + var blockRangeID int32 + + maybeBlockLeaseRequest := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if ba.IsSingleRequest() && ba.Requests[0].GetInner().Method() == roachpb.RequestLease && + int32(ba.RangeID) == atomic.LoadInt32(&blockRangeID) { + t.Logf("blocked lease request for r%d", ba.RangeID) + <-ctx.Done() + return roachpb.NewError(ctx.Err()) + } + return nil + } + + // The lease request timeout depends on the Raft election timeout, so we set + // it low to get faster timeouts (800 ms) and speed up the test. + var raftCfg base.RaftConfig + raftCfg.SetDefaults() + raftCfg.RaftHeartbeatIntervalTicks = 1 + raftCfg.RaftElectionTimeoutTicks = 2 + + manualClock := hlc.NewHybridManualClock() + + // Start a two-node cluster. + const numNodes = 2 + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + RaftConfig: raftCfg, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: maybeBlockLeaseRequest, + AllowLeaseRequestProposalsWhenNotLeader: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + srv := tc.Server(0) + + // Split off a range, upreplicate it to both servers, and move the lease + // from n1 to n2. + splitKey := roachpb.Key("a") + _, desc := tc.SplitRangeOrFatal(t, splitKey) + tc.AddVotersOrFatal(t, splitKey, tc.Target(1)) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID) + require.NoError(t, err) + + // Stop n2 and increment its epoch to invalidate the lease. + lv, ok := tc.Server(1).NodeLiveness().(*liveness.NodeLiveness) + require.True(t, ok) + lvNode2, ok := lv.Self() + require.True(t, ok) + tc.StopServer(1) + + manualClock.Forward(lvNode2.Expiration.WallTime) + lv, ok = srv.NodeLiveness().(*liveness.NodeLiveness) + require.True(t, ok) + testutils.SucceedsSoon(t, func() error { + err := lv.IncrementEpoch(context.Background(), lvNode2) + if errors.Is(err, liveness.ErrEpochAlreadyIncremented) { + return nil + } + return err + }) + require.False(t, repl.CurrentLeaseStatus(ctx).IsValid()) + + // Trying to acquire the lease should error with an empty NLHE, since the + // range doesn't have quorum. + var nlhe *roachpb.NotLeaseHolderError + _, err = repl.TestingAcquireLease(ctx) + require.Error(t, err) + require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type + require.ErrorAs(t, err, &nlhe) + require.Empty(t, nlhe.Lease) + + // Now for the real test: block lease requests for the range, and send off a + // bunch of sequential lease requests with a small delay, which should join + // onto the same lease request internally. All of these should return a NLHE + // when they time out, regardless of the internal mechanics. + atomic.StoreInt32(&blockRangeID, int32(desc.RangeID)) + + const attempts = 20 + var wg sync.WaitGroup + errC := make(chan error, attempts) + wg.Add(attempts) + for i := 0; i < attempts; i++ { + time.Sleep(10 * time.Millisecond) + go func() { + _, err := repl.TestingAcquireLease(ctx) + errC <- err + wg.Done() + }() + } + wg.Wait() + close(errC) + + for err := range errC { + require.Error(t, err) + require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type + require.ErrorAs(t, err, &nlhe) + require.Empty(t, nlhe.Lease) + } +} diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 2388bdc31d53..4ec35570aa30 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -53,7 +53,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -349,15 +348,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync( p.nextLease = roachpb.Lease{} } - // We may need to hold a Raft election and repropose the lease acquisition - // command, which can take a couple of Raft election timeouts. - timeout := 2 * p.repl.store.cfg.RaftElectionTimeout() - - const taskName = "pendingLeaseRequest: requesting lease" err := p.repl.store.Stopper().RunAsyncTaskEx( ctx, stop.TaskOpts{ - TaskName: taskName, + TaskName: "pendingLeaseRequest: requesting lease", // Trace the lease acquisition as a child even though it might outlive the // parent in case the parent's ctx is canceled. Other requests might // later block on this lease acquisition too, and we can't include the @@ -368,13 +362,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( func(ctx context.Context) { defer sp.Finish() - // Run the lease acquisition request with a timeout. We must eventually - // return a NotLeaseHolderError rather than hanging, otherwise we could - // prevent the caller from nudging a different replica into acquiring the - // lease. - err := contextutil.RunWithTimeout(ctx, taskName, timeout, func(ctx context.Context) error { - return p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq) - }) + err := p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq) // Error will be handled below. // We reset our state below regardless of whether we've gotten an error or @@ -1167,22 +1155,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( // We may need to hold a Raft election and repropose the lease acquisition // command, which can take a couple of Raft election timeouts. timeout := 2 * r.store.cfg.RaftElectionTimeout() - if err := contextutil.RunWithTimeout(ctx, "acquire-lease", timeout, - func(ctx context.Context) error { - status, pErr = r.redirectOnOrAcquireLeaseForRequestWithoutTimeout(ctx, reqTS, brSig) - return nil - }, - ); err != nil { - return kvserverpb.LeaseStatus{}, roachpb.NewError(err) - } - return status, pErr -} -// redirectOnOrAcquireLeaseForRequestWithoutTimeout is like -// redirectOnOrAcquireLeaseForRequest, but runs without a timeout. -func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout( - ctx context.Context, reqTS hlc.Timestamp, brSig signaller, -) (kvserverpb.LeaseStatus, *roachpb.Error) { + // Does not use RunWithTimeout(), because we do not want to mask the + // NotLeaseHolderError on context cancellation. + ctx, cancel := context.WithTimeout(ctx, timeout) // nolint:context + defer cancel() + // Try fast-path. now := r.store.Clock().NowAsClockTimestamp() {