From 965bc61b49d0ac3e2ae4eb24f8af97994e256fe4 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Tue, 18 May 2021 20:09:23 -0400 Subject: [PATCH] kvserver: simplify TestClosedTimestampInactiveAfterSubsumption Fixes #59448 This patch moves the test away from using the "old closed timestamp mechanism" to using the new one. The test becomes simpler in the process, and does not appear to be flaky any more (the test was skipped). Release note: None --- pkg/kv/kvserver/abortspan/abortspan.go | 1 + .../kvserver/batcheval/cmd_end_transaction.go | 2 +- pkg/kv/kvserver/batcheval/cmd_push_txn.go | 2 +- pkg/kv/kvserver/client_merge_test.go | 2 +- pkg/kv/kvserver/client_relocate_range_test.go | 2 +- pkg/kv/kvserver/client_replica_test.go | 2 +- pkg/kv/kvserver/closed_timestamp_test.go | 588 ++++++++---------- pkg/kv/kvserver/helpers_test.go | 9 - pkg/kv/kvserver/replica_closedts_test.go | 2 +- pkg/kv/kvserver/replica_command.go | 1 + pkg/kv/kvserver/replica_range_lease.go | 12 + pkg/kv/kvserver/replica_send.go | 2 +- pkg/kv/kvserver/testing_knobs.go | 60 ++ pkg/roachpb/batch.go | 7 +- pkg/roachpb/data.go | 2 +- .../serverutils/test_cluster_shim.go | 9 +- pkg/testutils/testcluster/BUILD.bazel | 1 - pkg/testutils/testcluster/testcluster.go | 43 +- 18 files changed, 367 insertions(+), 380 deletions(-) diff --git a/pkg/kv/kvserver/abortspan/abortspan.go b/pkg/kv/kvserver/abortspan/abortspan.go index 357e51d3f205..8ecc24c9ef4f 100644 --- a/pkg/kv/kvserver/abortspan/abortspan.go +++ b/pkg/kv/kvserver/abortspan/abortspan.go @@ -134,6 +134,7 @@ func (sc *AbortSpan) Put( txnID uuid.UUID, entry *roachpb.AbortSpanEntry, ) error { + log.VEventf(ctx, 2, "writing abort span entry for %s", txnID.Short()) key := keys.AbortSpanKey(sc.rangeID, txnID) return storage.MVCCPutProto(ctx, readWriter, ms, key, hlc.Timestamp{}, nil /* txn */, entry) } diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index cfe507d7b448..fb0d47b432e0 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -594,7 +594,7 @@ func RunCommitTrigger( // timestamp was bumped after it acquired latches. if txn.WriteTimestamp.Synthetic && rec.Clock().Now().Less(txn.WriteTimestamp) { return result.Result{}, errors.AssertionFailedf("txn %s with %s commit trigger needs "+ - "commit wait. Was its timestamp bumped after acquiring latches?", txn, errors.Safe(ct.Kind())) + "commit wait. Was its timestamp bumped after acquiring latches?", txn, ct.Kind()) } // Stage the commit trigger's side-effects so that they will go into effect on diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn.go b/pkg/kv/kvserver/batcheval/cmd_push_txn.go index 698d60553e90..c50fe0ea5bbc 100644 --- a/pkg/kv/kvserver/batcheval/cmd_push_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn.go @@ -139,7 +139,7 @@ func PushTxn( if err != nil { return result.Result{}, err } else if !ok { - log.VEventf(ctx, 2, "pushee txn record not found") + log.VEventf(ctx, 2, "pushee txn record not found (pushee: %s)", args.PusheeTxn.Short()) // There are three cases in which there is no transaction record: // // * the pushee is still active but its transaction record has not diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 1bf7c255f147..019b8fcea12f 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -4960,7 +4960,7 @@ func setupClusterWithSubsumedRange( errCh := make(chan error) blocker := filter.BlockNextMerge() go func() { - errCh <- mergeTxn(ctx, store, *lhsDesc) + errCh <- mergeWithRightNeighbor(ctx, store, *lhsDesc) }() defer func() { // Ensure that the request doesn't stay blocked if we fail. diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index 40a67fe9b364..fc7c2e337653 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -519,7 +519,7 @@ func setupReplicaRemovalTest( require.NoError(t, err) repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID) require.NoError(t, err) - err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) + _, err = tc.MoveRangeLeaseNonCooperatively(ctx, rangeDesc, tc.Target(1), manual) require.NoError(t, err) // Remove first store from raft group. diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 3b8b8dff57bf..45d48dfa6ef4 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3492,7 +3492,7 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { // Transfer the lease to Server 1. Do so non-cooperatively instead of using // a lease transfer, because the cooperative lease transfer would get stuck // acquiring latches, which are held by txn2. - err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) + _, err = tc.MoveRangeLeaseNonCooperatively(ctx, rangeDesc, tc.Target(1), manual) require.NoError(t, err) // Send an arbitrary request to the range to update the range descriptor diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 28199785096f..e262b20621dd 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -16,7 +16,6 @@ import ( "fmt" "math/rand" "strconv" - "sync" "sync/atomic" "testing" "time" @@ -26,8 +25,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -36,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -44,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -557,284 +560,221 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) { verifyNotLeaseHolderErrors(t, baRead, repls, 2) } -// TestClosedTimestampInactiveAfterSubsumption verifies that, during a merge, -// replicas of the subsumed range (RHS) cannot serve follower reads for -// timestamps after the subsumption time. -func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { +// Test that, during a merge, the closed timestamp of the subsumed RHS doesn't +// go above the subsumption time. It'd be bad if it did, since this advanced +// closed timestamp would be lost when the merge finalizes. +func TestClosedTimestampFrozenAfterSubsumption(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 59448, "flaky test") - // Skipping under short because this test pauses for a few seconds in order to - // trigger a node liveness expiration. - skip.UnderShort(t) - // TODO(aayush): After #51087, we're seeing some regression in the initial - // setup of this test that causes it to fail there. There are some - // improvements for that PR in-flight. Revisit at a later date and re-enable - // under race. - skip.UnderRace(t) - type postSubsumptionCallback func( - ctx context.Context, - t *testing.T, - tc serverutils.TestClusterInterface, - g *errgroup.Group, - rightDesc roachpb.RangeDescriptor, - rightLeaseholder roachpb.ReplicationTarget, - freezeStartTimestamp hlc.Timestamp, - leaseAcquisitionTrap *atomic.Value, - ) (roachpb.ReplicationTarget, hlc.Timestamp, error) - - type testCase struct { - name string - callback postSubsumptionCallback - } + defer log.Scope(t).Close(t) - tests := []testCase{ + for _, test := range []struct { + name string + // transferLease, if set, will be called while the RHS is subsumed in order + // to perform a RHS lease transfer. + transferLease func( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + rhsDesc roachpb.RangeDescriptor, + rhsLeaseholder roachpb.ReplicationTarget, + clock *hlc.HybridManualClock, + ) (newLeaseholder roachpb.ReplicationTarget, leaseStart hlc.Timestamp) + }{ { - name: "without lease transfer", - callback: nil, + name: "basic", + transferLease: nil, }, { - name: "with intervening lease transfer", - callback: forceLeaseTransferOnSubsumedRange, + name: "rhs lease transfer while subsumed", + transferLease: func( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + rhsDesc roachpb.RangeDescriptor, + rhsLeaseholder roachpb.ReplicationTarget, + clock *hlc.HybridManualClock, + ) (roachpb.ReplicationTarget, hlc.Timestamp) { + oldLeaseholderStore := getTargetStoreOrFatal(t, tc, rhsLeaseholder) + oldLease, _ := oldLeaseholderStore.LookupReplica(rhsDesc.StartKey).GetLease() + require.True(t, oldLease.Replica.StoreID == oldLeaseholderStore.StoreID()) + newLeaseholder := getFollowerReplicas(ctx, t, tc, rhsDesc, rhsLeaseholder)[0] + target := roachpb.ReplicationTarget{ + NodeID: newLeaseholder.NodeID(), + StoreID: newLeaseholder.StoreID(), + } + newLease, err := tc.MoveRangeLeaseNonCooperatively(ctx, rhsDesc, target, clock) + require.NoError(t, err) + return target, newLease.Start.ToTimestamp() + }, }, - } - - runTest := func(t *testing.T, callback postSubsumptionCallback) { - ctx := context.Background() - // Range merges can be internally retried by the coordinating node (the - // leaseholder of the left hand side range). If this happens, the right hand - // side can get re-subsumed. However, in the current implementation, even if - // the merge txn gets retried, the follower replicas should not be able to - // activate any closed timestamp updates succeeding the timestamp the RHS - // was subsumed _for the first time_. - st := mergeFilter{} - var leaseAcquisitionTrap atomic.Value - clusterArgs := base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - RaftConfig: base.RaftConfig{ - // We set the raft election timeout to a small duration. This should - // result in the node liveness duration being ~3.6 seconds. Note that - // if we set this too low, the test may flake due to the test - // cluster's nodes frequently missing their liveness heartbeats. - RaftHeartbeatIntervalTicks: 5, - RaftElectionTimeoutTicks: 6, - }, - Knobs: base.TestingKnobs{ - Store: &kvserver.StoreTestingKnobs{ - // This test suspends the merge txn right before it can apply the - // commit trigger and can lead to the merge txn taking longer than - // the defaults specified in aggressiveResolvedTimestampPushKnobs(). - // We use really high values here in order to avoid the merge txn - // being pushed due to resolved timestamps. - RangeFeedPushTxnsInterval: 5 * time.Second, - RangeFeedPushTxnsAge: 60 * time.Second, - TestingRequestFilter: st.SuspendMergeTrigger, - LeaseRequestEvent: func(ts hlc.Timestamp, storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error { - val := leaseAcquisitionTrap.Load() - if val == nil { - return nil - } - leaseAcquisitionCallback := val.(func(storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error) - if err := leaseAcquisitionCallback(storeID, rangeID); err != nil { - return err - } - return nil + } { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + // Set a long txn liveness threshold; we'll bump the clock to cause a + // lease to expire and we don't want that to cause transactions to be + // aborted (in particular, the merge txn that will be in progress when we + // bump the clock). + defer txnwait.TestingOverrideTxnLivenessThreshold(time.Hour)() + + // Range merges can be internally retried by the coordinating node (the + // leaseholder of the left hand side range). If this happens, the right hand + // side can get re-subsumed. However, in the current implementation, even if + // the merge txn gets retried, the follower replicas should not be able to + // activate any closed timestamp updates succeeding the timestamp the RHS + // was subsumed _for the first time_. + st := mergeFilter{} + manual := hlc.NewHybridManualClock() + pinnedLeases := kvserver.NewPinnedLeases() + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + RaftConfig: base.RaftConfig{ + // We set the raft election timeout to a small duration. This should + // result in the node liveness duration being ~3.6 seconds. Note that + // if we set this too low, the test may flake due to the test + // cluster's nodes frequently missing their liveness heartbeats. + RaftHeartbeatIntervalTicks: 5, + RaftElectionTimeoutTicks: 6, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manual.UnixNano, + }, + Store: &kvserver.StoreTestingKnobs{ + // This test suspends the merge txn right before it can apply the + // commit trigger and can lead to the merge txn taking longer than + // the defaults specified in aggressiveResolvedTimestampPushKnobs(). + // We use really high values here in order to avoid the merge txn + // being pushed due to resolved timestamps. + RangeFeedPushTxnsInterval: 5 * time.Second, + RangeFeedPushTxnsAge: 60 * time.Second, + TestingRequestFilter: st.SuspendMergeTrigger, + DisableMergeQueue: true, + // A subtest wants to force a lease change by stopping the liveness + // heartbeats on the old leaseholder and sending a request to + // another replica. If we didn't use this knob, we'd have to + // architect a Raft leadership change too in order to let the + // replica get the lease. + AllowLeaseRequestProposalsWhenNotLeader: true, + PinnedLeases: pinnedLeases, }, - DisableMergeQueue: true, - // A subtest wants to force a lease change by stopping the liveness - // heartbeats on the old leaseholder and sending a request to - // another replica. If we didn't use this knob, we'd have to - // architect a Raft leadership change too in order to let the - // replica get the lease. - AllowLeaseRequestProposalsWhenNotLeader: true, }, }, - }, - } - // If the initial phase of the merge txn takes longer than the closed - // timestamp target duration, its initial CPuts can have their write - // timestamps bumped due to an intervening closed timestamp update. This - // causes the entire merge txn to retry. So we use a long closed timestamp - // duration at the beginning of the test until we have the merge txn - // suspended at its commit trigger, and then change it back down to - // `testingTargetDuration`. - tc, leftDesc, rightDesc := setupClusterForClosedTSTestingWithSplitRanges(ctx, t, 5*time.Second, - testingCloseFraction, clusterArgs) - defer tc.Stopper().Stop(ctx) - - leftLeaseholder := getCurrentLeaseholder(t, tc, leftDesc) - rightLeaseholder := getCurrentLeaseholder(t, tc, rightDesc) - - g, ctx := errgroup.WithContext(ctx) - // Merge the ranges back together. The LHS leaseholder should block right - // before the merge trigger request is sent. - leftLeaseholderStore := getTargetStoreOrFatal(t, tc, leftLeaseholder) - blocker := st.BlockNextMerge() - mergeErrCh := make(chan error, 1) - g.Go(func() error { - err := mergeTxn(ctx, leftLeaseholderStore, leftDesc) - mergeErrCh <- err - return err - }) - defer func() { - // Unblock the rightLeaseholder so it can finally commit the merge. - blocker.Unblock() - if err := g.Wait(); err != nil { - t.Error(err) } - }() - - var freezeStartTimestamp hlc.Timestamp - // We now have the RHS in its subsumed state. - select { - case freezeStartTimestamp = <-blocker.WaitCh(): - case err := <-mergeErrCh: - t.Fatal(err) - case <-time.After(45 * time.Second): - t.Fatal("did not receive merge commit trigger as expected") - } - // Reduce the closed timestamp target duration in order to make the rest of - // the test faster. - db := tc.ServerConn(0) - if _, err := db.Exec(fmt.Sprintf(`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s';`, - testingTargetDuration)); err != nil { - t.Fatal(err) - } - // inactiveClosedTSBoundary indicates the low water mark for closed - // timestamp updates beyond which we expect none of the followers to be able - // to serve follower reads until the merge is complete. - inactiveClosedTSBoundary := freezeStartTimestamp - if callback != nil { - newRightLeaseholder, ts, err := callback(ctx, t, tc, g, rightDesc, rightLeaseholder, - freezeStartTimestamp, &leaseAcquisitionTrap) - if err != nil { + // If the initial phase of the merge txn takes longer than the closed + // timestamp target duration, its initial CPuts can have their write + // timestamps bumped due to an intervening closed timestamp update. This + // causes the entire merge txn to retry. So we use a long closed timestamp + // duration at the beginning of the test until we have the merge txn + // suspended at its commit trigger, and then change it back down to + // `testingTargetDuration`. + tc, leftDesc, rightDesc := setupClusterForClosedTSTestingWithSplitRanges(ctx, t, 5*time.Second, + testingCloseFraction, clusterArgs) + defer tc.Stopper().Stop(ctx) + + leftLeaseholder := getCurrentLeaseholder(t, tc, leftDesc) + rightLeaseholder := getCurrentLeaseholder(t, tc, rightDesc) + // Pin the lhs lease where it already is. We're going to bump the clock to + // expire the rhs lease, and we don't want the lhs lease to move to a + // different node. + pinnedLeases.PinLease(leftDesc.RangeID, leftLeaseholder.StoreID) + + g := ctxgroup.WithContext(ctx) + // Merge the ranges back together. The LHS leaseholder should block right + // before the merge trigger request is sent. + leftLeaseholderStore := getTargetStoreOrFatal(t, tc, leftLeaseholder) + mergeBlocker := st.BlockNextMerge() + mergeErrCh := make(chan error, 1) + g.Go(func() error { + err := mergeWithRightNeighbor(ctx, leftLeaseholderStore, leftDesc) + mergeErrCh <- err + return err + }) + defer func() { + // Unblock the merge. + if mergeBlocker.Unblock() { + assert.NoError(t, g.Wait()) + } + }() + + var freezeStartTimestamp hlc.Timestamp + // Wait for the RHS to enter the subsumed state. + select { + case freezeStartTimestamp = <-mergeBlocker.WaitCh(): + log.Infof(ctx, "test: merge blocked. Freeze time: %s", freezeStartTimestamp) + case err := <-mergeErrCh: t.Fatal(err) + case <-time.After(45 * time.Second): + t.Fatal("did not receive merge commit trigger as expected") } - rightLeaseholder, inactiveClosedTSBoundary = newRightLeaseholder, ts - } - // Poll the store for closed timestamp updates for timestamps greater than - // our `inactiveClosedTSBoundary`. - closedTimestampCh := make(chan ctpb.Entry, 1) - g.Go(func() (e error) { - pollForGreaterClosedTimestamp(t, tc, rightLeaseholder, rightDesc, inactiveClosedTSBoundary, closedTimestampCh) - return - }) - // We expect that none of the closed timestamp updates greater than - // `inactiveClosedTSBoundary` will be actionable by the RHS follower - // replicas. - log.Infof(ctx, "waiting for next closed timestamp update for the RHS") - select { - case <-closedTimestampCh: - case <-time.After(30 * time.Second): - t.Fatal("failed to receive next closed timestamp update") - } - baReadAfterLeaseTransfer := makeTxnReadBatchForDesc(rightDesc, inactiveClosedTSBoundary.Next()) - rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder) - log.Infof(ctx, "sending read requests from followers after the inactiveClosedTSBoundary") - verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */) - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - runTest(t, test.callback) - }) - } -} + var rhsLeaseStart hlc.Timestamp + if test.transferLease != nil { + // Transfer the RHS lease while the RHS is subsumed. + rightLeaseholder, rhsLeaseStart = test.transferLease(ctx, t, tc, rightDesc, rightLeaseholder, manual) + // Sanity check. + require.True(t, freezeStartTimestamp.Less(rhsLeaseStart)) + } -// forceLeaseTransferOnSubsumedRange triggers a lease transfer on `rightDesc` by -// pausing the liveness heartbeats of the store that holds the lease for it. -func forceLeaseTransferOnSubsumedRange( - ctx context.Context, - t *testing.T, - tc serverutils.TestClusterInterface, - g *errgroup.Group, - rightDesc roachpb.RangeDescriptor, - rightLeaseholder roachpb.ReplicationTarget, - freezeStartTimestamp hlc.Timestamp, - leaseAcquisitionTrap *atomic.Value, -) (newLeaseholder roachpb.ReplicationTarget, leaseStart hlc.Timestamp, err error) { - oldLeaseholderStore := getTargetStoreOrFatal(t, tc, rightLeaseholder) - // Co-operative lease transfers will block while a range is subsumed, so we - // pause the node liveness heartbeats until a lease transfer occurs. - oldLease, _ := oldLeaseholderStore.LookupReplica(rightDesc.StartKey).GetLease() - require.True(t, oldLease.Replica.StoreID == oldLeaseholderStore.StoreID()) - // Instantiate the lease acquisition callback right before we pause the node - // liveness heartbeats. We do this here because leases may be requested at - // any time for any reason, even before we pause the heartbeats. - leaseAcquisitionCh := make(chan roachpb.StoreID) - newRightLeaseholder := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder)[0] - var once sync.Once - leaseAcquisitionTrap.Store(func(storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error { - if rangeID == rightDesc.RangeID { - if expectedStoreID := newRightLeaseholder.StoreID(); expectedStoreID != storeID { - return roachpb.NewError(&roachpb.NotLeaseHolderError{ - CustomMsg: fmt.Sprintf("only store %d must acquire the RHS's lease", expectedStoreID), - }) + // Sleep a bit and assert that the closed timestamp has not advanced while + // we were sleeping. We need to sleep sufficiently to give the side + // transport a chance to publish updates. + log.Infof(ctx, "test: sleeping...") + time.Sleep(5 * closedts.SideTransportCloseInterval.Get(&tc.Server(0).ClusterSettings().SV)) + log.Infof(ctx, "test: sleeping... done") + + store, err := getTargetStore(tc, rightLeaseholder) + require.NoError(t, err) + r, err := store.GetReplica(rightDesc.RangeID) + require.NoError(t, err) + maxClosed, ok := r.MaxClosed(ctx) + require.True(t, ok) + // Note that maxClosed would not necessarily be below the freeze start if + // this was a LEAD_FOR_GLOBAL_READS range. + assert.True(t, maxClosed.LessEq(freezeStartTimestamp), + "expected closed %s to be <= freeze %s", maxClosed, freezeStartTimestamp) + + // Sanity check that follower reads are not served by the RHS at + // timestamps above the freeze (and also above the closed timestamp that + // we verified above). + scanTime := freezeStartTimestamp.Next() + scanReq := makeTxnReadBatchForDesc(rightDesc, scanTime) + follower := getFollowerReplicas(ctx, t, tc, rightDesc, roachpb.ReplicationTarget{ + NodeID: r.NodeID(), + StoreID: r.StoreID(), + })[0] + _, pErr := follower.Send(ctx, scanReq) + require.NotNil(t, pErr) + require.Regexp(t, "NotLeaseHolderError", pErr.String()) + + log.Infof(ctx, "test: unblocking merge") + mergeBlocker.Unblock() + require.NoError(t, g.Wait()) + + // Sanity check for the case where we've performed a lease transfer: make + // sure that we can write at a timestamp *lower* than that lease's start + // time. This shows that the test is not fooling itself and orchestrates + // the merge scenario that it wants; in this scenario the lease start time + // doesn't matter since the RHS is merged into its neighbor, which has a + // lower lease start time. If the closed timestamp would advance past the + // subsumption time (which we've asserted above that it doesn't), this + // write would be a violation of that closed timestamp. + if !rhsLeaseStart.IsEmpty() { + mergedLeaseholder, err := leftLeaseholderStore.GetReplica(leftDesc.RangeID) + require.NoError(t, err) + writeTime := rhsLeaseStart.Prev() + var baWrite roachpb.BatchRequest + baWrite.Header.RangeID = leftDesc.RangeID + baWrite.Header.Timestamp = writeTime + put := &roachpb.PutRequest{} + put.Key = rightDesc.StartKey.AsRawKey() + baWrite.Add(put) + resp, pErr := mergedLeaseholder.Send(ctx, baWrite) + require.Nil(t, pErr) + require.Equal(t, writeTime, resp.Timestamp, + "response time %s different from request time %s", resp.Timestamp, writeTime) } - once.Do(func() { - log.Infof(ctx, "received lease request from store %d for RHS range %d", - storeID, rangeID) - leaseAcquisitionCh <- storeID - }) - } - return nil - }) - restartHeartbeats := oldLeaseholderStore.GetStoreConfig().NodeLiveness.PauseAllHeartbeatsForTest() - defer restartHeartbeats() - log.Infof(ctx, "test: paused RHS rightLeaseholder's liveness heartbeats") - time.Sleep(oldLeaseholderStore.GetStoreConfig().NodeLiveness.GetLivenessThreshold()) - - // Send a read request from one of the followers of RHS so that it notices - // that the current rightLeaseholder has stopped heartbeating. This will prompt - // it to acquire the range lease for itself. - g.Go(func() error { - leaseAcquisitionRequest := makeTxnReadBatchForDesc(rightDesc, freezeStartTimestamp) - log.Infof(ctx, - "sending a read request from a follower of RHS (store %d) in order to trigger lease acquisition", - newRightLeaseholder.StoreID()) - _, pErr := newRightLeaseholder.Send(ctx, leaseAcquisitionRequest) - log.Infof(ctx, "test: RHS read returned err: %v", pErr) - // After the merge commits, the RHS will cease to exist and this read - // request will return a RangeNotFoundError. But we cannot guarantee that - // the merge will always successfully commit on its first attempt - // (especially under race). In this case, this blocked read request might be - // let through and be successful. Thus, we cannot make any assertions about - // the result of this read request. - return nil - }) - select { - case storeID := <-leaseAcquisitionCh: - if storeID != newRightLeaseholder.StoreID() { - err = errors.Newf("expected store %d to try to acquire the lease; got a request from store %d instead", - newRightLeaseholder.StoreID(), storeID) - return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err - } - case <-time.After(30 * time.Second): - err = errors.New("failed to receive lease acquisition request") - return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err - } - rightLeaseholder = roachpb.ReplicationTarget{ - NodeID: newRightLeaseholder.NodeID(), - StoreID: newRightLeaseholder.StoreID(), - } - oldLeaseholderStore = getTargetStoreOrFatal(t, tc, rightLeaseholder) - err = retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { - newLease, _ := oldLeaseholderStore.LookupReplica(rightDesc.StartKey).GetLease() - if newLease.Sequence == oldLease.Sequence { - return errors.New("RHS lease not updated") - } - leaseStart = newLease.Start.ToTimestamp() - return nil - }) - if err != nil { - return - } - if !freezeStartTimestamp.LessEq(leaseStart) { - err = errors.New("freeze timestamp greater than the start time of the new lease") - return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err + }) } - - return rightLeaseholder, leaseStart, nil } // mergeFilter provides a method (SuspendMergeTrigger) that can be installed as @@ -851,15 +791,21 @@ type mergeFilter struct { // blocker encapsulates the communication of a blocked merge to tests, and the // unblocking of that merge by the test. type mergeBlocker struct { - unblockCh chan struct{} - mu struct { + mu struct { syncutil.Mutex // mergeCh is the channel on which the merge is signaled. If nil, means that // the reader is not interested in receiving the notification any more. - mergeCh chan hlc.Timestamp + mergeCh chan hlc.Timestamp + unblockCh chan struct{} } } +func newMergeBlocker() *mergeBlocker { + m := &mergeBlocker{} + m.mu.unblockCh = make(chan struct{}) + return m +} + // WaitCh returns the channel on which the blocked merge will be signaled. The // channel will carry the freeze start timestamp for the RHS. func (mb *mergeBlocker) WaitCh() <-chan hlc.Timestamp { @@ -870,27 +816,37 @@ func (mb *mergeBlocker) WaitCh() <-chan hlc.Timestamp { // Unblock unblocks the blocked merge, if any. It's legal to call this even if // no merge is currently blocked, in which case the next merge trigger will no -// longer block. +// longer block. Unblock can be called multiple times; the first call returns +// true, subsequent ones return false and are no-ops. // // Calls to Unblock() need to be synchronized with reading from the channel // returned by WaitCh(). -func (mb *mergeBlocker) Unblock() { - close(mb.unblockCh) +func (mb *mergeBlocker) Unblock() bool { mb.mu.Lock() defer mb.mu.Unlock() + if mb.mu.unblockCh == nil { + // Unblock was already called. + return false + } + + close(mb.mu.unblockCh) mb.mu.mergeCh = nil + mb.mu.unblockCh = nil + return true } -// signal sends a freezeTs to someone waiting for a blocked merge. -func (mb *mergeBlocker) signal(freezeTs hlc.Timestamp) { +// signal sends a freezeTs to someone waiting for a blocked merge. Returns the +// channel to wait on for the merge to be unblocked. +func (mb *mergeBlocker) signal(freezeTs hlc.Timestamp) chan struct{} { mb.mu.Lock() defer mb.mu.Unlock() ch := mb.mu.mergeCh if ch == nil { // Nobody's waiting on this merge any more. - return + return nil } ch <- freezeTs + return mb.mu.unblockCh } // BlockNextMerge arms the merge filter state, installing a blocker for the next @@ -906,9 +862,7 @@ func (filter *mergeFilter) BlockNextMerge() *mergeBlocker { if filter.mu.blocker != nil { panic("next merge already blocked") } - filter.mu.blocker = &mergeBlocker{ - unblockCh: make(chan struct{}), - } + filter.mu.blocker = newMergeBlocker() // This channel is buffered because we don't force the caller to read from it; // the caller can call mergeBlocker.Unblock() instead. filter.mu.blocker.mu.mergeCh = make(chan hlc.Timestamp, 1) @@ -950,15 +904,19 @@ func (filter *mergeFilter) SuspendMergeTrigger( // We block the LHS leaseholder from applying the merge trigger. Note // that RHS followers will have already caught up to the leaseholder // well before this point. - blocker.signal(freezeStart.ToTimestamp()) + unblockCh := blocker.signal(freezeStart.ToTimestamp()) // Wait for the merge to be unblocked. - <-blocker.unblockCh + if unblockCh != nil { + <-unblockCh + } } } return nil } -func mergeTxn(ctx context.Context, store *kvserver.Store, leftDesc roachpb.RangeDescriptor) error { +func mergeWithRightNeighbor( + ctx context.Context, store *kvserver.Store, leftDesc roachpb.RangeDescriptor, +) error { mergeArgs := adminMergeArgs(leftDesc.StartKey.AsRawKey()) _, err := kv.SendWrapped(ctx, store.TestSender(), mergeArgs) return err.GoError() @@ -1033,56 +991,6 @@ func splitDummyRangeInTestCluster( return leftDesc, rightDesc } -func getCurrentMaxClosed( - t *testing.T, - tc serverutils.TestClusterInterface, - target roachpb.ReplicationTarget, - desc roachpb.RangeDescriptor, -) ctpb.Entry { - deadline := timeutil.Now().Add(2 * testingTargetDuration) - store := getTargetStoreOrFatal(t, tc, target) - var maxClosed ctpb.Entry - attempts := 0 - for attempts == 0 || timeutil.Now().Before(deadline) { - attempts++ - store.GetStoreConfig().ClosedTimestamp.Storage.VisitDescending(target.NodeID, func(entry ctpb.Entry) (done bool) { - if _, ok := entry.MLAI[desc.RangeID]; ok { - maxClosed = entry - return true - } - return false - }) - if _, ok := maxClosed.MLAI[desc.RangeID]; !ok { - // We ran out of closed timestamps to visit without finding one that - // corresponds to rightDesc. It is likely that no closed timestamps have - // been broadcast for desc yet, try again. - continue - } - return maxClosed - } - return ctpb.Entry{} -} - -func pollForGreaterClosedTimestamp( - t *testing.T, - tc serverutils.TestClusterInterface, - target roachpb.ReplicationTarget, - desc roachpb.RangeDescriptor, - lowerBound hlc.Timestamp, - returnCh chan<- ctpb.Entry, -) { - for { - if t.Failed() { - return - } - maxClosed := getCurrentMaxClosed(t, tc, target, desc) - if _, ok := maxClosed.MLAI[desc.RangeID]; ok && lowerBound.LessEq(maxClosed.ClosedTimestamp) { - returnCh <- maxClosed - return - } - } -} - func getFollowerReplicas( ctx context.Context, t *testing.T, @@ -1103,19 +1011,21 @@ func getFollowerReplicas( func getTargetStoreOrFatal( t *testing.T, tc serverutils.TestClusterInterface, target roachpb.ReplicationTarget, -) (store *kvserver.Store) { +) *kvserver.Store { + s, err := getTargetStore(tc, target) + require.NoError(t, err) + return s +} + +func getTargetStore( + tc serverutils.TestClusterInterface, target roachpb.ReplicationTarget, +) (_ *kvserver.Store, err error) { for i := 0; i < tc.NumServers(); i++ { - if server := tc.Server(i); server.NodeID() == target.NodeID && - server.GetStores().(*kvserver.Stores).HasStore(target.StoreID) { - store, err := server.GetStores().(*kvserver.Stores).GetStore(target.StoreID) - if err != nil { - t.Fatal(err) - } - return store + if server := tc.Server(i); server.NodeID() == target.NodeID { + return server.GetStores().(*kvserver.Stores).GetStore(target.StoreID) } } - t.Fatalf("Could not find store for replication target %+v\n", target) - return nil + return nil, errors.Errorf("could not find node for replication target %+v\n", target) } func verifyNotLeaseHolderErrors( diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index bcb9079be99e..65a2785a00b0 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -43,7 +43,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" - "github.com/cockroachdb/logtags" "go.etcd.io/etcd/raft/v3" ) @@ -547,11 +546,3 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { } } } - -// AcquireLease is redirectOnOrAcquireLease exposed for tests. -func (r *Replica) AcquireLease(ctx context.Context) (kvserverpb.LeaseStatus, error) { - ctx = r.AnnotateCtx(ctx) - ctx = logtags.AddTag(ctx, "lease-acq", nil) - l, pErr := r.redirectOnOrAcquireLease(ctx) - return l, pErr.GoError() -} diff --git a/pkg/kv/kvserver/replica_closedts_test.go b/pkg/kv/kvserver/replica_closedts_test.go index bf33cf09988e..3eb50ef84910 100644 --- a/pkg/kv/kvserver/replica_closedts_test.go +++ b/pkg/kv/kvserver/replica_closedts_test.go @@ -601,7 +601,7 @@ func TestRejectedLeaseDoesntDictateClosedTimestamp(t *testing.T) { leaseAcqErrCh <- err return } - _, err = r.AcquireLease(ctx) + _, err = r.TestingAcquireLease(ctx) leaseAcqErrCh <- err }() // Wait for the lease acquisition to be blocked. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 4b658128f7bd..5cf919c82465 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -784,6 +784,7 @@ func (r *Replica) AdminMerge( txn := kv.NewTxn(ctx, r.store.DB(), r.NodeID()) err := runMergeTxn(txn) if err != nil { + log.VEventf(ctx, 2, "merge txn failed: %s", err) txn.CleanupOnError(ctx, err) } if !errors.HasType(err, (*roachpb.TransactionRetryWithProtoRefreshError)(nil)) { diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 650c7445e4d7..b20454dfb0e0 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -728,6 +728,10 @@ func (r *Replica) requestLeaseLocked( return r.mu.pendingLeaseRequest.newResolvedHandle(err) } } + if pErr := r.store.TestingKnobs().PinnedLeases.rejectLeaseIfPinnedElsewhere(r); pErr != nil { + return r.mu.pendingLeaseRequest.newResolvedHandle(pErr) + } + // If we're draining, we'd rather not take any new leases (since we're also // trying to move leases away elsewhere). But if we're the leader, we don't // really have a choice and we take the lease - there might not be any other @@ -1063,6 +1067,14 @@ func (r *Replica) redirectOnOrAcquireLease( return r.redirectOnOrAcquireLeaseForRequest(ctx, hlc.Timestamp{}) } +// TestingAcquireLease is redirectOnOrAcquireLease exposed for tests. +func (r *Replica) TestingAcquireLease(ctx context.Context) (kvserverpb.LeaseStatus, error) { + ctx = r.AnnotateCtx(ctx) + ctx = logtags.AddTag(ctx, "lease-acq", nil) + l, pErr := r.redirectOnOrAcquireLease(ctx) + return l, pErr.GoError() +} + // redirectOnOrAcquireLeaseForRequest is like redirectOnOrAcquireLease, // but it accepts a specific request timestamp instead of assuming that // the request is operating at the current time. diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 00dbfdd47831..04eb0b1499f7 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -624,7 +624,7 @@ func (r *Replica) handleMergeInProgressError( if ba.IsSingleTransferLeaseRequest() { return roachpb.NewErrorf("cannot transfer lease while merge in progress") } - log.Event(ctx, "waiting on in-progress merge") + log.Event(ctx, "waiting on in-progress range merge") select { case <-mergeCompleteCh: // Merge complete. Retry the command. diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 967f3c828dac..2c6048071d98 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) // StoreTestingKnobs is a part of the context used to control parts of @@ -92,6 +93,8 @@ type StoreTestingKnobs struct { // called to acquire a new lease. This can be used to assert that a request // triggers a lease acquisition. LeaseRequestEvent func(ts hlc.Timestamp, storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error + // PinnedLeases can be used to prevent all but one store from acquiring leases on a given range. + PinnedLeases *PinnedLeasesKnob // LeaseTransferBlockedOnExtensionEvent, if set, is called when // replica.TransferLease() encounters an in-progress lease extension. // nextLeader is the replica that we're trying to transfer the lease to. @@ -362,3 +365,60 @@ var _ base.ModuleTestingKnobs = NodeLivenessTestingKnobs{} // ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. func (NodeLivenessTestingKnobs) ModuleTestingKnobs() {} + +// PinnedLeasesKnob is a testing know for controlling what store can acquire a +// lease for specific ranges. +type PinnedLeasesKnob struct { + mu struct { + syncutil.Mutex + pinned map[roachpb.RangeID]roachpb.StoreID + } +} + +// NewPinnedLeases creates a PinnedLeasesKnob. +func NewPinnedLeases() *PinnedLeasesKnob { + p := &PinnedLeasesKnob{} + p.mu.pinned = make(map[roachpb.RangeID]roachpb.StoreID) + return p +} + +// PinLease makes it so that only the specified store can take a lease for the +// specified range. Replicas on other stores attempting to acquire a lease will +// get NotLeaseHolderErrors. Lease transfers away from the pinned store are +// permitted. +func (p *PinnedLeasesKnob) PinLease(rangeID roachpb.RangeID, storeID roachpb.StoreID) { + p.mu.Lock() + defer p.mu.Unlock() + p.mu.pinned[rangeID] = storeID +} + +// rejectLeaseIfPinnedElsewhere is called when r is trying to acquire a lease. +// It returns a NotLeaseholderError if the lease is pinned on another store. +// r.mu needs to be rlocked. +func (p *PinnedLeasesKnob) rejectLeaseIfPinnedElsewhere(r *Replica) *roachpb.Error { + if p == nil { + return nil + } + + p.mu.Lock() + defer p.mu.Unlock() + pinnedStore, ok := p.mu.pinned[r.RangeID] + if !ok || pinnedStore == r.StoreID() { + return nil + } + + repDesc, err := r.getReplicaDescriptorRLocked() + if err != nil { + return roachpb.NewError(err) + } + var pinned *roachpb.ReplicaDescriptor + if pinnedRep, ok := r.descRLocked().GetReplicaDescriptor(pinnedStore); ok { + pinned = &pinnedRep + } + return roachpb.NewError(&roachpb.NotLeaseHolderError{ + Replica: repDesc, + LeaseHolder: pinned, + RangeID: r.RangeID, + CustomMsg: "injected: lease pinned to another store", + }) +} diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 6cb9eaba9b79..3741ac359cf1 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -597,8 +597,11 @@ func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) { req := arg.GetInner() if et, ok := req.(*EndTxnRequest); ok { h := req.Header() - s.Printf("%s(commit:%t) [%s]", - req.Method(), et.Commit, h.Key) + s.Printf("%s(commit:%t", req.Method(), et.Commit) + if et.InternalCommitTrigger != nil { + s.Printf(" %s", et.InternalCommitTrigger.Kind()) + } + s.Printf(") [%s]", h.Key) } else { h := req.Header() if req.Method() == PushTxn { diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index fcaa36bc62f9..0746ee0a537d 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -871,7 +871,7 @@ func (v Value) PrettyPrint() string { } // Kind returns the kind of commit trigger as a string. -func (ct InternalCommitTrigger) Kind() string { +func (ct InternalCommitTrigger) Kind() redact.SafeString { switch { case ct.SplitTrigger != nil: return "split" diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index 05c767286007..cf77fd0097b7 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -18,6 +18,7 @@ package serverutils import ( + "context" gosql "database/sql" "testing" @@ -155,11 +156,17 @@ type TestClusterInterface interface { // advancing the manual clock. The target is then instructed to acquire the // ownerless lease. Most tests should use the cooperative version of this // method, TransferRangeLease. + // + // Returns the new lease. + // + // If the lease starts out on dest, this is a no-op and the current lease is + // returned. MoveRangeLeaseNonCooperatively( + ctx context.Context, rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget, manual *hlc.HybridManualClock, - ) error + ) (*roachpb.Lease, error) // LookupRange returns the descriptor of the range containing key. LookupRange(key roachpb.Key) (roachpb.RangeDescriptor, error) diff --git a/pkg/testutils/testcluster/BUILD.bazel b/pkg/testutils/testcluster/BUILD.bazel index 528434ad5c29..22a6b5c28d96 100644 --- a/pkg/testutils/testcluster/BUILD.bazel +++ b/pkg/testutils/testcluster/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "//pkg/base", "//pkg/gossip", "//pkg/keys", - "//pkg/kv", "//pkg/kv/kvserver", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index b5710bf77267..3b714673fd86 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -906,23 +905,26 @@ func (tc *TestCluster) RemoveLeaseHolderOrFatal( // MoveRangeLeaseNonCooperatively is part of the TestClusterInterface. func (tc *TestCluster) MoveRangeLeaseNonCooperatively( - rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget, manual *hlc.HybridManualClock, -) error { + ctx context.Context, + rangeDesc roachpb.RangeDescriptor, + dest roachpb.ReplicationTarget, + manual *hlc.HybridManualClock, +) (*roachpb.Lease, error) { knobs := tc.clusterArgs.ServerArgs.Knobs.Store.(*kvserver.StoreTestingKnobs) if !knobs.AllowLeaseRequestProposalsWhenNotLeader { // Without this knob, we'd have to architect a Raft leadership change // too in order to let the replica get the lease. It's easier to just // require that callers set it. - return errors.Errorf("must set StoreTestingKnobs.AllowLeaseRequestProposalsWhenNotLeader") + return nil, errors.Errorf("must set StoreTestingKnobs.AllowLeaseRequestProposalsWhenNotLeader") } destServer, err := tc.FindMemberServer(dest.StoreID) if err != nil { - return err + return nil, err } destStore, err := destServer.Stores().GetStore(dest.StoreID) if err != nil { - return err + return nil, err } // We are going to advance the manual clock so that the current lease @@ -931,13 +933,15 @@ func (tc *TestCluster) MoveRangeLeaseNonCooperatively( // when it's up for grabs. To handle that case, we wrap the entire operation // in an outer retry loop. const retryDur = testutils.DefaultSucceedsSoonDuration - return retry.ForDuration(retryDur, func() error { + var newLease *roachpb.Lease + err = retry.ForDuration(retryDur, func() error { // Find the current lease. prevLease, _, err := tc.FindRangeLease(rangeDesc, nil /* hint */) if err != nil { return err } if prevLease.Replica.StoreID == dest.StoreID { + newLease = &prevLease return nil } @@ -946,6 +950,7 @@ func (tc *TestCluster) MoveRangeLeaseNonCooperatively( if err != nil { return err } + log.Infof(ctx, "test: advancing clock to lease expiration") manual.Increment(lhStore.GetStoreConfig().LeaseExpiration()) // Heartbeat the destination server's liveness record so that if we are @@ -957,24 +962,20 @@ func (tc *TestCluster) MoveRangeLeaseNonCooperatively( // Issue a request to the target replica, which should notice that the // old lease has expired and that it can acquire the lease. - var newLease *roachpb.Lease - ctx := context.Background() - req := &roachpb.LeaseInfoRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: rangeDesc.StartKey.AsRawKey(), - }, + r, err := destStore.GetReplica(rangeDesc.RangeID) + if err != nil { + return err } - h := roachpb.Header{RangeID: rangeDesc.RangeID} - reply, pErr := kv.SendWrappedWith(ctx, destStore, h, req) - if pErr != nil { - log.Infof(ctx, "LeaseInfoRequest failed: %v", pErr) - if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok && lErr.Lease != nil { + ls, err := r.TestingAcquireLease(ctx) + if err != nil { + log.Infof(ctx, "TestingAcquireLease failed: %s", err) + if lErr := (*roachpb.NotLeaseHolderError)(nil); errors.As(err, &lErr) { newLease = lErr.Lease } else { - return pErr.GoError() + return err } } else { - newLease = &reply.(*roachpb.LeaseInfoResponse).Lease + newLease = &ls.Lease } // Is the lease in the right place? @@ -984,6 +985,8 @@ func (tc *TestCluster) MoveRangeLeaseNonCooperatively( } return nil }) + log.Infof(ctx, "MoveRangeLeaseNonCooperatively: acquired lease: %s. err: %v", newLease, err) + return newLease, err } // FindRangeLease is similar to FindRangeLeaseHolder but returns a Lease proto