Skip to content

Commit

Permalink
streamingest: unskip TestTenantStreamingUnavailableStreamAddress
Browse files Browse the repository at this point in the history
Changing a few things to get this test to pass under stress:
- use 50 ranges instead of 10, because there are already 50-ish system ranges,
  so if we write only 10 more ranges those might not get distributed on all
  servers.
- avoid reading from the source cluster after stopping a node, it's flaky,
  see #107499 for more info.

Epic: none
Fixes: #107023
Fixes: #106865

Release note: None
  • Loading branch information
lidorcarmel committed Jul 28, 2023
1 parent 167da65 commit ce27952
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 35 deletions.
7 changes: 5 additions & 2 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,11 @@ func requireReplicatedTime(targetTime hlc.Timestamp, progress *jobspb.Progress)
}

func CreateScatteredTable(t *testing.T, c *TenantStreamingClusters, numNodes int) {
// Create a source table with multiple ranges spread across multiple nodes
numRanges := 10
// Create a source table with multiple ranges spread across multiple nodes. We
// need around 50 or more ranges because there are already over 50 system
// ranges, so if we write just a few ranges those might all be on a single
// server, which will cause the test to flake.
numRanges := 50
rowsPerRange := 20
c.SrcTenantSQL.Exec(t, "CREATE TABLE d.scattered (key INT PRIMARY KEY)")
c.SrcTenantSQL.Exec(t, "INSERT INTO d.scattered (key) SELECT * FROM generate_series(1, $1)",
Expand Down
48 changes: 15 additions & 33 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl"
Expand Down Expand Up @@ -460,11 +459,12 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) {
})
}

// TestTenantStreamingUnavailableStreamAddress verifies that after a
// pause/resume (replan) we will not use a dead server as a source.
func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 106865)
skip.UnderDeadlock(t, "multi-node may time out under deadlock")
skip.UnderRace(t, "takes too long with multiple nodes")

Expand Down Expand Up @@ -495,38 +495,23 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
streamAddresses := progress.GetStreamIngest().StreamAddresses
require.Greater(t, len(streamAddresses), 1)

destroyedAddress := c.SrcURL.String()
// Write something to the source cluster, note that the job is paused - and
// therefore not replicated for now.
c.SrcTenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)")
c.SrcTenantSQL.Exec(t, `INSERT INTO d.x VALUES (3);`)

// Stop a server on the source cluster. Note that in this test we are trying
// to avoid using the source cluster after this point because if we do the
// test flakes, see #107499 for more info.
destroyedAddress := c.SrcURL.String()
require.NoError(t, c.SrcTenantConn.Close())
c.SrcTenantServer.Stopper().Stop(ctx)
c.SrcCluster.StopServer(0)

// Once SrcCluster.Server(0) is shut down queries must be ran against a different server
alternateSrcSysSQL := sqlutils.MakeSQLRunner(c.SrcCluster.ServerConn(1))
_, alternateSrcTenantConn := serverutils.StartTenant(t, c.SrcCluster.Server(1),
base.TestTenantArgs{
TenantID: c.Args.SrcTenantID,
TenantName: c.Args.SrcTenantName,
DisableCreateTenant: true,
})
defer alternateSrcTenantConn.Close()
alternateSrcTenantSQL := sqlutils.MakeSQLRunner(alternateSrcTenantConn)

alternateCompareResult := func(query string) {
sourceData := alternateSrcTenantSQL.QueryStr(c.T, query)
destData := c.DestTenantSQL.QueryStr(c.T, query)
require.Equal(c.T, sourceData, destData)
}

c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID)
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

alternateSrcTenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)")
alternateSrcTenantSQL.Exec(t, `INSERT INTO d.x VALUES (3);`)

var cutoverTime time.Time
alternateSrcSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)

cutoverTime := c.SrcCluster.Server(1).Clock().Now().GoTime()
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
Expand All @@ -539,18 +524,15 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
require.NoError(t, cleanUpTenant())
}()

// The destroyed address should have been removed from the topology
// The destroyed address should have been removed from the topology.
progress = jobutils.GetJobProgress(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
newStreamAddresses := progress.GetStreamIngest().StreamAddresses
require.Contains(t, streamAddresses, destroyedAddress)
require.NotContains(t, newStreamAddresses, destroyedAddress)

alternateCompareResult("SELECT * FROM d.t1")
alternateCompareResult("SELECT * FROM d.t2")
alternateCompareResult("SELECT * FROM d.x")

// We can't use alternateCompareResult because it'll try to contact the deceased
// n1 even if the lease holders for d.scattered have all moved to other nodes
// Verify the destination tenant is fully replicated.
destData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.x")
require.Equal(c.T, [][]string{{"3", "NULL"}}, destData)
dstScatteredData := c.DestTenantSQL.QueryStr(c.T, "SELECT * FROM d.scattered ORDER BY key")
require.Equal(t, srcScatteredData, dstScatteredData)
}
Expand Down

0 comments on commit ce27952

Please sign in to comment.