Skip to content

Commit

Permalink
c2c: deflake a few e2e tests
Browse files Browse the repository at this point in the history
This patch changes the TestTenantStreamingMultipleNodes and
TestTenantStreamingUnavailable tests to run on a single single host cluster,
instead of two, which should reduce cpu contention and flakes.

Informs #112748
Fixes #112783
Fixes #109185

Release note: none
  • Loading branch information
msbutler committed Oct 22, 2023
1 parent ec82020 commit 4e8d00c
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 16 deletions.
8 changes: 4 additions & 4 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ func (c *TenantStreamingClusters) init(ctx context.Context) {
// will not yet be active. If the caller passes withTestingKnobs, the
// destination tenant starts up via a testServer.StartSharedProcessTenant().
func (c *TenantStreamingClusters) StartDestTenant(
ctx context.Context, withTestingKnobs *base.TestingKnobs,
ctx context.Context, withTestingKnobs *base.TestingKnobs, server int,
) func() error {
if withTestingKnobs != nil {
var err error
_, c.DestTenantConn, err = c.DestCluster.Server(0).StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{
_, c.DestTenantConn, err = c.DestCluster.Server(server).StartSharedProcessTenant(ctx, base.TestSharedProcessTenantArgs{
TenantID: c.Args.DestTenantID,
TenantName: c.Args.DestTenantName,
Knobs: *withTestingKnobs,
Expand All @@ -179,7 +179,7 @@ func (c *TenantStreamingClusters) StartDestTenant(
require.NoError(c.T, err)
} else {
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 START SERVICE SHARED`, c.Args.DestTenantName)
c.DestTenantConn = c.DestCluster.Server(0).SystemLayer().SQLConn(c.T, serverutils.DBName("cluster:"+string(c.Args.DestTenantName)+"/defaultdb"))
c.DestTenantConn = c.DestCluster.Server(server).SystemLayer().SQLConn(c.T, serverutils.DBName("cluster:"+string(c.Args.DestTenantName)+"/defaultdb"))
}

c.DestTenantSQL = sqlutils.MakeSQLRunner(c.DestTenantConn)
Expand All @@ -189,7 +189,7 @@ func (c *TenantStreamingClusters) StartDestTenant(
// TODO (msbutler): consider granting the new tenant some capabilities.
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 SET CLUSTER SETTING sql.virtual_cluster.feature_access.zone_configs.enabled=true`, c.Args.DestTenantName)
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 GRANT CAPABILITY can_use_nodelocal_storage`, c.Args.DestTenantName)
require.NoError(c.T, c.DestCluster.Server(0).WaitForTenantCapabilities(ctx, c.Args.DestTenantID, map[tenantcapabilities.ID]string{
require.NoError(c.T, c.DestCluster.Server(server).WaitForTenantCapabilities(ctx, c.Args.DestTenantID, map[tenantcapabilities.ID]string{
tenantcapabilities.CanUseNodelocalStorage: "true",
}, ""))
return func() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestAlterTenantPauseResume(t *testing.T) {
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(t, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
cleanupTenant := c.StartDestTenant(ctx, nil)
cleanupTenant := c.StartDestTenant(ctx, nil, 0)
defer func() {
require.NoError(t, cleanupTenant())
}()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestDataDriven(t *testing.T) {
jobspb.JobID(ds.ingestionJobID))
case "start-replicated-tenant":
testingKnobs := replicationtestutils.DefaultAppTenantTestingKnobs()
cleanupTenant := ds.replicationClusters.StartDestTenant(ctx, &testingKnobs)
cleanupTenant := ds.replicationClusters.StartDestTenant(ctx, &testingKnobs, 0)
ds.cleanupFns = append(ds.cleanupFns, cleanupTenant)
case "let":
if len(d.CmdArgs) == 0 {
Expand Down
21 changes: 11 additions & 10 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,17 +485,16 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {

skip.UnderDeadlock(t, "multi-node may time out under deadlock")
skip.UnderRace(t, "takes too long with multiple nodes")
skip.UnderStress(t, "multi node test times out under stress")

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.MultitenantSingleClusterNumNodes = 4

args.SrcNumNodes = 4
args.DestNumNodes = 3

c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
defer cleanup()

replicationtestutils.CreateScatteredTable(t, c, 3)
replicationtestutils.CreateScatteredTable(t, c, 4)
srcScatteredData := c.SrcTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key")

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
Expand Down Expand Up @@ -526,6 +525,9 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
c.SrcTenantServer.AppStopper().Stop(ctx)
c.SrcCluster.StopServer(0)

// Switch the SQL connection to a new node, as node 0 has shutdown-- recall that
// the source and destination tenant are on the same cluster.
c.DestSysSQL = sqlutils.MakeSQLRunner(c.DestCluster.Conns[1])
c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID)
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

Expand All @@ -537,7 +539,7 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
require.Equal(c.T, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

cleanUpTenant := c.StartDestTenant(ctx, nil)
cleanUpTenant := c.StartDestTenant(ctx, nil, 1)
defer func() {
require.NoError(t, cleanUpTenant())
}()
Expand Down Expand Up @@ -655,12 +657,11 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
defer log.Scope(t).Close(t)

skip.UnderDeadlock(t, "multi-node may time out under deadlock")
skip.UnderRace(t, "takes too long with multiple nodes")
skip.UnderRace(t, "multi-node test may time out under race")

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.SrcNumNodes = 4
args.DestNumNodes = 3
args.MultitenantSingleClusterNumNodes = 3

// Track the number of unique addresses that were connected to
clientAddresses := make(map[string]struct{})
Expand All @@ -673,7 +674,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
},
}

c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
defer cleanup()

// Make sure we have data on all nodes, so that we will have multiple
Expand Down

0 comments on commit 4e8d00c

Please sign in to comment.