diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 8feb60467878..cd9794011793 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -2000,6 +2000,117 @@ func TestStoreRangeMergeRHSLeaseTransfers(t *testing.T) { require.NoError(t, <-mergeErr) } +// TestStoreRangeMergeLHSLeaseTransfersAfterFreezeTime verifies that in cases +// where the lease start time on a LHS range is moved above the freeze time of a +// range merge, the combined range after the merge does not broadcast a closed +// timestamp that it then allows to be violated. +// +// This is a regression test for #60929. In that issue, which was discovered by +// kvnemesis, we found that a range merge and a lease transfer could race in +// such a way that the closed timestamp could later be violated by a write to +// the subsumed portion of the joint range. The root cause of this was an +// opportunistic optimization made in 7037b54 to consider a range's lease start +// time as an input to its closed timestamp computation. This optimization did +// not account for the possibility of serving writes to a newly subsumed +// keyspace below a range's lease start time if that keyspace was merged into a +// range under its current lease and with a freeze time below the current lease +// start time. This bug was fixed by removing the optimization, which was on its +// way out to allow for #61986 anyway. +func TestStoreRangeMergeLHSLeaseTransfersAfterFreezeTime(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Install a hook to control when the merge transaction subsumes the RHS. + // Put this in a sync.Once to ignore retries. + var once sync.Once + subsumeReceived := make(chan struct{}) + finishSubsume := make(chan struct{}) + testingResponseFilter := func(_ context.Context, ba roachpb.BatchRequest, _ *roachpb.BatchResponse) *roachpb.Error { + if ba.IsSingleSubsumeRequest() { + once.Do(func() { + subsumeReceived <- struct{}{} + <-finishSubsume + }) + } + return nil + } + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingResponseFilter: testingResponseFilter, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + // Create the ranges to be merged. Put both ranges on both stores. Give the + // first store the lease on the LHS and the second store the lease on the + // RHS. Before the merge completes, we'll transfer the LHS's lease to the + // second store so that the two leaseholders are collocated. + lhsDesc, rhsDesc, err := tc.Servers[0].ScratchRangeEx() + require.NoError(t, err) + + tc.AddVotersOrFatal(t, lhsDesc.StartKey.AsRawKey(), tc.Target(1)) + tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Target(1)) + tc.TransferRangeLeaseOrFatal(t, lhsDesc, tc.Target(0)) + tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(1)) + + // Launch the merge. + mergeErr := make(chan error, 1) + _ = tc.Stopper().RunAsyncTask(ctx, "merge", func(context.Context) { + args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) + _, pErr := kv.SendWrapped(ctx, tc.Servers[0].DistSender(), args) + mergeErr <- pErr.GoError() + }) + + // Wait for the merge transaction to send its Subsume request. It won't + // be able to complete just yet, thanks to the hook we installed above. + <-subsumeReceived + + // Transfer the lease on the LHS to the second store. Doing this will + // increase the lease start time on the LHS past the freeze time of the + // range merge. + if err := tc.TransferRangeLease(lhsDesc, tc.Target(1)); err != nil { + close(finishSubsume) // don't abandon merge + t.Fatalf(`could transfer lease for range %s error is %+v`, lhsDesc, err) + } + + store1 := tc.GetFirstStoreFromServer(t, 1) + lhsLeaseholder := store1.LookupReplica(lhsDesc.StartKey) + testutils.SucceedsSoon(t, func() error { + // Wait for the new leaseholder to notice that it received the lease. + now := tc.Servers[1].Clock().NowAsClockTimestamp() + if !lhsLeaseholder.OwnsValidLease(ctx, now) { + return errors.New("not leaseholder") + } + return nil + }) + lhsClosedTS, ok := lhsLeaseholder.MaxClosed(ctx) + require.True(t, ok) + + // Finally, allow the merge to complete. It should complete successfully. + close(finishSubsume) + require.NoError(t, <-mergeErr) + + // Attempt to write below the closed timestamp, to the subsumed keyspace. + // The write's timestamp should be forwarded to after the closed timestamp. + // If it is not, we have violated the closed timestamp's promise! + var ba roachpb.BatchRequest + ba.Timestamp = lhsClosedTS.Prev() + ba.RangeID = lhsDesc.RangeID + ba.Add(incrementArgs(rhsDesc.StartKey.AsRawKey().Next(), 1)) + br, pErr := tc.Servers[1].DistSender().Send(ctx, ba) + require.Nil(t, pErr) + require.NotEqual(t, ba.Timestamp, br.Timestamp, "write timestamp not bumped") + require.True(t, lhsClosedTS.Less(br.Timestamp), "write timestamp not bumped above closed timestamp") +} + // TestStoreRangeMergeCheckConsistencyAfterSubsumption verifies the following: // 1. While a range is subsumed, ComputeChecksum requests wait until the merge // is complete before proceeding. diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 9f88ffbd2f49..f509eed74dd7 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -411,8 +411,8 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { ctx := context.Background() // Set up the target duration to be very long and rely on lease transfers to // drive MaxClosed. - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, time.Hour, testingCloseFraction, - aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, + testingCloseFraction, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc, numNodes) @@ -420,20 +420,15 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { t.Fatal(err) } - // Grab a timestamp before initiating a lease transfer, transfer the lease, - // then ensure that reads at that timestamp can occur from all the replicas. + // Verify that we can serve a follower read at a timestamp. Wait if necessary. ts := tc.Server(0).Clock().Now() - lh := getCurrentLeaseholder(t, tc, desc) - target := pickRandomTarget(tc, lh, desc) - require.Nil(t, tc.TransferRangeLease(desc, target)) baRead := makeTxnReadBatchForDesc(desc, ts) testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) // Update the batch to simulate a transaction that has a global uncertainty - // limit after the lease transfer. Keep its read timestamp from before the - // lease transfer. + // limit after the lease transfer. Keep its read timestamp the same. baRead.Txn.GlobalUncertaintyLimit = tc.Server(0).Clock().Now().Add(time.Second.Nanoseconds(), 0) // Send the request to all three replicas. One should succeed and // the other two should return NotLeaseHolderErrors. diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 965c97ff7245..17f86c3248ec 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -197,9 +197,17 @@ func (r *Replica) maxClosedRLocked( // Look at the legacy closed timestamp propagation mechanism. maxClosed := r.store.cfg.ClosedTimestamp.Provider.MaxClosed( lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), appliedLAI) - maxClosed.Forward(lease.Start.ToTimestamp()) maxClosed.Forward(initialMaxClosed) + // If the range has not upgraded to the new closed timestamp system, + // continue using the lease start time as an input to the range's closed + // timestamp. Otherwise, ignore it. We expect to delete this code soon, but + // we keep it around for now to avoid a regression in follower read + // availability in mixed v20.2/v21.1 clusters. + if replicaStateClosed.IsEmpty() { + maxClosed.Forward(lease.Start.ToTimestamp()) + } + // Look at the "new" closed timestamp propagation mechanism. maxClosed.Forward(replicaStateClosed) maxClosed.Forward(sideTransportClosed)