Skip to content

Commit

Permalink
c2c: deflake a few e2e tests
Browse files Browse the repository at this point in the history
This patch changes the TestTenantStreamingMultipleNodes and
TestTenantStreamingUnavailable tests to run on a single single host cluster,
instead of two, which should reduce cpu contention and flakes.

Informs cockroachdb#112748
Fixes cockroachdb#112783
Fixes cockroachdb#109185

Release note: none
  • Loading branch information
msbutler committed Oct 22, 2023
1 parent ec82020 commit 2e5fe5c
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ package streamingest
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/security/username"
"net/url"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -488,14 +490,12 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.MultitenantSingleClusterNumNodes = 4

args.SrcNumNodes = 4
args.DestNumNodes = 3

c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
defer cleanup()

replicationtestutils.CreateScatteredTable(t, c, 3)
replicationtestutils.CreateScatteredTable(t, c, 4)
srcScatteredData := c.SrcTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key")

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
Expand All @@ -521,15 +521,17 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
// Stop a server on the source cluster. Note that in this test we are trying
// to avoid using the source cluster after this point because if we do the
// test flakes, see #107499 for more info.
destroyedAddress := c.SrcURL.String()
destroyedURL, cleanupSinkCert := sqlutils.PGUrl(t, c.SrcCluster.Server(1).SystemLayer().AdvSQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanupSinkCert()
destroyedAddress := destroyedURL.String()
require.NoError(t, c.SrcTenantConn.Close())
c.SrcTenantServer.AppStopper().Stop(ctx)
c.SrcCluster.StopServer(0)
c.SrcCluster.StopServer(1)

c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID)
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

cutoverTime := c.SrcCluster.Server(1).Clock().Now().GoTime()
cutoverTime := c.SrcCluster.Server(0).Clock().Now().GoTime()
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
Expand Down Expand Up @@ -655,12 +657,10 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
defer log.Scope(t).Close(t)

skip.UnderDeadlock(t, "multi-node may time out under deadlock")
skip.UnderRace(t, "takes too long with multiple nodes")

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.SrcNumNodes = 4
args.DestNumNodes = 3
args.MultitenantSingleClusterNumNodes = 3

// Track the number of unique addresses that were connected to
clientAddresses := make(map[string]struct{})
Expand All @@ -673,7 +673,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
},
}

c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
defer cleanup()

// Make sure we have data on all nodes, so that we will have multiple
Expand Down

0 comments on commit 2e5fe5c

Please sign in to comment.