Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107613: kvserver: avoid moving parts in closedts tests r=erikgrinaker a=tbg

Many tests (two just this last week[^1][^2]) are flaky because the replicate queue
can make rebalancing decisions that undermine the state the test is trying to
set up. Often, this happens "accidentally" because ReplicationAuto is our default
ReplicationMode.

This PR improves the situation at least for closed timestamp integration tests
by switching them all over to `ReplicationManual` (and preventing any new ones
from accidentally using `ReplicationAuto` in the future).

This can be seen as a small step towards cockroachdb#107528, which I am increasingly
convinced is an ocean worth boiling.

[^1]: cockroachdb#107179
[^2]: cockroachdb#101824

Epic: None
Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Jul 27, 2023
2 parents 57b6b70 + e3e9b77 commit 310951b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 37 deletions.
15 changes: 9 additions & 6 deletions pkg/kv/kvserver/client_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,17 @@ func TestRangefeedIsRoutedToNonVoter(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
clusterArgs := aggressiveResolvedTimestampClusterArgs
// We want to manually add a non-voter to a range in this test, so disable
// the replicateQueue to prevent it from disrupting the test.
clusterArgs.ReplicationMode = base.ReplicationManual
clusterArgs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
// NB: setupClusterForClosedTSTesting sets a low closed timestamp target
// duration.
// NB: the replicate queue is disabled in this test, so we can manually add a
// non-voter to a range without being disrupted.
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
tc.AddNonVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1))
// This test doesn't want the default voters on s2 and s3. We want only
// the voter on s1 and a non-voter on s2.
desc = tc.RemoveVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1), tc.Target(2))
desc = tc.AddNonVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1))

db := tc.Server(1).DB()
ds := tc.Server(1).DistSenderI().(*kvcoord.DistSender)
Expand Down Expand Up @@ -269,7 +271,8 @@ func TestRangefeedIsRoutedToNonVoter(t *testing.T) {
}
rangefeedCancel()
require.Regexp(t, "context canceled", <-rangefeedErrChan)
require.Regexp(t, "attempting to create a RangeFeed over replica.*2NON_VOTER", getRecAndFinish().String())
require.Regexp(t, `attempting to create a RangeFeed over replica .n2,s2.:\dNON_VOTER`,
getRecAndFinish().String())
}

// TestRangefeedWorksOnLivenessRange ensures that a rangefeed works as expected
Expand Down
69 changes: 43 additions & 26 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ import (
"golang.org/x/sync/errgroup"
)

var aggressiveResolvedTimestampClusterArgs = base.TestClusterArgs{
var aggressiveResolvedTimestampManuallyReplicatedClusterArgs = base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: base.RaftConfig{
// With expiration-based leases, we may be unable to maintain leases under
Expand All @@ -81,12 +82,13 @@ func TestClosedTimestampCanServe(t *testing.T) {
testutils.RunTrueAndFalse(t, "withNonVoters", func(t *testing.T, withNonVoters bool) {
ctx := context.Background()
dbName, tableName := "cttest", "kv"
clusterArgs := aggressiveResolvedTimestampClusterArgs
// Disable the replicateQueue so that it doesn't interfere with replica
// membership ranges.
clusterArgs.ReplicationMode = base.ReplicationManual
// NB: replicate queue is disabled here, so won't interfere with our manual
// changes.
clusterArgs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, dbName, tableName)
defer tc.Stopper().Stop(ctx)
// This test doesn't want the range 3x replicated. Move back to 1x.
desc = tc.RemoveVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1), tc.Target(2))

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -152,12 +154,14 @@ func TestClosedTimestampCanServeOnVoterIncoming(t *testing.T) {

ctx := context.Background()
dbName, tableName := "cttest", "kv"
clusterArgs := aggressiveResolvedTimestampClusterArgs
clusterArgs.ReplicationMode = base.ReplicationManual
clusterArgs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
knobs, ltk := makeReplicationTestKnobs()
clusterArgs.ServerArgs.Knobs = knobs
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, dbName, tableName)
defer tc.Stopper().Stop(ctx)
// This test actually doesn't want the range to be 3x replicated. Move it
// back down to one replica.
desc = tc.RemoveVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1), tc.Target(2))

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -192,7 +196,8 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
cArgs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, cArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)

Expand Down Expand Up @@ -270,7 +275,8 @@ func TestClosedTimestampCantServeWithConflictingIntent(t *testing.T) {
defer txnwait.TestingOverrideTxnLivenessThreshold(time.Hour)()

ctx := context.Background()
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
cArgs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, cArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)
ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender)
Expand Down Expand Up @@ -377,7 +383,8 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) {
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
cArgs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, cArgs, "cttest", "kv")
repls := replsForRange(ctx, t, tc, desc)
// Disable the automatic merging.
if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil {
Expand Down Expand Up @@ -457,7 +464,8 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) {
ctx := context.Background()
// Set up the target duration to be very long and rely on lease transfers to
// drive MaxClosed.
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
cArgs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, cArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)

Expand Down Expand Up @@ -490,7 +498,8 @@ func TestClosedTimestampCanServeForWritingTransaction(t *testing.T) {
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
cArgs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, cArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)

Expand Down Expand Up @@ -537,7 +546,8 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) {
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
cArgs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, cArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)

Expand Down Expand Up @@ -579,7 +589,8 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) {

testutils.RunTrueAndFalse(t, "tsFromServer", func(t *testing.T, tsFromServer bool) {
ctx := context.Background()
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
cArgs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, cArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)

Expand Down Expand Up @@ -683,6 +694,7 @@ func TestClosedTimestampFrozenAfterSubsumption(t *testing.T) {
kvserver.ExpirationLeasesOnly.Override(ctx, &cs.SV, false) // override metamorphism

clusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: cs,
RaftConfig: base.RaftConfig{
Expand Down Expand Up @@ -1158,9 +1170,13 @@ func pickRandomTarget(
tc serverutils.TestClusterInterface, lh roachpb.ReplicationTarget, desc roachpb.RangeDescriptor,
) (t roachpb.ReplicationTarget) {
for {
if t = tc.Target(rand.Intn(len(desc.InternalReplicas))); t != lh {
n := len(desc.InternalReplicas)
if t = tc.Target(rand.Intn(n)); t != lh {
return t
}
if n <= 1 {
panic(fmt.Sprintf("the only target in %v is the leaseholder %v", desc, lh))
}
}
}

Expand All @@ -1184,20 +1200,23 @@ func aggressiveResolvedTimestampPushKnobs() *kvserver.StoreTestingKnobs {
}
}

// setupClusterForClosedTSTesting creates a test cluster that is prepared to
// exercise follower reads. The returned test cluster has follower reads enabled
// using the given targetDuration and testingCloseFraction. In addition to the
// newly minted test cluster, this function returns a db handle to node 0, a
// range descriptor for the range used by the table `{dbName}.{tableName}`. It
// is the caller's responsibility to Stop the Stopper on the returned test
// cluster when done.
// setupClusterForClosedTSTesting creates a three node test cluster that is
// prepared to exercise follower reads. The returned test cluster has follower
// reads enabled using the given targetDuration and testingCloseFraction. In
// addition to the newly minted test cluster, this function returns a db handle
// to node 0, a range descriptor for the range used by the table
// `{dbName}.{tableName}`. It is the caller's responsibility to Stop the Stopper
// on the returned test cluster when done. The range represented by kvTableDesc
// will be fully replicated; other ranges won't be by default.
func setupClusterForClosedTSTesting(
ctx context.Context,
t *testing.T,
targetDuration, sideTransportInterval time.Duration,
clusterArgs base.TestClusterArgs,
dbName, tableName string,
) (tc serverutils.TestClusterInterface, db0 *gosql.DB, kvTableDesc roachpb.RangeDescriptor) {
require.Equal(t, base.ReplicationManual, clusterArgs.ReplicationMode)

const numNodes = 3
if sideTransportInterval == 0 {
sideTransportInterval = targetDuration / 4
Expand All @@ -1212,10 +1231,8 @@ SET CLUSTER SETTING kv.allocator.load_based_rebalancing = 'off';
`, targetDuration, sideTransportInterval),
";")...)

// Disable replicate queues to avoid errant lease transfers.
//
// See: https://github.com/cockroachdb/cockroach/issues/101824.
tc.ToggleReplicateQueues(false)
desc = tc.AddVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1), tc.Target(2))
require.EqualValues(t, 3, numNodes) // reminder to update the AddVotersOrFatal if numNodes ever changed

return tc, tc.ServerConn(0), desc
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,8 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
tc, db, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv")
cArgs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
tc, db, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, cArgs, "cttest", "kv")
defer tc.Stopper().Stop(ctx)
repls := replsForRange(ctx, t, tc, desc)

Expand Down Expand Up @@ -1245,8 +1246,7 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism

cargs := aggressiveResolvedTimestampClusterArgs
cargs.ReplicationMode = base.ReplicationManual
cargs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
manualClock := hlc.NewHybridManualClock()
cargs.ServerArgs = base.TestServerArgs{
Settings: st,
Expand Down Expand Up @@ -1430,8 +1430,7 @@ func TestNewRangefeedForceLeaseRetry(t *testing.T) {

var timeoutSimulated bool

cargs := aggressiveResolvedTimestampClusterArgs
cargs.ReplicationMode = base.ReplicationManual
cargs := aggressiveResolvedTimestampManuallyReplicatedClusterArgs
manualClock := hlc.NewHybridManualClock()
cargs.ServerArgs = base.TestServerArgs{
Knobs: base.TestingKnobs{
Expand Down

0 comments on commit 310951b

Please sign in to comment.