diff --git a/pkg/ccl/streamingccl/replicationtestutils/testutils.go b/pkg/ccl/streamingccl/replicationtestutils/testutils.go index f86326137e0d..b1633265a1c5 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/testutils.go +++ b/pkg/ccl/streamingccl/replicationtestutils/testutils.go @@ -166,11 +166,11 @@ func (c *TenantStreamingClusters) init(ctx context.Context) { // will not yet be active. If the caller passes withTestingKnobs, the // destination tenant starts up via a testServer.StartSharedProcessTenant(). func (c *TenantStreamingClusters) StartDestTenant( - ctx context.Context, withTestingKnobs *base.TestingKnobs, + ctx context.Context, withTestingKnobs *base.TestingKnobs, server int, ) func() error { if withTestingKnobs != nil { var err error - _, c.DestTenantConn, err = c.DestCluster.Server(0).StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{ + _, c.DestTenantConn, err = c.DestCluster.Server(server).StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{ TenantID: c.Args.DestTenantID, TenantName: c.Args.DestTenantName, Knobs: *withTestingKnobs, @@ -179,7 +179,7 @@ func (c *TenantStreamingClusters) StartDestTenant( require.NoError(c.T, err) } else { c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName) - c.DestTenantConn = c.DestCluster.Server(0).SystemLayer().SQLConn(c.T, serverutils.DBName("cluster:"+string(c.Args.DestTenantName)+"/defaultdb")) + c.DestTenantConn = c.DestCluster.Server(server).SystemLayer().SQLConn(c.T, serverutils.DBName("cluster:"+string(c.Args.DestTenantName)+"/defaultdb")) } c.DestTenantSQL = sqlutils.MakeSQLRunner(c.DestTenantConn) @@ -189,7 +189,7 @@ func (c *TenantStreamingClusters) StartDestTenant( // TODO (msbutler): consider granting the new tenant some capabilities. c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.DestTenantName) c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 GRANT CAPABILITY can_use_nodelocal_storage`, c.Args.DestTenantName) - require.NoError(c.T, c.DestCluster.Server(0).WaitForTenantCapabilities(ctx, c.Args.DestTenantID, map[tenantcapabilities.ID]string{ + require.NoError(c.T, c.DestCluster.Server(server).WaitForTenantCapabilities(ctx, c.Args.DestTenantID, map[tenantcapabilities.ID]string{ tenantcapabilities.CanUseNodelocalStorage: "true", }, "")) return func() error { diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go index c55a7c41e5ff..92ff7541f777 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go @@ -114,7 +114,7 @@ func TestAlterTenantPauseResume(t *testing.T) { cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) require.Equal(t, cutoverTime, cutoverOutput.GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - cleanupTenant := c.StartDestTenant(ctx, nil) + cleanupTenant := c.StartDestTenant(ctx, nil, 0) defer func() { require.NoError(t, cleanupTenant()) }() diff --git a/pkg/ccl/streamingccl/streamingest/datadriven_test.go b/pkg/ccl/streamingccl/streamingest/datadriven_test.go index 061f2f63c419..a0d85b723f0d 100644 --- a/pkg/ccl/streamingccl/streamingest/datadriven_test.go +++ b/pkg/ccl/streamingccl/streamingest/datadriven_test.go @@ -155,7 +155,7 @@ func TestDataDriven(t *testing.T) { jobspb.JobID(ds.ingestionJobID)) case "start-replicated-tenant": testingKnobs := replicationtestutils.DefaultAppTenantTestingKnobs() - cleanupTenant := ds.replicationClusters.StartDestTenant(ctx, &testingKnobs) + cleanupTenant := ds.replicationClusters.StartDestTenant(ctx, &testingKnobs, 0) ds.cleanupFns = append(ds.cleanupFns, cleanupTenant) case "let": if len(d.CmdArgs) == 0 { diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index e0593d333812..6b875c10bc09 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -485,17 +485,16 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { skip.UnderDeadlock(t, "multi-node may time out under deadlock") skip.UnderRace(t, "takes too long with multiple nodes") + skip.UnderStress(t, "multi node test times out under stress") ctx := context.Background() args := replicationtestutils.DefaultTenantStreamingClustersArgs + args.MultitenantSingleClusterNumNodes = 4 - args.SrcNumNodes = 4 - args.DestNumNodes = 3 - - c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args) + c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args) defer cleanup() - replicationtestutils.CreateScatteredTable(t, c, 3) + replicationtestutils.CreateScatteredTable(t, c, 4) srcScatteredData := c.SrcTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key") producerJobID, ingestionJobID := c.StartStreamReplication(ctx) @@ -526,6 +525,9 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { c.SrcTenantServer.AppStopper().Stop(ctx) c.SrcCluster.StopServer(0) + // Switch the SQL connection to a new node, as node 0 has shutdown-- recall that + // the source and destination tenant are on the same cluster. + c.DestSysSQL = sqlutils.MakeSQLRunner(c.DestCluster.Conns[1]) c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) @@ -537,7 +539,7 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { require.Equal(c.T, cutoverTime, cutoverOutput.GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - cleanUpTenant := c.StartDestTenant(ctx, nil) + cleanUpTenant := c.StartDestTenant(ctx, nil, 1) defer func() { require.NoError(t, cleanUpTenant()) }() @@ -655,12 +657,11 @@ func TestTenantStreamingMultipleNodes(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderDeadlock(t, "multi-node may time out under deadlock") - skip.UnderRace(t, "takes too long with multiple nodes") + skip.UnderRace(t, "multi-node test may time out under race") ctx := context.Background() args := replicationtestutils.DefaultTenantStreamingClustersArgs - args.SrcNumNodes = 4 - args.DestNumNodes = 3 + args.MultitenantSingleClusterNumNodes = 3 // Track the number of unique addresses that were connected to clientAddresses := make(map[string]struct{}) @@ -673,7 +674,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) { }, } - c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args) + c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args) defer cleanup() // Make sure we have data on all nodes, so that we will have multiple