Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: always return NLHE on lease acquisition timeouts #84865

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -1178,3 +1179,122 @@ func TestAlterRangeRelocate(t *testing.T) {
require.NoError(t, err)
require.NoError(t, tc.WaitForVoters(rhsDesc.StartKey.AsRawKey(), tc.Targets(0, 3, 4)...))
}

// TestAcquireLeaseTimeout is a regression test that lease acquisition timeouts
// always return a NotLeaseHolderError.
func TestAcquireLeaseTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Set a timeout for the test context, to guard against the test getting stuck.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

// blockRangeID, when non-zero, will signal the replica to delay lease
// requests for the given range until the request's context is cancelled, and
// return the context error.
var blockRangeID int32

maybeBlockLeaseRequest := func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if ba.IsSingleRequestLeaseRequest() && int32(ba.RangeID) == atomic.LoadInt32(&blockRangeID) {
t.Logf("blocked lease request for r%d", ba.RangeID)
<-ctx.Done()
return roachpb.NewError(ctx.Err())
}
return nil
}

// The lease request timeout depends on the Raft election timeout, so we set
// it low to get faster timeouts (800 ms) and speed up the test.
var raftCfg base.RaftConfig
raftCfg.SetDefaults()
raftCfg.RaftHeartbeatIntervalTicks = 1
raftCfg.RaftElectionTimeoutTicks = 2

manualClock := hlc.NewHybridManualClock()

// Start a two-node cluster.
const numNodes = 2
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: raftCfg,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: maybeBlockLeaseRequest,
AllowLeaseRequestProposalsWhenNotLeader: true,
},
},
},
})
defer tc.Stopper().Stop(ctx)
srv := tc.Server(0)

// Split off a range, upreplicate it to both servers, and move the lease
// from n1 to n2.
splitKey := roachpb.Key("a")
_, desc := tc.SplitRangeOrFatal(t, splitKey)
tc.AddVotersOrFatal(t, splitKey, tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))
repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)

// Stop n2 and increment its epoch to invalidate the lease.
lv, ok := tc.Server(1).NodeLiveness().(*liveness.NodeLiveness)
require.True(t, ok)
lvNode2, ok := lv.Self()
require.True(t, ok)
tc.StopServer(1)

manualClock.Forward(lvNode2.Expiration.WallTime)
lv, ok = srv.NodeLiveness().(*liveness.NodeLiveness)
require.True(t, ok)
testutils.SucceedsSoon(t, func() error {
err := lv.IncrementEpoch(context.Background(), lvNode2)
if errors.Is(err, liveness.ErrEpochAlreadyIncremented) {
return nil
}
return err
})
require.False(t, repl.CurrentLeaseStatus(ctx).IsValid())

// Trying to acquire the lease should error with an empty NLHE, since the
// range doesn't have quorum.
var nlhe *roachpb.NotLeaseHolderError
_, err = repl.TestingAcquireLease(ctx)
require.Error(t, err)
require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type
require.ErrorAs(t, err, &nlhe)
require.Empty(t, nlhe.Lease)

// Now for the real test: block lease requests for the range, and send off a
// bunch of sequential lease requests with a small delay, which should join
// onto the same lease request internally. All of these should return a NLHE
// when they time out, regardless of the internal mechanics.
atomic.StoreInt32(&blockRangeID, int32(desc.RangeID))

const attempts = 20
var wg sync.WaitGroup
errC := make(chan error, attempts)
wg.Add(attempts)
for i := 0; i < attempts; i++ {
time.Sleep(10 * time.Millisecond)
go func() {
_, err := repl.TestingAcquireLease(ctx)
errC <- err
wg.Done()
}()
}
wg.Wait()
close(errC)

for err := range errC {
require.Error(t, err)
require.IsType(t, &roachpb.NotLeaseHolderError{}, err) // check exact type
require.ErrorAs(t, err, &nlhe)
require.Empty(t, nlhe.Lease)
}
}
36 changes: 7 additions & 29 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -351,15 +350,10 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
p.nextLease = roachpb.Lease{}
}

// We may need to hold a Raft election and repropose the lease acquisition
// command, which can take a couple of Raft election timeouts.
timeout := 2 * p.repl.store.cfg.RaftElectionTimeout()

const taskName = "pendingLeaseRequest: requesting lease"
err := p.repl.store.Stopper().RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: taskName,
TaskName: "pendingLeaseRequest: requesting lease",
// Trace the lease acquisition as a child even though it might outlive the
// parent in case the parent's ctx is canceled. Other requests might
// later block on this lease acquisition too, and we can't include the
Expand All @@ -370,13 +364,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
func(ctx context.Context) {
defer sp.Finish()

// Run the lease acquisition request with a timeout. We must eventually
// return a NotLeaseHolderError rather than hanging, otherwise we could
// prevent the caller from nudging a different replica into acquiring the
// lease.
err := contextutil.RunWithTimeout(ctx, taskName, timeout, func(ctx context.Context) error {
return p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq)
})
err := p.requestLease(ctx, nextLeaseHolder, reqLease, status, leaseReq)
// Error will be handled below.

// We reset our state below regardless of whether we've gotten an error or
Expand Down Expand Up @@ -1230,22 +1218,12 @@ func (r *Replica) redirectOnOrAcquireLeaseForRequest(
// We may need to hold a Raft election and repropose the lease acquisition
// command, which can take a couple of Raft election timeouts.
timeout := 2 * r.store.cfg.RaftElectionTimeout()
if err := contextutil.RunWithTimeout(ctx, "acquire-lease", timeout,
func(ctx context.Context) error {
status, pErr = r.redirectOnOrAcquireLeaseForRequestWithoutTimeout(ctx, reqTS, brSig)
return nil
},
); err != nil {
return kvserverpb.LeaseStatus{}, roachpb.NewError(err)
}
return status, pErr
}

// redirectOnOrAcquireLeaseForRequestWithoutTimeout is like
// redirectOnOrAcquireLeaseForRequest, but runs without a timeout.
func (r *Replica) redirectOnOrAcquireLeaseForRequestWithoutTimeout(
ctx context.Context, reqTS hlc.Timestamp, brSig signaller,
) (kvserverpb.LeaseStatus, *roachpb.Error) {
// Does not use RunWithTimeout(), because we do not want to mask the
// NotLeaseHolderError on context cancellation.
ctx, cancel := context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()

// Try fast-path.
now := r.store.Clock().NowAsClockTimestamp()
{
Expand Down