Skip to content

Commit

Permalink
kvserver: always return NLHE on lease acquisition timeouts
Browse files Browse the repository at this point in the history
In ab74b97 we added internal timeouts for lease acquisitions. These
were wrapped in `RunWithTimeout()`, as mandated for context timeouts.
However, this would mask the returned `NotLeaseHolderError` as a
`TimeoutError`, preventing the DistSender from retrying it and instead
propagating it out to the client. Additionally, context cancellation
errors from the actual RPC call were never wrapped as a
`NotLeaseHolderError` in the first place.

This ended up only happening in a very specific scenario where the outer
timeout added to the client context did not trigger, but the inner
timeout for the coalesced request context did trigger while the lease
request was in flight. Accidentally, the outer `RunWithTimeout()` call
did not return the `roachpb.Error` from the closure but instead passed
it via a captured variable, bypassing the error wrapping.

This patch replaces the `RunWithTimeout()` calls with regular
`context.WithTimeout()` calls to avoid the error wrapping, and returns a
`NotLeaseHolderError` from `requestLease()` if the RPC request fails and
the context was cancelled (presumably causing the error). Another option
would be to extract an NLHE from the error chain, but this would require
correct propagation of the structured error chain across RPC boundaries,
so out of an abundance of caution and with an eye towards backports, we
instead choose to return a bare `NotLeaseHolderError`.

The empty lease in the returned error prevents the DistSender from
updating its caches on context cancellation.

Release note (bug fix): Fixed a bug where clients could sometimes
receive errors due to lease acquisition timeouts of the form
`operation "storage.pendingLeaseRequest: requesting lease" timed out after 6s`.
  • Loading branch information
erikgrinaker committed Jul 27, 2022
1 parent 457d724 commit 8c690ca
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 21 deletions.
130 changes: 130 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -1178,3 +1179,132 @@ func TestAlterRangeRelocate(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tc.WaitForVoters(rhsDesc.StartKey.AsRawKey(), tc.Targets(0, 3, 4)...))
}

// TestAcquireLeaseTimeout is a regression test that lease acquisition timeouts
// always return a NotLeaseHolderError, both when the given context times out and
// when the internal context times out. It tests a very specific scenario, where
// the outer, uncoalesced request timeout (in redirectOnOrAcquireLeaseForRequest)
// does not fire but the inner coalesced request timeout (in requestLease) does
// fire, and does so while waiting for the request to return. Previously, this
// would return a DeadlineExceeded error which in turn got wrapped in a
// TimeoutError due to RunWithTimeout(), preventing the DistSender from handling
// it correctly and instead propagating it to the client.
func TestAcquireLeaseTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Set a timeout for the test context, to guard against the test getting stuck.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

// blockRangeID, when non-zero, will signal the replica to delay lease
// requests for the given range until the request's context times out, and
// return the context error.
var blockRangeID int32

maybeBlockLeaseRequest := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if ba.IsSingleRequestLeaseRequest() && int32(ba.RangeID) == atomic.LoadInt32(&blockRangeID) {
t.Logf("blocked lease request for r%d", ba.RangeID)
<-ctx.Done()
return roachpb.NewError(ctx.Err())
}
return nil
}

// The lease request timeout depends on the Raft election timeout, so we set
// it low to get faster timeouts (800 ms) and speed up the test.
var raftCfg base.RaftConfig
raftCfg.SetDefaults()
raftCfg.RaftHeartbeatIntervalTicks = 1
raftCfg.RaftElectionTimeoutTicks = 2

manualClock := hlc.NewHybridManualClock()

// Start a two-node cluster.
const numNodes = 2
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: raftCfg,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: maybeBlockLeaseRequest,
AllowLeaseRequestProposalsWhenNotLeader: true,
},
},
},
})
defer tc.Stopper().Stop(ctx)
srv := tc.Server(0)

// Split off a range, upreplicate it to both servers, and move the lease
// from n1 to n2.
splitKey := roachpb.Key("a")
_, desc := tc.SplitRangeOrFatal(t, splitKey)
tc.AddVotersOrFatal(t, splitKey, tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))
repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)

// Stop n2 and increment its epoch to invalidate the lease.
lv, ok := tc.Server(1).NodeLiveness().(*liveness.NodeLiveness)
require.True(t, ok)
lvNode2, ok := lv.Self()
require.True(t, ok)
tc.StopServer(1)

manualClock.Forward(lvNode2.Expiration.WallTime)
lv, ok = srv.NodeLiveness().(*liveness.NodeLiveness)
require.True(t, ok)
testutils.SucceedsSoon(t, func() error {
err := lv.IncrementEpoch(context.Background(), lvNode2)
if errors.Is(err, liveness.ErrEpochAlreadyIncremented) {
return nil
}
return err
})
require.False(t, repl.CurrentLeaseStatus(ctx).IsValid())

// Trying to acquire the lease should error with an empty NLHE, since the
// range doesn't have quorum.
var nlhe *roachpb.NotLeaseHolderError
_, err = repl.TestingAcquireLease(ctx)
require.Error(t, err)
require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type
require.ErrorAs(t, err, &nlhe)
require.Empty(t, nlhe.Lease)

// Now for the real test: block lease requests for the range, and send off a
// bunch of sequential lease requests with a small delay, which should join
// onto the same lease request internally. All of these should return a NLHE
// when they time out, either because of their own timeout or because the
// coalesced request timed out. We need to explicitly block lease requests
// while in flight rather than rely on the lost quorum here, to tickle the
// error return path that manifested the original bug.
atomic.StoreInt32(&blockRangeID, int32(desc.RangeID))

const attempts = 20
var wg sync.WaitGroup
errC := make(chan error, attempts)
wg.Add(attempts)
for i := 0; i < attempts; i++ {
time.Sleep(10 * time.Millisecond)
go func() {
_, err := repl.TestingAcquireLease(ctx)
errC <- err
wg.Done()
}()
}
wg.Wait()
close(errC)

for err := range errC {
require.Error(t, err)
require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type
require.ErrorAs(t, err, &nlhe)
require.Empty(t, nlhe.Lease)
}
}
41 changes: 20 additions & 21 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -355,11 +354,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// command, which can take a couple of Raft election timeouts.
timeout := 2 * p.repl.store.cfg.RaftElectionTimeout()

const taskName = "pendingLeaseRequest: requesting lease"
err := p.repl.store.Stopper().RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: taskName,
TaskName: "pendingLeaseRequest: requesting lease",
// Trace the lease acquisition as a child even though it might outlive the
// parent in case the parent's ctx is canceled. Other requests might
// later block on this lease acquisition too, and we can't include the
Expand All @@ -374,9 +372,12 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// return a NotLeaseHolderError rather than hanging, otherwise we could
// prevent the caller from nudging a different replica into acquiring the
// lease.
err := contextutil.RunWithTimeout(ctx, taskName, timeout, func(ctx context.Context) error {
return p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq)
})
//
// Does not use RunWithTimeout(), because we do not want to mask the
// NotLeaseHolderError on context cancellation.
requestLeaseCtx, requestLeaseCancel := context.WithTimeout(ctx, timeout) // nolint:context
defer requestLeaseCancel()
err := p.requestLease(requestLeaseCtx, nextLeaseHolder, reqLease, status, leaseReq)
// Error will be handled below.

// We reset our state below regardless of whether we've gotten an error or
Expand Down Expand Up @@ -526,6 +527,14 @@ func (p *pendingLeaseRequest) requestLease(
// up on the lease and thus worsening the situation.
ba.Add(leaseReq)
_, pErr := p.repl.Send(ctx, ba)
// If the lease request failed and the context was cancelled, return a
// NotLeaseHolderError. We expect asyncRequestLease to pass a new context with
// a timeout, disconnected from the caller's context. The DistSender will
// check for client context cancellation when handling the error too.
if pErr != nil && ctx.Err() != nil {
return newNotLeaseHolderError(roachpb.Lease{}, p.repl.store.StoreID(), p.repl.Desc(),
fmt.Sprintf("lease acquisition cancelled: %s", ctx.Err()))
}
return pErr.GoError()
}

Expand Down Expand Up @@ -1228,22 +1237,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
// We may need to hold a Raft election and repropose the lease acquisition
// command, which can take a couple of Raft election timeouts.
timeout := 2 * r.store.cfg.RaftElectionTimeout()
if err := contextutil.RunWithTimeout(ctx, "acquire-lease", timeout,
func(ctx context.Context) error {
status, pErr = r.redirectOnOrAcquireLeaseForRequestWithoutTimeout(ctx, reqTS, brSig)
return nil
},
); err != nil {
return kvserverpb.LeaseStatus{}, roachpb.NewError(err)
}
return status, pErr
}

// redirectOnOrAcquireLeaseForRequestWithoutTimeout is like
// redirectOnOrAcquireLeaseForRequest, but runs without a timeout.
func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout(
ctx context.Context, reqTS hlc.Timestamp, brSig signaller,
) (kvserverpb.LeaseStatus, *roachpb.Error) {
// Does not use RunWithTimeout(), because we do not want to mask the
// NotLeaseHolderError on context cancellation.
ctx, cancel := context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()

// Try fast-path.
now := r.store.Clock().NowAsClockTimestamp()
{
Expand Down

0 comments on commit 8c690ca

Please sign in to comment.