Skip to content

Commit

Permalink
kvserver: always return NLHE on lease acquisition timeouts
Browse files Browse the repository at this point in the history
In ab74b97 we added internal timeouts for lease acquisitions. These
were wrapped in `RunWithTimeout()`, as mandated for context timeouts.
However, this would mask the returned `NotLeaseHolderError` as a
`TimeoutError`, preventing the DistSender from retrying it and instead
propagating it out to the client. Additionally, context cancellation
errors from the actual RPC call were never wrapped as a
`NotLeaseHolderError` in the first place.

This ended up only happening in a very specific scenario where the outer
timeout added to the client context did not trigger, but the inner
timeout for the coalesced request context did trigger while the lease
request was in flight. Accidentally, the outer `RunWithTimeout()` call
did not return the `roachpb.Error` from the closure but instead passed
it via a captured variable, bypassing the error wrapping.

This patch replaces the `RunWithTimeout()` calls with regular
`context.WithTimeout()` calls to avoid the error wrapping. Another
option would be to extract an NLHE from the error chain, but this would
require correct propagation of the structured error chain across RPC
boundaries, so with an eye towards backports we instead choose to return
a bare `NotLeaseHolderError`.

The patch also removes the inner lease request timeout, since it has
questionable benefits and the outer timeout is sufficient to prevent
leases getting stuck for the overall range.

Release note (bug fix): Fixed a bug where clients could sometimes
receive errors due to lease acquisition timeouts of the form
`operation "storage.pendingLeaseRequest: requesting lease" timed out after 6s`.
  • Loading branch information
erikgrinaker committed Aug 1, 2022
1 parent 94b9091 commit 067e740
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 29 deletions.
120 changes: 120 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -1178,3 +1179,122 @@ func TestAlterRangeRelocate(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tc.WaitForVoters(rhsDesc.StartKey.AsRawKey(), tc.Targets(0, 3, 4)...))
}

// TestAcquireLeaseTimeout is a regression test that lease acquisition timeouts
// always return a NotLeaseHolderError.
func TestAcquireLeaseTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Set a timeout for the test context, to guard against the test getting stuck.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

// blockRangeID, when non-zero, will signal the replica to delay lease
// requests for the given range until the request's context is cancelled, and
// return the context error.
var blockRangeID int32

maybeBlockLeaseRequest := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if ba.IsSingleRequestLeaseRequest() && int32(ba.RangeID) == atomic.LoadInt32(&blockRangeID) {
t.Logf("blocked lease request for r%d", ba.RangeID)
<-ctx.Done()
return roachpb.NewError(ctx.Err())
}
return nil
}

// The lease request timeout depends on the Raft election timeout, so we set
// it low to get faster timeouts (800 ms) and speed up the test.
var raftCfg base.RaftConfig
raftCfg.SetDefaults()
raftCfg.RaftHeartbeatIntervalTicks = 1
raftCfg.RaftElectionTimeoutTicks = 2

manualClock := hlc.NewHybridManualClock()

// Start a two-node cluster.
const numNodes = 2
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: raftCfg,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: maybeBlockLeaseRequest,
AllowLeaseRequestProposalsWhenNotLeader: true,
},
},
},
})
defer tc.Stopper().Stop(ctx)
srv := tc.Server(0)

// Split off a range, upreplicate it to both servers, and move the lease
// from n1 to n2.
splitKey := roachpb.Key("a")
_, desc := tc.SplitRangeOrFatal(t, splitKey)
tc.AddVotersOrFatal(t, splitKey, tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))
repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)

// Stop n2 and increment its epoch to invalidate the lease.
lv, ok := tc.Server(1).NodeLiveness().(*liveness.NodeLiveness)
require.True(t, ok)
lvNode2, ok := lv.Self()
require.True(t, ok)
tc.StopServer(1)

manualClock.Forward(lvNode2.Expiration.WallTime)
lv, ok = srv.NodeLiveness().(*liveness.NodeLiveness)
require.True(t, ok)
testutils.SucceedsSoon(t, func() error {
err := lv.IncrementEpoch(context.Background(), lvNode2)
if errors.Is(err, liveness.ErrEpochAlreadyIncremented) {
return nil
}
return err
})
require.False(t, repl.CurrentLeaseStatus(ctx).IsValid())

// Trying to acquire the lease should error with an empty NLHE, since the
// range doesn't have quorum.
var nlhe *roachpb.NotLeaseHolderError
_, err = repl.TestingAcquireLease(ctx)
require.Error(t, err)
require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type
require.ErrorAs(t, err, &nlhe)
require.Empty(t, nlhe.Lease)

// Now for the real test: block lease requests for the range, and send off a
// bunch of sequential lease requests with a small delay, which should join
// onto the same lease request internally. All of these should return a NLHE
// when they time out, regardless of the internal mechanics.
atomic.StoreInt32(&blockRangeID, int32(desc.RangeID))

const attempts = 20
var wg sync.WaitGroup
errC := make(chan error, attempts)
wg.Add(attempts)
for i := 0; i < attempts; i++ {
time.Sleep(10 * time.Millisecond)
go func() {
_, err := repl.TestingAcquireLease(ctx)
errC <- err
wg.Done()
}()
}
wg.Wait()
close(errC)

for err := range errC {
require.Error(t, err)
require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type
require.ErrorAs(t, err, &nlhe)
require.Empty(t, nlhe.Lease)
}
}
36 changes: 7 additions & 29 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -351,15 +350,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
p.nextLease = roachpb.Lease{}
}

// We may need to hold a Raft election and repropose the lease acquisition
// command, which can take a couple of Raft election timeouts.
timeout := 2 * p.repl.store.cfg.RaftElectionTimeout()

const taskName = "pendingLeaseRequest: requesting lease"
err := p.repl.store.Stopper().RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: taskName,
TaskName: "pendingLeaseRequest: requesting lease",
// Trace the lease acquisition as a child even though it might outlive the
// parent in case the parent's ctx is canceled. Other requests might
// later block on this lease acquisition too, and we can't include the
Expand All @@ -370,13 +364,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
func(ctx context.Context) {
defer sp.Finish()

// Run the lease acquisition request with a timeout. We must eventually
// return a NotLeaseHolderError rather than hanging, otherwise we could
// prevent the caller from nudging a different replica into acquiring the
// lease.
err := contextutil.RunWithTimeout(ctx, taskName, timeout, func(ctx context.Context) error {
return p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq)
})
err := p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq)
// Error will be handled below.

// We reset our state below regardless of whether we've gotten an error or
Expand Down Expand Up @@ -1230,22 +1218,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
// We may need to hold a Raft election and repropose the lease acquisition
// command, which can take a couple of Raft election timeouts.
timeout := 2 * r.store.cfg.RaftElectionTimeout()
if err := contextutil.RunWithTimeout(ctx, "acquire-lease", timeout,
func(ctx context.Context) error {
status, pErr = r.redirectOnOrAcquireLeaseForRequestWithoutTimeout(ctx, reqTS, brSig)
return nil
},
); err != nil {
return kvserverpb.LeaseStatus{}, roachpb.NewError(err)
}
return status, pErr
}

// redirectOnOrAcquireLeaseForRequestWithoutTimeout is like
// redirectOnOrAcquireLeaseForRequest, but runs without a timeout.
func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout(
ctx context.Context, reqTS hlc.Timestamp, brSig signaller,
) (kvserverpb.LeaseStatus, *roachpb.Error) {
// Does not use RunWithTimeout(), because we do not want to mask the
// NotLeaseHolderError on context cancellation.
ctx, cancel := context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()

// Try fast-path.
now := r.store.Clock().NowAsClockTimestamp()
{
Expand Down

0 comments on commit 067e740

Please sign in to comment.