diff --git a/pkg/ccl/backupccl/tenant_backup_nemesis_test.go b/pkg/ccl/backupccl/tenant_backup_nemesis_test.go index 49f21856cf55..178728c4e443 100644 --- a/pkg/ccl/backupccl/tenant_backup_nemesis_test.go +++ b/pkg/ccl/backupccl/tenant_backup_nemesis_test.go @@ -143,7 +143,7 @@ func TestTenantBackupNemesis(t *testing.T) { ) defer hostClusterCleanupFn() - tenant10, err := tc.Servers[0].StartTenant(ctx, base.TestTenantArgs{ + tenant10, err := tc.Servers[0].TenantController().StartTenant(ctx, base.TestTenantArgs{ TenantID: roachpb.MustMakeTenantID(10), TestingKnobs: base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), @@ -244,7 +244,7 @@ func TestTenantBackupNemesis(t *testing.T) { // // We check bank.bank which has had the workload running against it // and any table from a completed nemesis. - tenant11, err := tc.Servers[0].StartTenant(ctx, base.TestTenantArgs{ + tenant11, err := tc.Servers[0].TenantController().StartTenant(ctx, base.TestTenantArgs{ TenantName: "cluster-11", DisableCreateTenant: true, }) diff --git a/pkg/ccl/streamingccl/replicationtestutils/testutils.go b/pkg/ccl/streamingccl/replicationtestutils/testutils.go index 1f228a5e7f47..506e653dda7d 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/testutils.go +++ b/pkg/ccl/streamingccl/replicationtestutils/testutils.go @@ -104,7 +104,7 @@ type TenantStreamingClusters struct { Args TenantStreamingClustersArgs SrcCluster *testcluster.TestCluster SrcTenantConn *gosql.DB - SrcSysServer serverutils.TestServerInterface + SrcSysServer serverutils.ApplicationLayerInterface SrcSysSQL *sqlutils.SQLRunner SrcTenantSQL *sqlutils.SQLRunner SrcTenantServer serverutils.ApplicationLayerInterface @@ -112,7 +112,7 @@ type TenantStreamingClusters struct { SrcCleanup func() DestCluster *testcluster.TestCluster - DestSysServer serverutils.TestServerInterface + DestSysServer serverutils.ApplicationLayerInterface DestSysSQL *sqlutils.SQLRunner DestTenantConn *gosql.DB DestTenantSQL *sqlutils.SQLRunner @@ -330,9 +330,9 @@ func startC2CTestCluster( } c := testcluster.StartTestCluster(t, numNodes, params) - c.Server(0).Clock().Now() + // TODO(casper): support adding splits when we have multiple nodes. - pgURL, cleanupSinkCert := sqlutils.PGUrl(t, c.Server(0).AdvSQLAddr(), t.Name(), url.User(username.RootUser)) + pgURL, cleanupSinkCert := sqlutils.PGUrl(t, c.Server(0).SystemLayer().AdvSQLAddr(), t.Name(), url.User(username.RootUser)) return c, pgURL, func() { c.Stopper().Stop(ctx) cleanupSinkCert() @@ -355,12 +355,12 @@ func CreateMultiTenantStreamingCluster( Args: args, SrcCluster: cluster, SrcSysSQL: sqlutils.MakeSQLRunner(cluster.ServerConn(0)), - SrcSysServer: cluster.Server(0), + SrcSysServer: cluster.Server(0).SystemLayer(), SrcURL: url, SrcCleanup: cleanup, DestCluster: cluster, DestSysSQL: sqlutils.MakeSQLRunner(cluster.ServerConn(destNodeIdx)), - DestSysServer: cluster.Server(destNodeIdx), + DestSysServer: cluster.Server(destNodeIdx).SystemLayer(), Rng: rng, } tsc.setupSrcTenant() @@ -403,12 +403,12 @@ func CreateTenantStreamingClusters( Args: args, SrcCluster: srcCluster, SrcSysSQL: sqlutils.MakeSQLRunner(srcCluster.ServerConn(0)), - SrcSysServer: srcCluster.Server(0), + SrcSysServer: srcCluster.Server(0).SystemLayer(), SrcURL: srcURL, SrcCleanup: srcCleanup, DestCluster: destCluster, DestSysSQL: sqlutils.MakeSQLRunner(destCluster.ServerConn(0)), - DestSysServer: destCluster.Server(0), + DestSysServer: destCluster.Server(0).SystemLayer(), Rng: rng, } tsc.setupSrcTenant() diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index b9257d56661f..3afefdc43c73 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -381,7 +381,7 @@ func TestTenantStreamingCancelIngestion(t *testing.T) { jobutils.WaitForJobToFail(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID)) // Check if the producer job has released protected timestamp. - requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer.ApplicationLayer(), jobspb.JobID(producerJobID)) + requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer, jobspb.JobID(producerJobID)) // Check if dest tenant key ranges are not cleaned up. destTenantSpan := keys.MakeTenantSpan(args.DestTenantID) @@ -422,6 +422,8 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRace(t, "slow test") // takes >1mn under race + ctx := context.Background() args := replicationtestutils.DefaultTenantStreamingClustersArgs @@ -449,7 +451,7 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) { jobutils.WaitForJobToFail(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID)) // Check if the producer job has released protected timestamp. - requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer.ApplicationLayer(), jobspb.JobID(producerJobID)) + requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer, jobspb.JobID(producerJobID)) // Wait for the GC job to finish c.DestSysSQL.Exec(t, "SHOW JOBS WHEN COMPLETE SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC'") @@ -788,6 +790,8 @@ func TestProtectedTimestampManagement(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRace(t, "slow test") // takes >1mn under race. + ctx := context.Background() args := replicationtestutils.DefaultTenantStreamingClustersArgs // Override the replication job details ReplicationTTLSeconds to a small value @@ -802,7 +806,7 @@ func TestProtectedTimestampManagement(t *testing.T) { // greater or equal to the frontier we know we have replicated up until. waitForProducerProtection := func(c *replicationtestutils.TenantStreamingClusters, frontier hlc.Timestamp, producerJobID int) { testutils.SucceedsSoon(t, func() error { - srv := c.SrcSysServer.ApplicationLayer() + srv := c.SrcSysServer job, err := srv.JobRegistry().(*jobs.Registry).LoadJob(ctx, jobspb.JobID(producerJobID)) if err != nil { return err @@ -830,7 +834,7 @@ func TestProtectedTimestampManagement(t *testing.T) { // protecting the destination tenant. checkNoDestinationProtection := func(c *replicationtestutils.TenantStreamingClusters, replicationJobID int) { execCfg := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig) - require.NoError(t, c.DestCluster.Server(0).InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + require.NoError(t, c.DestSysServer.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error { j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn) require.NoError(t, err) payload := j.Payload() @@ -844,7 +848,7 @@ func TestProtectedTimestampManagement(t *testing.T) { checkDestinationProtection := func(c *replicationtestutils.TenantStreamingClusters, frontier hlc.Timestamp, replicationJobID int) { execCfg := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig) ptp := execCfg.ProtectedTimestampProvider - require.NoError(t, c.DestCluster.Server(0).InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + require.NoError(t, c.DestSysServer.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error { j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn) if err != nil { return err @@ -886,7 +890,7 @@ func TestProtectedTimestampManagement(t *testing.T) { // startup, and the first progress update (t2) is greater than 1s. This is // important because if `frontier@t2 - ReplicationTTLSeconds < t1` then we // will not update the PTS record. - now := c.SrcCluster.Server(0).Clock().Now().Add(int64(time.Second)*2, 0) + now := c.SrcCluster.Server(0).SystemLayer().Clock().Now().Add(int64(time.Second)*2, 0) c.WaitUntilReplicatedTime(now, jobspb.JobID(replicationJobID)) // Check that the producer and replication job have written a protected @@ -925,7 +929,7 @@ func TestProtectedTimestampManagement(t *testing.T) { } // Check if the producer job has released protected timestamp. - requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer.ApplicationLayer(), jobspb.JobID(producerJobID)) + requireReleasedProducerPTSRecord(t, ctx, c.SrcSysServer, jobspb.JobID(producerJobID)) // Check if the replication job has released protected timestamp. checkNoDestinationProtection(c, replicationJobID) @@ -936,7 +940,7 @@ func TestProtectedTimestampManagement(t *testing.T) { // Check if dest tenant key range is cleaned up. destTenantSpan := keys.MakeTenantSpan(args.DestTenantID) - rows, err := c.DestCluster.Server(0).DB(). + rows, err := c.DestSysServer.DB(). Scan(ctx, destTenantSpan.Key, destTenantSpan.EndKey, 10) require.NoError(t, err) require.Empty(t, rows) diff --git a/pkg/cmd/github-pull-request-make/main.go b/pkg/cmd/github-pull-request-make/main.go index 6f03df428816..f113b938fa29 100644 --- a/pkg/cmd/github-pull-request-make/main.go +++ b/pkg/cmd/github-pull-request-make/main.go @@ -276,7 +276,13 @@ func main() { for name, pkg := range pkgs { // 20 minutes total seems OK, but at least 2 minutes per test. // This should be reduced. See #46941. - duration := (20 * time.Minute) / time.Duration(len(pkgs)) + target, ok := os.LookupEnv(targetEnv) + var duration time.Duration + if ok && target == "stressrace" { + duration = (30 * time.Minute) / time.Duration(len(pkgs)) + } else { + duration = (20 * time.Minute) / time.Duration(len(pkgs)) + } minDuration := (2 * time.Minute) * time.Duration(len(pkg.tests)) if duration < minDuration { duration = minDuration diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index d30883fa3f4e..0525551cf47e 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1555,7 +1555,7 @@ func (t *logicTest) newCluster( opt.apply(&tenantArgs.TestingKnobs) } - tenant, err := t.cluster.Server(i).StartTenant(context.Background(), tenantArgs) + tenant, err := t.cluster.Server(i).TenantController().StartTenant(context.Background(), tenantArgs) if err != nil { t.rootT.Fatalf("%+v", err) } diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index acb4119ca460..0d0caa1bb227 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -317,7 +317,7 @@ func StartTenant( func StartSharedProcessTenant( t TestFataler, ts TestServerInterface, params base.TestSharedProcessTenantArgs, ) (ApplicationLayerInterface, *gosql.DB) { - tenant, goDB, err := ts.StartSharedProcessTenant(context.Background(), params) + tenant, goDB, err := ts.TenantController().StartSharedProcessTenant(context.Background(), params) if err != nil { t.Fatalf("%+v", err) } diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index cc57c60a63b0..6fa766d11d3a 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -1612,7 +1612,7 @@ func (tc *TestCluster) ReplicationMode() base.TestClusterReplicationMode { // ToggleReplicateQueues implements TestClusterInterface. func (tc *TestCluster) ToggleReplicateQueues(active bool) { for _, s := range tc.Servers { - _ = s.GetStores().(*kvserver.Stores).VisitStores(func(store *kvserver.Store) error { + _ = s.StorageLayer().GetStores().(*kvserver.Stores).VisitStores(func(store *kvserver.Store) error { store.SetReplicateQueueActive(active) return nil })