diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 28199785096f..188b21cd0df3 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -36,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -557,12 +557,11 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) { verifyNotLeaseHolderErrors(t, baRead, repls, 2) } -// TestClosedTimestampInactiveAfterSubsumption verifies that, during a merge, -// replicas of the subsumed range (RHS) cannot serve follower reads for -// timestamps after the subsumption time. -func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { +// Test that, during a merge, the closed timestamp of the subsumed RHS doesn't +// go above the subsumption time. It'd be bad if it did, since this advanced +// closed timestamp would be lost when the merge finalizes. +func TestClosedTimestampFrozenAfterSubsumption(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 59448, "flaky test") // Skipping under short because this test pauses for a few seconds in order to // trigger a node liveness expiration. skip.UnderShort(t) @@ -575,7 +574,7 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { ctx context.Context, t *testing.T, tc serverutils.TestClusterInterface, - g *errgroup.Group, + g *ctxgroup.Group, rightDesc roachpb.RangeDescriptor, rightLeaseholder roachpb.ReplicationTarget, freezeStartTimestamp hlc.Timestamp, @@ -587,7 +586,7 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { callback postSubsumptionCallback } - tests := []testCase{ + for _, test := range []testCase{ { name: "without lease transfer", callback: nil, @@ -596,149 +595,140 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { name: "with intervening lease transfer", callback: forceLeaseTransferOnSubsumedRange, }, - } - - runTest := func(t *testing.T, callback postSubsumptionCallback) { - ctx := context.Background() - // Range merges can be internally retried by the coordinating node (the - // leaseholder of the left hand side range). If this happens, the right hand - // side can get re-subsumed. However, in the current implementation, even if - // the merge txn gets retried, the follower replicas should not be able to - // activate any closed timestamp updates succeeding the timestamp the RHS - // was subsumed _for the first time_. - st := mergeFilter{} - var leaseAcquisitionTrap atomic.Value - clusterArgs := base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - RaftConfig: base.RaftConfig{ - // We set the raft election timeout to a small duration. This should - // result in the node liveness duration being ~3.6 seconds. Note that - // if we set this too low, the test may flake due to the test - // cluster's nodes frequently missing their liveness heartbeats. - RaftHeartbeatIntervalTicks: 5, - RaftElectionTimeoutTicks: 6, - }, - Knobs: base.TestingKnobs{ - Store: &kvserver.StoreTestingKnobs{ - // This test suspends the merge txn right before it can apply the - // commit trigger and can lead to the merge txn taking longer than - // the defaults specified in aggressiveResolvedTimestampPushKnobs(). - // We use really high values here in order to avoid the merge txn - // being pushed due to resolved timestamps. - RangeFeedPushTxnsInterval: 5 * time.Second, - RangeFeedPushTxnsAge: 60 * time.Second, - TestingRequestFilter: st.SuspendMergeTrigger, - LeaseRequestEvent: func(ts hlc.Timestamp, storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error { - val := leaseAcquisitionTrap.Load() - if val == nil { + } { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + // Range merges can be internally retried by the coordinating node (the + // leaseholder of the left hand side range). If this happens, the right hand + // side can get re-subsumed. However, in the current implementation, even if + // the merge txn gets retried, the follower replicas should not be able to + // activate any closed timestamp updates succeeding the timestamp the RHS + // was subsumed _for the first time_. + st := mergeFilter{} + var leaseAcquisitionTrap atomic.Value + clusterArgs := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + RaftConfig: base.RaftConfig{ + // We set the raft election timeout to a small duration. This should + // result in the node liveness duration being ~3.6 seconds. Note that + // if we set this too low, the test may flake due to the test + // cluster's nodes frequently missing their liveness heartbeats. + RaftHeartbeatIntervalTicks: 5, + RaftElectionTimeoutTicks: 6, + }, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // This test suspends the merge txn right before it can apply the + // commit trigger and can lead to the merge txn taking longer than + // the defaults specified in aggressiveResolvedTimestampPushKnobs(). + // We use really high values here in order to avoid the merge txn + // being pushed due to resolved timestamps. + RangeFeedPushTxnsInterval: 5 * time.Second, + RangeFeedPushTxnsAge: 60 * time.Second, + TestingRequestFilter: st.SuspendMergeTrigger, + LeaseRequestEvent: func(ts hlc.Timestamp, storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error { + val := leaseAcquisitionTrap.Load() + if val == nil { + return nil + } + leaseAcquisitionCallback := val.(func(storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error) + if err := leaseAcquisitionCallback(storeID, rangeID); err != nil { + return err + } return nil - } - leaseAcquisitionCallback := val.(func(storeID roachpb.StoreID, rangeID roachpb.RangeID) *roachpb.Error) - if err := leaseAcquisitionCallback(storeID, rangeID); err != nil { - return err - } - return nil + }, + DisableMergeQueue: true, + // A subtest wants to force a lease change by stopping the liveness + // heartbeats on the old leaseholder and sending a request to + // another replica. If we didn't use this knob, we'd have to + // architect a Raft leadership change too in order to let the + // replica get the lease. + AllowLeaseRequestProposalsWhenNotLeader: true, }, - DisableMergeQueue: true, - // A subtest wants to force a lease change by stopping the liveness - // heartbeats on the old leaseholder and sending a request to - // another replica. If we didn't use this knob, we'd have to - // architect a Raft leadership change too in order to let the - // replica get the lease. - AllowLeaseRequestProposalsWhenNotLeader: true, }, }, - }, - } - // If the initial phase of the merge txn takes longer than the closed - // timestamp target duration, its initial CPuts can have their write - // timestamps bumped due to an intervening closed timestamp update. This - // causes the entire merge txn to retry. So we use a long closed timestamp - // duration at the beginning of the test until we have the merge txn - // suspended at its commit trigger, and then change it back down to - // `testingTargetDuration`. - tc, leftDesc, rightDesc := setupClusterForClosedTSTestingWithSplitRanges(ctx, t, 5*time.Second, - testingCloseFraction, clusterArgs) - defer tc.Stopper().Stop(ctx) - - leftLeaseholder := getCurrentLeaseholder(t, tc, leftDesc) - rightLeaseholder := getCurrentLeaseholder(t, tc, rightDesc) - - g, ctx := errgroup.WithContext(ctx) - // Merge the ranges back together. The LHS leaseholder should block right - // before the merge trigger request is sent. - leftLeaseholderStore := getTargetStoreOrFatal(t, tc, leftLeaseholder) - blocker := st.BlockNextMerge() - mergeErrCh := make(chan error, 1) - g.Go(func() error { - err := mergeTxn(ctx, leftLeaseholderStore, leftDesc) - mergeErrCh <- err - return err - }) - defer func() { - // Unblock the rightLeaseholder so it can finally commit the merge. - blocker.Unblock() - if err := g.Wait(); err != nil { - t.Error(err) } - }() + // If the initial phase of the merge txn takes longer than the closed + // timestamp target duration, its initial CPuts can have their write + // timestamps bumped due to an intervening closed timestamp update. This + // causes the entire merge txn to retry. So we use a long closed timestamp + // duration at the beginning of the test until we have the merge txn + // suspended at its commit trigger, and then change it back down to + // `testingTargetDuration`. + tc, leftDesc, rightDesc := setupClusterForClosedTSTestingWithSplitRanges(ctx, t, 5*time.Second, + testingCloseFraction, clusterArgs) + defer tc.Stopper().Stop(ctx) + + leftLeaseholder := getCurrentLeaseholder(t, tc, leftDesc) + rightLeaseholder := getCurrentLeaseholder(t, tc, rightDesc) + + g := ctxgroup.WithContext(ctx) + // Merge the ranges back together. The LHS leaseholder should block right + // before the merge trigger request is sent. + leftLeaseholderStore := getTargetStoreOrFatal(t, tc, leftLeaseholder) + blocker := st.BlockNextMerge() + mergeErrCh := make(chan error, 1) + g.Go(func() error { + err := mergeTxn(ctx, leftLeaseholderStore, leftDesc) + mergeErrCh <- err + return err + }) + defer func() { + // Unblock the rightLeaseholder so it can finally commit the merge. + blocker.Unblock() + if err := g.Wait(); err != nil { + t.Error(err) + } + }() - var freezeStartTimestamp hlc.Timestamp - // We now have the RHS in its subsumed state. - select { - case freezeStartTimestamp = <-blocker.WaitCh(): - case err := <-mergeErrCh: - t.Fatal(err) - case <-time.After(45 * time.Second): - t.Fatal("did not receive merge commit trigger as expected") - } - // Reduce the closed timestamp target duration in order to make the rest of - // the test faster. - db := tc.ServerConn(0) - if _, err := db.Exec(fmt.Sprintf(`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s';`, - testingTargetDuration)); err != nil { - t.Fatal(err) - } - // inactiveClosedTSBoundary indicates the low water mark for closed - // timestamp updates beyond which we expect none of the followers to be able - // to serve follower reads until the merge is complete. - inactiveClosedTSBoundary := freezeStartTimestamp - if callback != nil { - newRightLeaseholder, ts, err := callback(ctx, t, tc, g, rightDesc, rightLeaseholder, - freezeStartTimestamp, &leaseAcquisitionTrap) - if err != nil { + var freezeStartTimestamp hlc.Timestamp + // We now have the RHS in its subsumed state. + select { + case freezeStartTimestamp = <-blocker.WaitCh(): + case err := <-mergeErrCh: t.Fatal(err) + case <-time.After(45 * time.Second): + t.Fatal("did not receive merge commit trigger as expected") + } + // Reduce the closed timestamp target duration in order to make the rest of + // the test faster. + db := tc.ServerConn(0) + if _, err := db.Exec(fmt.Sprintf(`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s';`, + testingTargetDuration)); err != nil { + t.Fatal(err) + } + // inactiveClosedTSBoundary indicates the low water mark for closed + // timestamp updates beyond which we expect none of the followers to be able + // to serve follower reads until the merge is complete. + inactiveClosedTSBoundary := freezeStartTimestamp + if test.callback != nil { + newRightLeaseholder, ts, err := test.callback(ctx, t, tc, &g, rightDesc, rightLeaseholder, + freezeStartTimestamp, &leaseAcquisitionTrap) + if err != nil { + t.Fatal(err) + } + rightLeaseholder, inactiveClosedTSBoundary = newRightLeaseholder, ts } - rightLeaseholder, inactiveClosedTSBoundary = newRightLeaseholder, ts - } - // Poll the store for closed timestamp updates for timestamps greater than - // our `inactiveClosedTSBoundary`. - closedTimestampCh := make(chan ctpb.Entry, 1) - g.Go(func() (e error) { - pollForGreaterClosedTimestamp(t, tc, rightLeaseholder, rightDesc, inactiveClosedTSBoundary, closedTimestampCh) - return - }) - // We expect that none of the closed timestamp updates greater than - // `inactiveClosedTSBoundary` will be actionable by the RHS follower - // replicas. - log.Infof(ctx, "waiting for next closed timestamp update for the RHS") - select { - case <-closedTimestampCh: - case <-time.After(30 * time.Second): - t.Fatal("failed to receive next closed timestamp update") - } - baReadAfterLeaseTransfer := makeTxnReadBatchForDesc(rightDesc, inactiveClosedTSBoundary.Next()) - rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder) - log.Infof(ctx, "sending read requests from followers after the inactiveClosedTSBoundary") - verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */) - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - runTest(t, test.callback) + // Sleep a bit and assert that the closed timestamp has not advanced while + // we were sleeping. + time.Sleep(time.Second) + + store, err := getTargetStore(tc, rightLeaseholder) + require.NoError(t, err) + r, err := store.GetReplica(rightDesc.RangeID) + require.NoError(t, err) + maxClosed, ok := r.MaxClosed(ctx) + require.True(t, ok) + require.True(t, maxClosed.LessEq(inactiveClosedTSBoundary)) + + // As a sanity check, verify that the followers cannot serve follower reads. + baReadAfterLeaseTransfer := makeTxnReadBatchForDesc(rightDesc, inactiveClosedTSBoundary.Next()) + rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder) + log.Infof(ctx, "test sending read requests from followers after the inactiveClosedTSBoundary") + verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */) }) } - } // forceLeaseTransferOnSubsumedRange triggers a lease transfer on `rightDesc` by @@ -747,7 +737,7 @@ func forceLeaseTransferOnSubsumedRange( ctx context.Context, t *testing.T, tc serverutils.TestClusterInterface, - g *errgroup.Group, + g *ctxgroup.Group, rightDesc roachpb.RangeDescriptor, rightLeaseholder roachpb.ReplicationTarget, freezeStartTimestamp hlc.Timestamp, @@ -1033,56 +1023,6 @@ func splitDummyRangeInTestCluster( return leftDesc, rightDesc } -func getCurrentMaxClosed( - t *testing.T, - tc serverutils.TestClusterInterface, - target roachpb.ReplicationTarget, - desc roachpb.RangeDescriptor, -) ctpb.Entry { - deadline := timeutil.Now().Add(2 * testingTargetDuration) - store := getTargetStoreOrFatal(t, tc, target) - var maxClosed ctpb.Entry - attempts := 0 - for attempts == 0 || timeutil.Now().Before(deadline) { - attempts++ - store.GetStoreConfig().ClosedTimestamp.Storage.VisitDescending(target.NodeID, func(entry ctpb.Entry) (done bool) { - if _, ok := entry.MLAI[desc.RangeID]; ok { - maxClosed = entry - return true - } - return false - }) - if _, ok := maxClosed.MLAI[desc.RangeID]; !ok { - // We ran out of closed timestamps to visit without finding one that - // corresponds to rightDesc. It is likely that no closed timestamps have - // been broadcast for desc yet, try again. - continue - } - return maxClosed - } - return ctpb.Entry{} -} - -func pollForGreaterClosedTimestamp( - t *testing.T, - tc serverutils.TestClusterInterface, - target roachpb.ReplicationTarget, - desc roachpb.RangeDescriptor, - lowerBound hlc.Timestamp, - returnCh chan<- ctpb.Entry, -) { - for { - if t.Failed() { - return - } - maxClosed := getCurrentMaxClosed(t, tc, target, desc) - if _, ok := maxClosed.MLAI[desc.RangeID]; ok && lowerBound.LessEq(maxClosed.ClosedTimestamp) { - returnCh <- maxClosed - return - } - } -} - func getFollowerReplicas( ctx context.Context, t *testing.T, @@ -1103,19 +1043,21 @@ func getFollowerReplicas( func getTargetStoreOrFatal( t *testing.T, tc serverutils.TestClusterInterface, target roachpb.ReplicationTarget, -) (store *kvserver.Store) { +) *kvserver.Store { + s, err := getTargetStore(tc, target) + require.NoError(t, err) + return s +} + +func getTargetStore( + tc serverutils.TestClusterInterface, target roachpb.ReplicationTarget, +) (_ *kvserver.Store, err error) { for i := 0; i < tc.NumServers(); i++ { - if server := tc.Server(i); server.NodeID() == target.NodeID && - server.GetStores().(*kvserver.Stores).HasStore(target.StoreID) { - store, err := server.GetStores().(*kvserver.Stores).GetStore(target.StoreID) - if err != nil { - t.Fatal(err) - } - return store + if server := tc.Server(i); server.NodeID() == target.NodeID { + return server.GetStores().(*kvserver.Stores).GetStore(target.StoreID) } } - t.Fatalf("Could not find store for replication target %+v\n", target) - return nil + return nil, errors.Errorf("could not find node for replication target %+v\n", target) } func verifyNotLeaseHolderErrors(