diff --git a/pkg/kv/kvserver/abortspan/abortspan.go b/pkg/kv/kvserver/abortspan/abortspan.go index 357e51d3f205..8ecc24c9ef4f 100644 --- a/pkg/kv/kvserver/abortspan/abortspan.go +++ b/pkg/kv/kvserver/abortspan/abortspan.go @@ -134,6 +134,7 @@ func (sc *AbortSpan) Put( txnID uuid.UUID, entry *roachpb.AbortSpanEntry, ) error { + log.VEventf(ctx, 2, "writing abort span entry for %s", txnID.Short()) key := keys.AbortSpanKey(sc.rangeID, txnID) return storage.MVCCPutProto(ctx, readWriter, ms, key, hlc.Timestamp{}, nil /* txn */, entry) } diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index cfe507d7b448..fb0d47b432e0 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -594,7 +594,7 @@ func RunCommitTrigger( // timestamp was bumped after it acquired latches. if txn.WriteTimestamp.Synthetic && rec.Clock().Now().Less(txn.WriteTimestamp) { return result.Result{}, errors.AssertionFailedf("txn %s with %s commit trigger needs "+ - "commit wait. Was its timestamp bumped after acquiring latches?", txn, errors.Safe(ct.Kind())) + "commit wait. Was its timestamp bumped after acquiring latches?", txn, ct.Kind()) } // Stage the commit trigger's side-effects so that they will go into effect on diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn.go b/pkg/kv/kvserver/batcheval/cmd_push_txn.go index 698d60553e90..c50fe0ea5bbc 100644 --- a/pkg/kv/kvserver/batcheval/cmd_push_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn.go @@ -139,7 +139,7 @@ func PushTxn( if err != nil { return result.Result{}, err } else if !ok { - log.VEventf(ctx, 2, "pushee txn record not found") + log.VEventf(ctx, 2, "pushee txn record not found (pushee: %s)", args.PusheeTxn.Short()) // There are three cases in which there is no transaction record: // // * the pushee is still active but its transaction record has not diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 1bf7c255f147..019b8fcea12f 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -4960,7 +4960,7 @@ func setupClusterWithSubsumedRange( errCh := make(chan error) blocker := filter.BlockNextMerge() go func() { - errCh <- mergeTxn(ctx, store, *lhsDesc) + errCh <- mergeWithRightNeighbor(ctx, store, *lhsDesc) }() defer func() { // Ensure that the request doesn't stay blocked if we fail. diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index 40a67fe9b364..fc7c2e337653 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -519,7 +519,7 @@ func setupReplicaRemovalTest( require.NoError(t, err) repl, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(rangeDesc.RangeID) require.NoError(t, err) - err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) + _, err = tc.MoveRangeLeaseNonCooperatively(ctx, rangeDesc, tc.Target(1), manual) require.NoError(t, err) // Remove first store from raft group. diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 3b8b8dff57bf..45d48dfa6ef4 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3492,7 +3492,7 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { // Transfer the lease to Server 1. Do so non-cooperatively instead of using // a lease transfer, because the cooperative lease transfer would get stuck // acquiring latches, which are held by txn2. - err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) + _, err = tc.MoveRangeLeaseNonCooperatively(ctx, rangeDesc, tc.Target(1), manual) require.NoError(t, err) // Send an arbitrary request to the range to update the range descriptor diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 28199785096f..e262b20621dd 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -16,7 +16,6 @@ import ( "fmt" "math/rand" "strconv" - "sync" "sync/atomic" "testing" "time" @@ -26,8 +25,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -36,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -44,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -557,284 +560,221 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) { verifyNotLeaseHolderErrors(t, baRead, repls, 2) } -// TestClosedTimestampInactiveAfterSubsumption verifies that, during a merge, -// replicas of the subsumed range (RHS) cannot serve follower reads for -// timestamps after the subsumption time. -func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { +// Test that, during a merge, the closed timestamp of the subsumed RHS doesn't +// go above the subsumption time. It'd be bad if it did, since this advanced +// closed timestamp would be lost when the merge finalizes. +func TestClosedTimestampFrozenAfterSubsumption(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 59448, "flaky test") - // Skipping under short because this test pauses for a few seconds in order to - // trigger a node liveness expiration. - skip.UnderShort(t) - // TODO(aayush): After #51087, we're seeing some regression in the initial - // setup of this test that causes it to fail there. There are some - // improvements for that PR in-flight. Revisit at a later date and re-enable - // under race. - skip.UnderRace(t) - type postSubsumptionCallback func( - ctx context.Context, - t *testing.T, - tc serverutils.TestClusterInterface, - g *errgroup.Group, - rightDesc roachpb.RangeDescriptor, - rightLeaseholder roachpb.ReplicationTarget, - freezeStartTimestamp hlc.Timestamp, - leaseAcquisitionTrap *atomic.Value, - ) (roachpb.ReplicationTarget, hlc.Timestamp, error) - - type testCase struct { - name string - callback postSubsumptionCallback - } + defer log.Scope(t).Close(t) - tests := []testCase{ + for _, test := range []struct { + name string + // transferLease, if set, will be called while the RHS is subsumed in order + // to perform a RHS lease transfer. + transferLease func( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + rhsDesc roachpb.RangeDescriptor, + rhsLeaseholder roachpb.ReplicationTarget, + clock *hlc.HybridManualClock, + ) (newLeaseholder roachpb.ReplicationTarget, leaseStart hlc.Timestamp) + }{ { - name: "without lease transfer", - callback: nil, + name: "basic", + transferLease: nil, }, { - name: "with intervening lease transfer", - callback: forceLeaseTransferOnSubsumedRange, + name: "rhs lease transfer while subsumed", + transferLease: func( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + rhsDesc roachpb.RangeDescriptor, + rhsLeaseholder roachpb.ReplicationTarget, + clock *hlc.HybridManualClock, + ) (roachpb.ReplicationTarget, hlc.Timestamp) { + oldLeaseholderStore := getTargetStoreOrFatal(t, tc, rhsLeaseholder) + oldLease, _ := oldLeaseholderStore.LookupReplica(rhsDesc.StartKey).GetLease() + require.True(t, oldLease.Replica.StoreID == oldLeaseholderStore.StoreID()) + newLeaseholder := getFollowerReplicas(ctx, t, tc, rhsDesc, rhsLeaseholder)[0] + target := roachpb.ReplicationTarget{ + NodeID: newLeaseholder.NodeID(), + StoreID: newLeaseholder.StoreID(), + } + newLease, err := tc.MoveRangeLeaseNonCooperatively(ctx, rhsDesc, target, clock) + require.NoError(t, err) + return target, newLease.Start.ToTimestamp() + }, }, - } - - runTest := func(t *testing.T, callback postSubsumptionCallback) { - ctx := context.Background() - // Range merges can be internally retried by the coordinating node (the - // leaseholder of the left hand side range). If this happens, the right hand - // side can get re-subsumed. However, in the current implementation, even if - // the merge txn gets retried, the follower replicas should not be able to - // activate any closed timestamp updates succeeding the timestamp the RHS - // was subsumed _for the first time_. - st := mergeFilter{} - var leaseAcquisitionTrap atomic.Value - clusterArgs := base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - RaftConfig: base.RaftConfig{ - // We set the raft election timeout to a small duration. This should - // result in the node liveness duration being ~3.6 seconds. Note that - // if we set this too low, the test may flake due to the test - // cluster's nodes frequently missing their liveness heartbeats. - RaftHeartbeatIntervalTicks: 5, - RaftElectionTimeoutTicks: 6, - }, - Knobs: base.TestingKnobs{ - Store: &kvserver.StoreTestingKnobs{ - // This test suspends the merge txn right before it can apply the - // commit trigger and can lead to the merge txn taking longer than - // the defaults specified in aggressiveResolvedTimestampPushKnobs(). - // We use really high values here in order to avoid the merge txn - // being pushed due to resolved timestamps. - RangeFeedPushTxnsInterval: 5 * time.Second, - RangeFeedPushTxnsAge: 60 * time.Second, - TestingRequestFilter: st.SuspendMergeTrigger, - LeaseRequestEvent: func(ts hlc.Timestamp, storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error { - val := leaseAcquisitionTrap.Load() - if val == nil { - return nil - } - leaseAcquisitionCallback := val.(func(storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error) - if err := leaseAcquisitionCallback(storeID, rangeID); err != nil { - return err - } - return nil + } { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + // Set a long txn liveness threshold; we'll bump the clock to cause a + // lease to expire and we don't want that to cause transactions to be + // aborted (in particular, the merge txn that will be in progress when we + // bump the clock). + defer txnwait.TestingOverrideTxnLivenessThreshold(time.Hour)() + + // Range merges can be internally retried by the coordinating node (the + // leaseholder of the left hand side range). If this happens, the right hand + // side can get re-subsumed. However, in the current implementation, even if + // the merge txn gets retried, the follower replicas should not be able to + // activate any closed timestamp updates succeeding the timestamp the RHS + // was subsumed _for the first time_. + st := mergeFilter{} + manual := hlc.NewHybridManualClock() + pinnedLeases := kvserver.NewPinnedLeases() + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + RaftConfig: base.RaftConfig{ + // We set the raft election timeout to a small duration. This should + // result in the node liveness duration being ~3.6 seconds. Note that + // if we set this too low, the test may flake due to the test + // cluster's nodes frequently missing their liveness heartbeats. + RaftHeartbeatIntervalTicks: 5, + RaftElectionTimeoutTicks: 6, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manual.UnixNano, + }, + Store: &kvserver.StoreTestingKnobs{ + // This test suspends the merge txn right before it can apply the + // commit trigger and can lead to the merge txn taking longer than + // the defaults specified in aggressiveResolvedTimestampPushKnobs(). + // We use really high values here in order to avoid the merge txn + // being pushed due to resolved timestamps. + RangeFeedPushTxnsInterval: 5 * time.Second, + RangeFeedPushTxnsAge: 60 * time.Second, + TestingRequestFilter: st.SuspendMergeTrigger, + DisableMergeQueue: true, + // A subtest wants to force a lease change by stopping the liveness + // heartbeats on the old leaseholder and sending a request to + // another replica. If we didn't use this knob, we'd have to + // architect a Raft leadership change too in order to let the + // replica get the lease. + AllowLeaseRequestProposalsWhenNotLeader: true, + PinnedLeases: pinnedLeases, }, - DisableMergeQueue: true, - // A subtest wants to force a lease change by stopping the liveness - // heartbeats on the old leaseholder and sending a request to - // another replica. If we didn't use this knob, we'd have to - // architect a Raft leadership change too in order to let the - // replica get the lease. - AllowLeaseRequestProposalsWhenNotLeader: true, }, }, - }, - } - // If the initial phase of the merge txn takes longer than the closed - // timestamp target duration, its initial CPuts can have their write - // timestamps bumped due to an intervening closed timestamp update. This - // causes the entire merge txn to retry. So we use a long closed timestamp - // duration at the beginning of the test until we have the merge txn - // suspended at its commit trigger, and then change it back down to - // `testingTargetDuration`. - tc, leftDesc, rightDesc := setupClusterForClosedTSTestingWithSplitRanges(ctx, t, 5*time.Second, - testingCloseFraction, clusterArgs) - defer tc.Stopper().Stop(ctx) - - leftLeaseholder := getCurrentLeaseholder(t, tc, leftDesc) - rightLeaseholder := getCurrentLeaseholder(t, tc, rightDesc) - - g, ctx := errgroup.WithContext(ctx) - // Merge the ranges back together. The LHS leaseholder should block right - // before the merge trigger request is sent. - leftLeaseholderStore := getTargetStoreOrFatal(t, tc, leftLeaseholder) - blocker := st.BlockNextMerge() - mergeErrCh := make(chan error, 1) - g.Go(func() error { - err := mergeTxn(ctx, leftLeaseholderStore, leftDesc) - mergeErrCh <- err - return err - }) - defer func() { - // Unblock the rightLeaseholder so it can finally commit the merge. - blocker.Unblock() - if err := g.Wait(); err != nil { - t.Error(err) } - }() - - var freezeStartTimestamp hlc.Timestamp - // We now have the RHS in its subsumed state. - select { - case freezeStartTimestamp = <-blocker.WaitCh(): - case err := <-mergeErrCh: - t.Fatal(err) - case <-time.After(45 * time.Second): - t.Fatal("did not receive merge commit trigger as expected") - } - // Reduce the closed timestamp target duration in order to make the rest of - // the test faster. - db := tc.ServerConn(0) - if _, err := db.Exec(fmt.Sprintf(`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s';`, - testingTargetDuration)); err != nil { - t.Fatal(err) - } - // inactiveClosedTSBoundary indicates the low water mark for closed - // timestamp updates beyond which we expect none of the followers to be able - // to serve follower reads until the merge is complete. - inactiveClosedTSBoundary := freezeStartTimestamp - if callback != nil { - newRightLeaseholder, ts, err := callback(ctx, t, tc, g, rightDesc, rightLeaseholder, - freezeStartTimestamp, &leaseAcquisitionTrap) - if err != nil { + // If the initial phase of the merge txn takes longer than the closed + // timestamp target duration, its initial CPuts can have their write + // timestamps bumped due to an intervening closed timestamp update. This + // causes the entire merge txn to retry. So we use a long closed timestamp + // duration at the beginning of the test until we have the merge txn + // suspended at its commit trigger, and then change it back down to + // `testingTargetDuration`. + tc, leftDesc, rightDesc := setupClusterForClosedTSTestingWithSplitRanges(ctx, t, 5*time.Second, + testingCloseFraction, clusterArgs) + defer tc.Stopper().Stop(ctx) + + leftLeaseholder := getCurrentLeaseholder(t, tc, leftDesc) + rightLeaseholder := getCurrentLeaseholder(t, tc, rightDesc) + // Pin the lhs lease where it already is. We're going to bump the clock to + // expire the rhs lease, and we don't want the lhs lease to move to a + // different node. + pinnedLeases.PinLease(leftDesc.RangeID, leftLeaseholder.StoreID) + + g := ctxgroup.WithContext(ctx) + // Merge the ranges back together. The LHS leaseholder should block right + // before the merge trigger request is sent. + leftLeaseholderStore := getTargetStoreOrFatal(t, tc, leftLeaseholder) + mergeBlocker := st.BlockNextMerge() + mergeErrCh := make(chan error, 1) + g.Go(func() error { + err := mergeWithRightNeighbor(ctx, leftLeaseholderStore, leftDesc) + mergeErrCh <- err + return err + }) + defer func() { + // Unblock the merge. + if mergeBlocker.Unblock() { + assert.NoError(t, g.Wait()) + } + }() + + var freezeStartTimestamp hlc.Timestamp + // Wait for the RHS to enter the subsumed state. + select { + case freezeStartTimestamp = <-mergeBlocker.WaitCh(): + log.Infof(ctx, "test: merge blocked. Freeze time: %s", freezeStartTimestamp) + case err := <-mergeErrCh: t.Fatal(err) + case <-time.After(45 * time.Second): + t.Fatal("did not receive merge commit trigger as expected") } - rightLeaseholder, inactiveClosedTSBoundary = newRightLeaseholder, ts - } - // Poll the store for closed timestamp updates for timestamps greater than - // our `inactiveClosedTSBoundary`. - closedTimestampCh := make(chan ctpb.Entry, 1) - g.Go(func() (e error) { - pollForGreaterClosedTimestamp(t, tc, rightLeaseholder, rightDesc, inactiveClosedTSBoundary, closedTimestampCh) - return - }) - // We expect that none of the closed timestamp updates greater than - // `inactiveClosedTSBoundary` will be actionable by the RHS follower - // replicas. - log.Infof(ctx, "waiting for next closed timestamp update for the RHS") - select { - case <-closedTimestampCh: - case <-time.After(30 * time.Second): - t.Fatal("failed to receive next closed timestamp update") - } - baReadAfterLeaseTransfer := makeTxnReadBatchForDesc(rightDesc, inactiveClosedTSBoundary.Next()) - rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder) - log.Infof(ctx, "sending read requests from followers after the inactiveClosedTSBoundary") - verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */) - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - runTest(t, test.callback) - }) - } -} + var rhsLeaseStart hlc.Timestamp + if test.transferLease != nil { + // Transfer the RHS lease while the RHS is subsumed. + rightLeaseholder, rhsLeaseStart = test.transferLease(ctx, t, tc, rightDesc, rightLeaseholder, manual) + // Sanity check. + require.True(t, freezeStartTimestamp.Less(rhsLeaseStart)) + } -// forceLeaseTransferOnSubsumedRange triggers a lease transfer on `rightDesc` by -// pausing the liveness heartbeats of the store that holds the lease for it. -func forceLeaseTransferOnSubsumedRange( - ctx context.Context, - t *testing.T, - tc serverutils.TestClusterInterface, - g *errgroup.Group, - rightDesc roachpb.RangeDescriptor, - rightLeaseholder roachpb.ReplicationTarget, - freezeStartTimestamp hlc.Timestamp, - leaseAcquisitionTrap *atomic.Value, -) (newLeaseholder roachpb.ReplicationTarget, leaseStart hlc.Timestamp, err error) { - oldLeaseholderStore := getTargetStoreOrFatal(t, tc, rightLeaseholder) - // Co-operative lease transfers will block while a range is subsumed, so we - // pause the node liveness heartbeats until a lease transfer occurs. - oldLease, _ := oldLeaseholderStore.LookupReplica(rightDesc.StartKey).GetLease() - require.True(t, oldLease.Replica.StoreID == oldLeaseholderStore.StoreID()) - // Instantiate the lease acquisition callback right before we pause the node - // liveness heartbeats. We do this here because leases may be requested at - // any time for any reason, even before we pause the heartbeats. - leaseAcquisitionCh := make(chan roachpb.StoreID) - newRightLeaseholder := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder)[0] - var once sync.Once - leaseAcquisitionTrap.Store(func(storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error { - if rangeID == rightDesc.RangeID { - if expectedStoreID := newRightLeaseholder.StoreID(); expectedStoreID != storeID { - return roachpb.NewError(&roachpb.NotLeaseHolderError{ - CustomMsg: fmt.Sprintf("only store %d must acquire the RHS's lease", expectedStoreID), - }) + // Sleep a bit and assert that the closed timestamp has not advanced while + // we were sleeping. We need to sleep sufficiently to give the side + // transport a chance to publish updates. + log.Infof(ctx, "test: sleeping...") + time.Sleep(5 * closedts.SideTransportCloseInterval.Get(&tc.Server(0).ClusterSettings().SV)) + log.Infof(ctx, "test: sleeping... done") + + store, err := getTargetStore(tc, rightLeaseholder) + require.NoError(t, err) + r, err := store.GetReplica(rightDesc.RangeID) + require.NoError(t, err) + maxClosed, ok := r.MaxClosed(ctx) + require.True(t, ok) + // Note that maxClosed would not necessarily be below the freeze start if + // this was a LEAD_FOR_GLOBAL_READS range. + assert.True(t, maxClosed.LessEq(freezeStartTimestamp), + "expected closed %s to be <= freeze %s", maxClosed, freezeStartTimestamp) + + // Sanity check that follower reads are not served by the RHS at + // timestamps above the freeze (and also above the closed timestamp that + // we verified above). + scanTime := freezeStartTimestamp.Next() + scanReq := makeTxnReadBatchForDesc(rightDesc, scanTime) + follower := getFollowerReplicas(ctx, t, tc, rightDesc, roachpb.ReplicationTarget{ + NodeID: r.NodeID(), + StoreID: r.StoreID(), + })[0] + _, pErr := follower.Send(ctx, scanReq) + require.NotNil(t, pErr) + require.Regexp(t, "NotLeaseHolderError", pErr.String()) + + log.Infof(ctx, "test: unblocking merge") + mergeBlocker.Unblock() + require.NoError(t, g.Wait()) + + // Sanity check for the case where we've performed a lease transfer: make + // sure that we can write at a timestamp *lower* than that lease's start + // time. This shows that the test is not fooling itself and orchestrates + // the merge scenario that it wants; in this scenario the lease start time + // doesn't matter since the RHS is merged into its neighbor, which has a + // lower lease start time. If the closed timestamp would advance past the + // subsumption time (which we've asserted above that it doesn't), this + // write would be a violation of that closed timestamp. + if !rhsLeaseStart.IsEmpty() { + mergedLeaseholder, err := leftLeaseholderStore.GetReplica(leftDesc.RangeID) + require.NoError(t, err) + writeTime := rhsLeaseStart.Prev() + var baWrite roachpb.BatchRequest + baWrite.Header.RangeID = leftDesc.RangeID + baWrite.Header.Timestamp = writeTime + put := &roachpb.PutRequest{} + put.Key = rightDesc.StartKey.AsRawKey() + baWrite.Add(put) + resp, pErr := mergedLeaseholder.Send(ctx, baWrite) + require.Nil(t, pErr) + require.Equal(t, writeTime, resp.Timestamp, + "response time %s different from request time %s", resp.Timestamp, writeTime) } - once.Do(func() { - log.Infof(ctx, "received lease request from store %d for RHS range %d", - storeID, rangeID) - leaseAcquisitionCh <- storeID - }) - } - return nil - }) - restartHeartbeats := oldLeaseholderStore.GetStoreConfig().NodeLiveness.PauseAllHeartbeatsForTest() - defer restartHeartbeats() - log.Infof(ctx, "test: paused RHS rightLeaseholder's liveness heartbeats") - time.Sleep(oldLeaseholderStore.GetStoreConfig().NodeLiveness.GetLivenessThreshold()) - - // Send a read request from one of the followers of RHS so that it notices - // that the current rightLeaseholder has stopped heartbeating. This will prompt - // it to acquire the range lease for itself. - g.Go(func() error { - leaseAcquisitionRequest := makeTxnReadBatchForDesc(rightDesc, freezeStartTimestamp) - log.Infof(ctx, - "sending a read request from a follower of RHS (store %d) in order to trigger lease acquisition", - newRightLeaseholder.StoreID()) - _, pErr := newRightLeaseholder.Send(ctx, leaseAcquisitionRequest) - log.Infof(ctx, "test: RHS read returned err: %v", pErr) - // After the merge commits, the RHS will cease to exist and this read - // request will return a RangeNotFoundError. But we cannot guarantee that - // the merge will always successfully commit on its first attempt - // (especially under race). In this case, this blocked read request might be - // let through and be successful. Thus, we cannot make any assertions about - // the result of this read request. - return nil - }) - select { - case storeID := <-leaseAcquisitionCh: - if storeID != newRightLeaseholder.StoreID() { - err = errors.Newf("expected store %d to try to acquire the lease; got a request from store %d instead", - newRightLeaseholder.StoreID(), storeID) - return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err - } - case <-time.After(30 * time.Second): - err = errors.New("failed to receive lease acquisition request") - return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err - } - rightLeaseholder = roachpb.ReplicationTarget{ - NodeID: newRightLeaseholder.NodeID(), - StoreID: newRightLeaseholder.StoreID(), - } - oldLeaseholderStore = getTargetStoreOrFatal(t, tc, rightLeaseholder) - err = retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { - newLease, _ := oldLeaseholderStore.LookupReplica(rightDesc.StartKey).GetLease() - if newLease.Sequence == oldLease.Sequence { - return errors.New("RHS lease not updated") - } - leaseStart = newLease.Start.ToTimestamp() - return nil - }) - if err != nil { - return - } - if !freezeStartTimestamp.LessEq(leaseStart) { - err = errors.New("freeze timestamp greater than the start time of the new lease") - return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err + }) } - - return rightLeaseholder, leaseStart, nil } // mergeFilter provides a method (SuspendMergeTrigger) that can be installed as @@ -851,15 +791,21 @@ type mergeFilter struct { // blocker encapsulates the communication of a blocked merge to tests, and the // unblocking of that merge by the test. type mergeBlocker struct { - unblockCh chan struct{} - mu struct { + mu struct { syncutil.Mutex // mergeCh is the channel on which the merge is signaled. If nil, means that // the reader is not interested in receiving the notification any more. - mergeCh chan hlc.Timestamp + mergeCh chan hlc.Timestamp + unblockCh chan struct{} } } +func newMergeBlocker() *mergeBlocker { + m := &mergeBlocker{} + m.mu.unblockCh = make(chan struct{}) + return m +} + // WaitCh returns the channel on which the blocked merge will be signaled. The // channel will carry the freeze start timestamp for the RHS. func (mb *mergeBlocker) WaitCh() <-chan hlc.Timestamp { @@ -870,27 +816,37 @@ func (mb *mergeBlocker) WaitCh() <-chan hlc.Timestamp { // Unblock unblocks the blocked merge, if any. It's legal to call this even if // no merge is currently blocked, in which case the next merge trigger will no -// longer block. +// longer block. Unblock can be called multiple times; the first call returns +// true, subsequent ones return false and are no-ops. // // Calls to Unblock() need to be synchronized with reading from the channel // returned by WaitCh(). -func (mb *mergeBlocker) Unblock() { - close(mb.unblockCh) +func (mb *mergeBlocker) Unblock() bool { mb.mu.Lock() defer mb.mu.Unlock() + if mb.mu.unblockCh == nil { + // Unblock was already called. + return false + } + + close(mb.mu.unblockCh) mb.mu.mergeCh = nil + mb.mu.unblockCh = nil + return true } -// signal sends a freezeTs to someone waiting for a blocked merge. -func (mb *mergeBlocker) signal(freezeTs hlc.Timestamp) { +// signal sends a freezeTs to someone waiting for a blocked merge. Returns the +// channel to wait on for the merge to be unblocked. +func (mb *mergeBlocker) signal(freezeTs hlc.Timestamp) chan struct{} { mb.mu.Lock() defer mb.mu.Unlock() ch := mb.mu.mergeCh if ch == nil { // Nobody's waiting on this merge any more. - return + return nil } ch <- freezeTs + return mb.mu.unblockCh } // BlockNextMerge arms the merge filter state, installing a blocker for the next @@ -906,9 +862,7 @@ func (filter *mergeFilter) BlockNextMerge() *mergeBlocker { if filter.mu.blocker != nil { panic("next merge already blocked") } - filter.mu.blocker = &mergeBlocker{ - unblockCh: make(chan struct{}), - } + filter.mu.blocker = newMergeBlocker() // This channel is buffered because we don't force the caller to read from it; // the caller can call mergeBlocker.Unblock() instead. filter.mu.blocker.mu.mergeCh = make(chan hlc.Timestamp, 1) @@ -950,15 +904,19 @@ func (filter *mergeFilter) SuspendMergeTrigger( // We block the LHS leaseholder from applying the merge trigger. Note // that RHS followers will have already caught up to the leaseholder // well before this point. - blocker.signal(freezeStart.ToTimestamp()) + unblockCh := blocker.signal(freezeStart.ToTimestamp()) // Wait for the merge to be unblocked. - <-blocker.unblockCh + if unblockCh != nil { + <-unblockCh + } } } return nil } -func mergeTxn(ctx context.Context, store *kvserver.Store, leftDesc roachpb.RangeDescriptor) error { +func mergeWithRightNeighbor( + ctx context.Context, store *kvserver.Store, leftDesc roachpb.RangeDescriptor, +) error { mergeArgs := adminMergeArgs(leftDesc.StartKey.AsRawKey()) _, err := kv.SendWrapped(ctx, store.TestSender(), mergeArgs) return err.GoError() @@ -1033,56 +991,6 @@ func splitDummyRangeInTestCluster( return leftDesc, rightDesc } -func getCurrentMaxClosed( - t *testing.T, - tc serverutils.TestClusterInterface, - target roachpb.ReplicationTarget, - desc roachpb.RangeDescriptor, -) ctpb.Entry { - deadline := timeutil.Now().Add(2 * testingTargetDuration) - store := getTargetStoreOrFatal(t, tc, target) - var maxClosed ctpb.Entry - attempts := 0 - for attempts == 0 || timeutil.Now().Before(deadline) { - attempts++ - store.GetStoreConfig().ClosedTimestamp.Storage.VisitDescending(target.NodeID, func(entry ctpb.Entry) (done bool) { - if _, ok := entry.MLAI[desc.RangeID]; ok { - maxClosed = entry - return true - } - return false - }) - if _, ok := maxClosed.MLAI[desc.RangeID]; !ok { - // We ran out of closed timestamps to visit without finding one that - // corresponds to rightDesc. It is likely that no closed timestamps have - // been broadcast for desc yet, try again. - continue - } - return maxClosed - } - return ctpb.Entry{} -} - -func pollForGreaterClosedTimestamp( - t *testing.T, - tc serverutils.TestClusterInterface, - target roachpb.ReplicationTarget, - desc roachpb.RangeDescriptor, - lowerBound hlc.Timestamp, - returnCh chan<- ctpb.Entry, -) { - for { - if t.Failed() { - return - } - maxClosed := getCurrentMaxClosed(t, tc, target, desc) - if _, ok := maxClosed.MLAI[desc.RangeID]; ok && lowerBound.LessEq(maxClosed.ClosedTimestamp) { - returnCh <- maxClosed - return - } - } -} - func getFollowerReplicas( ctx context.Context, t *testing.T, @@ -1103,19 +1011,21 @@ func getFollowerReplicas( func getTargetStoreOrFatal( t *testing.T, tc serverutils.TestClusterInterface, target roachpb.ReplicationTarget, -) (store *kvserver.Store) { +) *kvserver.Store { + s, err := getTargetStore(tc, target) + require.NoError(t, err) + return s +} + +func getTargetStore( + tc serverutils.TestClusterInterface, target roachpb.ReplicationTarget, +) (_ *kvserver.Store, err error) { for i := 0; i < tc.NumServers(); i++ { - if server := tc.Server(i); server.NodeID() == target.NodeID && - server.GetStores().(*kvserver.Stores).HasStore(target.StoreID) { - store, err := server.GetStores().(*kvserver.Stores).GetStore(target.StoreID) - if err != nil { - t.Fatal(err) - } - return store + if server := tc.Server(i); server.NodeID() == target.NodeID { + return server.GetStores().(*kvserver.Stores).GetStore(target.StoreID) } } - t.Fatalf("Could not find store for replication target %+v\n", target) - return nil + return nil, errors.Errorf("could not find node for replication target %+v\n", target) } func verifyNotLeaseHolderErrors( diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index bcb9079be99e..65a2785a00b0 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -43,7 +43,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" - "github.com/cockroachdb/logtags" "go.etcd.io/etcd/raft/v3" ) @@ -547,11 +546,3 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) { } } } - -// AcquireLease is redirectOnOrAcquireLease exposed for tests. -func (r *Replica) AcquireLease(ctx context.Context) (kvserverpb.LeaseStatus, error) { - ctx = r.AnnotateCtx(ctx) - ctx = logtags.AddTag(ctx, "lease-acq", nil) - l, pErr := r.redirectOnOrAcquireLease(ctx) - return l, pErr.GoError() -} diff --git a/pkg/kv/kvserver/replica_closedts_test.go b/pkg/kv/kvserver/replica_closedts_test.go index bf33cf09988e..3eb50ef84910 100644 --- a/pkg/kv/kvserver/replica_closedts_test.go +++ b/pkg/kv/kvserver/replica_closedts_test.go @@ -601,7 +601,7 @@ func TestRejectedLeaseDoesntDictateClosedTimestamp(t *testing.T) { leaseAcqErrCh <- err return } - _, err = r.AcquireLease(ctx) + _, err = r.TestingAcquireLease(ctx) leaseAcqErrCh <- err }() // Wait for the lease acquisition to be blocked. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 4b658128f7bd..5cf919c82465 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -784,6 +784,7 @@ func (r *Replica) AdminMerge( txn := kv.NewTxn(ctx, r.store.DB(), r.NodeID()) err := runMergeTxn(txn) if err != nil { + log.VEventf(ctx, 2, "merge txn failed: %s", err) txn.CleanupOnError(ctx, err) } if !errors.HasType(err, (*roachpb.TransactionRetryWithProtoRefreshError)(nil)) { diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 650c7445e4d7..b20454dfb0e0 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -728,6 +728,10 @@ func (r *Replica) requestLeaseLocked( return r.mu.pendingLeaseRequest.newResolvedHandle(err) } } + if pErr := r.store.TestingKnobs().PinnedLeases.rejectLeaseIfPinnedElsewhere(r); pErr != nil { + return r.mu.pendingLeaseRequest.newResolvedHandle(pErr) + } + // If we're draining, we'd rather not take any new leases (since we're also // trying to move leases away elsewhere). But if we're the leader, we don't // really have a choice and we take the lease - there might not be any other @@ -1063,6 +1067,14 @@ func (r *Replica) redirectOnOrAcquireLease( return r.redirectOnOrAcquireLeaseForRequest(ctx, hlc.Timestamp{}) } +// TestingAcquireLease is redirectOnOrAcquireLease exposed for tests. +func (r *Replica) TestingAcquireLease(ctx context.Context) (kvserverpb.LeaseStatus, error) { + ctx = r.AnnotateCtx(ctx) + ctx = logtags.AddTag(ctx, "lease-acq", nil) + l, pErr := r.redirectOnOrAcquireLease(ctx) + return l, pErr.GoError() +} + // redirectOnOrAcquireLeaseForRequest is like redirectOnOrAcquireLease, // but it accepts a specific request timestamp instead of assuming that // the request is operating at the current time. diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 00dbfdd47831..04eb0b1499f7 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -624,7 +624,7 @@ func (r *Replica) handleMergeInProgressError( if ba.IsSingleTransferLeaseRequest() { return roachpb.NewErrorf("cannot transfer lease while merge in progress") } - log.Event(ctx, "waiting on in-progress merge") + log.Event(ctx, "waiting on in-progress range merge") select { case <-mergeCompleteCh: // Merge complete. Retry the command. diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 967f3c828dac..2c6048071d98 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) // StoreTestingKnobs is a part of the context used to control parts of @@ -92,6 +93,8 @@ type StoreTestingKnobs struct { // called to acquire a new lease. This can be used to assert that a request // triggers a lease acquisition. LeaseRequestEvent func(ts hlc.Timestamp, storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error + // PinnedLeases can be used to prevent all but one store from acquiring leases on a given range. + PinnedLeases *PinnedLeasesKnob // LeaseTransferBlockedOnExtensionEvent, if set, is called when // replica.TransferLease() encounters an in-progress lease extension. // nextLeader is the replica that we're trying to transfer the lease to. @@ -362,3 +365,60 @@ var _ base.ModuleTestingKnobs = NodeLivenessTestingKnobs{} // ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. func (NodeLivenessTestingKnobs) ModuleTestingKnobs() {} + +// PinnedLeasesKnob is a testing know for controlling what store can acquire a +// lease for specific ranges. +type PinnedLeasesKnob struct { + mu struct { + syncutil.Mutex + pinned map[roachpb.RangeID]roachpb.StoreID + } +} + +// NewPinnedLeases creates a PinnedLeasesKnob. +func NewPinnedLeases() *PinnedLeasesKnob { + p := &PinnedLeasesKnob{} + p.mu.pinned = make(map[roachpb.RangeID]roachpb.StoreID) + return p +} + +// PinLease makes it so that only the specified store can take a lease for the +// specified range. Replicas on other stores attempting to acquire a lease will +// get NotLeaseHolderErrors. Lease transfers away from the pinned store are +// permitted. +func (p *PinnedLeasesKnob) PinLease(rangeID roachpb.RangeID, storeID roachpb.StoreID) { + p.mu.Lock() + defer p.mu.Unlock() + p.mu.pinned[rangeID] = storeID +} + +// rejectLeaseIfPinnedElsewhere is called when r is trying to acquire a lease. +// It returns a NotLeaseholderError if the lease is pinned on another store. +// r.mu needs to be rlocked. +func (p *PinnedLeasesKnob) rejectLeaseIfPinnedElsewhere(r *Replica) *roachpb.Error { + if p == nil { + return nil + } + + p.mu.Lock() + defer p.mu.Unlock() + pinnedStore, ok := p.mu.pinned[r.RangeID] + if !ok || pinnedStore == r.StoreID() { + return nil + } + + repDesc, err := r.getReplicaDescriptorRLocked() + if err != nil { + return roachpb.NewError(err) + } + var pinned *roachpb.ReplicaDescriptor + if pinnedRep, ok := r.descRLocked().GetReplicaDescriptor(pinnedStore); ok { + pinned = &pinnedRep + } + return roachpb.NewError(&roachpb.NotLeaseHolderError{ + Replica: repDesc, + LeaseHolder: pinned, + RangeID: r.RangeID, + CustomMsg: "injected: lease pinned to another store", + }) +} diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 6cb9eaba9b79..3741ac359cf1 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -597,8 +597,11 @@ func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) { req := arg.GetInner() if et, ok := req.(*EndTxnRequest); ok { h := req.Header() - s.Printf("%s(commit:%t) [%s]", - req.Method(), et.Commit, h.Key) + s.Printf("%s(commit:%t", req.Method(), et.Commit) + if et.InternalCommitTrigger != nil { + s.Printf(" %s", et.InternalCommitTrigger.Kind()) + } + s.Printf(") [%s]", h.Key) } else { h := req.Header() if req.Method() == PushTxn { diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index fcaa36bc62f9..0746ee0a537d 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -871,7 +871,7 @@ func (v Value) PrettyPrint() string { } // Kind returns the kind of commit trigger as a string. -func (ct InternalCommitTrigger) Kind() string { +func (ct InternalCommitTrigger) Kind() redact.SafeString { switch { case ct.SplitTrigger != nil: return "split" diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index 05c767286007..cf77fd0097b7 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -18,6 +18,7 @@ package serverutils import ( + "context" gosql "database/sql" "testing" @@ -155,11 +156,17 @@ type TestClusterInterface interface { // advancing the manual clock. The target is then instructed to acquire the // ownerless lease. Most tests should use the cooperative version of this // method, TransferRangeLease. + // + // Returns the new lease. + // + // If the lease starts out on dest, this is a no-op and the current lease is + // returned. MoveRangeLeaseNonCooperatively( + ctx context.Context, rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget, manual *hlc.HybridManualClock, - ) error + ) (*roachpb.Lease, error) // LookupRange returns the descriptor of the range containing key. LookupRange(key roachpb.Key) (roachpb.RangeDescriptor, error) diff --git a/pkg/testutils/testcluster/BUILD.bazel b/pkg/testutils/testcluster/BUILD.bazel index 528434ad5c29..22a6b5c28d96 100644 --- a/pkg/testutils/testcluster/BUILD.bazel +++ b/pkg/testutils/testcluster/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "//pkg/base", "//pkg/gossip", "//pkg/keys", - "//pkg/kv", "//pkg/kv/kvserver", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index b5710bf77267..3b714673fd86 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -906,23 +905,26 @@ func (tc *TestCluster) RemoveLeaseHolderOrFatal( // MoveRangeLeaseNonCooperatively is part of the TestClusterInterface. func (tc *TestCluster) MoveRangeLeaseNonCooperatively( - rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget, manual *hlc.HybridManualClock, -) error { + ctx context.Context, + rangeDesc roachpb.RangeDescriptor, + dest roachpb.ReplicationTarget, + manual *hlc.HybridManualClock, +) (*roachpb.Lease, error) { knobs := tc.clusterArgs.ServerArgs.Knobs.Store.(*kvserver.StoreTestingKnobs) if !knobs.AllowLeaseRequestProposalsWhenNotLeader { // Without this knob, we'd have to architect a Raft leadership change // too in order to let the replica get the lease. It's easier to just // require that callers set it. - return errors.Errorf("must set StoreTestingKnobs.AllowLeaseRequestProposalsWhenNotLeader") + return nil, errors.Errorf("must set StoreTestingKnobs.AllowLeaseRequestProposalsWhenNotLeader") } destServer, err := tc.FindMemberServer(dest.StoreID) if err != nil { - return err + return nil, err } destStore, err := destServer.Stores().GetStore(dest.StoreID) if err != nil { - return err + return nil, err } // We are going to advance the manual clock so that the current lease @@ -931,13 +933,15 @@ func (tc *TestCluster) MoveRangeLeaseNonCooperatively( // when it's up for grabs. To handle that case, we wrap the entire operation // in an outer retry loop. const retryDur = testutils.DefaultSucceedsSoonDuration - return retry.ForDuration(retryDur, func() error { + var newLease *roachpb.Lease + err = retry.ForDuration(retryDur, func() error { // Find the current lease. prevLease, _, err := tc.FindRangeLease(rangeDesc, nil /* hint */) if err != nil { return err } if prevLease.Replica.StoreID == dest.StoreID { + newLease = &prevLease return nil } @@ -946,6 +950,7 @@ func (tc *TestCluster) MoveRangeLeaseNonCooperatively( if err != nil { return err } + log.Infof(ctx, "test: advancing clock to lease expiration") manual.Increment(lhStore.GetStoreConfig().LeaseExpiration()) // Heartbeat the destination server's liveness record so that if we are @@ -957,24 +962,20 @@ func (tc *TestCluster) MoveRangeLeaseNonCooperatively( // Issue a request to the target replica, which should notice that the // old lease has expired and that it can acquire the lease. - var newLease *roachpb.Lease - ctx := context.Background() - req := &roachpb.LeaseInfoRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: rangeDesc.StartKey.AsRawKey(), - }, + r, err := destStore.GetReplica(rangeDesc.RangeID) + if err != nil { + return err } - h := roachpb.Header{RangeID: rangeDesc.RangeID} - reply, pErr := kv.SendWrappedWith(ctx, destStore, h, req) - if pErr != nil { - log.Infof(ctx, "LeaseInfoRequest failed: %v", pErr) - if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok && lErr.Lease != nil { + ls, err := r.TestingAcquireLease(ctx) + if err != nil { + log.Infof(ctx, "TestingAcquireLease failed: %s", err) + if lErr := (*roachpb.NotLeaseHolderError)(nil); errors.As(err, &lErr) { newLease = lErr.Lease } else { - return pErr.GoError() + return err } } else { - newLease = &reply.(*roachpb.LeaseInfoResponse).Lease + newLease = &ls.Lease } // Is the lease in the right place? @@ -984,6 +985,8 @@ func (tc *TestCluster) MoveRangeLeaseNonCooperatively( } return nil }) + log.Infof(ctx, "MoveRangeLeaseNonCooperatively: acquired lease: %s. err: %v", newLease, err) + return newLease, err } // FindRangeLease is similar to FindRangeLeaseHolder but returns a Lease proto