From 999412243cddb95cc0a9571dd98c87da61edfd01 Mon Sep 17 00:00:00 2001 From: Shiranka Miskin Date: Mon, 29 Aug 2022 14:45:30 +0000 Subject: [PATCH] streamingccl: reduce server count in multinode tests While these tests would pass under stress locally they would fail CI stress, which may be because we were starting more server processes than ever before with 4 source nodes, 4 source tenant pods, and 4 destination nodes. This PR reduces the node count to 3 (any lower and scatter doesn't correctly distribute ranges) and only starts a single tenant pod for the source cluster. Release justification: test-only change Release note: None --- .../stream_replication_e2e_test.go | 39 ++++++------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index d48c16bd03dc..eef01b619f89 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -214,8 +214,6 @@ func createTenantStreamingClusters( // Start the source cluster. srcCluster, srcURL, srcCleanup := startTestCluster(ctx, t, serverArgs, args.srcNumNodes) - // Start the src cluster tenant with tenant pods on every node in the cluster, - // ensuring they're all active beofre proceeding. tenantArgs := base.TestTenantArgs{ TenantID: args.srcTenantID, TestingKnobs: base.TestingKnobs{ @@ -223,17 +221,8 @@ func createTenantStreamingClusters( AllowSplitAndScatter: true, }}, } - tenantConns := make([]*gosql.DB, 0) srcTenantServer, srcTenantConn := serverutils.StartTenant(t, srcCluster.Server(0), tenantArgs) - tenantConns = append(tenantConns, srcTenantConn) - for i := 1; i < args.srcNumNodes; i++ { - tenantPodArgs := tenantArgs - tenantPodArgs.DisableCreateTenant = true - tenantPodArgs.SkipTenantCheck = true - _, srcTenantPodConn := serverutils.StartTenant(t, srcCluster.Server(i), tenantPodArgs) - tenantConns = append(tenantConns, srcTenantPodConn) - } - waitForTenantPodsActive(t, srcTenantServer, args.srcNumNodes) + waitForTenantPodsActive(t, srcTenantServer, 1) // Start the destination cluster. destCluster, _, destCleanup := startTestCluster(ctx, t, serverArgs, args.destNumNodes) @@ -265,11 +254,7 @@ func createTenantStreamingClusters( // Enable stream replication on dest by default. tsc.destSysSQL.Exec(t, `SET enable_experimental_stream_replication = true;`) return tsc, func() { - for _, tenantConn := range tenantConns { - if tenantConn != nil { - require.NoError(t, tenantConn.Close()) - } - } + require.NoError(t, srcTenantConn.Close()) destCleanup() srcCleanup() } @@ -279,7 +264,7 @@ func (c *tenantStreamingClusters) srcExec(exec srcInitExecFunc) { exec(c.t, c.srcSysSQL, c.srcTenantSQL) } -func createScatteredTable(t *testing.T, c *tenantStreamingClusters) { +func createScatteredTable(t *testing.T, c *tenantStreamingClusters, numNodes int) { // Create a source table with multiple ranges spread across multiple nodes numRanges := 50 rowsPerRange := 20 @@ -289,7 +274,7 @@ func createScatteredTable(t *testing.T, c *tenantStreamingClusters) { ALTER TABLE d.scattered SPLIT AT (SELECT * FROM generate_series(%d, %d, %d)); ALTER TABLE d.scattered SCATTER; `, numRanges*rowsPerRange, rowsPerRange, (numRanges-1)*rowsPerRange, rowsPerRange)) - c.srcSysSQL.CheckQueryResultsRetry(t, "SELECT count(distinct lease_holder) from crdb_internal.ranges", [][]string{{"4"}}) + c.srcSysSQL.CheckQueryResultsRetry(t, "SELECT count(distinct lease_holder) from crdb_internal.ranges", [][]string{{fmt.Sprint(numNodes)}}) } var defaultSrcClusterSetting = map[string]string{ @@ -743,16 +728,17 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderRace(t, "takes too long with multiple nodes") - skip.WithIssue(t, 86287) ctx := context.Background() args := defaultTenantStreamingClustersArgs - args.srcNumNodes = 4 - args.destNumNodes = 4 + + args.srcNumNodes = 3 + args.destNumNodes = 3 + c, cleanup := createTenantStreamingClusters(ctx, t, args) defer cleanup() - createScatteredTable(t, c) + createScatteredTable(t, c, 3) srcScatteredData := c.srcTenantSQL.QueryStr(c.t, "SELECT * FROM d.scattered ORDER BY key") producerJobID, ingestionJobID := c.startStreamReplication() @@ -931,12 +917,11 @@ func TestTenantStreamingMultipleNodes(t *testing.T) { defer log.Scope(t).Close(t) skip.UnderRace(t, "takes too long with multiple nodes") - skip.WithIssue(t, 86206) ctx := context.Background() args := defaultTenantStreamingClustersArgs - args.srcNumNodes = 4 - args.destNumNodes = 4 + args.srcNumNodes = 3 + args.destNumNodes = 3 // Track the number of unique addresses that were connected to clientAddresses := make(map[string]struct{}) @@ -952,7 +937,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) { c, cleanup := createTenantStreamingClusters(ctx, t, args) defer cleanup() - createScatteredTable(t, c) + createScatteredTable(t, c, 3) producerJobID, ingestionJobID := c.startStreamReplication() jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))