Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: accelerate tests whose latency is sensitive to rangefeeds #111753

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ type TestServerArgs struct {
// AutoConfigProvider provides auto-configuration tasks to apply on
// the cluster during server initialization.
AutoConfigProvider acprovider.Provider

// FastRangefeeds, if set, lowers the latency at which rangefeeds
// report changes.
Comment on lines +173 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: changes are always emitted ~immediately, this knob only affects checkpoint lag and frequency.

FastRangefeeds bool
}

// TestClusterArgs contains the parameters one can set when creating a test
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/alter_backup_schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ func newAlterSchedulesTestHelper(t *testing.T) (*alterSchedulesTestHelper, func(
Knobs: base.TestingKnobs{
JobsTestingKnobs: knobs,
},
// Speeds up test.
FastRangefeeds: true,
}
s, db, _ := serverutils.StartServer(t, args)
require.NotNil(t, th.cfg)
th.sqlDB = sqlutils.MakeSQLRunner(db)
th.server = s
th.sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'`)
sysDB := sqlutils.MakeSQLRunner(s.SystemLayer().SQLConn(t, ""))
sysDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) // speeds up test

return th, func() {
dirCleanupFn()
Expand Down
26 changes: 10 additions & 16 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5847,6 +5847,7 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
dir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()
params := base.TestClusterArgs{}
params.ServerArgs.FastRangefeeds = true
params.ServerArgs.ExternalIODir = dir
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
params.ServerArgs.DefaultTestTenant = base.TestControlsTenantsExplicitly
Expand Down Expand Up @@ -5889,10 +5890,6 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
tablePrefix := tt.ExecutorConfig().(sql.ExecutorConfig).Codec.TablePrefix(uint32(tableID))
startPretty := tablePrefix.String()

// Speeds up the test.
systemTenantRunner.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';")
systemTenantRunner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")

// Run a full backup.
baseBackupURI := "userfile:///foo"
runner.Exec(t, fmt.Sprintf(`BACKUP TABLE foo INTO '%s'`, baseBackupURI))
Expand Down Expand Up @@ -9154,11 +9151,12 @@ func TestGCDropIndexSpanExpansion(t *testing.T) {
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
// Speeds up the test.
FastRangefeeds: true,
}})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
sqlRunner := sqlutils.MakeSQLRunner(conn)
sqlRunner.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) // speeds up the test

sqlRunner.Exec(t, `CREATE DATABASE test;`)
sqlRunner.Exec(t, ` USE test;`)
Expand Down Expand Up @@ -9313,6 +9311,8 @@ func TestExcludeDataFromBackupAndRestore(t *testing.T) {
tc, sqlDB, iodir, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, 10,
InitManualReplication, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Speeds up the test.
FastRangefeeds: true,
// Disabled to run within tenants because the function that sets up the restoring cluster
// has not been configured yet to run within tenants.
DefaultTestTenant: base.TODOTestTenantDisabled,
Expand Down Expand Up @@ -9348,7 +9348,6 @@ func TestExcludeDataFromBackupAndRestore(t *testing.T) {
defer cleanup()

sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
conn := tc.Conns[0]

sqlDB.Exec(t, `CREATE TABLE data.foo (id INT, INDEX bar(id))`)
Expand Down Expand Up @@ -9419,6 +9418,8 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) {
// Test fails when run within a tenant as zone config
// updates are not allowed by default. Tracked with 73768.
DefaultTestTenant: base.TODOTestTenantDisabled,
// Make test faster.
FastRangefeeds: true,
},
}
args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
Expand Down Expand Up @@ -9447,12 +9448,6 @@ func TestExportRequestBelowGCThresholdOnDataExcludedFromBackup(t *testing.T) {
_, err := conn.Exec("CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)")
require.NoError(t, err)

_, err = conn.Exec("SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';")
require.NoError(t, err)

_, err = conn.Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test
require.NoError(t, err)

const tableRangeMaxBytes = 100 << 20
_, err = conn.Exec("ALTER TABLE foo CONFIGURE ZONE USING "+
"gc.ttlseconds = 1, range_max_bytes = $1, range_min_bytes = 1<<10;", tableRangeMaxBytes)
Expand Down Expand Up @@ -9518,6 +9513,9 @@ func TestExcludeDataFromBackupDoesNotHoldupGC(t *testing.T) {
DisableLastProcessedCheck: true,
}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
// Make test faster.
params.ServerArgs.FastRangefeeds = true

tc := testcluster.StartTestCluster(t, 1, params)
defer tc.Stopper().Stop(ctx)

Expand All @@ -9527,9 +9525,6 @@ func TestExcludeDataFromBackupDoesNotHoldupGC(t *testing.T) {
conn := tc.ServerConn(0)
runner := sqlutils.MakeSQLRunner(conn)
runner.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
// speeds up the test
runner.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
runner.Exec(t, `SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms'`)

runner.Exec(t, `CREATE DATABASE test;`)
runner.Exec(t, `CREATE TABLE test.foo (k INT PRIMARY KEY, v BYTES)`)
Expand Down Expand Up @@ -10563,7 +10558,6 @@ func TestBackupDBWithViewOnAdjacentDBRange(t *testing.T) {

// Speeds up the test.
sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)

// Create two databases da and db, and tables in such a way that the view
// db.dbview is on the same range as db.t2. Set da to have a short GC TTL of 1
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/create_scheduled_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,15 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
Knobs: base.TestingKnobs{
JobsTestingKnobs: knobs,
},
// Make test faster.
FastRangefeeds: true,
}
jobs.PollJobsMetricsInterval.Override(context.Background(), &args.Settings.SV, 250*time.Millisecond)
s, db, _ := serverutils.StartServer(t, args)
require.NotNil(t, th.cfg)
th.sqlDB = sqlutils.MakeSQLRunner(db)
th.server = s
th.sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.merge_file_buffer_size = '1MiB'`)
th.sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) // speeds up test

return th, func() {
dirCleanupFn()
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func backupRestoreTestSetup(
base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TODOTestTenantDisabled,
// Make tests faster.
FastRangefeeds: true,
}})
}

Expand Down
19 changes: 3 additions & 16 deletions pkg/ccl/jobsccl/jobsprotectedtsccl/jobs_protected_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,15 @@ func TestJobsProtectedTimestamp(t *testing.T) {
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
// Make test faster.
FastRangefeeds: true,
})
defer s0.Stopper().Stop(ctx)

// Now I want to create some artifacts that should get reconciled away and
// then make sure that they do and others which should not do not.
hostRunner := sqlutils.MakeSQLRunner(db)

hostRunner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
hostRunner.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';")
// Also set what tenants see for these settings.
// TODO(radu): use ALTER TENANT statement when that is available.
hostRunner.Exec(t, `INSERT INTO system.tenant_settings (tenant_id, name, value, value_type)
SELECT 0, name, value, "valueType" FROM system.settings
WHERE name IN ('kv.closed_timestamp.target_duration', 'kv.protectedts.reconciliation.interval')`)

t.Run("secondary-tenant", func(t *testing.T) {
ten10, conn10 := serverutils.StartTenant(t, s0, base.TestTenantArgs{TenantID: roachpb.MustMakeTenantID(10)})
defer conn10.Close()
Expand Down Expand Up @@ -287,21 +281,14 @@ func TestSchedulesProtectedTimestamp(t *testing.T) {
ctx := context.Background()
s0, db, _ := serverutils.StartServer(t, base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
FastRangefeeds: true,
})
defer s0.Stopper().Stop(ctx)

// Now I want to create some artifacts that should get reconciled away and
// then make sure that they do and others which should not do not.
hostRunner := sqlutils.MakeSQLRunner(db)

hostRunner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
hostRunner.Exec(t, "SET CLUSTER SETTING kv.protectedts.reconciliation.interval = '1ms';")
// Also set what tenants see for these settings.
// TODO(radu): use ALTER TENANT statement when that is available.
hostRunner.Exec(t, `INSERT INTO system.tenant_settings (tenant_id, name, value, value_type)
SELECT 0, name, value, "valueType" FROM system.settings
WHERE name IN ('kv.closed_timestamp.target_duration', 'kv.protectedts.reconciliation.interval')`)

t.Run("secondary-tenant", func(t *testing.T) {
ten10, conn10 := serverutils.StartTenant(t, s0, base.TestTenantArgs{TenantID: roachpb.MustMakeTenantID(10)})
defer conn10.Close()
Expand Down
13 changes: 6 additions & 7 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,9 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
},
},
},
// Speed up closing of timestamps, in order to sleep less
// below before we can use follower_read_timestamp().
FastRangefeeds: true,
},
},
})
Expand All @@ -747,11 +750,6 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
n1.Exec(t, `CREATE DATABASE t`)
n1.Exec(t, `CREATE TABLE test (k INT PRIMARY KEY)`)
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VOTERS VALUES (ARRAY[1,2], 1)`)
// Speed up closing of timestamps, in order to sleep less below before we can
// use follower_read_timestamp(). follower_read_timestamp() uses the sum of
// the following settings.
n1.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`)
n1.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`)
n1.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`)

// Sleep so that we can perform follower reads. The read timestamp needs to be
Expand Down Expand Up @@ -878,6 +876,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()
defer log.Scope(t).Close(t)

skip.UnderStressRace(t, "times out")

Expand All @@ -900,6 +899,8 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
localities[i] = locality
serverArgs[i] = base.TestServerArgs{
Locality: localities[i],
// Make test faster.
FastRangefeeds: true,
}
}
tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
Expand Down Expand Up @@ -961,8 +962,6 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
// use follower_read_timestamp(). Note that we need to override the setting
// for the tenant as well, because the builtin is run in the tenant's sql pod.
systemSQL := sqlutils.MakeSQLRunner(tc.Conns[0])
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.1s'`)
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s'`)
systemSQL.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.1s'`)
// We're making assertions on traces collected by the tenant using log lines
// in KV so we must ensure they're not redacted.
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/multiregionccl/cold_start_latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ func TestColdStartLatency(t *testing.T) {
},
}
args.Knobs.Server = serverKnobs
// Make span configs propagate more rapidly.
args.FastRangefeeds = true

perServerArgs[i] = args
}
tc := testcluster.NewTestCluster(t, numNodes, base.TestClusterArgs{
Expand Down Expand Up @@ -145,10 +148,7 @@ func TestColdStartLatency(t *testing.T) {
}
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(1))

// Shorten the closed timestamp target duration so that span configs
// propagate more rapidly.
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '200ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '200ms'`)
// Make the allocator faster.
tdb.Exec(t, "SET CLUSTER SETTING kv.allocator.load_based_rebalancing = off")
tdb.Exec(t, "SET CLUSTER SETTING kv.allocator.min_lease_transfer_interval = '10ms'")
// Lengthen the lead time for the global tables to prevent overload from
Expand Down
18 changes: 10 additions & 8 deletions pkg/ccl/multiregionccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ func TestMultiRegionDataDriven(t *testing.T) {
},
},
},
// Speed up closing of timestamps, in order to sleep less below before
// we can use follower_read_timestamp().
FastRangefeeds: true,
}
}
numServers := len(localityNames)
Expand All @@ -189,15 +192,14 @@ func TestMultiRegionDataDriven(t *testing.T) {
if err != nil {
return err.Error()
}
// Speed up closing of timestamps, in order to sleep less below before
// we can use follower_read_timestamp(). follower_read_timestamp() uses
// sum of the following settings. Also, disable all kvserver lease
// transfers other than those required to satisfy a lease preference.
// This prevents the lease shifting around too quickly, which leads to
// concurrent replication changes being proposed by prior leaseholders.
// The first setting accelerates follower_read_timestamp().
//
// Also disable all kvserver lease transfers other than those
// required to satisfy a lease preference. This prevents the
// lease shifting around too quickly, which leads to
// concurrent replication changes being proposed by prior
// leaseholders.
for _, stmt := range strings.Split(`
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '0.4s';
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '0.1s';
SET CLUSTER SETTING kv.closed_timestamp.propagation_slack = '0.5s';
SET CLUSTER SETTING kv.allocator.load_based_rebalancing = 'off';
SET CLUSTER SETTING kv.allocator.load_based_lease_rebalancing.enabled = false;
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/serverccl/statusccl/tenant_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ func TestTenantStatusAPI(t *testing.T) {
// Speed up propagation of tenant capability changes.
db := testHelper.HostCluster().ServerConn(0)
tdb := sqlutils.MakeSQLRunner(db)
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms'")
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10 ms'")
tdb.Exec(t, "SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '10 ms'")

t.Run("reset_sql_stats", func(t *testing.T) {
skip.UnderDeadlockWithIssue(t, 99559)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/serverccl/tenant_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func NewTestTenantHelper(
ServerArgs: base.TestServerArgs{
Knobs: knobs,
DefaultTestTenant: base.TestControlsTenantsExplicitly,

// Make tests faster.
FastRangefeeds: true,
},
})
server := testCluster.Server(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,13 @@ func TestDataDriven(t *testing.T) {
Knobs: base.TestingKnobs{
SpanConfig: scKnobs,
},
// Make tests faster overall.
FastRangefeeds: true,
},
})
defer tc.Stopper().Stop(ctx)
{
// Lower the closed ts target duration even further than what FastRangefeeds does.
sysDB := sqlutils.MakeSQLRunner(tc.SystemLayer(0).SQLConn(t, ""))
sysDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ func TestDataDriven(t *testing.T) {
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speeds up test
SpanConfig: scKnobs,
},
// Make tests faster.
FastRangefeeds: true,
},
})
defer tc.Stopper().Stop(ctx)

{
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
}

spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs)
Expand Down
Loading