From 8c690ca3be41dea68a0ea1e5c508fb06b56eea43 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 21 Jul 2022 13:07:04 +0000 Subject: [PATCH] kvserver: always return NLHE on lease acquisition timeouts In ab74b974bd we added internal timeouts for lease acquisitions. These were wrapped in `RunWithTimeout()`, as mandated for context timeouts. However, this would mask the returned `NotLeaseHolderError` as a `TimeoutError`, preventing the DistSender from retrying it and instead propagating it out to the client. Additionally, context cancellation errors from the actual RPC call were never wrapped as a `NotLeaseHolderError` in the first place. This ended up only happening in a very specific scenario where the outer timeout added to the client context did not trigger, but the inner timeout for the coalesced request context did trigger while the lease request was in flight. Accidentally, the outer `RunWithTimeout()` call did not return the `roachpb.Error` from the closure but instead passed it via a captured variable, bypassing the error wrapping. This patch replaces the `RunWithTimeout()` calls with regular `context.WithTimeout()` calls to avoid the error wrapping, and returns a `NotLeaseHolderError` from `requestLease()` if the RPC request fails and the context was cancelled (presumably causing the error). Another option would be to extract an NLHE from the error chain, but this would require correct propagation of the structured error chain across RPC boundaries, so out of an abundance of caution and with an eye towards backports, we instead choose to return a bare `NotLeaseHolderError`. The empty lease in the returned error prevents the DistSender from updating its caches on context cancellation. Release note (bug fix): Fixed a bug where clients could sometimes receive errors due to lease acquisition timeouts of the form `operation "storage.pendingLeaseRequest: requesting lease" timed out after 6s`. --- pkg/kv/kvserver/client_lease_test.go | 130 +++++++++++++++++++++++++ pkg/kv/kvserver/replica_range_lease.go | 41 ++++---- 2 files changed, 150 insertions(+), 21 deletions(-) diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 6cdb9c56976b..bf2723d6715d 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1178,3 +1179,132 @@ func TestAlterRangeRelocate(t *testing.T) { require.NoError(t, err) require.NoError(t, tc.WaitForVoters(rhsDesc.StartKey.AsRawKey(), tc.Targets(0, 3, 4)...)) } + +// TestAcquireLeaseTimeout is a regression test that lease acquisition timeouts +// always return a NotLeaseHolderError, both when the given context times out and +// when the internal context times out. It tests a very specific scenario, where +// the outer, uncoalesced request timeout (in redirectOnOrAcquireLeaseForRequest) +// does not fire but the inner coalesced request timeout (in requestLease) does +// fire, and does so while waiting for the request to return. Previously, this +// would return a DeadlineExceeded error which in turn got wrapped in a +// TimeoutError due to RunWithTimeout(), preventing the DistSender from handling +// it correctly and instead propagating it to the client. +func TestAcquireLeaseTimeout(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Set a timeout for the test context, to guard against the test getting stuck. + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // blockRangeID, when non-zero, will signal the replica to delay lease + // requests for the given range until the request's context times out, and + // return the context error. + var blockRangeID int32 + + maybeBlockLeaseRequest := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error { + if ba.IsSingleRequestLeaseRequest() && int32(ba.RangeID) == atomic.LoadInt32(&blockRangeID) { + t.Logf("blocked lease request for r%d", ba.RangeID) + <-ctx.Done() + return roachpb.NewError(ctx.Err()) + } + return nil + } + + // The lease request timeout depends on the Raft election timeout, so we set + // it low to get faster timeouts (800 ms) and speed up the test. + var raftCfg base.RaftConfig + raftCfg.SetDefaults() + raftCfg.RaftHeartbeatIntervalTicks = 1 + raftCfg.RaftElectionTimeoutTicks = 2 + + manualClock := hlc.NewHybridManualClock() + + // Start a two-node cluster. + const numNodes = 2 + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + RaftConfig: raftCfg, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + WallClock: manualClock, + }, + Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: maybeBlockLeaseRequest, + AllowLeaseRequestProposalsWhenNotLeader: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + srv := tc.Server(0) + + // Split off a range, upreplicate it to both servers, and move the lease + // from n1 to n2. + splitKey := roachpb.Key("a") + _, desc := tc.SplitRangeOrFatal(t, splitKey) + tc.AddVotersOrFatal(t, splitKey, tc.Target(1)) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1)) + repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID) + require.NoError(t, err) + + // Stop n2 and increment its epoch to invalidate the lease. + lv, ok := tc.Server(1).NodeLiveness().(*liveness.NodeLiveness) + require.True(t, ok) + lvNode2, ok := lv.Self() + require.True(t, ok) + tc.StopServer(1) + + manualClock.Forward(lvNode2.Expiration.WallTime) + lv, ok = srv.NodeLiveness().(*liveness.NodeLiveness) + require.True(t, ok) + testutils.SucceedsSoon(t, func() error { + err := lv.IncrementEpoch(context.Background(), lvNode2) + if errors.Is(err, liveness.ErrEpochAlreadyIncremented) { + return nil + } + return err + }) + require.False(t, repl.CurrentLeaseStatus(ctx).IsValid()) + + // Trying to acquire the lease should error with an empty NLHE, since the + // range doesn't have quorum. + var nlhe *roachpb.NotLeaseHolderError + _, err = repl.TestingAcquireLease(ctx) + require.Error(t, err) + require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type + require.ErrorAs(t, err, &nlhe) + require.Empty(t, nlhe.Lease) + + // Now for the real test: block lease requests for the range, and send off a + // bunch of sequential lease requests with a small delay, which should join + // onto the same lease request internally. All of these should return a NLHE + // when they time out, either because of their own timeout or because the + // coalesced request timed out. We need to explicitly block lease requests + // while in flight rather than rely on the lost quorum here, to tickle the + // error return path that manifested the original bug. + atomic.StoreInt32(&blockRangeID, int32(desc.RangeID)) + + const attempts = 20 + var wg sync.WaitGroup + errC := make(chan error, attempts) + wg.Add(attempts) + for i := 0; i < attempts; i++ { + time.Sleep(10 * time.Millisecond) + go func() { + _, err := repl.TestingAcquireLease(ctx) + errC <- err + wg.Done() + }() + } + wg.Wait() + close(errC) + + for err := range errC { + require.Error(t, err) + require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type + require.ErrorAs(t, err, &nlhe) + require.Empty(t, nlhe.Lease) + } +} diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index d6ac463daef1..d4a909653504 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -54,7 +54,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -355,11 +354,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // command, which can take a couple of Raft election timeouts. timeout := 2 * p.repl.store.cfg.RaftElectionTimeout() - const taskName = "pendingLeaseRequest: requesting lease" err := p.repl.store.Stopper().RunAsyncTaskEx( ctx, stop.TaskOpts{ - TaskName: taskName, + TaskName: "pendingLeaseRequest: requesting lease", // Trace the lease acquisition as a child even though it might outlive the // parent in case the parent's ctx is canceled. Other requests might // later block on this lease acquisition too, and we can't include the @@ -374,9 +372,12 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // return a NotLeaseHolderError rather than hanging, otherwise we could // prevent the caller from nudging a different replica into acquiring the // lease. - err := contextutil.RunWithTimeout(ctx, taskName, timeout, func(ctx context.Context) error { - return p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq) - }) + // + // Does not use RunWithTimeout(), because we do not want to mask the + // NotLeaseHolderError on context cancellation. + requestLeaseCtx, requestLeaseCancel := context.WithTimeout(ctx, timeout) // nolint:context + defer requestLeaseCancel() + err := p.requestLease(requestLeaseCtx, nextLeaseHolder, reqLease, status, leaseReq) // Error will be handled below. // We reset our state below regardless of whether we've gotten an error or @@ -526,6 +527,14 @@ func (p *pendingLeaseRequest) requestLease( // up on the lease and thus worsening the situation. ba.Add(leaseReq) _, pErr := p.repl.Send(ctx, ba) + // If the lease request failed and the context was cancelled, return a + // NotLeaseHolderError. We expect asyncRequestLease to pass a new context with + // a timeout, disconnected from the caller's context. The DistSender will + // check for client context cancellation when handling the error too. + if pErr != nil && ctx.Err() != nil { + return newNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(), + fmt.Sprintf("lease acquisition cancelled: %s", ctx.Err())) + } return pErr.GoError() } @@ -1228,22 +1237,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest( // We may need to hold a Raft election and repropose the lease acquisition // command, which can take a couple of Raft election timeouts. timeout := 2 * r.store.cfg.RaftElectionTimeout() - if err := contextutil.RunWithTimeout(ctx, "acquire-lease", timeout, - func(ctx context.Context) error { - status, pErr = r.redirectOnOrAcquireLeaseForRequestWithoutTimeout(ctx, reqTS, brSig) - return nil - }, - ); err != nil { - return kvserverpb.LeaseStatus{}, roachpb.NewError(err) - } - return status, pErr -} -// redirectOnOrAcquireLeaseForRequestWithoutTimeout is like -// redirectOnOrAcquireLeaseForRequest, but runs without a timeout. -func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout( - ctx context.Context, reqTS hlc.Timestamp, brSig signaller, -) (kvserverpb.LeaseStatus, *roachpb.Error) { + // Does not use RunWithTimeout(), because we do not want to mask the + // NotLeaseHolderError on context cancellation. + ctx, cancel := context.WithTimeout(ctx, timeout) // nolint:context + defer cancel() + // Try fast-path. now := r.store.Clock().NowAsClockTimestamp() {