diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index 669ae9053a15..1a7e4bacf0ed 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -11,6 +11,8 @@ package streamingest import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/security/username" + "net/url" "sync/atomic" "testing" "time" @@ -488,14 +490,12 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { ctx := context.Background() args := replicationtestutils.DefaultTenantStreamingClustersArgs + args.MultitenantSingleClusterNumNodes = 4 - args.SrcNumNodes = 4 - args.DestNumNodes = 3 - - c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args) + c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args) defer cleanup() - replicationtestutils.CreateScatteredTable(t, c, 3) + replicationtestutils.CreateScatteredTable(t, c, 4) srcScatteredData := c.SrcTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key") producerJobID, ingestionJobID := c.StartStreamReplication(ctx) @@ -521,15 +521,17 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { // Stop a server on the source cluster. Note that in this test we are trying // to avoid using the source cluster after this point because if we do the // test flakes, see #107499 for more info. - destroyedAddress := c.SrcURL.String() + destroyedURL, cleanupSinkCert := sqlutils.PGUrl(t, c.SrcCluster.Server(1).SystemLayer().AdvSQLAddr(), t.Name(), url.User(username.RootUser)) + defer cleanupSinkCert() + destroyedAddress := destroyedURL.String() require.NoError(t, c.SrcTenantConn.Close()) c.SrcTenantServer.AppStopper().Stop(ctx) - c.SrcCluster.StopServer(0) + c.SrcCluster.StopServer(1) c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - cutoverTime := c.SrcCluster.Server(1).Clock().Now().GoTime() + cutoverTime := c.SrcCluster.Server(0).Clock().Now().GoTime() var cutoverStr string c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr) @@ -655,12 +657,10 @@ func TestTenantStreamingMultipleNodes(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderDeadlock(t, "multi-node may time out under deadlock") - skip.UnderRace(t, "takes too long with multiple nodes") ctx := context.Background() args := replicationtestutils.DefaultTenantStreamingClustersArgs - args.SrcNumNodes = 4 - args.DestNumNodes = 3 + args.MultitenantSingleClusterNumNodes = 3 // Track the number of unique addresses that were connected to clientAddresses := make(map[string]struct{}) @@ -673,7 +673,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) { }, } - c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args) + c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args) defer cleanup() // Make sure we have data on all nodes, so that we will have multiple