Skip to content

Commit

Permalink
streamingccl: reduce server count in multinode tests
Browse files Browse the repository at this point in the history
While these tests would pass under stress locally they would fail CI
stress, which may be because we were starting more server processes than
ever before with 4 source nodes, 4 source tenant pods, and 4 destination
nodes.

This PR reduces the node count to 3 (any lower and scatter doesn't
correctly distribute ranges) and only starts a single tenant pod for the
source cluster.

Release justification: test-only change
Release note: None
  • Loading branch information
samiskin committed Aug 29, 2022
1 parent cb57def commit 9994122
Showing 1 changed file with 12 additions and 27 deletions.
39 changes: 12 additions & 27 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,26 +214,15 @@ func createTenantStreamingClusters(
// Start the source cluster.
srcCluster, srcURL, srcCleanup := startTestCluster(ctx, t, serverArgs, args.srcNumNodes)

// Start the src cluster tenant with tenant pods on every node in the cluster,
// ensuring they're all active beofre proceeding.
tenantArgs := base.TestTenantArgs{
TenantID: args.srcTenantID,
TestingKnobs: base.TestingKnobs{
TenantTestingKnobs: &sql.TenantTestingKnobs{
AllowSplitAndScatter: true,
}},
}
tenantConns := make([]*gosql.DB, 0)
srcTenantServer, srcTenantConn := serverutils.StartTenant(t, srcCluster.Server(0), tenantArgs)
tenantConns = append(tenantConns, srcTenantConn)
for i := 1; i < args.srcNumNodes; i++ {
tenantPodArgs := tenantArgs
tenantPodArgs.DisableCreateTenant = true
tenantPodArgs.SkipTenantCheck = true
_, srcTenantPodConn := serverutils.StartTenant(t, srcCluster.Server(i), tenantPodArgs)
tenantConns = append(tenantConns, srcTenantPodConn)
}
waitForTenantPodsActive(t, srcTenantServer, args.srcNumNodes)
waitForTenantPodsActive(t, srcTenantServer, 1)

// Start the destination cluster.
destCluster, _, destCleanup := startTestCluster(ctx, t, serverArgs, args.destNumNodes)
Expand Down Expand Up @@ -265,11 +254,7 @@ func createTenantStreamingClusters(
// Enable stream replication on dest by default.
tsc.destSysSQL.Exec(t, `SET enable_experimental_stream_replication = true;`)
return tsc, func() {
for _, tenantConn := range tenantConns {
if tenantConn != nil {
require.NoError(t, tenantConn.Close())
}
}
require.NoError(t, srcTenantConn.Close())
destCleanup()
srcCleanup()
}
Expand All @@ -279,7 +264,7 @@ func (c *tenantStreamingClusters) srcExec(exec srcInitExecFunc) {
exec(c.t, c.srcSysSQL, c.srcTenantSQL)
}

func createScatteredTable(t *testing.T, c *tenantStreamingClusters) {
func createScatteredTable(t *testing.T, c *tenantStreamingClusters, numNodes int) {
// Create a source table with multiple ranges spread across multiple nodes
numRanges := 50
rowsPerRange := 20
Expand All @@ -289,7 +274,7 @@ func createScatteredTable(t *testing.T, c *tenantStreamingClusters) {
ALTER TABLE d.scattered SPLIT AT (SELECT * FROM generate_series(%d, %d, %d));
ALTER TABLE d.scattered SCATTER;
`, numRanges*rowsPerRange, rowsPerRange, (numRanges-1)*rowsPerRange, rowsPerRange))
c.srcSysSQL.CheckQueryResultsRetry(t, "SELECT count(distinct lease_holder) from crdb_internal.ranges", [][]string{{"4"}})
c.srcSysSQL.CheckQueryResultsRetry(t, "SELECT count(distinct lease_holder) from crdb_internal.ranges", [][]string{{fmt.Sprint(numNodes)}})
}

var defaultSrcClusterSetting = map[string]string{
Expand Down Expand Up @@ -743,16 +728,17 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
defer log.Scope(t).Close(t)

skip.UnderRace(t, "takes too long with multiple nodes")
skip.WithIssue(t, 86287)

ctx := context.Background()
args := defaultTenantStreamingClustersArgs
args.srcNumNodes = 4
args.destNumNodes = 4

args.srcNumNodes = 3
args.destNumNodes = 3

c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

createScatteredTable(t, c)
createScatteredTable(t, c, 3)
srcScatteredData := c.srcTenantSQL.QueryStr(c.t, "SELECT * FROM d.scattered ORDER BY key")

producerJobID, ingestionJobID := c.startStreamReplication()
Expand Down Expand Up @@ -931,12 +917,11 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
defer log.Scope(t).Close(t)

skip.UnderRace(t, "takes too long with multiple nodes")
skip.WithIssue(t, 86206)

ctx := context.Background()
args := defaultTenantStreamingClustersArgs
args.srcNumNodes = 4
args.destNumNodes = 4
args.srcNumNodes = 3
args.destNumNodes = 3

// Track the number of unique addresses that were connected to
clientAddresses := make(map[string]struct{})
Expand All @@ -952,7 +937,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()

createScatteredTable(t, c)
createScatteredTable(t, c, 3)

producerJobID, ingestionJobID := c.startStreamReplication()
jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
Expand Down

0 comments on commit 9994122

Please sign in to comment.